From 4cb9d98544756e251c3552661de1a18901919183 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=99=88=E5=B0=91=E6=9D=B0?= Date: Tue, 14 Apr 2026 00:37:35 +0800 Subject: [PATCH] Expose data-quality semantics before rolling contract-first further Phase 3 adds concrete data-quality states to the contract surface so weekend runs, stale market data, partial payloads, and provider/config mismatches stop collapsing into generic success or failure. The backend now carries those diagnostics from quant/llm runners through the legacy executor contract, while the frontend reads decision/confidence fields from result or compat instead of assuming legacy top-level payloads. Constraint: existing recommendation/task files and current dashboard routes must remain readable during migration Rejected: infer data quality only in the service layer | loses source-specific evidence and violates the executor/orchestrator boundary Rejected: leave frontend on top-level decision fields | breaks as soon as contract-first payloads become the default Confidence: high Scope-risk: moderate Reversibility: clean Directive: keep new data-quality states explicit in contract metadata and route all UI reads through result/compat helpers Tested: python -m pytest orchestrator/tests/test_quant_runner.py orchestrator/tests/test_llm_runner.py orchestrator/tests/test_signals.py orchestrator/tests/test_application_service.py orchestrator/tests/test_trading_graph_config.py web_dashboard/backend/tests/test_executors.py web_dashboard/backend/tests/test_services_migration.py web_dashboard/backend/tests/test_api_smoke.py web_dashboard/backend/tests/test_main_api.py web_dashboard/backend/tests/test_portfolio_api.py -q Tested: python -m compileall orchestrator tradingagents web_dashboard/backend Tested: npm run build (web_dashboard/frontend) Not-tested: real exchange holiday calendars beyond weekend detection Not-tested: real provider-backed end-to-end runs for provider_mismatch and stale-data scenarios --- orchestrator/contracts/error_taxonomy.py | 4 + orchestrator/llm_runner.py | 52 ++++++++- orchestrator/orchestrator.py | 63 ++++++++++- orchestrator/quant_runner.py | 101 +++++++++++++++++- orchestrator/tests/test_llm_runner.py | 17 +++ orchestrator/tests/test_quant_runner.py | 59 ++++++++++ web_dashboard/backend/api/portfolio.py | 5 + .../backend/services/analysis_service.py | 11 +- web_dashboard/backend/services/executor.py | 55 +++++++++- web_dashboard/backend/services/job_service.py | 6 +- web_dashboard/backend/tests/test_executors.py | 26 +++++ .../frontend/src/components/StatusIcon.jsx | 3 + .../frontend/src/pages/AnalysisMonitor.jsx | 47 ++++++-- .../frontend/src/pages/BatchManager.jsx | 15 +-- .../frontend/src/pages/PortfolioPanel.jsx | 11 +- .../frontend/src/utils/contractView.js | 38 +++++++ 16 files changed, 482 insertions(+), 31 deletions(-) create mode 100644 web_dashboard/frontend/src/utils/contractView.js diff --git a/orchestrator/contracts/error_taxonomy.py b/orchestrator/contracts/error_taxonomy.py index 81bff597..d6f1fc3d 100644 --- a/orchestrator/contracts/error_taxonomy.py +++ b/orchestrator/contracts/error_taxonomy.py @@ -7,9 +7,13 @@ class ReasonCode(str, Enum): QUANT_INIT_FAILED = "quant_init_failed" QUANT_SIGNAL_FAILED = "quant_signal_failed" QUANT_NO_DATA = "quant_no_data" + NON_TRADING_DAY = "non_trading_day" + PARTIAL_DATA = "partial_data" + STALE_DATA = "stale_data" LLM_INIT_FAILED = "llm_init_failed" LLM_SIGNAL_FAILED = "llm_signal_failed" LLM_UNKNOWN_RATING = "llm_unknown_rating" + PROVIDER_MISMATCH = "provider_mismatch" BOTH_SIGNALS_UNAVAILABLE = "both_signals_unavailable" diff --git a/orchestrator/llm_runner.py b/orchestrator/llm_runner.py index 8b23afe3..9c5b3988 100644 --- a/orchestrator/llm_runner.py +++ b/orchestrator/llm_runner.py @@ -10,6 +10,12 @@ from orchestrator.contracts.result_contract import Signal, build_error_signal logger = logging.getLogger(__name__) +def _build_data_quality(state: str, **details): + payload = {"state": state} + payload.update({key: value for key, value in details.items() if value is not None}) + return payload + + class LLMRunner: def __init__(self, config: OrchestratorConfig): self._config = config @@ -28,6 +34,24 @@ class LLMRunner: self._graph = TradingAgentsGraph(**graph_kwargs) return self._graph + def _detect_provider_mismatch(self): + trading_cfg = self._config.trading_agents_config or {} + provider = str(trading_cfg.get("llm_provider", "")).lower() + base_url = str(trading_cfg.get("backend_url", "") or "").lower() + if not provider or not base_url: + return None + if provider == "anthropic" and "/anthropic" not in base_url: + return { + "provider": provider, + "backend_url": trading_cfg.get("backend_url"), + } + if provider in {"openai", "openrouter", "ollama", "xai"} and "/anthropic" in base_url: + return { + "provider": provider, + "backend_url": trading_cfg.get("backend_url"), + } + return None + def get_signal(self, ticker: str, date: str) -> Signal: """获取指定股票在指定日期的 LLM 信号,带缓存。""" safe_ticker = ticker.replace("/", "_") # sanitize for filesystem (e.g. BRK/B) @@ -47,6 +71,21 @@ class LLMRunner: metadata=data, ) + mismatch = self._detect_provider_mismatch() + if mismatch is not None: + return build_error_signal( + ticker=ticker, + source="llm", + reason_code=ReasonCode.PROVIDER_MISMATCH.value, + message=( + f"provider '{mismatch['provider']}' does not match backend_url " + f"'{mismatch['backend_url']}'" + ), + metadata={ + "data_quality": _build_data_quality("provider_mismatch", **mismatch), + }, + ) + try: _final_state, processed_signal = self._get_graph().propagate(ticker, date) rating = processed_signal if isinstance(processed_signal, str) else str(processed_signal) @@ -60,6 +99,7 @@ class LLMRunner: "timestamp": now.isoformat(), "ticker": ticker, "date": date, + "data_quality": _build_data_quality("ok"), } with open(cache_path, "w", encoding="utf-8") as f: json.dump(cache_data, f, ensure_ascii=False, indent=2) @@ -74,11 +114,21 @@ class LLMRunner: ) except Exception as e: logger.error("LLMRunner: propagate failed for %s %s: %s", ticker, date, e) + reason_code = ReasonCode.LLM_SIGNAL_FAILED.value + if "Unsupported LLM provider" in str(e): + reason_code = ReasonCode.PROVIDER_MISMATCH.value return build_error_signal( ticker=ticker, source="llm", - reason_code=ReasonCode.LLM_SIGNAL_FAILED.value, + reason_code=reason_code, message=str(e), + metadata={ + "data_quality": _build_data_quality( + "provider_mismatch" if reason_code == ReasonCode.PROVIDER_MISMATCH.value else "unknown", + provider=(self._config.trading_agents_config or {}).get("llm_provider"), + backend_url=(self._config.trading_agents_config or {}).get("backend_url"), + ), + }, ) def _map_rating(self, rating: str) -> tuple[int, float]: diff --git a/orchestrator/orchestrator.py b/orchestrator/orchestrator.py index 9bc98f8b..f27c1b0a 100644 --- a/orchestrator/orchestrator.py +++ b/orchestrator/orchestrator.py @@ -47,25 +47,31 @@ class TradingOrchestrator: quant_sig: Optional[Signal] = None llm_sig: Optional[Signal] = None degradation_reasons: list[str] = [] + source_diagnostics: dict[str, dict] = {} if self._quant is None and self._quant_unavailable_reason: degradation_reasons.append(self._quant_unavailable_reason) + source_diagnostics["quant"] = {"reason_code": self._quant_unavailable_reason} if self._llm is None and self._llm_unavailable_reason: degradation_reasons.append(self._llm_unavailable_reason) + source_diagnostics["llm"] = {"reason_code": self._llm_unavailable_reason} # Get quant signal if self._quant is not None: try: quant_sig = self._quant.get_signal(ticker, date) if quant_sig.degraded: + reason_code = signal_reason_code(quant_sig) or ReasonCode.QUANT_SIGNAL_FAILED.value degradation_reasons.append( - signal_reason_code(quant_sig) or ReasonCode.QUANT_SIGNAL_FAILED.value + reason_code ) + source_diagnostics["quant"] = self._build_source_diagnostic(quant_sig, reason_code) logger.warning("TradingOrchestrator: quant signal degraded for %s %s", ticker, date) quant_sig = None except Exception as e: logger.error("TradingOrchestrator: quant get_signal failed: %s", e) degradation_reasons.append(ReasonCode.QUANT_SIGNAL_FAILED.value) + source_diagnostics["quant"] = {"reason_code": ReasonCode.QUANT_SIGNAL_FAILED.value} quant_sig = None # Get llm signal @@ -73,21 +79,72 @@ class TradingOrchestrator: try: llm_sig = self._llm.get_signal(ticker, date) if llm_sig.degraded: + reason_code = signal_reason_code(llm_sig) or ReasonCode.LLM_SIGNAL_FAILED.value degradation_reasons.append( - signal_reason_code(llm_sig) or ReasonCode.LLM_SIGNAL_FAILED.value + reason_code ) + source_diagnostics["llm"] = self._build_source_diagnostic(llm_sig, reason_code) logger.warning("TradingOrchestrator: llm signal degraded for %s %s", ticker, date) llm_sig = None except Exception as e: logger.error("TradingOrchestrator: llm get_signal failed: %s", e) degradation_reasons.append(ReasonCode.LLM_SIGNAL_FAILED.value) + source_diagnostics["llm"] = {"reason_code": ReasonCode.LLM_SIGNAL_FAILED.value} llm_sig = None # merge raises ValueError if both None if quant_sig is None and llm_sig is None: degradation_reasons.append(ReasonCode.BOTH_SIGNALS_UNAVAILABLE.value) - return self._merger.merge( + final_signal = self._merger.merge( quant_sig, llm_sig, degradation_reasons=degradation_reasons, ) + data_quality = self._summarize_data_quality(source_diagnostics) + metadata = dict(final_signal.metadata) + if source_diagnostics: + metadata["source_diagnostics"] = source_diagnostics + if data_quality: + metadata["data_quality"] = data_quality + final_signal.metadata = metadata + return final_signal + + @staticmethod + def _build_source_diagnostic(signal: Signal, reason_code: str) -> dict: + diagnostic = {"reason_code": reason_code} + data_quality = signal.metadata.get("data_quality") + if data_quality is not None: + diagnostic["data_quality"] = data_quality + error = signal.metadata.get("error") + if error: + diagnostic["error"] = error + return diagnostic + + @staticmethod + def _summarize_data_quality(source_diagnostics: dict[str, dict]) -> Optional[dict]: + states: list[tuple[str, dict]] = [] + for source, diagnostic in source_diagnostics.items(): + data_quality = diagnostic.get("data_quality") + if isinstance(data_quality, dict) and data_quality.get("state"): + states.append((source, data_quality)) + + if not states: + return None + + priority = { + "provider_mismatch": 0, + "non_trading_day": 1, + "stale_data": 2, + "partial_data": 3, + } + source, selected = sorted( + states, + key=lambda item: priority.get(item[1].get("state"), 999), + )[0] + summary = dict(selected) + summary["source"] = source + summary["issues"] = [ + {"source": issue_source, **issue_quality} + for issue_source, issue_quality in states + ] + return summary diff --git a/orchestrator/quant_runner.py b/orchestrator/quant_runner.py index 5a55efe0..e3ba3877 100644 --- a/orchestrator/quant_runner.py +++ b/orchestrator/quant_runner.py @@ -5,6 +5,7 @@ import sys from datetime import datetime, timezone, timedelta from typing import Any +import pandas as pd import yfinance as yf from orchestrator.config import OrchestratorConfig @@ -14,6 +15,12 @@ from orchestrator.contracts.result_contract import Signal, build_error_signal logger = logging.getLogger(__name__) +def _build_data_quality(state: str, **details: Any) -> dict[str, Any]: + payload = {"state": state} + payload.update({key: value for key, value in details.items() if value is not None}) + return payload + + class QuantRunner: def __init__(self, config: OrchestratorConfig): if not config.quant_backtest_path: @@ -39,20 +46,99 @@ class QuantRunner: start_dt = end_dt - timedelta(days=60) start_str = start_dt.strftime("%Y-%m-%d") - df = yf.download(ticker, start=start_str, end=date, progress=False, auto_adjust=True) + end_exclusive = (end_dt + timedelta(days=1)).strftime("%Y-%m-%d") + df = yf.download(ticker, start=start_str, end=end_exclusive, progress=False, auto_adjust=True) if df.empty: logger.warning("No price data for %s between %s and %s", ticker, start_str, date) + if end_dt.weekday() >= 5: + return build_error_signal( + ticker=ticker, + source="quant", + reason_code=ReasonCode.NON_TRADING_DAY.value, + message=f"{date} is not a trading day", + metadata={ + "start_date": start_str, + "end_date": date, + "data_quality": _build_data_quality( + "non_trading_day", + requested_date=date, + ), + }, + ) return build_error_signal( ticker=ticker, source="quant", reason_code=ReasonCode.QUANT_NO_DATA.value, message=f"no price data between {start_str} and {date}", - metadata={"start_date": start_str, "end_date": date}, + metadata={ + "start_date": start_str, + "end_date": date, + }, ) # 标准化列名为小写 df.columns = [c[0].lower() if isinstance(c, tuple) else c.lower() for c in df.columns] + required_columns = {"open", "high", "low", "close"} + missing_columns = sorted(required_columns - set(df.columns)) + if missing_columns: + return build_error_signal( + ticker=ticker, + source="quant", + reason_code=ReasonCode.PARTIAL_DATA.value, + message=f"missing price columns: {', '.join(missing_columns)}", + metadata={ + "start_date": start_str, + "end_date": date, + "data_quality": _build_data_quality( + "partial_data", + missing_fields=missing_columns, + ), + }, + ) + + df.index = pd.to_datetime(df.index) + available_dates = df.index.normalize() + requested_date = pd.Timestamp(end_dt.date()) + if requested_date not in available_dates: + last_available_ts = df.index.max() + last_available_date = ( + last_available_ts.strftime("%Y-%m-%d") + if hasattr(last_available_ts, "strftime") + else str(last_available_ts) + ) + if end_dt.weekday() >= 5: + return build_error_signal( + ticker=ticker, + source="quant", + reason_code=ReasonCode.NON_TRADING_DAY.value, + message=f"{date} is not a trading day", + metadata={ + "start_date": start_str, + "end_date": date, + "data_quality": _build_data_quality( + "non_trading_day", + requested_date=date, + last_available_date=last_available_date, + ), + }, + ) + return build_error_signal( + ticker=ticker, + source="quant", + reason_code=ReasonCode.STALE_DATA.value, + message=f"latest price data stops at {last_available_date}", + metadata={ + "start_date": start_str, + "end_date": date, + "data_quality": _build_data_quality( + "stale_data", + requested_date=date, + last_available_date=last_available_date, + ), + }, + ) + # 用最佳参数创建 BollingerStrategy 实例 # Lazy import: requires quant_backtest_path to be in sys.path (set in __init__) from strategies.momentum import BollingerStrategy @@ -117,7 +203,16 @@ class QuantRunner: confidence=confidence, source="quant", timestamp=datetime.now(timezone.utc), - metadata={"params": params, "sharpe_ratio": sharpe, "max_sharpe": max_sharpe}, + metadata={ + "params": params, + "sharpe_ratio": sharpe, + "max_sharpe": max_sharpe, + "data_quality": _build_data_quality( + "ok", + requested_date=date, + last_available_date=date, + ), + }, ) def _load_best_params(self) -> dict: diff --git a/orchestrator/tests/test_llm_runner.py b/orchestrator/tests/test_llm_runner.py index 578584f2..7cfa0f27 100644 --- a/orchestrator/tests/test_llm_runner.py +++ b/orchestrator/tests/test_llm_runner.py @@ -82,3 +82,20 @@ def test_get_signal_returns_reason_code_on_propagate_failure(monkeypatch, tmp_pa assert signal.degraded is True assert signal.reason_code == ReasonCode.LLM_SIGNAL_FAILED.value assert signal.metadata["error"] == "graph unavailable" + + +def test_get_signal_returns_provider_mismatch_before_graph_init(tmp_path): + cfg = OrchestratorConfig( + cache_dir=str(tmp_path), + trading_agents_config={ + "llm_provider": "anthropic", + "backend_url": "https://api.openai.com/v1", + }, + ) + runner = LLMRunner(cfg) + + signal = runner.get_signal("AAPL", "2024-01-02") + + assert signal.degraded is True + assert signal.reason_code == ReasonCode.PROVIDER_MISMATCH.value + assert signal.metadata["data_quality"]["state"] == "provider_mismatch" diff --git a/orchestrator/tests/test_quant_runner.py b/orchestrator/tests/test_quant_runner.py index da45c500..f04ebe10 100644 --- a/orchestrator/tests/test_quant_runner.py +++ b/orchestrator/tests/test_quant_runner.py @@ -1,6 +1,7 @@ """Tests for QuantRunner._calc_confidence().""" import json import sqlite3 +import pandas as pd import pytest from orchestrator.config import OrchestratorConfig @@ -74,3 +75,61 @@ def test_get_signal_returns_reason_code_when_no_data(runner, monkeypatch): assert signal.degraded is True assert signal.reason_code == ReasonCode.QUANT_NO_DATA.value + + +def test_get_signal_marks_non_trading_day_on_weekend(runner, monkeypatch): + monkeypatch.setattr( + "orchestrator.quant_runner.yf.download", + lambda *args, **kwargs: pd.DataFrame(), + ) + + signal = runner.get_signal("AAPL", "2024-01-06") + + assert signal.degraded is True + assert signal.reason_code == ReasonCode.NON_TRADING_DAY.value + assert signal.metadata["data_quality"]["state"] == "non_trading_day" + + +def test_get_signal_marks_stale_data_when_requested_day_missing(runner, monkeypatch): + stale_frame = pd.DataFrame( + { + "Open": [10.0], + "High": [11.0], + "Low": [9.0], + "Close": [10.5], + "Volume": [1000], + }, + index=pd.to_datetime(["2024-01-01"]), + ) + monkeypatch.setattr( + "orchestrator.quant_runner.yf.download", + lambda *args, **kwargs: stale_frame, + ) + + signal = runner.get_signal("AAPL", "2024-01-02") + + assert signal.degraded is True + assert signal.reason_code == ReasonCode.STALE_DATA.value + assert signal.metadata["data_quality"]["state"] == "stale_data" + + +def test_get_signal_marks_partial_data_when_required_columns_missing(runner, monkeypatch): + partial_frame = pd.DataFrame( + { + "Open": [10.0], + "Low": [9.0], + "Close": [10.5], + "Volume": [1000], + }, + index=pd.to_datetime(["2024-01-02"]), + ) + monkeypatch.setattr( + "orchestrator.quant_runner.yf.download", + lambda *args, **kwargs: partial_frame, + ) + + signal = runner.get_signal("AAPL", "2024-01-02") + + assert signal.degraded is True + assert signal.reason_code == ReasonCode.PARTIAL_DATA.value + assert signal.metadata["data_quality"]["state"] == "partial_data" diff --git a/web_dashboard/backend/api/portfolio.py b/web_dashboard/backend/api/portfolio.py index 05d2797c..25594686 100644 --- a/web_dashboard/backend/api/portfolio.py +++ b/web_dashboard/backend/api/portfolio.py @@ -333,6 +333,11 @@ def _normalize_recommendation_record(record: dict, *, date: Optional[str] = None }, "degraded": quant_signal is None or llm_signal is None, }, + "degradation": normalized.get("degradation") or { + "degraded": quant_signal is None or llm_signal is None, + "reason_codes": [], + }, + "data_quality": normalized.get("data_quality"), "compat": { "analysis_date": date_value, "decision": decision, diff --git a/web_dashboard/backend/services/analysis_service.py b/web_dashboard/backend/services/analysis_service.py index 4caff065..1ea37d3c 100644 --- a/web_dashboard/backend/services/analysis_service.py +++ b/web_dashboard/backend/services/analysis_service.py @@ -306,11 +306,15 @@ class AnalysisService: quant_signal = output.quant_signal llm_signal = output.llm_signal confidence = output.confidence + data_quality = output.data_quality + degrade_reason_codes = list(output.degrade_reason_codes) else: decision = "HOLD" quant_signal = None llm_signal = None confidence = None + data_quality = None + degrade_reason_codes = [] for line in (stdout or "").splitlines(): if line.startswith("SIGNAL_DETAIL:"): try: @@ -328,7 +332,7 @@ class AnalysisService: "ticker": ticker, "name": stock.get("name", ticker), "date": date, - "status": "completed", + "status": "degraded_success" if (degrade_reason_codes or data_quality or quant_signal is None or llm_signal is None) else "completed", "created_at": datetime.now().isoformat(), "result": { "decision": decision, @@ -351,6 +355,11 @@ class AnalysisService: }, "degraded": quant_signal is None or llm_signal is None, }, + "degradation": { + "degraded": bool(degrade_reason_codes) or quant_signal is None or llm_signal is None, + "reason_codes": degrade_reason_codes, + }, + "data_quality": data_quality, "compat": { "analysis_date": date, "decision": decision, diff --git a/web_dashboard/backend/services/executor.py b/web_dashboard/backend/services/executor.py index 18844d6d..69514c67 100644 --- a/web_dashboard/backend/services/executor.py +++ b/web_dashboard/backend/services/executor.py @@ -41,6 +41,25 @@ trading_config["project_dir"] = os.path.join(repo_root, "tradingagents") trading_config["results_dir"] = os.path.join(repo_root, "results") trading_config["max_debate_rounds"] = 1 trading_config["max_risk_discuss_rounds"] = 1 +if os.environ.get("TRADINGAGENTS_LLM_PROVIDER"): + trading_config["llm_provider"] = os.environ["TRADINGAGENTS_LLM_PROVIDER"] +elif os.environ.get("ANTHROPIC_BASE_URL"): + trading_config["llm_provider"] = "anthropic" +elif os.environ.get("OPENAI_BASE_URL"): + trading_config["llm_provider"] = "openai" +if os.environ.get("TRADINGAGENTS_BACKEND_URL"): + trading_config["backend_url"] = os.environ["TRADINGAGENTS_BACKEND_URL"] +elif os.environ.get("ANTHROPIC_BASE_URL"): + trading_config["backend_url"] = os.environ["ANTHROPIC_BASE_URL"] +elif os.environ.get("OPENAI_BASE_URL"): + trading_config["backend_url"] = os.environ["OPENAI_BASE_URL"] +if os.environ.get("TRADINGAGENTS_MODEL"): + trading_config["deep_think_llm"] = os.environ["TRADINGAGENTS_MODEL"] + trading_config["quick_think_llm"] = os.environ["TRADINGAGENTS_MODEL"] +if os.environ.get("TRADINGAGENTS_DEEP_MODEL"): + trading_config["deep_think_llm"] = os.environ["TRADINGAGENTS_DEEP_MODEL"] +if os.environ.get("TRADINGAGENTS_QUICK_MODEL"): + trading_config["quick_think_llm"] = os.environ["TRADINGAGENTS_QUICK_MODEL"] print("STAGE:analysts", flush=True) print("STAGE:research", flush=True) @@ -105,7 +124,13 @@ report_path.write_text(report_content) print("STAGE:portfolio", flush=True) signal_detail = json.dumps({"llm_signal": llm_signal, "quant_signal": quant_signal, "confidence": confidence}) +result_meta = json.dumps({ + "degrade_reason_codes": list(getattr(result, "degrade_reason_codes", ())), + "data_quality": (result.metadata or {}).get("data_quality"), + "source_diagnostics": (result.metadata or {}).get("source_diagnostics"), +}) print("SIGNAL_DETAIL:" + signal_detail, flush=True) +print("RESULT_META:" + result_meta, flush=True) print("ANALYSIS_COMPLETE:" + signal, flush=True) """ @@ -125,6 +150,9 @@ class AnalysisExecutionOutput: llm_signal: Optional[str] confidence: Optional[float] report_path: Optional[str] = None + degrade_reason_codes: tuple[str, ...] = () + data_quality: Optional[dict] = None + source_diagnostics: Optional[dict] = None contract_version: str = CONTRACT_VERSION executor_type: str = DEFAULT_EXECUTOR_TYPE @@ -138,17 +166,24 @@ class AnalysisExecutionOutput: elapsed_seconds: int, current_stage: str = "portfolio", ) -> dict: + degraded = bool(self.degrade_reason_codes) or bool(self.data_quality) or self.quant_signal is None or self.llm_signal is None return { "contract_version": self.contract_version, "task_id": task_id, "ticker": ticker, "date": date, - "status": "completed", + "status": "degraded_success" if degraded else "completed", "progress": 100, "current_stage": current_stage, "created_at": created_at, "elapsed_seconds": elapsed_seconds, "elapsed": elapsed_seconds, + "degradation": { + "degraded": degraded, + "reason_codes": list(self.degrade_reason_codes), + "source_diagnostics": self.source_diagnostics or {}, + }, + "data_quality": self.data_quality, "result": { "decision": self.decision, "confidence": self.confidence, @@ -168,7 +203,7 @@ class AnalysisExecutionOutput: "available": self.llm_signal is not None, }, }, - "degraded": self.quant_signal is None or self.llm_signal is None, + "degraded": degraded, "report": { "path": self.report_path, "available": bool(self.report_path), @@ -325,7 +360,11 @@ class LegacySubprocessAnalysisExecutor: quant_signal = None llm_signal = None confidence = None + degrade_reason_codes: tuple[str, ...] = () + data_quality = None + source_diagnostics = None seen_signal_detail = False + seen_result_meta = False seen_complete = False for line in stdout_lines: @@ -338,6 +377,15 @@ class LegacySubprocessAnalysisExecutor: quant_signal = detail.get("quant_signal") llm_signal = detail.get("llm_signal") confidence = detail.get("confidence") + elif line.startswith("RESULT_META:"): + seen_result_meta = True + try: + detail = json.loads(line.split(":", 1)[1].strip()) + except Exception as exc: + raise AnalysisExecutorError("failed to parse RESULT_META payload") from exc + degrade_reason_codes = tuple(detail.get("degrade_reason_codes") or ()) + data_quality = detail.get("data_quality") + source_diagnostics = detail.get("source_diagnostics") elif line.startswith("ANALYSIS_COMPLETE:"): seen_complete = True decision = line.split(":", 1)[1].strip() @@ -360,6 +408,9 @@ class LegacySubprocessAnalysisExecutor: llm_signal=llm_signal, confidence=confidence, report_path=report_path, + degrade_reason_codes=degrade_reason_codes, + data_quality=data_quality, + source_diagnostics=source_diagnostics, contract_version=contract_version, executor_type=executor_type, ) diff --git a/web_dashboard/backend/services/job_service.py b/web_dashboard/backend/services/job_service.py index 0ba5e0e4..64ffff88 100644 --- a/web_dashboard/backend/services/job_service.py +++ b/web_dashboard/backend/services/job_service.py @@ -151,7 +151,7 @@ class JobService: state["result"] = result state["error"] = contract.get("error") state["contract_version"] = contract.get("contract_version", state.get("contract_version")) - state["degradation_summary"] = self._build_degradation_summary(result) + state["degradation_summary"] = contract.get("degradation") or self._build_degradation_summary(result) state["data_quality_summary"] = contract.get("data_quality") state["compat"] = { "decision": result.get("decision"), @@ -255,6 +255,8 @@ class JobService: "status": payload["status"], "created_at": payload.get("created_at"), "error": payload.get("error"), + "data_quality_summary": payload.get("data_quality_summary"), + "degradation_summary": payload.get("degradation_summary"), } if state.get("type") == "portfolio": summary.update({ @@ -310,6 +312,8 @@ class JobService: normalized.setdefault("result_ref", None) normalized.setdefault("degradation_summary", None) normalized.setdefault("data_quality_summary", None) + if "data_quality" in normalized and normalized.get("data_quality_summary") is None: + normalized["data_quality_summary"] = normalized.get("data_quality") compat = normalized.get("compat") if not isinstance(compat, dict): compat = {} diff --git a/web_dashboard/backend/tests/test_executors.py b/web_dashboard/backend/tests/test_executors.py index dcbe5b62..ff861e9a 100644 --- a/web_dashboard/backend/tests/test_executors.py +++ b/web_dashboard/backend/tests/test_executors.py @@ -110,3 +110,29 @@ def test_executor_kills_subprocess_on_timeout(monkeypatch): assert process.kill_called is True assert process.wait_called is True + + +def test_executor_marks_degraded_success_when_result_meta_reports_data_quality(): + output = LegacySubprocessAnalysisExecutor._parse_output( + stdout_lines=[ + 'SIGNAL_DETAIL:{"quant_signal":"HOLD","llm_signal":"BUY","confidence":0.6}', + 'RESULT_META:{"degrade_reason_codes":["non_trading_day"],"data_quality":{"state":"non_trading_day","requested_date":"2026-04-12"}}', + "ANALYSIS_COMPLETE:OVERWEIGHT", + ], + ticker="AAPL", + date="2026-04-12", + contract_version="v1alpha1", + executor_type="legacy_subprocess", + ) + + contract = output.to_result_contract( + task_id="task-3", + ticker="AAPL", + date="2026-04-12", + created_at="2026-04-12T10:00:00", + elapsed_seconds=3, + ) + + assert contract["status"] == "degraded_success" + assert contract["data_quality"]["state"] == "non_trading_day" + assert contract["degradation"]["reason_codes"] == ["non_trading_day"] diff --git a/web_dashboard/frontend/src/components/StatusIcon.jsx b/web_dashboard/frontend/src/components/StatusIcon.jsx index 696056af..c0bb0681 100644 --- a/web_dashboard/frontend/src/components/StatusIcon.jsx +++ b/web_dashboard/frontend/src/components/StatusIcon.jsx @@ -4,6 +4,7 @@ const STATUS_TAG_MAP = { pending: { text: '等待', bg: 'var(--bg-elevated)', color: 'var(--text-muted)' }, running: { text: '分析中', bg: 'var(--running-dim)', color: 'var(--running)' }, completed: { text: '完成', bg: 'var(--buy-dim)', color: 'var(--buy)' }, + degraded_success: { text: '降级完成', bg: 'var(--hold-dim)', color: 'var(--hold)' }, failed: { text: '失败', bg: 'var(--sell-dim)', color: 'var(--sell)' }, } @@ -11,6 +12,8 @@ export function StatusIcon({ status }) { switch (status) { case 'completed': return + case 'degraded_success': + return case 'failed': return case 'running': diff --git a/web_dashboard/frontend/src/pages/AnalysisMonitor.jsx b/web_dashboard/frontend/src/pages/AnalysisMonitor.jsx index 4aeff2b3..6c7a6109 100644 --- a/web_dashboard/frontend/src/pages/AnalysisMonitor.jsx +++ b/web_dashboard/frontend/src/pages/AnalysisMonitor.jsx @@ -3,6 +3,15 @@ import { useSearchParams } from 'react-router-dom' import { Card, Progress, Badge, Empty, Button, Result, message } from 'antd' import DecisionBadge from '../components/DecisionBadge' import { StatusIcon } from '../components/StatusIcon' +import { + getConfidence, + getDecision, + getDisplayDate, + getErrorMessage, + getLlmSignal, + getQuantSignal, + isCompletedLikeStatus, +} from '../utils/contractView' const ANALYSIS_STAGES = [ { key: 'analysts', label: '分析师团队' }, @@ -20,6 +29,13 @@ export default function AnalysisMonitor() { const [loading, setLoading] = useState(false) const [error, setError] = useState(null) const wsRef = useRef(null) + const decision = getDecision(task) + const llmSignal = getLlmSignal(task) + const quantSignal = getQuantSignal(task) + const confidence = getConfidence(task) + const displayDate = getDisplayDate(task) + const dataQuality = task?.data_quality_summary + const errorMessage = getErrorMessage(task) const fetchInitialState = useCallback(async () => { if (!taskId) return @@ -155,23 +171,38 @@ export default function AnalysisMonitor() { {task.ticker} - + + {displayDate && ( +
+ 分析日期: {displayDate} +
+ )} {/* Signal Detail Row */} - {task.status === 'completed' && (task.llm_signal || task.quant_signal || task.confidence != null) && ( + {isCompletedLikeStatus(task.status) && (llmSignal || quantSignal || confidence != null) && (
- {task.llm_signal && ( - LLM: + {llmSignal && ( + LLM: )} - {task.quant_signal && ( - Quant: + {quantSignal && ( + Quant: )} - {task.confidence != null && ( - 置信度: {(task.confidence * 100).toFixed(0)}% + {confidence != null && ( + 置信度: {(confidence * 100).toFixed(0)}% )}
)} + {dataQuality?.state && ( +
+ 数据质量: {dataQuality.state} +
+ )} + {errorMessage && ( +
+ 错误: {errorMessage} +
+ )} {/* Progress */}
diff --git a/web_dashboard/frontend/src/pages/BatchManager.jsx b/web_dashboard/frontend/src/pages/BatchManager.jsx index 22098670..12d1b27d 100644 --- a/web_dashboard/frontend/src/pages/BatchManager.jsx +++ b/web_dashboard/frontend/src/pages/BatchManager.jsx @@ -3,6 +3,7 @@ import { Table, Button, Progress, Result, Card, message, Popconfirm, Tooltip } f import { DeleteOutlined, CopyOutlined, SyncOutlined } from '@ant-design/icons' import DecisionBadge from '../components/DecisionBadge' import { StatusIcon, StatusTag } from '../components/StatusIcon' +import { getDecision, getErrorMessage } from '../utils/contractView' export default function BatchManager() { const [tasks, setTasks] = useState([]) @@ -105,10 +106,9 @@ export default function BatchManager() { }, { title: '决策', - dataIndex: 'decision', key: 'decision', width: 80, - render: (decision) => , + render: (_, record) => , }, { title: '任务ID', @@ -132,16 +132,17 @@ export default function BatchManager() { }, { title: '错误', - dataIndex: 'error', key: 'error', width: 180, ellipsis: { showTitle: false }, - render: (error) => - error ? ( + render: (_, record) => { + const error = getErrorMessage(record) + return error ? ( {error} - ) : null, + ) : null + }, }, { title: '操作', @@ -174,7 +175,7 @@ export default function BatchManager() { const stats = useMemo(() => ({ pending: tasks.filter(t => t.status === 'pending').length, running: tasks.filter(t => t.status === 'running').length, - completed: tasks.filter(t => t.status === 'completed').length, + completed: tasks.filter(t => t.status === 'completed' || t.status === 'degraded_success').length, failed: tasks.filter(t => t.status === 'failed').length, }), [tasks]) diff --git a/web_dashboard/frontend/src/pages/PortfolioPanel.jsx b/web_dashboard/frontend/src/pages/PortfolioPanel.jsx index 98ba6383..08d49a9c 100644 --- a/web_dashboard/frontend/src/pages/PortfolioPanel.jsx +++ b/web_dashboard/frontend/src/pages/PortfolioPanel.jsx @@ -9,6 +9,7 @@ import { } from '@ant-design/icons' import { portfolioApi } from '../services/portfolioApi' import DecisionBadge from '../components/DecisionBadge' +import { getDecision, getDisplayDate, isCompletedLikeStatus } from '../utils/contractView' const { Text } = Typography @@ -316,7 +317,7 @@ function RecommendationsTab() { const res = await portfolioApi.getRecommendations(date) setData(res.recommendations || []) if (!date) { - const d = [...new Set((res.recommendations || []).map(r => r.analysis_date))].sort().reverse() + const d = [...new Set((res.recommendations || []).map(r => getDisplayDate(r)).filter(Boolean))].sort().reverse() setDates(d) } } catch { @@ -338,7 +339,7 @@ function RecommendationsTab() { const d = JSON.parse(e.data) if (d.type === 'progress') { setProgress(d) - if (d.status === 'completed' || d.status === 'failed') { + if (isCompletedLikeStatus(d.status) || d.status === 'failed') { setAnalyzing(false) setTaskId(null) setProgress(null) @@ -377,10 +378,10 @@ function RecommendationsTab() { render: t => {t} }, { title: '名称', dataIndex: 'name', key: 'name', render: t => {t} }, { - title: '决策', dataIndex: 'decision', key: 'decision', width: 80, - render: d => , + title: '决策', key: 'decision', width: 80, + render: (_, record) => , }, - { title: '分析日期', dataIndex: 'analysis_date', key: 'analysis_date', width: 120 }, + { title: '分析日期', key: 'analysis_date', width: 120, render: (_, record) => getDisplayDate(record) || '—' }, ] return ( diff --git a/web_dashboard/frontend/src/utils/contractView.js b/web_dashboard/frontend/src/utils/contractView.js new file mode 100644 index 00000000..02f30555 --- /dev/null +++ b/web_dashboard/frontend/src/utils/contractView.js @@ -0,0 +1,38 @@ +export function getCompat(payload) { + return payload?.compat || {} +} + +export function getResult(payload) { + return payload?.result || {} +} + +export function getDecision(payload) { + return getResult(payload).decision ?? getCompat(payload).decision ?? null +} + +export function getQuantSignal(payload) { + return getResult(payload).signals?.quant?.rating ?? getCompat(payload).quant_signal ?? null +} + +export function getLlmSignal(payload) { + return getResult(payload).signals?.llm?.rating ?? getCompat(payload).llm_signal ?? null +} + +export function getConfidence(payload) { + return getResult(payload).confidence ?? getCompat(payload).confidence ?? null +} + +export function getDisplayDate(payload) { + return payload?.date ?? getCompat(payload).analysis_date ?? null +} + +export function getErrorMessage(payload) { + const error = payload?.error + if (!error) return null + if (typeof error === 'string') return error + return error.message || error.code || null +} + +export function isCompletedLikeStatus(status) { + return status === 'completed' || status === 'degraded_success' +}