Harden executor configuration and failure contracts before further rollout

The rollout-ready branch still conflated dashboard auth with provider credentials, discarded diagnostics when both signal lanes degraded, and treated RESULT_META as optional even though downstream contracts now depend on it. This change separates provider runtime settings from request auth, preserves source diagnostics/data quality in full-failure contracts, requires RESULT_META in the subprocess protocol, and moves A-share holidays into an updateable calendar data source.

Constraint: No external market-calendar dependency is available in env312 and dependency policy forbids adding one casually
Rejected: Keep reading provider keys from request headers | couples dashboard auth to execution and breaks non-anthropic providers
Rejected: Leave both-signals-unavailable as a bare ValueError | loses diagnostics before live/backend contracts can serialize them
Rejected: Keep A-share holidays embedded in Python constants | requires code edits every year and preserves the stopgap design
Confidence: high
Scope-risk: moderate
Reversibility: clean
Directive: Keep subprocess protocol fields explicit and fail closed when RESULT_META is missing; do not route provider credentials through dashboard auth again
Tested: python -m pytest web_dashboard/backend/tests/test_executors.py web_dashboard/backend/tests/test_services_migration.py web_dashboard/backend/tests/test_api_smoke.py orchestrator/tests/test_market_calendar.py orchestrator/tests/test_live_mode.py orchestrator/tests/test_application_service.py orchestrator/tests/test_quant_runner.py orchestrator/tests/test_llm_runner.py -q
Tested: python -m compileall orchestrator web_dashboard/backend
Not-tested: real provider-backed execution across openai/google providers
Not-tested: browser/manual verification beyond existing frontend contract consumers
This commit is contained in:
陈少杰 2026-04-14 01:54:44 +08:00
parent a70b485d6f
commit 63858bf98b
15 changed files with 560 additions and 78 deletions

View File

@ -10,6 +10,7 @@ from orchestrator.contracts.config_schema import (
) )
from orchestrator.contracts.error_taxonomy import ReasonCode from orchestrator.contracts.error_taxonomy import ReasonCode
from orchestrator.contracts.result_contract import ( from orchestrator.contracts.result_contract import (
CombinedSignalFailure,
FinalSignal, FinalSignal,
Signal, Signal,
build_error_signal, build_error_signal,
@ -18,6 +19,7 @@ from orchestrator.contracts.result_contract import (
__all__ = [ __all__ = [
"CONTRACT_VERSION", "CONTRACT_VERSION",
"CombinedSignalFailure",
"FinalSignal", "FinalSignal",
"OrchestratorConfigSchema", "OrchestratorConfigSchema",
"ReasonCode", "ReasonCode",

View File

@ -97,3 +97,20 @@ def signal_reason_code(signal: Optional[Signal]) -> Optional[str]:
if signal is None: if signal is None:
return None return None
return signal.reason_code or signal.metadata.get("reason_code") return signal.reason_code or signal.metadata.get("reason_code")
class CombinedSignalFailure(ValueError):
"""Structured failure for cases where no merged signal can be produced."""
def __init__(
self,
message: str,
*,
reason_codes: tuple[str, ...] = (),
source_diagnostics: Optional[dict[str, Any]] = None,
data_quality: Optional[dict[str, Any]] = None,
) -> None:
super().__init__(message)
self.reason_codes = tuple(reason_codes)
self.source_diagnostics = dict(source_diagnostics or {})
self.data_quality = dict(data_quality) if data_quality is not None else None

View File

@ -0,0 +1,101 @@
{
"a_share": {
"2024": [
"2024-01-01",
"2024-02-10",
"2024-02-11",
"2024-02-12",
"2024-02-13",
"2024-02-14",
"2024-02-15",
"2024-02-16",
"2024-02-17",
"2024-04-04",
"2024-04-05",
"2024-04-06",
"2024-05-01",
"2024-05-02",
"2024-05-03",
"2024-05-04",
"2024-05-05",
"2024-06-08",
"2024-06-09",
"2024-06-10",
"2024-09-15",
"2024-09-16",
"2024-09-17",
"2024-10-01",
"2024-10-02",
"2024-10-03",
"2024-10-04",
"2024-10-05",
"2024-10-06",
"2024-10-07"
],
"2025": [
"2025-01-01",
"2025-01-28",
"2025-01-29",
"2025-01-30",
"2025-01-31",
"2025-02-01",
"2025-02-02",
"2025-02-03",
"2025-02-04",
"2025-04-04",
"2025-04-05",
"2025-04-06",
"2025-05-01",
"2025-05-02",
"2025-05-03",
"2025-05-04",
"2025-05-05",
"2025-05-31",
"2025-06-01",
"2025-06-02",
"2025-10-01",
"2025-10-02",
"2025-10-03",
"2025-10-04",
"2025-10-05",
"2025-10-06",
"2025-10-07",
"2025-10-08"
],
"2026": [
"2026-01-01",
"2026-01-02",
"2026-01-03",
"2026-02-15",
"2026-02-16",
"2026-02-17",
"2026-02-18",
"2026-02-19",
"2026-02-20",
"2026-02-21",
"2026-02-22",
"2026-02-23",
"2026-04-04",
"2026-04-05",
"2026-04-06",
"2026-05-01",
"2026-05-02",
"2026-05-03",
"2026-05-04",
"2026-05-05",
"2026-06-19",
"2026-06-20",
"2026-06-21",
"2026-09-25",
"2026-09-26",
"2026-09-27",
"2026-10-01",
"2026-10-02",
"2026-10-03",
"2026-10-04",
"2026-10-05",
"2026-10-06",
"2026-10-07"
]
}
}

View File

@ -1,49 +1,80 @@
from __future__ import annotations from __future__ import annotations
import json
import os
from datetime import date, timedelta from datetime import date, timedelta
from pathlib import Path
_A_SHARE_SUFFIXES = {"SH", "SS", "SZ"} _A_SHARE_SUFFIXES = {"SH", "SS", "SZ"}
_DEFAULT_MARKET_HOLIDAYS_PATH = Path(__file__).with_name("data") / "market_holidays.json"
# Mainland exchanges close on weekends plus the annual State Council public-holiday windows.
# Weekend make-up workdays do not become exchange trading days.
_A_SHARE_HOLIDAYS = {
date(2024, 1, 1),
*[date(2024, 2, day) for day in range(10, 18)],
*[date(2024, 4, day) for day in range(4, 7)],
*[date(2024, 5, day) for day in range(1, 6)],
*[date(2024, 6, day) for day in range(8, 11)],
*[date(2024, 9, day) for day in range(15, 18)],
*[date(2024, 10, day) for day in range(1, 8)],
date(2025, 1, 1),
*[date(2025, 1, day) for day in range(28, 32)],
*[date(2025, 2, day) for day in range(1, 5)],
*[date(2025, 4, day) for day in range(4, 7)],
*[date(2025, 5, day) for day in range(1, 6)],
*[date(2025, 5, day) for day in range(31, 32)],
*[date(2025, 6, day) for day in range(1, 3)],
*[date(2025, 10, day) for day in range(1, 9)],
*[date(2026, 1, day) for day in range(1, 4)],
*[date(2026, 2, day) for day in range(15, 24)],
*[date(2026, 4, day) for day in range(4, 7)],
*[date(2026, 5, day) for day in range(1, 6)],
*[date(2026, 6, day) for day in range(19, 22)],
*[date(2026, 9, day) for day in range(25, 28)],
*[date(2026, 10, day) for day in range(1, 8)],
}
def is_non_trading_day(ticker: str, day: date) -> bool: def is_non_trading_day(ticker: str, day: date, *, data_path: Path | None = None) -> bool:
"""Return whether the requested date is a known non-trading day for the ticker's market.""" """Return whether the requested date is a known non-trading day for the ticker's market."""
if day.weekday() >= 5: if day.weekday() >= 5:
return True return True
if _is_a_share_ticker(ticker): market = market_for_ticker(ticker)
return day in _A_SHARE_HOLIDAYS if market == "a_share":
return _is_nyse_holiday(day) return day in get_market_holidays(market, day.year, data_path=data_path)
if market == "nyse":
return _is_nyse_holiday(day)
return False
def _is_a_share_ticker(ticker: str) -> bool: def market_for_ticker(ticker: str) -> str:
suffix = ticker.rsplit(".", 1)[-1].upper() if "." in ticker else "" suffix = ticker.rsplit(".", 1)[-1].upper() if "." in ticker else ""
return suffix in _A_SHARE_SUFFIXES if suffix in _A_SHARE_SUFFIXES:
return "a_share"
return "nyse"
def get_market_holidays(market: str, year: int, *, data_path: Path | None = None) -> set[date]:
holidays_by_market = load_market_holidays(data_path=data_path)
market_data = holidays_by_market.get(market, {})
values = market_data.get(str(year), [])
return {date.fromisoformat(raw) for raw in values}
def load_market_holidays(*, data_path: Path | None = None) -> dict[str, dict[str, list[str]]]:
path = _resolve_market_holidays_path(data_path)
if not path.exists():
return {}
payload = json.loads(path.read_text())
return {
str(market): {str(year): list(days) for year, days in years.items()}
for market, years in payload.items()
}
def update_market_holidays(
*,
market: str,
year: int,
holiday_dates: list[date | str],
data_path: Path | None = None,
) -> Path:
path = _resolve_market_holidays_path(data_path)
payload = load_market_holidays(data_path=path)
payload.setdefault(market, {})
normalized_days = sorted(
{
item.isoformat() if isinstance(item, date) else date.fromisoformat(item).isoformat()
for item in holiday_dates
}
)
payload[market][str(year)] = normalized_days
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(json.dumps(payload, ensure_ascii=False, indent=2, sort_keys=True))
return path
def _resolve_market_holidays_path(data_path: Path | None = None) -> Path:
if data_path is not None:
return data_path
env_path = os.environ.get("TRADINGAGENTS_MARKET_HOLIDAYS_PATH")
if env_path:
return Path(env_path)
return _DEFAULT_MARKET_HOLIDAYS_PATH
def _is_nyse_holiday(day: date) -> bool: def _is_nyse_holiday(day: date) -> bool:
@ -66,7 +97,6 @@ def _is_nyse_holiday(day: date) -> bool:
if day.year >= 2022: if day.year >= 2022:
holidays.add(observed_juneteenth) holidays.add(observed_juneteenth)
# When Jan 1 falls on Saturday, NYSE observes New Year's Day on the prior Friday.
if day.month == 12 and day.day == 31: if day.month == 12 and day.day == 31:
next_new_year = _observed_fixed_holiday(day.year + 1, 1, 1) next_new_year = _observed_fixed_holiday(day.year + 1, 1, 1)
if next_new_year.year == day.year: if next_new_year.year == day.year:

View File

@ -3,8 +3,8 @@ from typing import Optional
from orchestrator.config import OrchestratorConfig from orchestrator.config import OrchestratorConfig
from orchestrator.contracts.error_taxonomy import ReasonCode from orchestrator.contracts.error_taxonomy import ReasonCode
from orchestrator.contracts.result_contract import FinalSignal, Signal, signal_reason_code from orchestrator.contracts.result_contract import CombinedSignalFailure, FinalSignal, Signal, signal_reason_code
from orchestrator.signals import Signal, FinalSignal, SignalMerger from orchestrator.signals import SignalMerger
from orchestrator.quant_runner import QuantRunner from orchestrator.quant_runner import QuantRunner
from orchestrator.llm_runner import LLMRunner from orchestrator.llm_runner import LLMRunner
@ -92,9 +92,16 @@ class TradingOrchestrator:
source_diagnostics["llm"] = {"reason_code": ReasonCode.LLM_SIGNAL_FAILED.value} source_diagnostics["llm"] = {"reason_code": ReasonCode.LLM_SIGNAL_FAILED.value}
llm_sig = None llm_sig = None
# merge raises ValueError if both None # Preserve diagnostics even when both lanes degrade and no FinalSignal can be produced.
if quant_sig is None and llm_sig is None: if quant_sig is None and llm_sig is None:
degradation_reasons.append(ReasonCode.BOTH_SIGNALS_UNAVAILABLE.value) degradation_reasons.append(ReasonCode.BOTH_SIGNALS_UNAVAILABLE.value)
raise CombinedSignalFailure(
"both quant and llm signals are None",
reason_codes=tuple(dict.fromkeys(degradation_reasons)),
source_diagnostics=source_diagnostics,
data_quality=self._summarize_data_quality(source_diagnostics),
)
final_signal = self._merger.merge( final_signal = self._merger.merge(
quant_sig, quant_sig,
llm_sig, llm_sig,

View File

@ -5,6 +5,7 @@ import pytest
import orchestrator.orchestrator as orchestrator_module import orchestrator.orchestrator as orchestrator_module
from orchestrator.config import OrchestratorConfig from orchestrator.config import OrchestratorConfig
from orchestrator.contracts.error_taxonomy import ReasonCode from orchestrator.contracts.error_taxonomy import ReasonCode
from orchestrator.contracts.result_contract import CombinedSignalFailure
from orchestrator.signals import Signal from orchestrator.signals import Signal
@ -107,11 +108,16 @@ def test_trading_orchestrator_raises_when_both_sources_degrade(monkeypatch):
monkeypatch.setattr(orchestrator_module, "QuantRunner", FakeQuantRunner) monkeypatch.setattr(orchestrator_module, "QuantRunner", FakeQuantRunner)
monkeypatch.setattr(orchestrator_module, "LLMRunner", FakeLLMRunner) monkeypatch.setattr(orchestrator_module, "LLMRunner", FakeLLMRunner)
with pytest.raises(ValueError, match="both quant and llm signals are None"): with pytest.raises(CombinedSignalFailure) as exc_info:
orchestrator_module.TradingOrchestrator( orchestrator_module.TradingOrchestrator(
OrchestratorConfig(quant_backtest_path="/tmp/quant") OrchestratorConfig(quant_backtest_path="/tmp/quant")
).get_combined_signal("AAPL", "2026-04-11") ).get_combined_signal("AAPL", "2026-04-11")
assert str(exc_info.value) == "both quant and llm signals are None"
assert exc_info.value.reason_codes[0] == ReasonCode.QUANT_NO_DATA.value
assert exc_info.value.reason_codes[-1] == ReasonCode.BOTH_SIGNALS_UNAVAILABLE.value
assert exc_info.value.source_diagnostics["quant"]["reason_code"] == ReasonCode.QUANT_NO_DATA.value
def test_trading_orchestrator_surfaces_provider_mismatch_summary_when_llm_degrades(monkeypatch): def test_trading_orchestrator_surfaces_provider_mismatch_summary_when_llm_degrades(monkeypatch):
class FakeQuantRunner: class FakeQuantRunner:

View File

@ -2,7 +2,7 @@ import asyncio
from datetime import datetime, timezone from datetime import datetime, timezone
from orchestrator.contracts.error_taxonomy import ReasonCode from orchestrator.contracts.error_taxonomy import ReasonCode
from orchestrator.contracts.result_contract import FinalSignal, Signal from orchestrator.contracts.result_contract import CombinedSignalFailure, FinalSignal, Signal
from orchestrator.live_mode import LiveMode from orchestrator.live_mode import LiveMode
@ -83,7 +83,12 @@ def test_live_mode_serializes_failure_contract_shape():
live_mode = LiveMode( live_mode = LiveMode(
_StubOrchestrator( _StubOrchestrator(
{ {
("AAPL", "2026-04-11"): ValueError("both quant and llm signals are None") ("AAPL", "2026-04-11"): CombinedSignalFailure(
"both quant and llm signals are None",
reason_codes=(ReasonCode.BOTH_SIGNALS_UNAVAILABLE.value, ReasonCode.PROVIDER_MISMATCH.value),
source_diagnostics={"llm": {"reason_code": ReasonCode.PROVIDER_MISMATCH.value}},
data_quality={"state": "provider_mismatch", "source": "llm"},
)
} }
) )
) )
@ -104,9 +109,14 @@ def test_live_mode_serializes_failure_contract_shape():
}, },
"degradation": { "degradation": {
"degraded": True, "degraded": True,
"reason_codes": [ReasonCode.BOTH_SIGNALS_UNAVAILABLE.value], "reason_codes": [
"source_diagnostics": {}, ReasonCode.BOTH_SIGNALS_UNAVAILABLE.value,
ReasonCode.PROVIDER_MISMATCH.value,
],
"source_diagnostics": {
"llm": {"reason_code": ReasonCode.PROVIDER_MISMATCH.value},
},
}, },
"data_quality": None, "data_quality": {"state": "provider_mismatch", "source": "llm"},
} }
] ]

View File

@ -1,6 +1,7 @@
import json
from datetime import date from datetime import date
from orchestrator.market_calendar import is_non_trading_day from orchestrator.market_calendar import get_market_holidays, is_non_trading_day, update_market_holidays
def test_is_non_trading_day_marks_a_share_holiday(): def test_is_non_trading_day_marks_a_share_holiday():
@ -13,3 +14,22 @@ def test_is_non_trading_day_marks_nyse_holiday():
def test_is_non_trading_day_leaves_regular_weekday_open(): def test_is_non_trading_day_leaves_regular_weekday_open():
assert is_non_trading_day('AAPL', date(2024, 3, 28)) is False assert is_non_trading_day('AAPL', date(2024, 3, 28)) is False
def test_update_market_holidays_creates_maintainable_future_year_entry(tmp_path):
data_path = tmp_path / "market_holidays.json"
data_path.write_text(json.dumps({"a_share": {}}))
update_market_holidays(
market="a_share",
year=2027,
holiday_dates=["2027-02-10", "2027-02-11"],
data_path=data_path,
)
assert get_market_holidays("a_share", 2027, data_path=data_path) == {
date(2027, 2, 10),
date(2027, 2, 11),
}
assert is_non_trading_day("600519.SS", date(2027, 2, 10)) is False
assert is_non_trading_day("600519.SS", date(2027, 2, 10), data_path=data_path) is True

View File

@ -18,6 +18,7 @@ from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import Response, FileResponse from fastapi.responses import Response, FileResponse
from fastapi.staticfiles import StaticFiles from fastapi.staticfiles import StaticFiles
from pydantic import BaseModel from pydantic import BaseModel
from tradingagents.default_config import get_default_config
from services import AnalysisService, JobService, ResultStore, build_request_context, load_migration_flags from services import AnalysisService, JobService, ResultStore, build_request_context, load_migration_flags
from services.executor import LegacySubprocessAnalysisExecutor from services.executor import LegacySubprocessAnalysisExecutor
@ -55,7 +56,7 @@ async def lifespan(app: FastAPI):
executor=LegacySubprocessAnalysisExecutor( executor=LegacySubprocessAnalysisExecutor(
analysis_python=ANALYSIS_PYTHON, analysis_python=ANALYSIS_PYTHON,
repo_root=REPO_ROOT, repo_root=REPO_ROOT,
api_key_resolver=_get_analysis_api_key, api_key_resolver=_get_analysis_provider_api_key,
process_registry=app.state.job_service.register_process, process_registry=app.state.job_service.register_process,
), ),
result_store=app.state.result_store, result_store=app.state.result_store,
@ -103,10 +104,8 @@ class ScreenRequest(BaseModel):
@app.get("/api/config/check") @app.get("/api/config/check")
async def check_config(): async def check_config():
"""Check if the app is configured (API key is set). """Check if the analysis provider is configured with a callable API key."""
The FastAPI backend receives ANTHROPIC_API_KEY as an env var when spawned by Tauri. configured = bool(_resolve_analysis_runtime_settings().get("provider_api_key"))
"""
configured = bool(_get_analysis_api_key())
return {"configured": configured} return {"configured": configured}
@ -151,7 +150,7 @@ _api_key: Optional[str] = None
def _get_api_key() -> Optional[str]: def _get_api_key() -> Optional[str]:
global _api_key global _api_key
if _api_key is None: if _api_key is None:
_api_key = os.environ.get("DASHBOARD_API_KEY") or os.environ.get("ANTHROPIC_API_KEY") _api_key = os.environ.get("DASHBOARD_API_KEY")
return _api_key return _api_key
def _check_api_key(api_key: Optional[str]) -> bool: def _check_api_key(api_key: Optional[str]) -> bool:
@ -181,15 +180,73 @@ def _persist_analysis_api_key(api_key_value: str):
CONFIG_PATH.parent.mkdir(parents=True, exist_ok=True) CONFIG_PATH.parent.mkdir(parents=True, exist_ok=True)
CONFIG_PATH.write_text(json.dumps({"api_key": api_key_value}, ensure_ascii=False)) CONFIG_PATH.write_text(json.dumps({"api_key": api_key_value}, ensure_ascii=False))
os.chmod(CONFIG_PATH, 0o600) os.chmod(CONFIG_PATH, 0o600)
os.environ["ANTHROPIC_API_KEY"] = api_key_value
_api_key = None _api_key = None
def _get_analysis_api_key() -> Optional[str]: def _get_analysis_provider_api_key(provider: str, saved_api_key: Optional[str] = None) -> Optional[str]:
return ( env_names = {
os.environ.get("ANTHROPIC_API_KEY") "anthropic": ("ANTHROPIC_API_KEY", "MINIMAX_API_KEY"),
or os.environ.get("MINIMAX_API_KEY") "openai": ("OPENAI_API_KEY",),
or _load_saved_config().get("api_key") "openrouter": ("OPENROUTER_API_KEY",),
"xai": ("XAI_API_KEY",),
"google": ("GOOGLE_API_KEY",),
"ollama": tuple(),
}.get(provider.lower(), tuple())
for env_name in env_names:
value = os.environ.get(env_name)
if value:
return value
return saved_api_key
def _resolve_analysis_runtime_settings() -> dict:
saved = _load_saved_config()
defaults = get_default_config()
provider = os.environ.get("TRADINGAGENTS_LLM_PROVIDER")
if not provider:
if os.environ.get("ANTHROPIC_BASE_URL"):
provider = "anthropic"
elif os.environ.get("OPENAI_BASE_URL"):
provider = "openai"
else:
provider = defaults.get("llm_provider", "anthropic")
backend_url = (
os.environ.get("TRADINGAGENTS_BACKEND_URL")
or os.environ.get("ANTHROPIC_BASE_URL")
or os.environ.get("OPENAI_BASE_URL")
or defaults.get("backend_url")
)
deep_model = (
os.environ.get("TRADINGAGENTS_DEEP_MODEL")
or os.environ.get("TRADINGAGENTS_MODEL")
or defaults.get("deep_think_llm")
)
quick_model = (
os.environ.get("TRADINGAGENTS_QUICK_MODEL")
or os.environ.get("TRADINGAGENTS_MODEL")
or defaults.get("quick_think_llm")
)
return {
"llm_provider": provider,
"backend_url": backend_url,
"deep_think_llm": deep_model,
"quick_think_llm": quick_model,
"provider_api_key": _get_analysis_provider_api_key(provider, saved.get("api_key")),
}
def _build_analysis_request_context(request: Request, auth_key: Optional[str]):
settings = _resolve_analysis_runtime_settings()
return build_request_context(
request,
auth_key=auth_key,
provider_api_key=settings["provider_api_key"],
llm_provider=settings["llm_provider"],
backend_url=settings["backend_url"],
deep_think_llm=settings["deep_think_llm"],
quick_think_llm=settings["quick_think_llm"],
) )
@ -279,7 +336,7 @@ async def start_analysis(
task_id = f"{payload.ticker}_{datetime.now().strftime('%Y%m%d_%H%M%S')}_{uuid.uuid4().hex[:6]}" task_id = f"{payload.ticker}_{datetime.now().strftime('%Y%m%d_%H%M%S')}_{uuid.uuid4().hex[:6]}"
date = payload.date or datetime.now().strftime("%Y-%m-%d") date = payload.date or datetime.now().strftime("%Y-%m-%d")
request_context = build_request_context(http_request, api_key=api_key) request_context = _build_analysis_request_context(http_request, api_key)
try: try:
return await app.state.analysis_service.start_analysis( return await app.state.analysis_service.start_analysis(
@ -838,7 +895,7 @@ async def start_portfolio_analysis(
date = datetime.now().strftime("%Y-%m-%d") date = datetime.now().strftime("%Y-%m-%d")
task_id = f"port_{date}_{uuid.uuid4().hex[:6]}" task_id = f"port_{date}_{uuid.uuid4().hex[:6]}"
request_context = build_request_context(http_request, api_key=api_key) request_context = _build_analysis_request_context(http_request, api_key)
try: try:
return await app.state.analysis_service.start_portfolio_analysis( return await app.state.analysis_service.start_portfolio_analysis(

View File

@ -154,6 +154,12 @@ class AnalysisService:
started_at=start_time, started_at=start_time,
code=exc.code, code=exc.code,
retryable=exc.retryable, retryable=exc.retryable,
degradation={
"degraded": bool(exc.degrade_reason_codes) or bool(exc.data_quality),
"reason_codes": list(exc.degrade_reason_codes),
"source_diagnostics": exc.source_diagnostics or {},
} if (exc.degrade_reason_codes or exc.data_quality or exc.source_diagnostics) else None,
data_quality=exc.data_quality,
) )
except Exception as exc: except Exception as exc:
self._fail_analysis_state( self._fail_analysis_state(
@ -162,6 +168,8 @@ class AnalysisService:
started_at=start_time, started_at=start_time,
code="analysis_failed", code="analysis_failed",
retryable=False, retryable=False,
degradation=None,
data_quality=None,
) )
await broadcast_progress(task_id, self.job_service.task_results[task_id]) await broadcast_progress(task_id, self.job_service.task_results[task_id])
@ -279,12 +287,16 @@ class AnalysisService:
started_at: float, started_at: float,
code: str, code: str,
retryable: bool, retryable: bool,
degradation: Optional[dict],
data_quality: Optional[dict],
) -> None: ) -> None:
state = self.job_service.task_results[task_id] state = self.job_service.task_results[task_id]
state["status"] = "failed" state["status"] = "failed"
state["elapsed_seconds"] = int(time.monotonic() - started_at) state["elapsed_seconds"] = int(time.monotonic() - started_at)
state["elapsed"] = state["elapsed_seconds"] state["elapsed"] = state["elapsed_seconds"]
state["result"] = None state["result"] = None
state["degradation_summary"] = degradation
state["data_quality_summary"] = data_quality
state["error"] = { state["error"] = {
"code": code, "code": code,
"message": message, "message": message,

View File

@ -75,7 +75,13 @@ print("STAGE:trading", flush=True)
try: try:
result = orchestrator.get_combined_signal(ticker, date) result = orchestrator.get_combined_signal(ticker, date)
except ValueError as exc: except Exception as exc:
result_meta = {
"degrade_reason_codes": list(getattr(exc, "reason_codes", ()) or ()),
"data_quality": getattr(exc, "data_quality", None),
"source_diagnostics": getattr(exc, "source_diagnostics", None),
}
print("RESULT_META:" + json.dumps(result_meta), file=sys.stderr, flush=True)
print("ANALYSIS_ERROR:" + str(exc), file=sys.stderr, flush=True) print("ANALYSIS_ERROR:" + str(exc), file=sys.stderr, flush=True)
sys.exit(1) sys.exit(1)
@ -214,10 +220,22 @@ class AnalysisExecutionOutput:
class AnalysisExecutorError(RuntimeError): class AnalysisExecutorError(RuntimeError):
def __init__(self, message: str, *, code: str = "analysis_failed", retryable: bool = False): def __init__(
self,
message: str,
*,
code: str = "analysis_failed",
retryable: bool = False,
degrade_reason_codes: tuple[str, ...] = (),
data_quality: Optional[dict] = None,
source_diagnostics: Optional[dict] = None,
):
super().__init__(message) super().__init__(message)
self.code = code self.code = code
self.retryable = retryable self.retryable = retryable
self.degrade_reason_codes = degrade_reason_codes
self.data_quality = data_quality
self.source_diagnostics = source_diagnostics
class AnalysisExecutor(Protocol): class AnalysisExecutor(Protocol):
@ -240,7 +258,7 @@ class LegacySubprocessAnalysisExecutor:
*, *,
analysis_python: Path, analysis_python: Path,
repo_root: Path, repo_root: Path,
api_key_resolver: Callable[[], Optional[str]], api_key_resolver: Callable[..., Optional[str]],
process_registry: Optional[ProcessRegistry] = None, process_registry: Optional[ProcessRegistry] = None,
script_template: str = LEGACY_ANALYSIS_SCRIPT_TEMPLATE, script_template: str = LEGACY_ANALYSIS_SCRIPT_TEMPLATE,
stdout_timeout_secs: float = 300.0, stdout_timeout_secs: float = 300.0,
@ -261,9 +279,10 @@ class LegacySubprocessAnalysisExecutor:
request_context: RequestContext, request_context: RequestContext,
on_stage: Optional[StageCallback] = None, on_stage: Optional[StageCallback] = None,
) -> AnalysisExecutionOutput: ) -> AnalysisExecutionOutput:
analysis_api_key = request_context.api_key or self.api_key_resolver() llm_provider = (request_context.llm_provider or "anthropic").lower()
if not analysis_api_key: analysis_api_key = request_context.provider_api_key or self._resolve_provider_api_key(llm_provider)
raise RuntimeError("ANTHROPIC_API_KEY environment variable not set") if llm_provider != "ollama" and not analysis_api_key:
raise RuntimeError(f"{llm_provider} provider API key not configured")
script_path: Optional[Path] = None script_path: Optional[Path] = None
proc: asyncio.subprocess.Process | None = None proc: asyncio.subprocess.Process | None = None
@ -279,7 +298,16 @@ class LegacySubprocessAnalysisExecutor:
for key, value in os.environ.items() for key, value in os.environ.items()
if not key.startswith(("PYTHON", "CONDA", "VIRTUAL")) if not key.startswith(("PYTHON", "CONDA", "VIRTUAL"))
} }
clean_env["ANTHROPIC_API_KEY"] = analysis_api_key clean_env["TRADINGAGENTS_LLM_PROVIDER"] = llm_provider
if request_context.backend_url:
clean_env["TRADINGAGENTS_BACKEND_URL"] = request_context.backend_url
if request_context.deep_think_llm:
clean_env["TRADINGAGENTS_DEEP_MODEL"] = request_context.deep_think_llm
if request_context.quick_think_llm:
clean_env["TRADINGAGENTS_QUICK_MODEL"] = request_context.quick_think_llm
for env_name in self._provider_api_env_names(llm_provider):
if analysis_api_key:
clean_env[env_name] = analysis_api_key
proc = await asyncio.create_subprocess_exec( proc = await asyncio.create_subprocess_exec(
str(self.analysis_python), str(self.analysis_python),
@ -317,9 +345,22 @@ class LegacySubprocessAnalysisExecutor:
await proc.wait() await proc.wait()
stderr_bytes = await proc.stderr.read() if proc.stderr is not None else b"" stderr_bytes = await proc.stderr.read() if proc.stderr is not None else b""
stderr_lines = stderr_bytes.decode(errors="replace").splitlines() if stderr_bytes else []
if proc.returncode != 0: if proc.returncode != 0:
message = stderr_bytes.decode(errors="replace")[-1000:] if stderr_bytes else f"exit {proc.returncode}" failure_meta = self._parse_failure_metadata(stdout_lines, stderr_lines)
raise AnalysisExecutorError(message) message = self._extract_error_message(stderr_lines) or (stderr_bytes.decode(errors="replace")[-1000:] if stderr_bytes else f"exit {proc.returncode}")
if failure_meta is None:
raise AnalysisExecutorError(
"analysis subprocess failed without required markers: RESULT_META",
code="analysis_protocol_failed",
)
raise AnalysisExecutorError(
message,
code="analysis_failed",
degrade_reason_codes=failure_meta["degrade_reason_codes"],
data_quality=failure_meta["data_quality"],
source_diagnostics=failure_meta["source_diagnostics"],
)
return self._parse_output( return self._parse_output(
stdout_lines=stdout_lines, stdout_lines=stdout_lines,
@ -347,6 +388,48 @@ class LegacySubprocessAnalysisExecutor:
return return
await proc.wait() await proc.wait()
def _resolve_provider_api_key(self, provider: str) -> Optional[str]:
try:
return self.api_key_resolver(provider) # type: ignore[misc]
except TypeError:
return self.api_key_resolver()
@staticmethod
def _provider_api_env_names(provider: str) -> tuple[str, ...]:
return {
"anthropic": ("ANTHROPIC_API_KEY",),
"openai": ("OPENAI_API_KEY",),
"openrouter": ("OPENROUTER_API_KEY",),
"xai": ("XAI_API_KEY",),
"google": ("GOOGLE_API_KEY",),
"ollama": tuple(),
}.get(provider, tuple())
@staticmethod
def _parse_failure_metadata(stdout_lines: list[str], stderr_lines: list[str]) -> Optional[dict]:
for line in [*stdout_lines, *stderr_lines]:
if line.startswith("RESULT_META:"):
try:
detail = json.loads(line.split(":", 1)[1].strip())
except Exception as exc:
raise AnalysisExecutorError(
"failed to parse RESULT_META payload",
code="analysis_protocol_failed",
) from exc
return {
"degrade_reason_codes": tuple(detail.get("degrade_reason_codes") or ()),
"data_quality": detail.get("data_quality"),
"source_diagnostics": detail.get("source_diagnostics"),
}
return None
@staticmethod
def _extract_error_message(stderr_lines: list[str]) -> Optional[str]:
for line in stderr_lines:
if line.startswith("ANALYSIS_ERROR:"):
return line.split(":", 1)[1].strip()
return None
@staticmethod @staticmethod
def _parse_output( def _parse_output(
*, *,
@ -393,6 +476,8 @@ class LegacySubprocessAnalysisExecutor:
missing_markers = [] missing_markers = []
if not seen_signal_detail: if not seen_signal_detail:
missing_markers.append("SIGNAL_DETAIL") missing_markers.append("SIGNAL_DETAIL")
if not seen_result_meta:
missing_markers.append("RESULT_META")
if not seen_complete: if not seen_complete:
missing_markers.append("ANALYSIS_COMPLETE") missing_markers.append("ANALYSIS_COMPLETE")
if missing_markers: if missing_markers:

View File

@ -18,7 +18,12 @@ class RequestContext:
request_id: str request_id: str
contract_version: str = CONTRACT_VERSION contract_version: str = CONTRACT_VERSION
executor_type: str = DEFAULT_EXECUTOR_TYPE executor_type: str = DEFAULT_EXECUTOR_TYPE
api_key: Optional[str] = None auth_key: Optional[str] = None
provider_api_key: Optional[str] = None
llm_provider: Optional[str] = None
backend_url: Optional[str] = None
deep_think_llm: Optional[str] = None
quick_think_llm: Optional[str] = None
client_host: Optional[str] = None client_host: Optional[str] = None
is_local: bool = False is_local: bool = False
metadata: dict[str, str] = field(default_factory=dict) metadata: dict[str, str] = field(default_factory=dict)
@ -27,7 +32,12 @@ class RequestContext:
def build_request_context( def build_request_context(
request: Optional[Request] = None, request: Optional[Request] = None,
*, *,
api_key: Optional[str] = None, auth_key: Optional[str] = None,
provider_api_key: Optional[str] = None,
llm_provider: Optional[str] = None,
backend_url: Optional[str] = None,
deep_think_llm: Optional[str] = None,
quick_think_llm: Optional[str] = None,
request_id: Optional[str] = None, request_id: Optional[str] = None,
contract_version: str = CONTRACT_VERSION, contract_version: str = CONTRACT_VERSION,
executor_type: str = DEFAULT_EXECUTOR_TYPE, executor_type: str = DEFAULT_EXECUTOR_TYPE,
@ -40,7 +50,12 @@ def build_request_context(
request_id=request_id or uuid4().hex, request_id=request_id or uuid4().hex,
contract_version=contract_version, contract_version=contract_version,
executor_type=executor_type, executor_type=executor_type,
api_key=api_key, auth_key=auth_key,
provider_api_key=provider_api_key,
llm_provider=llm_provider,
backend_url=backend_url,
deep_think_llm=deep_think_llm,
quick_think_llm=quick_think_llm,
client_host=client_host, client_host=client_host,
is_local=is_local, is_local=is_local,
metadata=dict(metadata or {}), metadata=dict(metadata or {}),

View File

@ -140,7 +140,7 @@ def test_portfolio_analyze_route_uses_analysis_service_smoke(monkeypatch):
assert response.json()["status"] == "running" assert response.json()["status"] == "running"
assert str(captured["task_id"]).startswith("port_") assert str(captured["task_id"]).startswith("port_")
assert isinstance(captured["date"], str) assert isinstance(captured["date"], str)
assert captured["request_context"].api_key == "service-key" assert captured["request_context"].auth_key == "service-key"
assert callable(captured["broadcast_progress"]) assert callable(captured["broadcast_progress"])
@ -248,7 +248,7 @@ def test_orchestrator_websocket_smoke_is_contract_first(monkeypatch):
def test_orchestrator_websocket_rejects_unauthorized(monkeypatch): def test_orchestrator_websocket_rejects_unauthorized(monkeypatch):
monkeypatch.delenv("DASHBOARD_API_KEY", raising=False) monkeypatch.setenv("DASHBOARD_API_KEY", "dashboard-secret")
monkeypatch.setenv("ANTHROPIC_API_KEY", "test-key") monkeypatch.setenv("ANTHROPIC_API_KEY", "test-key")
main = _load_main_module(monkeypatch) main = _load_main_module(monkeypatch)

View File

@ -71,12 +71,16 @@ def test_executor_raises_when_required_markers_missing(monkeypatch):
) )
async def scenario(): async def scenario():
with pytest.raises(AnalysisExecutorError, match="required markers: ANALYSIS_COMPLETE"): with pytest.raises(AnalysisExecutorError, match="required markers: RESULT_META, ANALYSIS_COMPLETE"):
await executor.execute( await executor.execute(
task_id="task-1", task_id="task-1",
ticker="AAPL", ticker="AAPL",
date="2026-04-13", date="2026-04-13",
request_context=build_request_context(api_key="ctx-key"), request_context=build_request_context(
provider_api_key="ctx-key",
llm_provider="anthropic",
backend_url="https://api.minimaxi.com/anthropic",
),
) )
asyncio.run(scenario()) asyncio.run(scenario())
@ -103,7 +107,11 @@ def test_executor_kills_subprocess_on_timeout(monkeypatch):
task_id="task-2", task_id="task-2",
ticker="AAPL", ticker="AAPL",
date="2026-04-13", date="2026-04-13",
request_context=build_request_context(api_key="ctx-key"), request_context=build_request_context(
provider_api_key="ctx-key",
llm_provider="anthropic",
backend_url="https://api.minimaxi.com/anthropic",
),
) )
asyncio.run(scenario()) asyncio.run(scenario())
@ -136,3 +144,99 @@ def test_executor_marks_degraded_success_when_result_meta_reports_data_quality()
assert contract["status"] == "degraded_success" assert contract["status"] == "degraded_success"
assert contract["data_quality"]["state"] == "non_trading_day" assert contract["data_quality"]["state"] == "non_trading_day"
assert contract["degradation"]["reason_codes"] == ["non_trading_day"] assert contract["degradation"]["reason_codes"] == ["non_trading_day"]
def test_executor_requires_result_meta_on_success():
with pytest.raises(AnalysisExecutorError, match="required markers: RESULT_META"):
LegacySubprocessAnalysisExecutor._parse_output(
stdout_lines=[
'SIGNAL_DETAIL:{"quant_signal":"HOLD","llm_signal":"BUY","confidence":0.6}',
"ANALYSIS_COMPLETE:OVERWEIGHT",
],
ticker="AAPL",
date="2026-04-12",
contract_version="v1alpha1",
executor_type="legacy_subprocess",
)
def test_executor_injects_provider_specific_env(monkeypatch):
captured = {}
process = _FakeProcess(
_FakeStdout(
[
b'SIGNAL_DETAIL:{"quant_signal":"BUY","llm_signal":"BUY","confidence":0.8}\n',
b'RESULT_META:{"degrade_reason_codes":[],"data_quality":{"state":"ok"}}\n',
b"ANALYSIS_COMPLETE:BUY\n",
]
),
returncode=0,
)
async def fake_create_subprocess_exec(*args, **kwargs):
captured["env"] = kwargs["env"]
return process
monkeypatch.setattr(asyncio, "create_subprocess_exec", fake_create_subprocess_exec)
executor = LegacySubprocessAnalysisExecutor(
analysis_python=Path("/usr/bin/python3"),
repo_root=Path("."),
api_key_resolver=lambda provider="openai": "fallback-key",
)
async def scenario():
await executor.execute(
task_id="task-provider",
ticker="AAPL",
date="2026-04-13",
request_context=build_request_context(
auth_key="dashboard-key",
provider_api_key="provider-key",
llm_provider="openai",
backend_url="https://api.openai.com/v1",
deep_think_llm="gpt-5.4",
quick_think_llm="gpt-5.4-mini",
),
)
asyncio.run(scenario())
assert captured["env"]["TRADINGAGENTS_LLM_PROVIDER"] == "openai"
assert captured["env"]["TRADINGAGENTS_BACKEND_URL"] == "https://api.openai.com/v1"
assert captured["env"]["OPENAI_API_KEY"] == "provider-key"
assert "ANTHROPIC_API_KEY" not in captured["env"]
def test_executor_requires_result_meta_on_failure(monkeypatch):
process = _FakeProcess(
_FakeStdout([]),
stderr=b"ANALYSIS_ERROR:boom\n",
returncode=1,
)
async def fake_create_subprocess_exec(*args, **kwargs):
return process
monkeypatch.setattr(asyncio, "create_subprocess_exec", fake_create_subprocess_exec)
executor = LegacySubprocessAnalysisExecutor(
analysis_python=Path("/usr/bin/python3"),
repo_root=Path("."),
api_key_resolver=lambda: "env-key",
)
async def scenario():
with pytest.raises(AnalysisExecutorError, match="required markers: RESULT_META"):
await executor.execute(
task_id="task-failure",
ticker="AAPL",
date="2026-04-13",
request_context=build_request_context(
provider_api_key="ctx-key",
llm_provider="anthropic",
backend_url="https://api.minimaxi.com/anthropic",
),
)
asyncio.run(scenario())

View File

@ -45,9 +45,20 @@ def test_load_migration_flags_from_env(monkeypatch):
def test_build_request_context_defaults(): def test_build_request_context_defaults():
context = build_request_context(api_key="secret", metadata={"source": "test"}) context = build_request_context(
auth_key="dashboard-secret",
provider_api_key="provider-secret",
llm_provider="anthropic",
backend_url="https://api.minimaxi.com/anthropic",
deep_think_llm="MiniMax-M2.7-highspeed",
quick_think_llm="MiniMax-M2.7-highspeed",
metadata={"source": "test"},
)
assert context.api_key == "secret" assert context.auth_key == "dashboard-secret"
assert context.provider_api_key == "provider-secret"
assert context.llm_provider == "anthropic"
assert context.backend_url == "https://api.minimaxi.com/anthropic"
assert context.request_id assert context.request_id
assert context.contract_version == "v1alpha1" assert context.contract_version == "v1alpha1"
assert context.executor_type == "legacy_subprocess" assert context.executor_type == "legacy_subprocess"
@ -209,7 +220,12 @@ def test_analysis_service_start_analysis_uses_executor(tmp_path):
task_id="task-1", task_id="task-1",
ticker="AAPL", ticker="AAPL",
date="2026-04-13", date="2026-04-13",
request_context=build_request_context(api_key="secret"), request_context=build_request_context(
auth_key="dashboard-secret",
provider_api_key="provider-secret",
llm_provider="anthropic",
backend_url="https://api.minimaxi.com/anthropic",
),
broadcast_progress=_broadcast, broadcast_progress=_broadcast,
) )
await analysis_tasks["task-1"] await analysis_tasks["task-1"]