diff --git a/orchestrator/contracts/__init__.py b/orchestrator/contracts/__init__.py index 150b1a5d..11ee1b8a 100644 --- a/orchestrator/contracts/__init__.py +++ b/orchestrator/contracts/__init__.py @@ -10,6 +10,7 @@ from orchestrator.contracts.config_schema import ( ) from orchestrator.contracts.error_taxonomy import ReasonCode from orchestrator.contracts.result_contract import ( + CombinedSignalFailure, FinalSignal, Signal, build_error_signal, @@ -18,6 +19,7 @@ from orchestrator.contracts.result_contract import ( __all__ = [ "CONTRACT_VERSION", + "CombinedSignalFailure", "FinalSignal", "OrchestratorConfigSchema", "ReasonCode", diff --git a/orchestrator/contracts/result_contract.py b/orchestrator/contracts/result_contract.py index 9221476c..402303d9 100644 --- a/orchestrator/contracts/result_contract.py +++ b/orchestrator/contracts/result_contract.py @@ -97,3 +97,20 @@ def signal_reason_code(signal: Optional[Signal]) -> Optional[str]: if signal is None: return None return signal.reason_code or signal.metadata.get("reason_code") + + +class CombinedSignalFailure(ValueError): + """Structured failure for cases where no merged signal can be produced.""" + + def __init__( + self, + message: str, + *, + reason_codes: tuple[str, ...] = (), + source_diagnostics: Optional[dict[str, Any]] = None, + data_quality: Optional[dict[str, Any]] = None, + ) -> None: + super().__init__(message) + self.reason_codes = tuple(reason_codes) + self.source_diagnostics = dict(source_diagnostics or {}) + self.data_quality = dict(data_quality) if data_quality is not None else None diff --git a/orchestrator/data/market_holidays.json b/orchestrator/data/market_holidays.json new file mode 100644 index 00000000..3ede5fab --- /dev/null +++ b/orchestrator/data/market_holidays.json @@ -0,0 +1,101 @@ +{ + "a_share": { + "2024": [ + "2024-01-01", + "2024-02-10", + "2024-02-11", + "2024-02-12", + "2024-02-13", + "2024-02-14", + "2024-02-15", + "2024-02-16", + "2024-02-17", + "2024-04-04", + "2024-04-05", + "2024-04-06", + "2024-05-01", + "2024-05-02", + "2024-05-03", + "2024-05-04", + "2024-05-05", + "2024-06-08", + "2024-06-09", + "2024-06-10", + "2024-09-15", + "2024-09-16", + "2024-09-17", + "2024-10-01", + "2024-10-02", + "2024-10-03", + "2024-10-04", + "2024-10-05", + "2024-10-06", + "2024-10-07" + ], + "2025": [ + "2025-01-01", + "2025-01-28", + "2025-01-29", + "2025-01-30", + "2025-01-31", + "2025-02-01", + "2025-02-02", + "2025-02-03", + "2025-02-04", + "2025-04-04", + "2025-04-05", + "2025-04-06", + "2025-05-01", + "2025-05-02", + "2025-05-03", + "2025-05-04", + "2025-05-05", + "2025-05-31", + "2025-06-01", + "2025-06-02", + "2025-10-01", + "2025-10-02", + "2025-10-03", + "2025-10-04", + "2025-10-05", + "2025-10-06", + "2025-10-07", + "2025-10-08" + ], + "2026": [ + "2026-01-01", + "2026-01-02", + "2026-01-03", + "2026-02-15", + "2026-02-16", + "2026-02-17", + "2026-02-18", + "2026-02-19", + "2026-02-20", + "2026-02-21", + "2026-02-22", + "2026-02-23", + "2026-04-04", + "2026-04-05", + "2026-04-06", + "2026-05-01", + "2026-05-02", + "2026-05-03", + "2026-05-04", + "2026-05-05", + "2026-06-19", + "2026-06-20", + "2026-06-21", + "2026-09-25", + "2026-09-26", + "2026-09-27", + "2026-10-01", + "2026-10-02", + "2026-10-03", + "2026-10-04", + "2026-10-05", + "2026-10-06", + "2026-10-07" + ] + } +} diff --git a/orchestrator/market_calendar.py b/orchestrator/market_calendar.py index 6a5d6cde..5c75d5da 100644 --- a/orchestrator/market_calendar.py +++ b/orchestrator/market_calendar.py @@ -1,49 +1,80 @@ from __future__ import annotations +import json +import os from datetime import date, timedelta +from pathlib import Path _A_SHARE_SUFFIXES = {"SH", "SS", "SZ"} - -# Mainland exchanges close on weekends plus the annual State Council public-holiday windows. -# Weekend make-up workdays do not become exchange trading days. -_A_SHARE_HOLIDAYS = { - date(2024, 1, 1), - *[date(2024, 2, day) for day in range(10, 18)], - *[date(2024, 4, day) for day in range(4, 7)], - *[date(2024, 5, day) for day in range(1, 6)], - *[date(2024, 6, day) for day in range(8, 11)], - *[date(2024, 9, day) for day in range(15, 18)], - *[date(2024, 10, day) for day in range(1, 8)], - date(2025, 1, 1), - *[date(2025, 1, day) for day in range(28, 32)], - *[date(2025, 2, day) for day in range(1, 5)], - *[date(2025, 4, day) for day in range(4, 7)], - *[date(2025, 5, day) for day in range(1, 6)], - *[date(2025, 5, day) for day in range(31, 32)], - *[date(2025, 6, day) for day in range(1, 3)], - *[date(2025, 10, day) for day in range(1, 9)], - *[date(2026, 1, day) for day in range(1, 4)], - *[date(2026, 2, day) for day in range(15, 24)], - *[date(2026, 4, day) for day in range(4, 7)], - *[date(2026, 5, day) for day in range(1, 6)], - *[date(2026, 6, day) for day in range(19, 22)], - *[date(2026, 9, day) for day in range(25, 28)], - *[date(2026, 10, day) for day in range(1, 8)], -} +_DEFAULT_MARKET_HOLIDAYS_PATH = Path(__file__).with_name("data") / "market_holidays.json" -def is_non_trading_day(ticker: str, day: date) -> bool: +def is_non_trading_day(ticker: str, day: date, *, data_path: Path | None = None) -> bool: """Return whether the requested date is a known non-trading day for the ticker's market.""" if day.weekday() >= 5: return True - if _is_a_share_ticker(ticker): - return day in _A_SHARE_HOLIDAYS - return _is_nyse_holiday(day) + market = market_for_ticker(ticker) + if market == "a_share": + return day in get_market_holidays(market, day.year, data_path=data_path) + if market == "nyse": + return _is_nyse_holiday(day) + return False -def _is_a_share_ticker(ticker: str) -> bool: +def market_for_ticker(ticker: str) -> str: suffix = ticker.rsplit(".", 1)[-1].upper() if "." in ticker else "" - return suffix in _A_SHARE_SUFFIXES + if suffix in _A_SHARE_SUFFIXES: + return "a_share" + return "nyse" + + +def get_market_holidays(market: str, year: int, *, data_path: Path | None = None) -> set[date]: + holidays_by_market = load_market_holidays(data_path=data_path) + market_data = holidays_by_market.get(market, {}) + values = market_data.get(str(year), []) + return {date.fromisoformat(raw) for raw in values} + + +def load_market_holidays(*, data_path: Path | None = None) -> dict[str, dict[str, list[str]]]: + path = _resolve_market_holidays_path(data_path) + if not path.exists(): + return {} + payload = json.loads(path.read_text()) + return { + str(market): {str(year): list(days) for year, days in years.items()} + for market, years in payload.items() + } + + +def update_market_holidays( + *, + market: str, + year: int, + holiday_dates: list[date | str], + data_path: Path | None = None, +) -> Path: + path = _resolve_market_holidays_path(data_path) + payload = load_market_holidays(data_path=path) + payload.setdefault(market, {}) + normalized_days = sorted( + { + item.isoformat() if isinstance(item, date) else date.fromisoformat(item).isoformat() + for item in holiday_dates + } + ) + payload[market][str(year)] = normalized_days + path.parent.mkdir(parents=True, exist_ok=True) + path.write_text(json.dumps(payload, ensure_ascii=False, indent=2, sort_keys=True)) + return path + + +def _resolve_market_holidays_path(data_path: Path | None = None) -> Path: + if data_path is not None: + return data_path + env_path = os.environ.get("TRADINGAGENTS_MARKET_HOLIDAYS_PATH") + if env_path: + return Path(env_path) + return _DEFAULT_MARKET_HOLIDAYS_PATH def _is_nyse_holiday(day: date) -> bool: @@ -66,7 +97,6 @@ def _is_nyse_holiday(day: date) -> bool: if day.year >= 2022: holidays.add(observed_juneteenth) - # When Jan 1 falls on Saturday, NYSE observes New Year's Day on the prior Friday. if day.month == 12 and day.day == 31: next_new_year = _observed_fixed_holiday(day.year + 1, 1, 1) if next_new_year.year == day.year: diff --git a/orchestrator/orchestrator.py b/orchestrator/orchestrator.py index f27c1b0a..e78e22c6 100644 --- a/orchestrator/orchestrator.py +++ b/orchestrator/orchestrator.py @@ -3,8 +3,8 @@ from typing import Optional from orchestrator.config import OrchestratorConfig from orchestrator.contracts.error_taxonomy import ReasonCode -from orchestrator.contracts.result_contract import FinalSignal, Signal, signal_reason_code -from orchestrator.signals import Signal, FinalSignal, SignalMerger +from orchestrator.contracts.result_contract import CombinedSignalFailure, FinalSignal, Signal, signal_reason_code +from orchestrator.signals import SignalMerger from orchestrator.quant_runner import QuantRunner from orchestrator.llm_runner import LLMRunner @@ -92,9 +92,16 @@ class TradingOrchestrator: source_diagnostics["llm"] = {"reason_code": ReasonCode.LLM_SIGNAL_FAILED.value} llm_sig = None - # merge raises ValueError if both None + # Preserve diagnostics even when both lanes degrade and no FinalSignal can be produced. if quant_sig is None and llm_sig is None: degradation_reasons.append(ReasonCode.BOTH_SIGNALS_UNAVAILABLE.value) + raise CombinedSignalFailure( + "both quant and llm signals are None", + reason_codes=tuple(dict.fromkeys(degradation_reasons)), + source_diagnostics=source_diagnostics, + data_quality=self._summarize_data_quality(source_diagnostics), + ) + final_signal = self._merger.merge( quant_sig, llm_sig, diff --git a/orchestrator/tests/test_application_service.py b/orchestrator/tests/test_application_service.py index 0b0c2d5f..c6e7f74c 100644 --- a/orchestrator/tests/test_application_service.py +++ b/orchestrator/tests/test_application_service.py @@ -5,6 +5,7 @@ import pytest import orchestrator.orchestrator as orchestrator_module from orchestrator.config import OrchestratorConfig from orchestrator.contracts.error_taxonomy import ReasonCode +from orchestrator.contracts.result_contract import CombinedSignalFailure from orchestrator.signals import Signal @@ -107,11 +108,16 @@ def test_trading_orchestrator_raises_when_both_sources_degrade(monkeypatch): monkeypatch.setattr(orchestrator_module, "QuantRunner", FakeQuantRunner) monkeypatch.setattr(orchestrator_module, "LLMRunner", FakeLLMRunner) - with pytest.raises(ValueError, match="both quant and llm signals are None"): + with pytest.raises(CombinedSignalFailure) as exc_info: orchestrator_module.TradingOrchestrator( OrchestratorConfig(quant_backtest_path="/tmp/quant") ).get_combined_signal("AAPL", "2026-04-11") + assert str(exc_info.value) == "both quant and llm signals are None" + assert exc_info.value.reason_codes[0] == ReasonCode.QUANT_NO_DATA.value + assert exc_info.value.reason_codes[-1] == ReasonCode.BOTH_SIGNALS_UNAVAILABLE.value + assert exc_info.value.source_diagnostics["quant"]["reason_code"] == ReasonCode.QUANT_NO_DATA.value + def test_trading_orchestrator_surfaces_provider_mismatch_summary_when_llm_degrades(monkeypatch): class FakeQuantRunner: diff --git a/orchestrator/tests/test_live_mode.py b/orchestrator/tests/test_live_mode.py index fd555910..d1baa2d7 100644 --- a/orchestrator/tests/test_live_mode.py +++ b/orchestrator/tests/test_live_mode.py @@ -2,7 +2,7 @@ import asyncio from datetime import datetime, timezone from orchestrator.contracts.error_taxonomy import ReasonCode -from orchestrator.contracts.result_contract import FinalSignal, Signal +from orchestrator.contracts.result_contract import CombinedSignalFailure, FinalSignal, Signal from orchestrator.live_mode import LiveMode @@ -83,7 +83,12 @@ def test_live_mode_serializes_failure_contract_shape(): live_mode = LiveMode( _StubOrchestrator( { - ("AAPL", "2026-04-11"): ValueError("both quant and llm signals are None") + ("AAPL", "2026-04-11"): CombinedSignalFailure( + "both quant and llm signals are None", + reason_codes=(ReasonCode.BOTH_SIGNALS_UNAVAILABLE.value, ReasonCode.PROVIDER_MISMATCH.value), + source_diagnostics={"llm": {"reason_code": ReasonCode.PROVIDER_MISMATCH.value}}, + data_quality={"state": "provider_mismatch", "source": "llm"}, + ) } ) ) @@ -104,9 +109,14 @@ def test_live_mode_serializes_failure_contract_shape(): }, "degradation": { "degraded": True, - "reason_codes": [ReasonCode.BOTH_SIGNALS_UNAVAILABLE.value], - "source_diagnostics": {}, + "reason_codes": [ + ReasonCode.BOTH_SIGNALS_UNAVAILABLE.value, + ReasonCode.PROVIDER_MISMATCH.value, + ], + "source_diagnostics": { + "llm": {"reason_code": ReasonCode.PROVIDER_MISMATCH.value}, + }, }, - "data_quality": None, + "data_quality": {"state": "provider_mismatch", "source": "llm"}, } ] diff --git a/orchestrator/tests/test_market_calendar.py b/orchestrator/tests/test_market_calendar.py index 6b2ac65e..77810227 100644 --- a/orchestrator/tests/test_market_calendar.py +++ b/orchestrator/tests/test_market_calendar.py @@ -1,6 +1,7 @@ +import json from datetime import date -from orchestrator.market_calendar import is_non_trading_day +from orchestrator.market_calendar import get_market_holidays, is_non_trading_day, update_market_holidays def test_is_non_trading_day_marks_a_share_holiday(): @@ -13,3 +14,22 @@ def test_is_non_trading_day_marks_nyse_holiday(): def test_is_non_trading_day_leaves_regular_weekday_open(): assert is_non_trading_day('AAPL', date(2024, 3, 28)) is False + + +def test_update_market_holidays_creates_maintainable_future_year_entry(tmp_path): + data_path = tmp_path / "market_holidays.json" + data_path.write_text(json.dumps({"a_share": {}})) + + update_market_holidays( + market="a_share", + year=2027, + holiday_dates=["2027-02-10", "2027-02-11"], + data_path=data_path, + ) + + assert get_market_holidays("a_share", 2027, data_path=data_path) == { + date(2027, 2, 10), + date(2027, 2, 11), + } + assert is_non_trading_day("600519.SS", date(2027, 2, 10)) is False + assert is_non_trading_day("600519.SS", date(2027, 2, 10), data_path=data_path) is True diff --git a/web_dashboard/backend/main.py b/web_dashboard/backend/main.py index 4c26840e..a741b77e 100644 --- a/web_dashboard/backend/main.py +++ b/web_dashboard/backend/main.py @@ -18,6 +18,7 @@ from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import Response, FileResponse from fastapi.staticfiles import StaticFiles from pydantic import BaseModel +from tradingagents.default_config import get_default_config from services import AnalysisService, JobService, ResultStore, build_request_context, load_migration_flags from services.executor import LegacySubprocessAnalysisExecutor @@ -55,7 +56,7 @@ async def lifespan(app: FastAPI): executor=LegacySubprocessAnalysisExecutor( analysis_python=ANALYSIS_PYTHON, repo_root=REPO_ROOT, - api_key_resolver=_get_analysis_api_key, + api_key_resolver=_get_analysis_provider_api_key, process_registry=app.state.job_service.register_process, ), result_store=app.state.result_store, @@ -103,10 +104,8 @@ class ScreenRequest(BaseModel): @app.get("/api/config/check") async def check_config(): - """Check if the app is configured (API key is set). - The FastAPI backend receives ANTHROPIC_API_KEY as an env var when spawned by Tauri. - """ - configured = bool(_get_analysis_api_key()) + """Check if the analysis provider is configured with a callable API key.""" + configured = bool(_resolve_analysis_runtime_settings().get("provider_api_key")) return {"configured": configured} @@ -151,7 +150,7 @@ _api_key: Optional[str] = None def _get_api_key() -> Optional[str]: global _api_key if _api_key is None: - _api_key = os.environ.get("DASHBOARD_API_KEY") or os.environ.get("ANTHROPIC_API_KEY") + _api_key = os.environ.get("DASHBOARD_API_KEY") return _api_key def _check_api_key(api_key: Optional[str]) -> bool: @@ -181,15 +180,73 @@ def _persist_analysis_api_key(api_key_value: str): CONFIG_PATH.parent.mkdir(parents=True, exist_ok=True) CONFIG_PATH.write_text(json.dumps({"api_key": api_key_value}, ensure_ascii=False)) os.chmod(CONFIG_PATH, 0o600) - os.environ["ANTHROPIC_API_KEY"] = api_key_value _api_key = None -def _get_analysis_api_key() -> Optional[str]: - return ( - os.environ.get("ANTHROPIC_API_KEY") - or os.environ.get("MINIMAX_API_KEY") - or _load_saved_config().get("api_key") +def _get_analysis_provider_api_key(provider: str, saved_api_key: Optional[str] = None) -> Optional[str]: + env_names = { + "anthropic": ("ANTHROPIC_API_KEY", "MINIMAX_API_KEY"), + "openai": ("OPENAI_API_KEY",), + "openrouter": ("OPENROUTER_API_KEY",), + "xai": ("XAI_API_KEY",), + "google": ("GOOGLE_API_KEY",), + "ollama": tuple(), + }.get(provider.lower(), tuple()) + for env_name in env_names: + value = os.environ.get(env_name) + if value: + return value + return saved_api_key + + +def _resolve_analysis_runtime_settings() -> dict: + saved = _load_saved_config() + defaults = get_default_config() + + provider = os.environ.get("TRADINGAGENTS_LLM_PROVIDER") + if not provider: + if os.environ.get("ANTHROPIC_BASE_URL"): + provider = "anthropic" + elif os.environ.get("OPENAI_BASE_URL"): + provider = "openai" + else: + provider = defaults.get("llm_provider", "anthropic") + + backend_url = ( + os.environ.get("TRADINGAGENTS_BACKEND_URL") + or os.environ.get("ANTHROPIC_BASE_URL") + or os.environ.get("OPENAI_BASE_URL") + or defaults.get("backend_url") + ) + deep_model = ( + os.environ.get("TRADINGAGENTS_DEEP_MODEL") + or os.environ.get("TRADINGAGENTS_MODEL") + or defaults.get("deep_think_llm") + ) + quick_model = ( + os.environ.get("TRADINGAGENTS_QUICK_MODEL") + or os.environ.get("TRADINGAGENTS_MODEL") + or defaults.get("quick_think_llm") + ) + return { + "llm_provider": provider, + "backend_url": backend_url, + "deep_think_llm": deep_model, + "quick_think_llm": quick_model, + "provider_api_key": _get_analysis_provider_api_key(provider, saved.get("api_key")), + } + + +def _build_analysis_request_context(request: Request, auth_key: Optional[str]): + settings = _resolve_analysis_runtime_settings() + return build_request_context( + request, + auth_key=auth_key, + provider_api_key=settings["provider_api_key"], + llm_provider=settings["llm_provider"], + backend_url=settings["backend_url"], + deep_think_llm=settings["deep_think_llm"], + quick_think_llm=settings["quick_think_llm"], ) @@ -279,7 +336,7 @@ async def start_analysis( task_id = f"{payload.ticker}_{datetime.now().strftime('%Y%m%d_%H%M%S')}_{uuid.uuid4().hex[:6]}" date = payload.date or datetime.now().strftime("%Y-%m-%d") - request_context = build_request_context(http_request, api_key=api_key) + request_context = _build_analysis_request_context(http_request, api_key) try: return await app.state.analysis_service.start_analysis( @@ -838,7 +895,7 @@ async def start_portfolio_analysis( date = datetime.now().strftime("%Y-%m-%d") task_id = f"port_{date}_{uuid.uuid4().hex[:6]}" - request_context = build_request_context(http_request, api_key=api_key) + request_context = _build_analysis_request_context(http_request, api_key) try: return await app.state.analysis_service.start_portfolio_analysis( diff --git a/web_dashboard/backend/services/analysis_service.py b/web_dashboard/backend/services/analysis_service.py index 1ea37d3c..3346403b 100644 --- a/web_dashboard/backend/services/analysis_service.py +++ b/web_dashboard/backend/services/analysis_service.py @@ -154,6 +154,12 @@ class AnalysisService: started_at=start_time, code=exc.code, retryable=exc.retryable, + degradation={ + "degraded": bool(exc.degrade_reason_codes) or bool(exc.data_quality), + "reason_codes": list(exc.degrade_reason_codes), + "source_diagnostics": exc.source_diagnostics or {}, + } if (exc.degrade_reason_codes or exc.data_quality or exc.source_diagnostics) else None, + data_quality=exc.data_quality, ) except Exception as exc: self._fail_analysis_state( @@ -162,6 +168,8 @@ class AnalysisService: started_at=start_time, code="analysis_failed", retryable=False, + degradation=None, + data_quality=None, ) await broadcast_progress(task_id, self.job_service.task_results[task_id]) @@ -279,12 +287,16 @@ class AnalysisService: started_at: float, code: str, retryable: bool, + degradation: Optional[dict], + data_quality: Optional[dict], ) -> None: state = self.job_service.task_results[task_id] state["status"] = "failed" state["elapsed_seconds"] = int(time.monotonic() - started_at) state["elapsed"] = state["elapsed_seconds"] state["result"] = None + state["degradation_summary"] = degradation + state["data_quality_summary"] = data_quality state["error"] = { "code": code, "message": message, diff --git a/web_dashboard/backend/services/executor.py b/web_dashboard/backend/services/executor.py index 69514c67..84431cbf 100644 --- a/web_dashboard/backend/services/executor.py +++ b/web_dashboard/backend/services/executor.py @@ -75,7 +75,13 @@ print("STAGE:trading", flush=True) try: result = orchestrator.get_combined_signal(ticker, date) -except ValueError as exc: +except Exception as exc: + result_meta = { + "degrade_reason_codes": list(getattr(exc, "reason_codes", ()) or ()), + "data_quality": getattr(exc, "data_quality", None), + "source_diagnostics": getattr(exc, "source_diagnostics", None), + } + print("RESULT_META:" + json.dumps(result_meta), file=sys.stderr, flush=True) print("ANALYSIS_ERROR:" + str(exc), file=sys.stderr, flush=True) sys.exit(1) @@ -214,10 +220,22 @@ class AnalysisExecutionOutput: class AnalysisExecutorError(RuntimeError): - def __init__(self, message: str, *, code: str = "analysis_failed", retryable: bool = False): + def __init__( + self, + message: str, + *, + code: str = "analysis_failed", + retryable: bool = False, + degrade_reason_codes: tuple[str, ...] = (), + data_quality: Optional[dict] = None, + source_diagnostics: Optional[dict] = None, + ): super().__init__(message) self.code = code self.retryable = retryable + self.degrade_reason_codes = degrade_reason_codes + self.data_quality = data_quality + self.source_diagnostics = source_diagnostics class AnalysisExecutor(Protocol): @@ -240,7 +258,7 @@ class LegacySubprocessAnalysisExecutor: *, analysis_python: Path, repo_root: Path, - api_key_resolver: Callable[[], Optional[str]], + api_key_resolver: Callable[..., Optional[str]], process_registry: Optional[ProcessRegistry] = None, script_template: str = LEGACY_ANALYSIS_SCRIPT_TEMPLATE, stdout_timeout_secs: float = 300.0, @@ -261,9 +279,10 @@ class LegacySubprocessAnalysisExecutor: request_context: RequestContext, on_stage: Optional[StageCallback] = None, ) -> AnalysisExecutionOutput: - analysis_api_key = request_context.api_key or self.api_key_resolver() - if not analysis_api_key: - raise RuntimeError("ANTHROPIC_API_KEY environment variable not set") + llm_provider = (request_context.llm_provider or "anthropic").lower() + analysis_api_key = request_context.provider_api_key or self._resolve_provider_api_key(llm_provider) + if llm_provider != "ollama" and not analysis_api_key: + raise RuntimeError(f"{llm_provider} provider API key not configured") script_path: Optional[Path] = None proc: asyncio.subprocess.Process | None = None @@ -279,7 +298,16 @@ class LegacySubprocessAnalysisExecutor: for key, value in os.environ.items() if not key.startswith(("PYTHON", "CONDA", "VIRTUAL")) } - clean_env["ANTHROPIC_API_KEY"] = analysis_api_key + clean_env["TRADINGAGENTS_LLM_PROVIDER"] = llm_provider + if request_context.backend_url: + clean_env["TRADINGAGENTS_BACKEND_URL"] = request_context.backend_url + if request_context.deep_think_llm: + clean_env["TRADINGAGENTS_DEEP_MODEL"] = request_context.deep_think_llm + if request_context.quick_think_llm: + clean_env["TRADINGAGENTS_QUICK_MODEL"] = request_context.quick_think_llm + for env_name in self._provider_api_env_names(llm_provider): + if analysis_api_key: + clean_env[env_name] = analysis_api_key proc = await asyncio.create_subprocess_exec( str(self.analysis_python), @@ -317,9 +345,22 @@ class LegacySubprocessAnalysisExecutor: await proc.wait() stderr_bytes = await proc.stderr.read() if proc.stderr is not None else b"" + stderr_lines = stderr_bytes.decode(errors="replace").splitlines() if stderr_bytes else [] if proc.returncode != 0: - message = stderr_bytes.decode(errors="replace")[-1000:] if stderr_bytes else f"exit {proc.returncode}" - raise AnalysisExecutorError(message) + failure_meta = self._parse_failure_metadata(stdout_lines, stderr_lines) + message = self._extract_error_message(stderr_lines) or (stderr_bytes.decode(errors="replace")[-1000:] if stderr_bytes else f"exit {proc.returncode}") + if failure_meta is None: + raise AnalysisExecutorError( + "analysis subprocess failed without required markers: RESULT_META", + code="analysis_protocol_failed", + ) + raise AnalysisExecutorError( + message, + code="analysis_failed", + degrade_reason_codes=failure_meta["degrade_reason_codes"], + data_quality=failure_meta["data_quality"], + source_diagnostics=failure_meta["source_diagnostics"], + ) return self._parse_output( stdout_lines=stdout_lines, @@ -347,6 +388,48 @@ class LegacySubprocessAnalysisExecutor: return await proc.wait() + def _resolve_provider_api_key(self, provider: str) -> Optional[str]: + try: + return self.api_key_resolver(provider) # type: ignore[misc] + except TypeError: + return self.api_key_resolver() + + @staticmethod + def _provider_api_env_names(provider: str) -> tuple[str, ...]: + return { + "anthropic": ("ANTHROPIC_API_KEY",), + "openai": ("OPENAI_API_KEY",), + "openrouter": ("OPENROUTER_API_KEY",), + "xai": ("XAI_API_KEY",), + "google": ("GOOGLE_API_KEY",), + "ollama": tuple(), + }.get(provider, tuple()) + + @staticmethod + def _parse_failure_metadata(stdout_lines: list[str], stderr_lines: list[str]) -> Optional[dict]: + for line in [*stdout_lines, *stderr_lines]: + if line.startswith("RESULT_META:"): + try: + detail = json.loads(line.split(":", 1)[1].strip()) + except Exception as exc: + raise AnalysisExecutorError( + "failed to parse RESULT_META payload", + code="analysis_protocol_failed", + ) from exc + return { + "degrade_reason_codes": tuple(detail.get("degrade_reason_codes") or ()), + "data_quality": detail.get("data_quality"), + "source_diagnostics": detail.get("source_diagnostics"), + } + return None + + @staticmethod + def _extract_error_message(stderr_lines: list[str]) -> Optional[str]: + for line in stderr_lines: + if line.startswith("ANALYSIS_ERROR:"): + return line.split(":", 1)[1].strip() + return None + @staticmethod def _parse_output( *, @@ -393,6 +476,8 @@ class LegacySubprocessAnalysisExecutor: missing_markers = [] if not seen_signal_detail: missing_markers.append("SIGNAL_DETAIL") + if not seen_result_meta: + missing_markers.append("RESULT_META") if not seen_complete: missing_markers.append("ANALYSIS_COMPLETE") if missing_markers: diff --git a/web_dashboard/backend/services/request_context.py b/web_dashboard/backend/services/request_context.py index c88340a0..b3824701 100644 --- a/web_dashboard/backend/services/request_context.py +++ b/web_dashboard/backend/services/request_context.py @@ -18,7 +18,12 @@ class RequestContext: request_id: str contract_version: str = CONTRACT_VERSION executor_type: str = DEFAULT_EXECUTOR_TYPE - api_key: Optional[str] = None + auth_key: Optional[str] = None + provider_api_key: Optional[str] = None + llm_provider: Optional[str] = None + backend_url: Optional[str] = None + deep_think_llm: Optional[str] = None + quick_think_llm: Optional[str] = None client_host: Optional[str] = None is_local: bool = False metadata: dict[str, str] = field(default_factory=dict) @@ -27,7 +32,12 @@ class RequestContext: def build_request_context( request: Optional[Request] = None, *, - api_key: Optional[str] = None, + auth_key: Optional[str] = None, + provider_api_key: Optional[str] = None, + llm_provider: Optional[str] = None, + backend_url: Optional[str] = None, + deep_think_llm: Optional[str] = None, + quick_think_llm: Optional[str] = None, request_id: Optional[str] = None, contract_version: str = CONTRACT_VERSION, executor_type: str = DEFAULT_EXECUTOR_TYPE, @@ -40,7 +50,12 @@ def build_request_context( request_id=request_id or uuid4().hex, contract_version=contract_version, executor_type=executor_type, - api_key=api_key, + auth_key=auth_key, + provider_api_key=provider_api_key, + llm_provider=llm_provider, + backend_url=backend_url, + deep_think_llm=deep_think_llm, + quick_think_llm=quick_think_llm, client_host=client_host, is_local=is_local, metadata=dict(metadata or {}), diff --git a/web_dashboard/backend/tests/test_api_smoke.py b/web_dashboard/backend/tests/test_api_smoke.py index d02924d3..e27ea241 100644 --- a/web_dashboard/backend/tests/test_api_smoke.py +++ b/web_dashboard/backend/tests/test_api_smoke.py @@ -140,7 +140,7 @@ def test_portfolio_analyze_route_uses_analysis_service_smoke(monkeypatch): assert response.json()["status"] == "running" assert str(captured["task_id"]).startswith("port_") assert isinstance(captured["date"], str) - assert captured["request_context"].api_key == "service-key" + assert captured["request_context"].auth_key == "service-key" assert callable(captured["broadcast_progress"]) @@ -248,7 +248,7 @@ def test_orchestrator_websocket_smoke_is_contract_first(monkeypatch): def test_orchestrator_websocket_rejects_unauthorized(monkeypatch): - monkeypatch.delenv("DASHBOARD_API_KEY", raising=False) + monkeypatch.setenv("DASHBOARD_API_KEY", "dashboard-secret") monkeypatch.setenv("ANTHROPIC_API_KEY", "test-key") main = _load_main_module(monkeypatch) diff --git a/web_dashboard/backend/tests/test_executors.py b/web_dashboard/backend/tests/test_executors.py index ff861e9a..fe6b4df1 100644 --- a/web_dashboard/backend/tests/test_executors.py +++ b/web_dashboard/backend/tests/test_executors.py @@ -71,12 +71,16 @@ def test_executor_raises_when_required_markers_missing(monkeypatch): ) async def scenario(): - with pytest.raises(AnalysisExecutorError, match="required markers: ANALYSIS_COMPLETE"): + with pytest.raises(AnalysisExecutorError, match="required markers: RESULT_META, ANALYSIS_COMPLETE"): await executor.execute( task_id="task-1", ticker="AAPL", date="2026-04-13", - request_context=build_request_context(api_key="ctx-key"), + request_context=build_request_context( + provider_api_key="ctx-key", + llm_provider="anthropic", + backend_url="https://api.minimaxi.com/anthropic", + ), ) asyncio.run(scenario()) @@ -103,7 +107,11 @@ def test_executor_kills_subprocess_on_timeout(monkeypatch): task_id="task-2", ticker="AAPL", date="2026-04-13", - request_context=build_request_context(api_key="ctx-key"), + request_context=build_request_context( + provider_api_key="ctx-key", + llm_provider="anthropic", + backend_url="https://api.minimaxi.com/anthropic", + ), ) asyncio.run(scenario()) @@ -136,3 +144,99 @@ def test_executor_marks_degraded_success_when_result_meta_reports_data_quality() assert contract["status"] == "degraded_success" assert contract["data_quality"]["state"] == "non_trading_day" assert contract["degradation"]["reason_codes"] == ["non_trading_day"] + + +def test_executor_requires_result_meta_on_success(): + with pytest.raises(AnalysisExecutorError, match="required markers: RESULT_META"): + LegacySubprocessAnalysisExecutor._parse_output( + stdout_lines=[ + 'SIGNAL_DETAIL:{"quant_signal":"HOLD","llm_signal":"BUY","confidence":0.6}', + "ANALYSIS_COMPLETE:OVERWEIGHT", + ], + ticker="AAPL", + date="2026-04-12", + contract_version="v1alpha1", + executor_type="legacy_subprocess", + ) + + +def test_executor_injects_provider_specific_env(monkeypatch): + captured = {} + process = _FakeProcess( + _FakeStdout( + [ + b'SIGNAL_DETAIL:{"quant_signal":"BUY","llm_signal":"BUY","confidence":0.8}\n', + b'RESULT_META:{"degrade_reason_codes":[],"data_quality":{"state":"ok"}}\n', + b"ANALYSIS_COMPLETE:BUY\n", + ] + ), + returncode=0, + ) + + async def fake_create_subprocess_exec(*args, **kwargs): + captured["env"] = kwargs["env"] + return process + + monkeypatch.setattr(asyncio, "create_subprocess_exec", fake_create_subprocess_exec) + + executor = LegacySubprocessAnalysisExecutor( + analysis_python=Path("/usr/bin/python3"), + repo_root=Path("."), + api_key_resolver=lambda provider="openai": "fallback-key", + ) + + async def scenario(): + await executor.execute( + task_id="task-provider", + ticker="AAPL", + date="2026-04-13", + request_context=build_request_context( + auth_key="dashboard-key", + provider_api_key="provider-key", + llm_provider="openai", + backend_url="https://api.openai.com/v1", + deep_think_llm="gpt-5.4", + quick_think_llm="gpt-5.4-mini", + ), + ) + + asyncio.run(scenario()) + + assert captured["env"]["TRADINGAGENTS_LLM_PROVIDER"] == "openai" + assert captured["env"]["TRADINGAGENTS_BACKEND_URL"] == "https://api.openai.com/v1" + assert captured["env"]["OPENAI_API_KEY"] == "provider-key" + assert "ANTHROPIC_API_KEY" not in captured["env"] + + +def test_executor_requires_result_meta_on_failure(monkeypatch): + process = _FakeProcess( + _FakeStdout([]), + stderr=b"ANALYSIS_ERROR:boom\n", + returncode=1, + ) + + async def fake_create_subprocess_exec(*args, **kwargs): + return process + + monkeypatch.setattr(asyncio, "create_subprocess_exec", fake_create_subprocess_exec) + + executor = LegacySubprocessAnalysisExecutor( + analysis_python=Path("/usr/bin/python3"), + repo_root=Path("."), + api_key_resolver=lambda: "env-key", + ) + + async def scenario(): + with pytest.raises(AnalysisExecutorError, match="required markers: RESULT_META"): + await executor.execute( + task_id="task-failure", + ticker="AAPL", + date="2026-04-13", + request_context=build_request_context( + provider_api_key="ctx-key", + llm_provider="anthropic", + backend_url="https://api.minimaxi.com/anthropic", + ), + ) + + asyncio.run(scenario()) diff --git a/web_dashboard/backend/tests/test_services_migration.py b/web_dashboard/backend/tests/test_services_migration.py index 2253e9e0..f2e9df30 100644 --- a/web_dashboard/backend/tests/test_services_migration.py +++ b/web_dashboard/backend/tests/test_services_migration.py @@ -45,9 +45,20 @@ def test_load_migration_flags_from_env(monkeypatch): def test_build_request_context_defaults(): - context = build_request_context(api_key="secret", metadata={"source": "test"}) + context = build_request_context( + auth_key="dashboard-secret", + provider_api_key="provider-secret", + llm_provider="anthropic", + backend_url="https://api.minimaxi.com/anthropic", + deep_think_llm="MiniMax-M2.7-highspeed", + quick_think_llm="MiniMax-M2.7-highspeed", + metadata={"source": "test"}, + ) - assert context.api_key == "secret" + assert context.auth_key == "dashboard-secret" + assert context.provider_api_key == "provider-secret" + assert context.llm_provider == "anthropic" + assert context.backend_url == "https://api.minimaxi.com/anthropic" assert context.request_id assert context.contract_version == "v1alpha1" assert context.executor_type == "legacy_subprocess" @@ -209,7 +220,12 @@ def test_analysis_service_start_analysis_uses_executor(tmp_path): task_id="task-1", ticker="AAPL", date="2026-04-13", - request_context=build_request_context(api_key="secret"), + request_context=build_request_context( + auth_key="dashboard-secret", + provider_api_key="provider-secret", + llm_provider="anthropic", + backend_url="https://api.minimaxi.com/anthropic", + ), broadcast_progress=_broadcast, ) await analysis_tasks["task-1"]