From d86b805c12ccdd927fa02e9d2c1132e321bcfbcc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=99=88=E5=B0=91=E6=9D=B0?= Date: Tue, 14 Apr 2026 00:26:28 +0800 Subject: [PATCH] Make backend task and recommendation APIs contract-first by default Phase 2 moves the dashboard off raw task-state leakage and onto stable public projections. Task status, task listings, progress websocket events, and portfolio recommendation reads now load persisted contracts when available, expose a contract-first envelope, and keep legacy fields inside a compat block instead of smearing them across top-level payloads. Constraint: existing task-status JSON and recommendation files must continue to read successfully during migration Rejected: return raw task_results directly from API and websocket | keeps legacy fields as the public contract and blocks cutover Rejected: rewrite stored recommendation files in-place | adds risky migration work before rollout gates exist Confidence: high Scope-risk: moderate Reversibility: clean Directive: keep public payload shaping in job/result-store projections, not in ad-hoc route logic Tested: python -m pytest web_dashboard/backend/tests/test_executors.py web_dashboard/backend/tests/test_services_migration.py web_dashboard/backend/tests/test_api_smoke.py web_dashboard/backend/tests/test_main_api.py web_dashboard/backend/tests/test_portfolio_api.py -q Tested: python -m pytest orchestrator/tests/test_application_service.py orchestrator/tests/test_trading_graph_config.py -q Tested: python -m compileall orchestrator tradingagents web_dashboard/backend Not-tested: legacy frontend rendering against new compat-wrapped task payloads Not-tested: real websocket clients and provider-backed end-to-end analysis --- web_dashboard/backend/api/portfolio.py | 71 ++++++++- web_dashboard/backend/main.py | 70 +++++---- .../backend/services/analysis_service.py | 56 ++++++- web_dashboard/backend/services/job_service.py | 139 ++++++++++++++++-- .../backend/services/migration_flags.py | 45 +++++- .../backend/services/result_store.py | 29 +++- web_dashboard/backend/tests/test_api_smoke.py | 55 +++++++ .../backend/tests/test_portfolio_api.py | 38 +++++ .../backend/tests/test_services_migration.py | 29 ++-- 9 files changed, 466 insertions(+), 66 deletions(-) diff --git a/web_dashboard/backend/api/portfolio.py b/web_dashboard/backend/api/portfolio.py index a2d1cfe2..05d2797c 100644 --- a/web_dashboard/backend/api/portfolio.py +++ b/web_dashboard/backend/api/portfolio.py @@ -284,6 +284,65 @@ DEFAULT_PAGE_SIZE = 50 MAX_PAGE_SIZE = 500 +def _rating_to_direction(rating: Optional[str]) -> int: + if rating in {"BUY", "OVERWEIGHT"}: + return 1 + if rating in {"SELL", "UNDERWEIGHT"}: + return -1 + return 0 + + +def _normalize_recommendation_record(record: dict, *, date: Optional[str] = None, ticker: Optional[str] = None) -> dict: + normalized = dict(record) + if "result" in normalized and "contract_version" in normalized: + normalized.setdefault("ticker", ticker or normalized.get("ticker")) + normalized.setdefault("date", date or normalized.get("date") or normalized.get("analysis_date")) + return normalized + + decision = normalized.get("decision", "HOLD") + quant_signal = normalized.get("quant_signal") + llm_signal = normalized.get("llm_signal") + confidence = normalized.get("confidence") + date_value = date or normalized.get("date") or normalized.get("analysis_date") + ticker_value = ticker or normalized.get("ticker") + return { + "contract_version": "v1alpha1", + "ticker": ticker_value, + "name": normalized.get("name", ticker_value), + "date": date_value, + "status": normalized.get("status", "completed"), + "created_at": normalized.get("created_at"), + "result": { + "decision": decision, + "confidence": confidence, + "signals": { + "merged": { + "direction": _rating_to_direction(decision), + "rating": decision, + }, + "quant": { + "direction": _rating_to_direction(quant_signal), + "rating": quant_signal, + "available": quant_signal is not None, + }, + "llm": { + "direction": _rating_to_direction(llm_signal), + "rating": llm_signal, + "available": llm_signal is not None, + }, + }, + "degraded": quant_signal is None or llm_signal is None, + }, + "compat": { + "analysis_date": date_value, + "decision": decision, + "quant_signal": quant_signal, + "llm_signal": llm_signal, + "confidence": confidence, + }, + } + + def get_recommendations(date: Optional[str] = None, limit: int = DEFAULT_PAGE_SIZE, offset: int = 0) -> dict: """List recommendations, optionally filtered by date. Returns paginated results.""" RECOMMENDATIONS_DIR.mkdir(parents=True, exist_ok=True) @@ -293,7 +352,7 @@ def get_recommendations(date: Optional[str] = None, limit: int = DEFAULT_PAGE_SI date_dir = RECOMMENDATIONS_DIR / date if date_dir.exists(): all_recs = [ - json.loads(f.read_text()) + _normalize_recommendation_record(json.loads(f.read_text()), date=date_dir.name) for f in sorted(date_dir.glob("*.json"), reverse=True) if f.suffix == ".json" ] @@ -302,10 +361,16 @@ def get_recommendations(date: Optional[str] = None, limit: int = DEFAULT_PAGE_SI if date_dir.is_dir() and date_dir.name.startswith("20"): for f in sorted(date_dir.glob("*.json"), reverse=True): if f.suffix == ".json": - all_recs.append(json.loads(f.read_text())) + all_recs.append( + _normalize_recommendation_record( + json.loads(f.read_text()), + date=date_dir.name, + ) + ) total = len(all_recs) return { + "contract_version": "v1alpha1", "recommendations": all_recs[offset : offset + limit], "total": total, "limit": limit, @@ -327,7 +392,7 @@ def get_recommendation(date: str, ticker: str) -> Optional[dict]: path.resolve().relative_to(RECOMMENDATIONS_DIR.resolve()) except ValueError: return None - return json.loads(path.read_text()) + return _normalize_recommendation_record(json.loads(path.read_text()), date=date, ticker=ticker) def save_recommendation(date: str, ticker: str, data: dict): diff --git a/web_dashboard/backend/main.py b/web_dashboard/backend/main.py index 36f7a023..4c26840e 100644 --- a/web_dashboard/backend/main.py +++ b/web_dashboard/backend/main.py @@ -302,7 +302,7 @@ async def get_task_status(task_id: str, api_key: Optional[str] = Header(None)): _auth_error() if task_id not in app.state.task_results: raise HTTPException(status_code=404, detail="Task not found") - return app.state.task_results[task_id] + return _public_task_payload(task_id) @app.get("/api/analysis/tasks") @@ -310,21 +310,10 @@ async def list_tasks(api_key: Optional[str] = Header(None)): """List all tasks (active and recent)""" if not _check_api_key(api_key): _auth_error() - tasks = [] - for task_id, state in app.state.task_results.items(): - tasks.append({ - "task_id": task_id, - "ticker": state.get("ticker"), - "date": state.get("date"), - "status": state.get("status"), - "progress": state.get("progress", 0), - "decision": state.get("decision"), - "error": state.get("error"), - "created_at": state.get("created_at"), - }) + tasks = [_public_task_summary(task_id) for task_id in app.state.task_results] # Sort by created_at descending (most recent first) tasks.sort(key=lambda x: x.get("created_at") or "", reverse=True) - return {"tasks": tasks, "total": len(tasks)} + return {"contract_version": "v1alpha1", "tasks": tasks, "total": len(tasks)} @app.delete("/api/analysis/cancel/{task_id}") @@ -346,11 +335,16 @@ async def cancel_task(task_id: str, api_key: Optional[str] = Header(None)): if task: task.cancel() - state = app.state.task_results[task_id] - state["status"] = "cancelled" - state["error"] = "用户取消" - app.state.result_store.save_task_status(task_id, state) - await broadcast_progress(task_id, state) + state = app.state.job_service.cancel_job(task_id, error="用户取消") + if state is not None: + state["status"] = "cancelled" + state["error"] = { + "code": "cancelled", + "message": "用户取消", + "retryable": False, + } + app.state.result_store.save_task_status(task_id, state) + await broadcast_progress(task_id, state) app.state.result_store.delete_task_status(task_id) return {"contract_version": "v1alpha1", "task_id": task_id, "status": "cancelled"} @@ -376,7 +370,7 @@ async def websocket_analysis(websocket: WebSocket, task_id: str): if task_id in app.state.task_results: await websocket.send_text(json.dumps({ "type": "progress", - **app.state.task_results[task_id] + **_public_task_payload(task_id) })) try: @@ -395,7 +389,8 @@ async def broadcast_progress(task_id: str, progress: dict): if task_id not in app.state.active_connections: return - message = json.dumps({"type": "progress", **progress}) + payload = _public_task_payload(task_id, state_override=progress) + message = json.dumps({"type": "progress", **payload}) dead = [] for connection in app.state.active_connections[task_id]: @@ -408,6 +403,28 @@ async def broadcast_progress(task_id: str, progress: dict): app.state.active_connections[task_id].remove(conn) +def _load_task_contract(task_id: str, state: Optional[dict] = None) -> Optional[dict]: + current_state = state or app.state.task_results.get(task_id) + if current_state is None: + return None + return app.state.result_store.load_result_contract( + result_ref=current_state.get("result_ref"), + task_id=task_id, + ) + + +def _public_task_payload(task_id: str, state_override: Optional[dict] = None) -> dict: + state = state_override or app.state.task_results[task_id] + contract = _load_task_contract(task_id, state) + return app.state.job_service.to_public_task_payload(task_id, contract=contract) + + +def _public_task_summary(task_id: str, state_override: Optional[dict] = None) -> dict: + state = state_override or app.state.task_results[task_id] + contract = _load_task_contract(task_id, state) + return app.state.job_service.to_task_summary(task_id, contract=contract) + + # ============== Reports ============== def get_results_dir() -> Path: @@ -664,8 +681,6 @@ from api.portfolio import ( get_watchlist, add_to_watchlist, remove_from_watchlist, get_positions, add_position, remove_position, get_accounts, create_account, delete_account, - get_recommendations, get_recommendation, save_recommendation, - RECOMMENDATIONS_DIR, ) @@ -795,14 +810,14 @@ async def list_recommendations( ): if not _check_api_key(api_key): _auth_error() - return get_recommendations(date, limit, offset) + return app.state.result_store.get_recommendations(date, limit, offset) @app.get("/api/portfolio/recommendations/{date}/{ticker}") async def get_recommendation_endpoint(date: str, ticker: str, api_key: Optional[str] = Header(None)): if not _check_api_key(api_key): _auth_error() - rec = get_recommendation(date, ticker) + rec = app.state.result_store.get_recommendation(date, ticker) if not rec: raise HTTPException(status_code=404, detail="Recommendation not found") return rec @@ -877,7 +892,10 @@ async def ws_orchestrator(websocket: WebSocket, api_key: Optional[str] = None): date = payload.get("date") results = await live.run_once(tickers, date) - await websocket.send_text(json.dumps({"signals": results})) + await websocket.send_text(json.dumps({ + "contract_version": "v1alpha1", + "signals": results, + })) except WebSocketDisconnect: pass except Exception as e: diff --git a/web_dashboard/backend/services/analysis_service.py b/web_dashboard/backend/services/analysis_service.py index 9118e7d7..4caff065 100644 --- a/web_dashboard/backend/services/analysis_service.py +++ b/web_dashboard/backend/services/analysis_service.py @@ -152,12 +152,16 @@ class AnalysisService: task_id=task_id, message=str(exc), started_at=start_time, + code=exc.code, + retryable=exc.retryable, ) except Exception as exc: self._fail_analysis_state( task_id=task_id, message=str(exc), started_at=start_time, + code="analysis_failed", + retryable=False, ) await broadcast_progress(task_id, self.job_service.task_results[task_id]) @@ -267,13 +271,25 @@ class AnalysisService: self.job_service.task_results[task_id]["last_error"] = last_error return False, None - def _fail_analysis_state(self, *, task_id: str, message: str, started_at: float) -> None: + def _fail_analysis_state( + self, + *, + task_id: str, + message: str, + started_at: float, + code: str, + retryable: bool, + ) -> None: state = self.job_service.task_results[task_id] state["status"] = "failed" state["elapsed_seconds"] = int(time.monotonic() - started_at) state["elapsed"] = state["elapsed_seconds"] state["result"] = None - state["error"] = message + state["error"] = { + "code": code, + "message": message, + "retryable": retryable, + } self.result_store.save_task_status(task_id, state) @staticmethod @@ -308,12 +324,38 @@ class AnalysisService: decision = line.split(":", 1)[1].strip() return { + "contract_version": "v1alpha1", "ticker": ticker, "name": stock.get("name", ticker), - "analysis_date": date, - "decision": decision, - "quant_signal": quant_signal, - "llm_signal": llm_signal, - "confidence": confidence, + "date": date, + "status": "completed", "created_at": datetime.now().isoformat(), + "result": { + "decision": decision, + "confidence": confidence, + "signals": { + "merged": { + "direction": 1 if decision in {"BUY", "OVERWEIGHT"} else -1 if decision in {"SELL", "UNDERWEIGHT"} else 0, + "rating": decision, + }, + "quant": { + "direction": 1 if quant_signal in {"BUY", "OVERWEIGHT"} else -1 if quant_signal in {"SELL", "UNDERWEIGHT"} else 0, + "rating": quant_signal, + "available": quant_signal is not None, + }, + "llm": { + "direction": 1 if llm_signal in {"BUY", "OVERWEIGHT"} else -1 if llm_signal in {"SELL", "UNDERWEIGHT"} else 0, + "rating": llm_signal, + "available": llm_signal is not None, + }, + }, + "degraded": quant_signal is None or llm_signal is None, + }, + "compat": { + "analysis_date": date, + "decision": decision, + "quant_signal": quant_signal, + "llm_signal": llm_signal, + "confidence": confidence, + }, } diff --git a/web_dashboard/backend/services/job_service.py b/web_dashboard/backend/services/job_service.py index 7fb55003..0ba5e0e4 100644 --- a/web_dashboard/backend/services/job_service.py +++ b/web_dashboard/backend/services/job_service.py @@ -10,7 +10,7 @@ DEFAULT_EXECUTOR_TYPE = "legacy_subprocess" class JobService: - """Application-layer job state orchestrator with legacy-compatible payloads.""" + """Application-layer job state orchestrator with contract-first public projections.""" def __init__( self, @@ -67,16 +67,15 @@ class JobService: ) ], "logs": [], - "decision": None, - "quant_signal": None, - "llm_signal": None, - "confidence": None, "result": None, "error": None, "request_id": request_id, "executor_type": executor_type, "contract_version": contract_version, "result_ref": result_ref, + "degradation_summary": None, + "data_quality_summary": None, + "compat": {}, }) self.task_results[task_id] = state self.processes.setdefault(task_id, None) @@ -107,6 +106,9 @@ class JobService: "executor_type": executor_type, "contract_version": contract_version, "result_ref": result_ref, + "degradation_summary": None, + "data_quality_summary": None, + "compat": {}, }) self.task_results[task_id] = state self.processes.setdefault(task_id, None) @@ -146,13 +148,17 @@ class JobService: state["current_stage"] = contract.get("current_stage", state.get("current_stage")) state["elapsed_seconds"] = contract.get("elapsed_seconds", state.get("elapsed_seconds", 0)) state["elapsed"] = contract.get("elapsed", state["elapsed_seconds"]) - state["decision"] = result.get("decision") - state["quant_signal"] = quant.get("rating") - state["llm_signal"] = llm.get("rating") - state["confidence"] = result.get("confidence") state["result"] = result state["error"] = contract.get("error") state["contract_version"] = contract.get("contract_version", state.get("contract_version")) + state["degradation_summary"] = self._build_degradation_summary(result) + state["data_quality_summary"] = contract.get("data_quality") + state["compat"] = { + "decision": result.get("decision"), + "quant_signal": quant.get("rating"), + "llm_signal": llm.get("rating"), + "confidence": result.get("confidence"), + } self.attach_result_contract( task_id, result_ref=result_ref, @@ -194,6 +200,89 @@ class JobService: self.persist_task(task_id, state) return state + def to_public_task_payload(self, task_id: str, *, contract: dict | None = None) -> dict: + state = self.task_results[task_id] + payload = { + "contract_version": state.get("contract_version", CONTRACT_VERSION), + "task_id": task_id, + "request_id": state.get("request_id"), + "executor_type": state.get("executor_type", DEFAULT_EXECUTOR_TYPE), + "result_ref": state.get("result_ref"), + "status": state.get("status"), + "created_at": state.get("created_at"), + "degradation_summary": state.get("degradation_summary"), + "data_quality_summary": state.get("data_quality_summary"), + "error": self._public_error(contract, state), + } + if state.get("type") == "portfolio": + payload.update({ + "type": "portfolio", + "total": state.get("total", 0), + "completed": state.get("completed", 0), + "failed": state.get("failed", 0), + "current_ticker": state.get("current_ticker"), + "results": state.get("results", []), + }) + else: + payload.update({ + "ticker": state.get("ticker"), + "date": state.get("date"), + "progress": state.get("progress", 0), + "current_stage": state.get("current_stage"), + "elapsed_seconds": state.get("elapsed_seconds", 0), + "stages": state.get("stages", []), + "result": self._public_result(contract, state), + }) + + compat = { + key: value + for key, value in (state.get("compat") or {}).items() + if value is not None + } + if compat: + payload["compat"] = compat + return payload + + def to_task_summary(self, task_id: str, *, contract: dict | None = None) -> dict: + state = self.task_results[task_id] + payload = self.to_public_task_payload(task_id, contract=contract) + summary = { + "task_id": payload["task_id"], + "contract_version": payload["contract_version"], + "request_id": payload.get("request_id"), + "executor_type": payload.get("executor_type"), + "result_ref": payload.get("result_ref"), + "status": payload["status"], + "created_at": payload.get("created_at"), + "error": payload.get("error"), + } + if state.get("type") == "portfolio": + summary.update({ + "type": "portfolio", + "total": payload.get("total", 0), + "completed": payload.get("completed", 0), + "failed": payload.get("failed", 0), + "current_ticker": payload.get("current_ticker"), + }) + return summary + + result = payload.get("result") or {} + summary.update({ + "ticker": payload.get("ticker"), + "date": payload.get("date"), + "progress": payload.get("progress", 0), + "current_stage": payload.get("current_stage"), + "summary": { + "decision": result.get("decision"), + "confidence": result.get("confidence"), + "degraded": result.get("degraded", False), + }, + }) + compat = payload.get("compat") + if compat: + summary["compat"] = compat + return summary + def register_background_task(self, task_id: str, task: asyncio.Task) -> None: self.analysis_tasks[task_id] = task @@ -219,4 +308,36 @@ class JobService: normalized.setdefault("executor_type", DEFAULT_EXECUTOR_TYPE) normalized.setdefault("contract_version", CONTRACT_VERSION) normalized.setdefault("result_ref", None) + normalized.setdefault("degradation_summary", None) + normalized.setdefault("data_quality_summary", None) + compat = normalized.get("compat") + if not isinstance(compat, dict): + compat = {} + for key in ("decision", "quant_signal", "llm_signal", "confidence"): + if key in normalized and key not in compat: + compat[key] = normalized.get(key) + normalized["compat"] = compat return normalized + + @staticmethod + def _build_degradation_summary(result: dict) -> dict | None: + if not result: + return None + degraded = bool(result.get("degraded")) + report = result.get("report") or {} + return { + "degraded": degraded, + "report_available": bool(report.get("available")), + } + + @staticmethod + def _public_result(contract: dict | None, state: dict) -> dict | None: + if contract is not None: + return contract.get("result") + return state.get("result") + + @staticmethod + def _public_error(contract: dict | None, state: dict) -> dict | str | None: + if contract is not None and "error" in contract: + return contract.get("error") + return state.get("error") diff --git a/web_dashboard/backend/services/migration_flags.py b/web_dashboard/backend/services/migration_flags.py index f1d13694..10f00b03 100644 --- a/web_dashboard/backend/services/migration_flags.py +++ b/web_dashboard/backend/services/migration_flags.py @@ -13,17 +13,46 @@ def _env_flag(name: str, default: bool = False) -> bool: @dataclass(frozen=True) class MigrationFlags: - """Feature flags for backend application-service migration.""" + """Migration modes for contract-first backend rollout.""" - use_application_services: bool = False - use_result_store: bool = False - use_request_context: bool = True + executor_mode: str = "legacy" + response_mode: str = "contract_first" + write_mode: str = "dual_write" + read_mode: str = "dual_read" + request_context_enabled: bool = True + + @property + def use_application_services(self) -> bool: + return self.executor_mode in {"legacy", "direct", "auto"} + + @property + def use_result_store(self) -> bool: + return self.read_mode in {"dual_read", "contract_only"} + + @property + def use_request_context(self) -> bool: + return self.request_context_enabled def load_migration_flags() -> MigrationFlags: - """Load service migration flags from the environment.""" + """Load service migration modes from the environment with boolean compatibility.""" + executor_mode = os.environ.get("TRADINGAGENTS_EXECUTOR_MODE") + if executor_mode is None: + executor_mode = "legacy" if _env_flag("TRADINGAGENTS_USE_APPLICATION_SERVICES", default=False) else "legacy" + + response_mode = os.environ.get("TRADINGAGENTS_RESPONSE_MODE", "contract_first") + write_mode = os.environ.get("TRADINGAGENTS_WRITE_MODE") + if write_mode is None: + write_mode = "dual_write" if _env_flag("TRADINGAGENTS_USE_RESULT_STORE", default=False) else "dual_write" + + read_mode = os.environ.get("TRADINGAGENTS_READ_MODE") + if read_mode is None: + read_mode = "dual_read" if _env_flag("TRADINGAGENTS_USE_RESULT_STORE", default=False) else "legacy_only" + return MigrationFlags( - use_application_services=_env_flag("TRADINGAGENTS_USE_APPLICATION_SERVICES", default=False), - use_result_store=_env_flag("TRADINGAGENTS_USE_RESULT_STORE", default=False), - use_request_context=_env_flag("TRADINGAGENTS_USE_REQUEST_CONTEXT", default=True), + executor_mode=executor_mode, + response_mode=response_mode, + write_mode=write_mode, + read_mode=read_mode, + request_context_enabled=_env_flag("TRADINGAGENTS_USE_REQUEST_CONTEXT", default=True), ) diff --git a/web_dashboard/backend/services/result_store.py b/web_dashboard/backend/services/result_store.py index 6f4dcf71..7ef8b54a 100644 --- a/web_dashboard/backend/services/result_store.py +++ b/web_dashboard/backend/services/result_store.py @@ -13,7 +13,8 @@ class ResultStore: def __init__(self, task_status_dir: Path, portfolio_gateway): self.task_status_dir = task_status_dir - self.result_contract_dir = self.task_status_dir / "result_contracts" + self.result_contract_dir = self.task_status_dir / "results" + self.legacy_result_contract_dir = self.task_status_dir / "result_contracts" self.portfolio_gateway = portfolio_gateway def restore_task_results(self) -> dict[str, dict]: @@ -34,14 +35,36 @@ class ResultStore: (self.task_status_dir / f"{task_id}.json").write_text(json.dumps(data, ensure_ascii=False)) def save_result_contract(self, task_id: str, contract: dict) -> str: - self.result_contract_dir.mkdir(parents=True, exist_ok=True) + target_dir = self.result_contract_dir / task_id + target_dir.mkdir(parents=True, exist_ok=True) payload = dict(contract) payload.setdefault("task_id", task_id) payload.setdefault("contract_version", CONTRACT_VERSION) - file_path = self.result_contract_dir / f"{task_id}.json" + file_path = target_dir / "result.v1alpha1.json" file_path.write_text(json.dumps(payload, ensure_ascii=False)) return file_path.relative_to(self.task_status_dir).as_posix() + def load_result_contract( + self, + *, + result_ref: str | None = None, + task_id: str | None = None, + ) -> dict | None: + candidates: list[Path] = [] + if result_ref: + candidates.append(self.task_status_dir / result_ref) + if task_id: + candidates.append(self.result_contract_dir / task_id / "result.v1alpha1.json") + candidates.append(self.legacy_result_contract_dir / f"{task_id}.json") + for path in candidates: + if not path.exists(): + continue + try: + return json.loads(path.read_text()) + except Exception: + continue + return None + def delete_task_status(self, task_id: str) -> None: (self.task_status_dir / f"{task_id}.json").unlink(missing_ok=True) diff --git a/web_dashboard/backend/tests/test_api_smoke.py b/web_dashboard/backend/tests/test_api_smoke.py index 137b5765..6824ad26 100644 --- a/web_dashboard/backend/tests/test_api_smoke.py +++ b/web_dashboard/backend/tests/test_api_smoke.py @@ -32,11 +32,24 @@ def test_analysis_task_routes_smoke(monkeypatch): main = _load_main_module(monkeypatch) seeded_task = { + "contract_version": "v1alpha1", "task_id": "task-smoke", + "request_id": "req-task-smoke", + "executor_type": "legacy_subprocess", + "result_ref": None, "ticker": "AAPL", "date": "2026-04-11", "status": "running", + "progress": 10, + "current_stage": "analysts", "created_at": "2026-04-11T10:00:00", + "elapsed_seconds": 1, + "stages": [], + "result": None, + "error": None, + "degradation_summary": None, + "data_quality_summary": None, + "compat": {}, } with TestClient(main.app) as client: @@ -53,6 +66,9 @@ def test_analysis_task_routes_smoke(monkeypatch): assert any(task["task_id"] == "task-smoke" for task in tasks_response.json()["tasks"]) assert status_response.status_code == 200 assert status_response.json()["task_id"] == "task-smoke" + assert status_response.json()["contract_version"] == "v1alpha1" + assert status_response.json()["request_id"] == "req-task-smoke" + assert status_response.json()["result"] is None def test_analysis_start_route_uses_analysis_service(monkeypatch): @@ -96,6 +112,7 @@ def test_analysis_start_route_uses_analysis_service(monkeypatch): assert main.app.state.task_results[task_id]["request_id"] assert main.app.state.task_results[task_id]["executor_type"] == "legacy_subprocess" assert main.app.state.task_results[task_id]["result_ref"] is None + assert main.app.state.task_results[task_id]["compat"] == {} def test_portfolio_analyze_route_uses_analysis_service_smoke(monkeypatch): @@ -123,3 +140,41 @@ def test_portfolio_analyze_route_uses_analysis_service_smoke(monkeypatch): assert isinstance(captured["date"], str) assert captured["request_context"].api_key == "service-key" assert callable(captured["broadcast_progress"]) + + +def test_analysis_websocket_progress_is_contract_first(monkeypatch): + monkeypatch.delenv("DASHBOARD_API_KEY", raising=False) + monkeypatch.setenv("ANTHROPIC_API_KEY", "test-key") + + main = _load_main_module(monkeypatch) + + with TestClient(main.app) as client: + main.app.state.task_results["task-ws"] = { + "contract_version": "v1alpha1", + "task_id": "task-ws", + "request_id": "req-task-ws", + "executor_type": "legacy_subprocess", + "result_ref": None, + "ticker": "AAPL", + "date": "2026-04-11", + "status": "running", + "progress": 50, + "current_stage": "research", + "created_at": "2026-04-11T10:00:00", + "elapsed_seconds": 3, + "stages": [], + "result": None, + "error": None, + "degradation_summary": None, + "data_quality_summary": None, + "compat": {"decision": "HOLD"}, + } + with client.websocket_connect("/ws/analysis/task-ws?api_key=test-key") as websocket: + message = websocket.receive_json() + + assert message["type"] == "progress" + assert message["contract_version"] == "v1alpha1" + assert message["task_id"] == "task-ws" + assert message["request_id"] == "req-task-ws" + assert message["compat"]["decision"] == "HOLD" + assert "decision" not in message diff --git a/web_dashboard/backend/tests/test_portfolio_api.py b/web_dashboard/backend/tests/test_portfolio_api.py index 1ca7d2c6..e6c00d22 100644 --- a/web_dashboard/backend/tests/test_portfolio_api.py +++ b/web_dashboard/backend/tests/test_portfolio_api.py @@ -193,6 +193,8 @@ class TestGetRecommendationsPagination: result = get_recommendations(limit=10, offset=0) assert result["total"] == 5 assert len(result["recommendations"]) == 5 + assert result["recommendations"][0]["contract_version"] == "v1alpha1" + assert result["recommendations"][0]["result"]["decision"] == "BUY" result = get_recommendations(limit=2, offset=0) assert result["total"] == 5 @@ -204,6 +206,42 @@ class TestGetRecommendationsPagination: assert result["offset"] == 2 assert result["limit"] == 2 + def test_single_recommendation_is_normalized_contract(self, tmp_path, monkeypatch): + data_dir = tmp_path / "data" + data_dir.mkdir() + rec_dir = data_dir / "recommendations" / "2026-01-01" + rec_dir.mkdir(parents=True) + + import fcntl + monkeypatch.setattr(fcntl, "flock", lambda *args: None) + + monkeypatch.setattr("api.portfolio.DATA_DIR", data_dir) + monkeypatch.setattr("api.portfolio.RECOMMENDATIONS_DIR", data_dir / "recommendations") + monkeypatch.setattr("api.portfolio.WATCHLIST_FILE", data_dir / "watchlist.json") + monkeypatch.setattr("api.portfolio.POSITIONS_FILE", data_dir / "positions.json") + monkeypatch.setattr("api.portfolio.WATCHLIST_LOCK", data_dir / "watchlist.lock") + monkeypatch.setattr("api.portfolio.POSITIONS_LOCK", data_dir / "positions.lock") + + (rec_dir / "AAPL.json").write_text(json.dumps({ + "ticker": "AAPL", + "name": "Apple", + "analysis_date": "2026-01-01", + "decision": "OVERWEIGHT", + "quant_signal": "BUY", + "llm_signal": "HOLD", + "confidence": 0.75, + })) + + from api.portfolio import get_recommendation + + result = get_recommendation("2026-01-01", "AAPL") + + assert result["contract_version"] == "v1alpha1" + assert result["date"] == "2026-01-01" + assert result["result"]["decision"] == "OVERWEIGHT" + assert result["result"]["signals"]["quant"]["rating"] == "BUY" + assert result["compat"]["confidence"] == 0.75 + class TestConstants: """Verify named constants are defined instead of magic numbers.""" diff --git a/web_dashboard/backend/tests/test_services_migration.py b/web_dashboard/backend/tests/test_services_migration.py index 7bf419c5..2253e9e0 100644 --- a/web_dashboard/backend/tests/test_services_migration.py +++ b/web_dashboard/backend/tests/test_services_migration.py @@ -78,11 +78,14 @@ def test_result_store_saves_result_contract(tmp_path): saved = json.loads((tmp_path / "task_status" / result_ref).read_text()) - assert result_ref == "result_contracts/task-2.json" + assert result_ref == "results/task-2/result.v1alpha1.json" assert saved["task_id"] == "task-2" assert saved["contract_version"] == "v1alpha1" assert saved["result"]["decision"] == "BUY" + loaded = store.load_result_contract(result_ref=result_ref, task_id="task-2") + assert loaded == saved + def test_job_service_create_and_fail_job(): task_results = {} @@ -110,12 +113,13 @@ def test_job_service_create_and_fail_job(): assert state["executor_type"] == "analysis_executor" assert state["contract_version"] == "v1alpha1" assert state["result_ref"] is None + assert state["compat"] == {} attached = service.attach_result_contract( "port_1", - result_ref="result_contracts/port_1.json", + result_ref="results/port_1/result.v1alpha1.json", ) - assert attached["result_ref"] == "result_contracts/port_1.json" + assert attached["result_ref"] == "results/port_1/result.v1alpha1.json" failed = service.fail_job("port_1", "boom") assert failed["status"] == "failed" @@ -138,6 +142,7 @@ def test_job_service_restores_legacy_tasks_with_contract_metadata(): assert restored["executor_type"] == "legacy_subprocess" assert restored["contract_version"] == "v1alpha1" assert restored["result_ref"] is None + assert restored["compat"] == {} def test_analysis_service_build_recommendation_record(): @@ -152,10 +157,11 @@ def test_analysis_service_build_recommendation_record(): ) assert rec["ticker"] == "AAPL" - assert rec["decision"] == "OVERWEIGHT" - assert rec["quant_signal"] == "BUY" - assert rec["llm_signal"] == "HOLD" - assert rec["confidence"] == 0.75 + assert rec["contract_version"] == "v1alpha1" + assert rec["result"]["decision"] == "OVERWEIGHT" + assert rec["result"]["signals"]["quant"]["rating"] == "BUY" + assert rec["result"]["signals"]["llm"]["rating"] == "HOLD" + assert rec["compat"]["confidence"] == 0.75 class FakeExecutor: @@ -219,10 +225,13 @@ def test_analysis_service_start_analysis_uses_executor(tmp_path): "status": "running", } assert task_results["task-1"]["status"] == "completed" - assert task_results["task-1"]["decision"] == "BUY" - assert task_results["task-1"]["result_ref"] == "result_contracts/task-1.json" + assert task_results["task-1"]["compat"]["decision"] == "BUY" + assert task_results["task-1"]["result_ref"] == "results/task-1/result.v1alpha1.json" assert task_results["task-1"]["result"]["signals"]["llm"]["rating"] == "BUY" - saved_contract = json.loads((tmp_path / "task_status" / "result_contracts" / "task-1.json").read_text()) + public_payload = service.to_public_task_payload("task-1", contract=store.load_result_contract(task_id="task-1")) + assert public_payload["result_ref"] == "results/task-1/result.v1alpha1.json" + assert public_payload["compat"]["decision"] == "BUY" + saved_contract = json.loads((tmp_path / "task_status" / "results" / "task-1" / "result.v1alpha1.json").read_text()) assert saved_contract["status"] == "completed" assert saved_contract["result"]["signals"]["merged"]["rating"] == "BUY" assert broadcasts[0] == ("task-1", "running", "analysts")