Expose data-quality semantics before rolling contract-first further

Phase 3 adds concrete data-quality states to the contract surface so weekend runs, stale market data, partial payloads, and provider/config mismatches stop collapsing into generic success or failure. The backend now carries those diagnostics from quant/llm runners through the legacy executor contract, while the frontend reads decision/confidence fields from result or compat instead of assuming legacy top-level payloads.

Constraint: existing recommendation/task files and current dashboard routes must remain readable during migration
Rejected: infer data quality only in the service layer | loses source-specific evidence and violates the executor/orchestrator boundary
Rejected: leave frontend on top-level decision fields | breaks as soon as contract-first payloads become the default
Confidence: high
Scope-risk: moderate
Reversibility: clean
Directive: keep new data-quality states explicit in contract metadata and route all UI reads through result/compat helpers
Tested: python -m pytest orchestrator/tests/test_quant_runner.py orchestrator/tests/test_llm_runner.py orchestrator/tests/test_signals.py orchestrator/tests/test_application_service.py orchestrator/tests/test_trading_graph_config.py web_dashboard/backend/tests/test_executors.py web_dashboard/backend/tests/test_services_migration.py web_dashboard/backend/tests/test_api_smoke.py web_dashboard/backend/tests/test_main_api.py web_dashboard/backend/tests/test_portfolio_api.py -q
Tested: python -m compileall orchestrator tradingagents web_dashboard/backend
Tested: npm run build (web_dashboard/frontend)
Not-tested: real exchange holiday calendars beyond weekend detection
Not-tested: real provider-backed end-to-end runs for provider_mismatch and stale-data scenarios
This commit is contained in:
陈少杰 2026-04-14 00:37:35 +08:00
parent 0c70bae58b
commit 4cb9d98544
16 changed files with 482 additions and 31 deletions

View File

@ -7,9 +7,13 @@ class ReasonCode(str, Enum):
QUANT_INIT_FAILED = "quant_init_failed"
QUANT_SIGNAL_FAILED = "quant_signal_failed"
QUANT_NO_DATA = "quant_no_data"
NON_TRADING_DAY = "non_trading_day"
PARTIAL_DATA = "partial_data"
STALE_DATA = "stale_data"
LLM_INIT_FAILED = "llm_init_failed"
LLM_SIGNAL_FAILED = "llm_signal_failed"
LLM_UNKNOWN_RATING = "llm_unknown_rating"
PROVIDER_MISMATCH = "provider_mismatch"
BOTH_SIGNALS_UNAVAILABLE = "both_signals_unavailable"

View File

@ -10,6 +10,12 @@ from orchestrator.contracts.result_contract import Signal, build_error_signal
logger = logging.getLogger(__name__)
def _build_data_quality(state: str, **details):
payload = {"state": state}
payload.update({key: value for key, value in details.items() if value is not None})
return payload
class LLMRunner:
def __init__(self, config: OrchestratorConfig):
self._config = config
@ -28,6 +34,24 @@ class LLMRunner:
self._graph = TradingAgentsGraph(**graph_kwargs)
return self._graph
def _detect_provider_mismatch(self):
trading_cfg = self._config.trading_agents_config or {}
provider = str(trading_cfg.get("llm_provider", "")).lower()
base_url = str(trading_cfg.get("backend_url", "") or "").lower()
if not provider or not base_url:
return None
if provider == "anthropic" and "/anthropic" not in base_url:
return {
"provider": provider,
"backend_url": trading_cfg.get("backend_url"),
}
if provider in {"openai", "openrouter", "ollama", "xai"} and "/anthropic" in base_url:
return {
"provider": provider,
"backend_url": trading_cfg.get("backend_url"),
}
return None
def get_signal(self, ticker: str, date: str) -> Signal:
"""获取指定股票在指定日期的 LLM 信号,带缓存。"""
safe_ticker = ticker.replace("/", "_") # sanitize for filesystem (e.g. BRK/B)
@ -47,6 +71,21 @@ class LLMRunner:
metadata=data,
)
mismatch = self._detect_provider_mismatch()
if mismatch is not None:
return build_error_signal(
ticker=ticker,
source="llm",
reason_code=ReasonCode.PROVIDER_MISMATCH.value,
message=(
f"provider '{mismatch['provider']}' does not match backend_url "
f"'{mismatch['backend_url']}'"
),
metadata={
"data_quality": _build_data_quality("provider_mismatch", **mismatch),
},
)
try:
_final_state, processed_signal = self._get_graph().propagate(ticker, date)
rating = processed_signal if isinstance(processed_signal, str) else str(processed_signal)
@ -60,6 +99,7 @@ class LLMRunner:
"timestamp": now.isoformat(),
"ticker": ticker,
"date": date,
"data_quality": _build_data_quality("ok"),
}
with open(cache_path, "w", encoding="utf-8") as f:
json.dump(cache_data, f, ensure_ascii=False, indent=2)
@ -74,11 +114,21 @@ class LLMRunner:
)
except Exception as e:
logger.error("LLMRunner: propagate failed for %s %s: %s", ticker, date, e)
reason_code = ReasonCode.LLM_SIGNAL_FAILED.value
if "Unsupported LLM provider" in str(e):
reason_code = ReasonCode.PROVIDER_MISMATCH.value
return build_error_signal(
ticker=ticker,
source="llm",
reason_code=ReasonCode.LLM_SIGNAL_FAILED.value,
reason_code=reason_code,
message=str(e),
metadata={
"data_quality": _build_data_quality(
"provider_mismatch" if reason_code == ReasonCode.PROVIDER_MISMATCH.value else "unknown",
provider=(self._config.trading_agents_config or {}).get("llm_provider"),
backend_url=(self._config.trading_agents_config or {}).get("backend_url"),
),
},
)
def _map_rating(self, rating: str) -> tuple[int, float]:

View File

@ -47,25 +47,31 @@ class TradingOrchestrator:
quant_sig: Optional[Signal] = None
llm_sig: Optional[Signal] = None
degradation_reasons: list[str] = []
source_diagnostics: dict[str, dict] = {}
if self._quant is None and self._quant_unavailable_reason:
degradation_reasons.append(self._quant_unavailable_reason)
source_diagnostics["quant"] = {"reason_code": self._quant_unavailable_reason}
if self._llm is None and self._llm_unavailable_reason:
degradation_reasons.append(self._llm_unavailable_reason)
source_diagnostics["llm"] = {"reason_code": self._llm_unavailable_reason}
# Get quant signal
if self._quant is not None:
try:
quant_sig = self._quant.get_signal(ticker, date)
if quant_sig.degraded:
reason_code = signal_reason_code(quant_sig) or ReasonCode.QUANT_SIGNAL_FAILED.value
degradation_reasons.append(
signal_reason_code(quant_sig) or ReasonCode.QUANT_SIGNAL_FAILED.value
reason_code
)
source_diagnostics["quant"] = self._build_source_diagnostic(quant_sig, reason_code)
logger.warning("TradingOrchestrator: quant signal degraded for %s %s", ticker, date)
quant_sig = None
except Exception as e:
logger.error("TradingOrchestrator: quant get_signal failed: %s", e)
degradation_reasons.append(ReasonCode.QUANT_SIGNAL_FAILED.value)
source_diagnostics["quant"] = {"reason_code": ReasonCode.QUANT_SIGNAL_FAILED.value}
quant_sig = None
# Get llm signal
@ -73,21 +79,72 @@ class TradingOrchestrator:
try:
llm_sig = self._llm.get_signal(ticker, date)
if llm_sig.degraded:
reason_code = signal_reason_code(llm_sig) or ReasonCode.LLM_SIGNAL_FAILED.value
degradation_reasons.append(
signal_reason_code(llm_sig) or ReasonCode.LLM_SIGNAL_FAILED.value
reason_code
)
source_diagnostics["llm"] = self._build_source_diagnostic(llm_sig, reason_code)
logger.warning("TradingOrchestrator: llm signal degraded for %s %s", ticker, date)
llm_sig = None
except Exception as e:
logger.error("TradingOrchestrator: llm get_signal failed: %s", e)
degradation_reasons.append(ReasonCode.LLM_SIGNAL_FAILED.value)
source_diagnostics["llm"] = {"reason_code": ReasonCode.LLM_SIGNAL_FAILED.value}
llm_sig = None
# merge raises ValueError if both None
if quant_sig is None and llm_sig is None:
degradation_reasons.append(ReasonCode.BOTH_SIGNALS_UNAVAILABLE.value)
return self._merger.merge(
final_signal = self._merger.merge(
quant_sig,
llm_sig,
degradation_reasons=degradation_reasons,
)
data_quality = self._summarize_data_quality(source_diagnostics)
metadata = dict(final_signal.metadata)
if source_diagnostics:
metadata["source_diagnostics"] = source_diagnostics
if data_quality:
metadata["data_quality"] = data_quality
final_signal.metadata = metadata
return final_signal
@staticmethod
def _build_source_diagnostic(signal: Signal, reason_code: str) -> dict:
diagnostic = {"reason_code": reason_code}
data_quality = signal.metadata.get("data_quality")
if data_quality is not None:
diagnostic["data_quality"] = data_quality
error = signal.metadata.get("error")
if error:
diagnostic["error"] = error
return diagnostic
@staticmethod
def _summarize_data_quality(source_diagnostics: dict[str, dict]) -> Optional[dict]:
states: list[tuple[str, dict]] = []
for source, diagnostic in source_diagnostics.items():
data_quality = diagnostic.get("data_quality")
if isinstance(data_quality, dict) and data_quality.get("state"):
states.append((source, data_quality))
if not states:
return None
priority = {
"provider_mismatch": 0,
"non_trading_day": 1,
"stale_data": 2,
"partial_data": 3,
}
source, selected = sorted(
states,
key=lambda item: priority.get(item[1].get("state"), 999),
)[0]
summary = dict(selected)
summary["source"] = source
summary["issues"] = [
{"source": issue_source, **issue_quality}
for issue_source, issue_quality in states
]
return summary

View File

@ -5,6 +5,7 @@ import sys
from datetime import datetime, timezone, timedelta
from typing import Any
import pandas as pd
import yfinance as yf
from orchestrator.config import OrchestratorConfig
@ -14,6 +15,12 @@ from orchestrator.contracts.result_contract import Signal, build_error_signal
logger = logging.getLogger(__name__)
def _build_data_quality(state: str, **details: Any) -> dict[str, Any]:
payload = {"state": state}
payload.update({key: value for key, value in details.items() if value is not None})
return payload
class QuantRunner:
def __init__(self, config: OrchestratorConfig):
if not config.quant_backtest_path:
@ -39,20 +46,99 @@ class QuantRunner:
start_dt = end_dt - timedelta(days=60)
start_str = start_dt.strftime("%Y-%m-%d")
df = yf.download(ticker, start=start_str, end=date, progress=False, auto_adjust=True)
end_exclusive = (end_dt + timedelta(days=1)).strftime("%Y-%m-%d")
df = yf.download(ticker, start=start_str, end=end_exclusive, progress=False, auto_adjust=True)
if df.empty:
logger.warning("No price data for %s between %s and %s", ticker, start_str, date)
if end_dt.weekday() >= 5:
return build_error_signal(
ticker=ticker,
source="quant",
reason_code=ReasonCode.NON_TRADING_DAY.value,
message=f"{date} is not a trading day",
metadata={
"start_date": start_str,
"end_date": date,
"data_quality": _build_data_quality(
"non_trading_day",
requested_date=date,
),
},
)
return build_error_signal(
ticker=ticker,
source="quant",
reason_code=ReasonCode.QUANT_NO_DATA.value,
message=f"no price data between {start_str} and {date}",
metadata={"start_date": start_str, "end_date": date},
metadata={
"start_date": start_str,
"end_date": date,
},
)
# 标准化列名为小写
df.columns = [c[0].lower() if isinstance(c, tuple) else c.lower() for c in df.columns]
required_columns = {"open", "high", "low", "close"}
missing_columns = sorted(required_columns - set(df.columns))
if missing_columns:
return build_error_signal(
ticker=ticker,
source="quant",
reason_code=ReasonCode.PARTIAL_DATA.value,
message=f"missing price columns: {', '.join(missing_columns)}",
metadata={
"start_date": start_str,
"end_date": date,
"data_quality": _build_data_quality(
"partial_data",
missing_fields=missing_columns,
),
},
)
df.index = pd.to_datetime(df.index)
available_dates = df.index.normalize()
requested_date = pd.Timestamp(end_dt.date())
if requested_date not in available_dates:
last_available_ts = df.index.max()
last_available_date = (
last_available_ts.strftime("%Y-%m-%d")
if hasattr(last_available_ts, "strftime")
else str(last_available_ts)
)
if end_dt.weekday() >= 5:
return build_error_signal(
ticker=ticker,
source="quant",
reason_code=ReasonCode.NON_TRADING_DAY.value,
message=f"{date} is not a trading day",
metadata={
"start_date": start_str,
"end_date": date,
"data_quality": _build_data_quality(
"non_trading_day",
requested_date=date,
last_available_date=last_available_date,
),
},
)
return build_error_signal(
ticker=ticker,
source="quant",
reason_code=ReasonCode.STALE_DATA.value,
message=f"latest price data stops at {last_available_date}",
metadata={
"start_date": start_str,
"end_date": date,
"data_quality": _build_data_quality(
"stale_data",
requested_date=date,
last_available_date=last_available_date,
),
},
)
# 用最佳参数创建 BollingerStrategy 实例
# Lazy import: requires quant_backtest_path to be in sys.path (set in __init__)
from strategies.momentum import BollingerStrategy
@ -117,7 +203,16 @@ class QuantRunner:
confidence=confidence,
source="quant",
timestamp=datetime.now(timezone.utc),
metadata={"params": params, "sharpe_ratio": sharpe, "max_sharpe": max_sharpe},
metadata={
"params": params,
"sharpe_ratio": sharpe,
"max_sharpe": max_sharpe,
"data_quality": _build_data_quality(
"ok",
requested_date=date,
last_available_date=date,
),
},
)
def _load_best_params(self) -> dict:

View File

@ -82,3 +82,20 @@ def test_get_signal_returns_reason_code_on_propagate_failure(monkeypatch, tmp_pa
assert signal.degraded is True
assert signal.reason_code == ReasonCode.LLM_SIGNAL_FAILED.value
assert signal.metadata["error"] == "graph unavailable"
def test_get_signal_returns_provider_mismatch_before_graph_init(tmp_path):
cfg = OrchestratorConfig(
cache_dir=str(tmp_path),
trading_agents_config={
"llm_provider": "anthropic",
"backend_url": "https://api.openai.com/v1",
},
)
runner = LLMRunner(cfg)
signal = runner.get_signal("AAPL", "2024-01-02")
assert signal.degraded is True
assert signal.reason_code == ReasonCode.PROVIDER_MISMATCH.value
assert signal.metadata["data_quality"]["state"] == "provider_mismatch"

View File

@ -1,6 +1,7 @@
"""Tests for QuantRunner._calc_confidence()."""
import json
import sqlite3
import pandas as pd
import pytest
from orchestrator.config import OrchestratorConfig
@ -74,3 +75,61 @@ def test_get_signal_returns_reason_code_when_no_data(runner, monkeypatch):
assert signal.degraded is True
assert signal.reason_code == ReasonCode.QUANT_NO_DATA.value
def test_get_signal_marks_non_trading_day_on_weekend(runner, monkeypatch):
monkeypatch.setattr(
"orchestrator.quant_runner.yf.download",
lambda *args, **kwargs: pd.DataFrame(),
)
signal = runner.get_signal("AAPL", "2024-01-06")
assert signal.degraded is True
assert signal.reason_code == ReasonCode.NON_TRADING_DAY.value
assert signal.metadata["data_quality"]["state"] == "non_trading_day"
def test_get_signal_marks_stale_data_when_requested_day_missing(runner, monkeypatch):
stale_frame = pd.DataFrame(
{
"Open": [10.0],
"High": [11.0],
"Low": [9.0],
"Close": [10.5],
"Volume": [1000],
},
index=pd.to_datetime(["2024-01-01"]),
)
monkeypatch.setattr(
"orchestrator.quant_runner.yf.download",
lambda *args, **kwargs: stale_frame,
)
signal = runner.get_signal("AAPL", "2024-01-02")
assert signal.degraded is True
assert signal.reason_code == ReasonCode.STALE_DATA.value
assert signal.metadata["data_quality"]["state"] == "stale_data"
def test_get_signal_marks_partial_data_when_required_columns_missing(runner, monkeypatch):
partial_frame = pd.DataFrame(
{
"Open": [10.0],
"Low": [9.0],
"Close": [10.5],
"Volume": [1000],
},
index=pd.to_datetime(["2024-01-02"]),
)
monkeypatch.setattr(
"orchestrator.quant_runner.yf.download",
lambda *args, **kwargs: partial_frame,
)
signal = runner.get_signal("AAPL", "2024-01-02")
assert signal.degraded is True
assert signal.reason_code == ReasonCode.PARTIAL_DATA.value
assert signal.metadata["data_quality"]["state"] == "partial_data"

View File

@ -333,6 +333,11 @@ def _normalize_recommendation_record(record: dict, *, date: Optional[str] = None
},
"degraded": quant_signal is None or llm_signal is None,
},
"degradation": normalized.get("degradation") or {
"degraded": quant_signal is None or llm_signal is None,
"reason_codes": [],
},
"data_quality": normalized.get("data_quality"),
"compat": {
"analysis_date": date_value,
"decision": decision,

View File

@ -306,11 +306,15 @@ class AnalysisService:
quant_signal = output.quant_signal
llm_signal = output.llm_signal
confidence = output.confidence
data_quality = output.data_quality
degrade_reason_codes = list(output.degrade_reason_codes)
else:
decision = "HOLD"
quant_signal = None
llm_signal = None
confidence = None
data_quality = None
degrade_reason_codes = []
for line in (stdout or "").splitlines():
if line.startswith("SIGNAL_DETAIL:"):
try:
@ -328,7 +332,7 @@ class AnalysisService:
"ticker": ticker,
"name": stock.get("name", ticker),
"date": date,
"status": "completed",
"status": "degraded_success" if (degrade_reason_codes or data_quality or quant_signal is None or llm_signal is None) else "completed",
"created_at": datetime.now().isoformat(),
"result": {
"decision": decision,
@ -351,6 +355,11 @@ class AnalysisService:
},
"degraded": quant_signal is None or llm_signal is None,
},
"degradation": {
"degraded": bool(degrade_reason_codes) or quant_signal is None or llm_signal is None,
"reason_codes": degrade_reason_codes,
},
"data_quality": data_quality,
"compat": {
"analysis_date": date,
"decision": decision,

View File

@ -41,6 +41,25 @@ trading_config["project_dir"] = os.path.join(repo_root, "tradingagents")
trading_config["results_dir"] = os.path.join(repo_root, "results")
trading_config["max_debate_rounds"] = 1
trading_config["max_risk_discuss_rounds"] = 1
if os.environ.get("TRADINGAGENTS_LLM_PROVIDER"):
trading_config["llm_provider"] = os.environ["TRADINGAGENTS_LLM_PROVIDER"]
elif os.environ.get("ANTHROPIC_BASE_URL"):
trading_config["llm_provider"] = "anthropic"
elif os.environ.get("OPENAI_BASE_URL"):
trading_config["llm_provider"] = "openai"
if os.environ.get("TRADINGAGENTS_BACKEND_URL"):
trading_config["backend_url"] = os.environ["TRADINGAGENTS_BACKEND_URL"]
elif os.environ.get("ANTHROPIC_BASE_URL"):
trading_config["backend_url"] = os.environ["ANTHROPIC_BASE_URL"]
elif os.environ.get("OPENAI_BASE_URL"):
trading_config["backend_url"] = os.environ["OPENAI_BASE_URL"]
if os.environ.get("TRADINGAGENTS_MODEL"):
trading_config["deep_think_llm"] = os.environ["TRADINGAGENTS_MODEL"]
trading_config["quick_think_llm"] = os.environ["TRADINGAGENTS_MODEL"]
if os.environ.get("TRADINGAGENTS_DEEP_MODEL"):
trading_config["deep_think_llm"] = os.environ["TRADINGAGENTS_DEEP_MODEL"]
if os.environ.get("TRADINGAGENTS_QUICK_MODEL"):
trading_config["quick_think_llm"] = os.environ["TRADINGAGENTS_QUICK_MODEL"]
print("STAGE:analysts", flush=True)
print("STAGE:research", flush=True)
@ -105,7 +124,13 @@ report_path.write_text(report_content)
print("STAGE:portfolio", flush=True)
signal_detail = json.dumps({"llm_signal": llm_signal, "quant_signal": quant_signal, "confidence": confidence})
result_meta = json.dumps({
"degrade_reason_codes": list(getattr(result, "degrade_reason_codes", ())),
"data_quality": (result.metadata or {}).get("data_quality"),
"source_diagnostics": (result.metadata or {}).get("source_diagnostics"),
})
print("SIGNAL_DETAIL:" + signal_detail, flush=True)
print("RESULT_META:" + result_meta, flush=True)
print("ANALYSIS_COMPLETE:" + signal, flush=True)
"""
@ -125,6 +150,9 @@ class AnalysisExecutionOutput:
llm_signal: Optional[str]
confidence: Optional[float]
report_path: Optional[str] = None
degrade_reason_codes: tuple[str, ...] = ()
data_quality: Optional[dict] = None
source_diagnostics: Optional[dict] = None
contract_version: str = CONTRACT_VERSION
executor_type: str = DEFAULT_EXECUTOR_TYPE
@ -138,17 +166,24 @@ class AnalysisExecutionOutput:
elapsed_seconds: int,
current_stage: str = "portfolio",
) -> dict:
degraded = bool(self.degrade_reason_codes) or bool(self.data_quality) or self.quant_signal is None or self.llm_signal is None
return {
"contract_version": self.contract_version,
"task_id": task_id,
"ticker": ticker,
"date": date,
"status": "completed",
"status": "degraded_success" if degraded else "completed",
"progress": 100,
"current_stage": current_stage,
"created_at": created_at,
"elapsed_seconds": elapsed_seconds,
"elapsed": elapsed_seconds,
"degradation": {
"degraded": degraded,
"reason_codes": list(self.degrade_reason_codes),
"source_diagnostics": self.source_diagnostics or {},
},
"data_quality": self.data_quality,
"result": {
"decision": self.decision,
"confidence": self.confidence,
@ -168,7 +203,7 @@ class AnalysisExecutionOutput:
"available": self.llm_signal is not None,
},
},
"degraded": self.quant_signal is None or self.llm_signal is None,
"degraded": degraded,
"report": {
"path": self.report_path,
"available": bool(self.report_path),
@ -325,7 +360,11 @@ class LegacySubprocessAnalysisExecutor:
quant_signal = None
llm_signal = None
confidence = None
degrade_reason_codes: tuple[str, ...] = ()
data_quality = None
source_diagnostics = None
seen_signal_detail = False
seen_result_meta = False
seen_complete = False
for line in stdout_lines:
@ -338,6 +377,15 @@ class LegacySubprocessAnalysisExecutor:
quant_signal = detail.get("quant_signal")
llm_signal = detail.get("llm_signal")
confidence = detail.get("confidence")
elif line.startswith("RESULT_META:"):
seen_result_meta = True
try:
detail = json.loads(line.split(":", 1)[1].strip())
except Exception as exc:
raise AnalysisExecutorError("failed to parse RESULT_META payload") from exc
degrade_reason_codes = tuple(detail.get("degrade_reason_codes") or ())
data_quality = detail.get("data_quality")
source_diagnostics = detail.get("source_diagnostics")
elif line.startswith("ANALYSIS_COMPLETE:"):
seen_complete = True
decision = line.split(":", 1)[1].strip()
@ -360,6 +408,9 @@ class LegacySubprocessAnalysisExecutor:
llm_signal=llm_signal,
confidence=confidence,
report_path=report_path,
degrade_reason_codes=degrade_reason_codes,
data_quality=data_quality,
source_diagnostics=source_diagnostics,
contract_version=contract_version,
executor_type=executor_type,
)

View File

@ -151,7 +151,7 @@ class JobService:
state["result"] = result
state["error"] = contract.get("error")
state["contract_version"] = contract.get("contract_version", state.get("contract_version"))
state["degradation_summary"] = self._build_degradation_summary(result)
state["degradation_summary"] = contract.get("degradation") or self._build_degradation_summary(result)
state["data_quality_summary"] = contract.get("data_quality")
state["compat"] = {
"decision": result.get("decision"),
@ -255,6 +255,8 @@ class JobService:
"status": payload["status"],
"created_at": payload.get("created_at"),
"error": payload.get("error"),
"data_quality_summary": payload.get("data_quality_summary"),
"degradation_summary": payload.get("degradation_summary"),
}
if state.get("type") == "portfolio":
summary.update({
@ -310,6 +312,8 @@ class JobService:
normalized.setdefault("result_ref", None)
normalized.setdefault("degradation_summary", None)
normalized.setdefault("data_quality_summary", None)
if "data_quality" in normalized and normalized.get("data_quality_summary") is None:
normalized["data_quality_summary"] = normalized.get("data_quality")
compat = normalized.get("compat")
if not isinstance(compat, dict):
compat = {}

View File

@ -110,3 +110,29 @@ def test_executor_kills_subprocess_on_timeout(monkeypatch):
assert process.kill_called is True
assert process.wait_called is True
def test_executor_marks_degraded_success_when_result_meta_reports_data_quality():
output = LegacySubprocessAnalysisExecutor._parse_output(
stdout_lines=[
'SIGNAL_DETAIL:{"quant_signal":"HOLD","llm_signal":"BUY","confidence":0.6}',
'RESULT_META:{"degrade_reason_codes":["non_trading_day"],"data_quality":{"state":"non_trading_day","requested_date":"2026-04-12"}}',
"ANALYSIS_COMPLETE:OVERWEIGHT",
],
ticker="AAPL",
date="2026-04-12",
contract_version="v1alpha1",
executor_type="legacy_subprocess",
)
contract = output.to_result_contract(
task_id="task-3",
ticker="AAPL",
date="2026-04-12",
created_at="2026-04-12T10:00:00",
elapsed_seconds=3,
)
assert contract["status"] == "degraded_success"
assert contract["data_quality"]["state"] == "non_trading_day"
assert contract["degradation"]["reason_codes"] == ["non_trading_day"]

View File

@ -4,6 +4,7 @@ const STATUS_TAG_MAP = {
pending: { text: '等待', bg: 'var(--bg-elevated)', color: 'var(--text-muted)' },
running: { text: '分析中', bg: 'var(--running-dim)', color: 'var(--running)' },
completed: { text: '完成', bg: 'var(--buy-dim)', color: 'var(--buy)' },
degraded_success: { text: '降级完成', bg: 'var(--hold-dim)', color: 'var(--hold)' },
failed: { text: '失败', bg: 'var(--sell-dim)', color: 'var(--sell)' },
}
@ -11,6 +12,8 @@ export function StatusIcon({ status }) {
switch (status) {
case 'completed':
return <CheckCircleOutlined style={{ color: 'var(--buy)', fontSize: 16 }} />
case 'degraded_success':
return <CheckCircleOutlined style={{ color: 'var(--hold)', fontSize: 16 }} />
case 'failed':
return <CloseCircleOutlined style={{ color: 'var(--sell)', fontSize: 16 }} />
case 'running':

View File

@ -3,6 +3,15 @@ import { useSearchParams } from 'react-router-dom'
import { Card, Progress, Badge, Empty, Button, Result, message } from 'antd'
import DecisionBadge from '../components/DecisionBadge'
import { StatusIcon } from '../components/StatusIcon'
import {
getConfidence,
getDecision,
getDisplayDate,
getErrorMessage,
getLlmSignal,
getQuantSignal,
isCompletedLikeStatus,
} from '../utils/contractView'
const ANALYSIS_STAGES = [
{ key: 'analysts', label: '分析师团队' },
@ -20,6 +29,13 @@ export default function AnalysisMonitor() {
const [loading, setLoading] = useState(false)
const [error, setError] = useState(null)
const wsRef = useRef(null)
const decision = getDecision(task)
const llmSignal = getLlmSignal(task)
const quantSignal = getQuantSignal(task)
const confidence = getConfidence(task)
const displayDate = getDisplayDate(task)
const dataQuality = task?.data_quality_summary
const errorMessage = getErrorMessage(task)
const fetchInitialState = useCallback(async () => {
if (!taskId) return
@ -155,23 +171,38 @@ export default function AnalysisMonitor() {
<span style={{ fontFamily: 'var(--font-ui)', fontSize: 28, fontWeight: 600, letterSpacing: 0.196, lineHeight: 1.14 }}>
{task.ticker}
</span>
<DecisionBadge decision={task.decision} />
<DecisionBadge decision={decision} />
</div>
{displayDate && (
<div style={{ marginBottom: 10, fontSize: 'var(--text-sm)', color: 'var(--text-secondary)' }}>
分析日期: {displayDate}
</div>
)}
{/* Signal Detail Row */}
{task.status === 'completed' && (task.llm_signal || task.quant_signal || task.confidence != null) && (
{isCompletedLikeStatus(task.status) && (llmSignal || quantSignal || confidence != null) && (
<div style={{ display: 'flex', gap: 24, marginBottom: 12, fontSize: 'var(--text-sm)', fontFamily: 'var(--font-ui)', color: 'var(--text-secondary)' }}>
{task.llm_signal && (
<span>LLM: <DecisionBadge decision={task.llm_signal} /></span>
{llmSignal && (
<span>LLM: <DecisionBadge decision={llmSignal} /></span>
)}
{task.quant_signal && (
<span>Quant: <DecisionBadge decision={task.quant_signal} /></span>
{quantSignal && (
<span>Quant: <DecisionBadge decision={quantSignal} /></span>
)}
{task.confidence != null && (
<span>置信度: <strong style={{ color: 'var(--text-primary)' }}>{(task.confidence * 100).toFixed(0)}%</strong></span>
{confidence != null && (
<span>置信度: <strong style={{ color: 'var(--text-primary)' }}>{(confidence * 100).toFixed(0)}%</strong></span>
)}
</div>
)}
{dataQuality?.state && (
<div style={{ marginBottom: 12, fontSize: 'var(--text-sm)', color: 'var(--hold)' }}>
数据质量: <strong>{dataQuality.state}</strong>
</div>
)}
{errorMessage && (
<div style={{ marginBottom: 12, fontSize: 'var(--text-sm)', color: 'var(--sell)' }}>
错误: {errorMessage}
</div>
)}
{/* Progress */}
<div style={{ display: 'flex', alignItems: 'center', gap: 16, marginBottom: 16 }}>

View File

@ -3,6 +3,7 @@ import { Table, Button, Progress, Result, Card, message, Popconfirm, Tooltip } f
import { DeleteOutlined, CopyOutlined, SyncOutlined } from '@ant-design/icons'
import DecisionBadge from '../components/DecisionBadge'
import { StatusIcon, StatusTag } from '../components/StatusIcon'
import { getDecision, getErrorMessage } from '../utils/contractView'
export default function BatchManager() {
const [tasks, setTasks] = useState([])
@ -105,10 +106,9 @@ export default function BatchManager() {
},
{
title: '决策',
dataIndex: 'decision',
key: 'decision',
width: 80,
render: (decision) => <DecisionBadge decision={decision} />,
render: (_, record) => <DecisionBadge decision={getDecision(record)} />,
},
{
title: '任务ID',
@ -132,16 +132,17 @@ export default function BatchManager() {
},
{
title: '错误',
dataIndex: 'error',
key: 'error',
width: 180,
ellipsis: { showTitle: false },
render: (error) =>
error ? (
render: (_, record) => {
const error = getErrorMessage(record)
return error ? (
<Tooltip title={error} placement="topLeft">
<span style={{ color: 'var(--sell)', fontSize: 12, display: 'block' }}>{error}</span>
</Tooltip>
) : null,
) : null
},
},
{
title: '操作',
@ -174,7 +175,7 @@ export default function BatchManager() {
const stats = useMemo(() => ({
pending: tasks.filter(t => t.status === 'pending').length,
running: tasks.filter(t => t.status === 'running').length,
completed: tasks.filter(t => t.status === 'completed').length,
completed: tasks.filter(t => t.status === 'completed' || t.status === 'degraded_success').length,
failed: tasks.filter(t => t.status === 'failed').length,
}), [tasks])

View File

@ -9,6 +9,7 @@ import {
} from '@ant-design/icons'
import { portfolioApi } from '../services/portfolioApi'
import DecisionBadge from '../components/DecisionBadge'
import { getDecision, getDisplayDate, isCompletedLikeStatus } from '../utils/contractView'
const { Text } = Typography
@ -316,7 +317,7 @@ function RecommendationsTab() {
const res = await portfolioApi.getRecommendations(date)
setData(res.recommendations || [])
if (!date) {
const d = [...new Set((res.recommendations || []).map(r => r.analysis_date))].sort().reverse()
const d = [...new Set((res.recommendations || []).map(r => getDisplayDate(r)).filter(Boolean))].sort().reverse()
setDates(d)
}
} catch {
@ -338,7 +339,7 @@ function RecommendationsTab() {
const d = JSON.parse(e.data)
if (d.type === 'progress') {
setProgress(d)
if (d.status === 'completed' || d.status === 'failed') {
if (isCompletedLikeStatus(d.status) || d.status === 'failed') {
setAnalyzing(false)
setTaskId(null)
setProgress(null)
@ -377,10 +378,10 @@ function RecommendationsTab() {
render: t => <span className="text-data">{t}</span> },
{ title: '名称', dataIndex: 'name', key: 'name', render: t => <span style={{ fontWeight: 500 }}>{t}</span> },
{
title: '决策', dataIndex: 'decision', key: 'decision', width: 80,
render: d => <DecisionBadge decision={d} />,
title: '决策', key: 'decision', width: 80,
render: (_, record) => <DecisionBadge decision={getDecision(record)} />,
},
{ title: '分析日期', dataIndex: 'analysis_date', key: 'analysis_date', width: 120 },
{ title: '分析日期', key: 'analysis_date', width: 120, render: (_, record) => getDisplayDate(record) || '—' },
]
return (

View File

@ -0,0 +1,38 @@
export function getCompat(payload) {
return payload?.compat || {}
}
export function getResult(payload) {
return payload?.result || {}
}
export function getDecision(payload) {
return getResult(payload).decision ?? getCompat(payload).decision ?? null
}
export function getQuantSignal(payload) {
return getResult(payload).signals?.quant?.rating ?? getCompat(payload).quant_signal ?? null
}
export function getLlmSignal(payload) {
return getResult(payload).signals?.llm?.rating ?? getCompat(payload).llm_signal ?? null
}
export function getConfidence(payload) {
return getResult(payload).confidence ?? getCompat(payload).confidence ?? null
}
export function getDisplayDate(payload) {
return payload?.date ?? getCompat(payload).analysis_date ?? null
}
export function getErrorMessage(payload) {
const error = payload?.error
if (!error) return null
if (typeof error === 'string') return error
return error.message || error.code || null
}
export function isCompletedLikeStatus(status) {
return status === 'completed' || status === 'degraded_success'
}