Harden executor configuration and failure contracts before further rollout

The rollout-ready branch still conflated dashboard auth with provider credentials, discarded diagnostics when both signal lanes degraded, and treated RESULT_META as optional even though downstream contracts now depend on it. This change separates provider runtime settings from request auth, preserves source diagnostics/data quality in full-failure contracts, requires RESULT_META in the subprocess protocol, and moves A-share holidays into an updateable calendar data source.

Constraint: No external market-calendar dependency is available in env312 and dependency policy forbids adding one casually
Rejected: Keep reading provider keys from request headers | couples dashboard auth to execution and breaks non-anthropic providers
Rejected: Leave both-signals-unavailable as a bare ValueError | loses diagnostics before live/backend contracts can serialize them
Rejected: Keep A-share holidays embedded in Python constants | requires code edits every year and preserves the stopgap design
Confidence: high
Scope-risk: moderate
Reversibility: clean
Directive: Keep subprocess protocol fields explicit and fail closed when RESULT_META is missing; do not route provider credentials through dashboard auth again
Tested: python -m pytest web_dashboard/backend/tests/test_executors.py web_dashboard/backend/tests/test_services_migration.py web_dashboard/backend/tests/test_api_smoke.py orchestrator/tests/test_market_calendar.py orchestrator/tests/test_live_mode.py orchestrator/tests/test_application_service.py orchestrator/tests/test_quant_runner.py orchestrator/tests/test_llm_runner.py -q
Tested: python -m compileall orchestrator web_dashboard/backend
Not-tested: real provider-backed execution across openai/google providers
Not-tested: browser/manual verification beyond existing frontend contract consumers
This commit is contained in:
陈少杰 2026-04-14 01:54:44 +08:00
parent a70b485d6f
commit 63858bf98b
15 changed files with 560 additions and 78 deletions

View File

@ -10,6 +10,7 @@ from orchestrator.contracts.config_schema import (
)
from orchestrator.contracts.error_taxonomy import ReasonCode
from orchestrator.contracts.result_contract import (
CombinedSignalFailure,
FinalSignal,
Signal,
build_error_signal,
@ -18,6 +19,7 @@ from orchestrator.contracts.result_contract import (
__all__ = [
"CONTRACT_VERSION",
"CombinedSignalFailure",
"FinalSignal",
"OrchestratorConfigSchema",
"ReasonCode",

View File

@ -97,3 +97,20 @@ def signal_reason_code(signal: Optional[Signal]) -> Optional[str]:
if signal is None:
return None
return signal.reason_code or signal.metadata.get("reason_code")
class CombinedSignalFailure(ValueError):
"""Structured failure for cases where no merged signal can be produced."""
def __init__(
self,
message: str,
*,
reason_codes: tuple[str, ...] = (),
source_diagnostics: Optional[dict[str, Any]] = None,
data_quality: Optional[dict[str, Any]] = None,
) -> None:
super().__init__(message)
self.reason_codes = tuple(reason_codes)
self.source_diagnostics = dict(source_diagnostics or {})
self.data_quality = dict(data_quality) if data_quality is not None else None

View File

@ -0,0 +1,101 @@
{
"a_share": {
"2024": [
"2024-01-01",
"2024-02-10",
"2024-02-11",
"2024-02-12",
"2024-02-13",
"2024-02-14",
"2024-02-15",
"2024-02-16",
"2024-02-17",
"2024-04-04",
"2024-04-05",
"2024-04-06",
"2024-05-01",
"2024-05-02",
"2024-05-03",
"2024-05-04",
"2024-05-05",
"2024-06-08",
"2024-06-09",
"2024-06-10",
"2024-09-15",
"2024-09-16",
"2024-09-17",
"2024-10-01",
"2024-10-02",
"2024-10-03",
"2024-10-04",
"2024-10-05",
"2024-10-06",
"2024-10-07"
],
"2025": [
"2025-01-01",
"2025-01-28",
"2025-01-29",
"2025-01-30",
"2025-01-31",
"2025-02-01",
"2025-02-02",
"2025-02-03",
"2025-02-04",
"2025-04-04",
"2025-04-05",
"2025-04-06",
"2025-05-01",
"2025-05-02",
"2025-05-03",
"2025-05-04",
"2025-05-05",
"2025-05-31",
"2025-06-01",
"2025-06-02",
"2025-10-01",
"2025-10-02",
"2025-10-03",
"2025-10-04",
"2025-10-05",
"2025-10-06",
"2025-10-07",
"2025-10-08"
],
"2026": [
"2026-01-01",
"2026-01-02",
"2026-01-03",
"2026-02-15",
"2026-02-16",
"2026-02-17",
"2026-02-18",
"2026-02-19",
"2026-02-20",
"2026-02-21",
"2026-02-22",
"2026-02-23",
"2026-04-04",
"2026-04-05",
"2026-04-06",
"2026-05-01",
"2026-05-02",
"2026-05-03",
"2026-05-04",
"2026-05-05",
"2026-06-19",
"2026-06-20",
"2026-06-21",
"2026-09-25",
"2026-09-26",
"2026-09-27",
"2026-10-01",
"2026-10-02",
"2026-10-03",
"2026-10-04",
"2026-10-05",
"2026-10-06",
"2026-10-07"
]
}
}

View File

@ -1,49 +1,80 @@
from __future__ import annotations
import json
import os
from datetime import date, timedelta
from pathlib import Path
_A_SHARE_SUFFIXES = {"SH", "SS", "SZ"}
# Mainland exchanges close on weekends plus the annual State Council public-holiday windows.
# Weekend make-up workdays do not become exchange trading days.
_A_SHARE_HOLIDAYS = {
date(2024, 1, 1),
*[date(2024, 2, day) for day in range(10, 18)],
*[date(2024, 4, day) for day in range(4, 7)],
*[date(2024, 5, day) for day in range(1, 6)],
*[date(2024, 6, day) for day in range(8, 11)],
*[date(2024, 9, day) for day in range(15, 18)],
*[date(2024, 10, day) for day in range(1, 8)],
date(2025, 1, 1),
*[date(2025, 1, day) for day in range(28, 32)],
*[date(2025, 2, day) for day in range(1, 5)],
*[date(2025, 4, day) for day in range(4, 7)],
*[date(2025, 5, day) for day in range(1, 6)],
*[date(2025, 5, day) for day in range(31, 32)],
*[date(2025, 6, day) for day in range(1, 3)],
*[date(2025, 10, day) for day in range(1, 9)],
*[date(2026, 1, day) for day in range(1, 4)],
*[date(2026, 2, day) for day in range(15, 24)],
*[date(2026, 4, day) for day in range(4, 7)],
*[date(2026, 5, day) for day in range(1, 6)],
*[date(2026, 6, day) for day in range(19, 22)],
*[date(2026, 9, day) for day in range(25, 28)],
*[date(2026, 10, day) for day in range(1, 8)],
}
_DEFAULT_MARKET_HOLIDAYS_PATH = Path(__file__).with_name("data") / "market_holidays.json"
def is_non_trading_day(ticker: str, day: date) -> bool:
def is_non_trading_day(ticker: str, day: date, *, data_path: Path | None = None) -> bool:
"""Return whether the requested date is a known non-trading day for the ticker's market."""
if day.weekday() >= 5:
return True
if _is_a_share_ticker(ticker):
return day in _A_SHARE_HOLIDAYS
return _is_nyse_holiday(day)
market = market_for_ticker(ticker)
if market == "a_share":
return day in get_market_holidays(market, day.year, data_path=data_path)
if market == "nyse":
return _is_nyse_holiday(day)
return False
def _is_a_share_ticker(ticker: str) -> bool:
def market_for_ticker(ticker: str) -> str:
suffix = ticker.rsplit(".", 1)[-1].upper() if "." in ticker else ""
return suffix in _A_SHARE_SUFFIXES
if suffix in _A_SHARE_SUFFIXES:
return "a_share"
return "nyse"
def get_market_holidays(market: str, year: int, *, data_path: Path | None = None) -> set[date]:
holidays_by_market = load_market_holidays(data_path=data_path)
market_data = holidays_by_market.get(market, {})
values = market_data.get(str(year), [])
return {date.fromisoformat(raw) for raw in values}
def load_market_holidays(*, data_path: Path | None = None) -> dict[str, dict[str, list[str]]]:
path = _resolve_market_holidays_path(data_path)
if not path.exists():
return {}
payload = json.loads(path.read_text())
return {
str(market): {str(year): list(days) for year, days in years.items()}
for market, years in payload.items()
}
def update_market_holidays(
*,
market: str,
year: int,
holiday_dates: list[date | str],
data_path: Path | None = None,
) -> Path:
path = _resolve_market_holidays_path(data_path)
payload = load_market_holidays(data_path=path)
payload.setdefault(market, {})
normalized_days = sorted(
{
item.isoformat() if isinstance(item, date) else date.fromisoformat(item).isoformat()
for item in holiday_dates
}
)
payload[market][str(year)] = normalized_days
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(json.dumps(payload, ensure_ascii=False, indent=2, sort_keys=True))
return path
def _resolve_market_holidays_path(data_path: Path | None = None) -> Path:
if data_path is not None:
return data_path
env_path = os.environ.get("TRADINGAGENTS_MARKET_HOLIDAYS_PATH")
if env_path:
return Path(env_path)
return _DEFAULT_MARKET_HOLIDAYS_PATH
def _is_nyse_holiday(day: date) -> bool:
@ -66,7 +97,6 @@ def _is_nyse_holiday(day: date) -> bool:
if day.year >= 2022:
holidays.add(observed_juneteenth)
# When Jan 1 falls on Saturday, NYSE observes New Year's Day on the prior Friday.
if day.month == 12 and day.day == 31:
next_new_year = _observed_fixed_holiday(day.year + 1, 1, 1)
if next_new_year.year == day.year:

View File

@ -3,8 +3,8 @@ from typing import Optional
from orchestrator.config import OrchestratorConfig
from orchestrator.contracts.error_taxonomy import ReasonCode
from orchestrator.contracts.result_contract import FinalSignal, Signal, signal_reason_code
from orchestrator.signals import Signal, FinalSignal, SignalMerger
from orchestrator.contracts.result_contract import CombinedSignalFailure, FinalSignal, Signal, signal_reason_code
from orchestrator.signals import SignalMerger
from orchestrator.quant_runner import QuantRunner
from orchestrator.llm_runner import LLMRunner
@ -92,9 +92,16 @@ class TradingOrchestrator:
source_diagnostics["llm"] = {"reason_code": ReasonCode.LLM_SIGNAL_FAILED.value}
llm_sig = None
# merge raises ValueError if both None
# Preserve diagnostics even when both lanes degrade and no FinalSignal can be produced.
if quant_sig is None and llm_sig is None:
degradation_reasons.append(ReasonCode.BOTH_SIGNALS_UNAVAILABLE.value)
raise CombinedSignalFailure(
"both quant and llm signals are None",
reason_codes=tuple(dict.fromkeys(degradation_reasons)),
source_diagnostics=source_diagnostics,
data_quality=self._summarize_data_quality(source_diagnostics),
)
final_signal = self._merger.merge(
quant_sig,
llm_sig,

View File

@ -5,6 +5,7 @@ import pytest
import orchestrator.orchestrator as orchestrator_module
from orchestrator.config import OrchestratorConfig
from orchestrator.contracts.error_taxonomy import ReasonCode
from orchestrator.contracts.result_contract import CombinedSignalFailure
from orchestrator.signals import Signal
@ -107,11 +108,16 @@ def test_trading_orchestrator_raises_when_both_sources_degrade(monkeypatch):
monkeypatch.setattr(orchestrator_module, "QuantRunner", FakeQuantRunner)
monkeypatch.setattr(orchestrator_module, "LLMRunner", FakeLLMRunner)
with pytest.raises(ValueError, match="both quant and llm signals are None"):
with pytest.raises(CombinedSignalFailure) as exc_info:
orchestrator_module.TradingOrchestrator(
OrchestratorConfig(quant_backtest_path="/tmp/quant")
).get_combined_signal("AAPL", "2026-04-11")
assert str(exc_info.value) == "both quant and llm signals are None"
assert exc_info.value.reason_codes[0] == ReasonCode.QUANT_NO_DATA.value
assert exc_info.value.reason_codes[-1] == ReasonCode.BOTH_SIGNALS_UNAVAILABLE.value
assert exc_info.value.source_diagnostics["quant"]["reason_code"] == ReasonCode.QUANT_NO_DATA.value
def test_trading_orchestrator_surfaces_provider_mismatch_summary_when_llm_degrades(monkeypatch):
class FakeQuantRunner:

View File

@ -2,7 +2,7 @@ import asyncio
from datetime import datetime, timezone
from orchestrator.contracts.error_taxonomy import ReasonCode
from orchestrator.contracts.result_contract import FinalSignal, Signal
from orchestrator.contracts.result_contract import CombinedSignalFailure, FinalSignal, Signal
from orchestrator.live_mode import LiveMode
@ -83,7 +83,12 @@ def test_live_mode_serializes_failure_contract_shape():
live_mode = LiveMode(
_StubOrchestrator(
{
("AAPL", "2026-04-11"): ValueError("both quant and llm signals are None")
("AAPL", "2026-04-11"): CombinedSignalFailure(
"both quant and llm signals are None",
reason_codes=(ReasonCode.BOTH_SIGNALS_UNAVAILABLE.value, ReasonCode.PROVIDER_MISMATCH.value),
source_diagnostics={"llm": {"reason_code": ReasonCode.PROVIDER_MISMATCH.value}},
data_quality={"state": "provider_mismatch", "source": "llm"},
)
}
)
)
@ -104,9 +109,14 @@ def test_live_mode_serializes_failure_contract_shape():
},
"degradation": {
"degraded": True,
"reason_codes": [ReasonCode.BOTH_SIGNALS_UNAVAILABLE.value],
"source_diagnostics": {},
"reason_codes": [
ReasonCode.BOTH_SIGNALS_UNAVAILABLE.value,
ReasonCode.PROVIDER_MISMATCH.value,
],
"source_diagnostics": {
"llm": {"reason_code": ReasonCode.PROVIDER_MISMATCH.value},
},
},
"data_quality": None,
"data_quality": {"state": "provider_mismatch", "source": "llm"},
}
]

View File

@ -1,6 +1,7 @@
import json
from datetime import date
from orchestrator.market_calendar import is_non_trading_day
from orchestrator.market_calendar import get_market_holidays, is_non_trading_day, update_market_holidays
def test_is_non_trading_day_marks_a_share_holiday():
@ -13,3 +14,22 @@ def test_is_non_trading_day_marks_nyse_holiday():
def test_is_non_trading_day_leaves_regular_weekday_open():
assert is_non_trading_day('AAPL', date(2024, 3, 28)) is False
def test_update_market_holidays_creates_maintainable_future_year_entry(tmp_path):
data_path = tmp_path / "market_holidays.json"
data_path.write_text(json.dumps({"a_share": {}}))
update_market_holidays(
market="a_share",
year=2027,
holiday_dates=["2027-02-10", "2027-02-11"],
data_path=data_path,
)
assert get_market_holidays("a_share", 2027, data_path=data_path) == {
date(2027, 2, 10),
date(2027, 2, 11),
}
assert is_non_trading_day("600519.SS", date(2027, 2, 10)) is False
assert is_non_trading_day("600519.SS", date(2027, 2, 10), data_path=data_path) is True

View File

@ -18,6 +18,7 @@ from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import Response, FileResponse
from fastapi.staticfiles import StaticFiles
from pydantic import BaseModel
from tradingagents.default_config import get_default_config
from services import AnalysisService, JobService, ResultStore, build_request_context, load_migration_flags
from services.executor import LegacySubprocessAnalysisExecutor
@ -55,7 +56,7 @@ async def lifespan(app: FastAPI):
executor=LegacySubprocessAnalysisExecutor(
analysis_python=ANALYSIS_PYTHON,
repo_root=REPO_ROOT,
api_key_resolver=_get_analysis_api_key,
api_key_resolver=_get_analysis_provider_api_key,
process_registry=app.state.job_service.register_process,
),
result_store=app.state.result_store,
@ -103,10 +104,8 @@ class ScreenRequest(BaseModel):
@app.get("/api/config/check")
async def check_config():
"""Check if the app is configured (API key is set).
The FastAPI backend receives ANTHROPIC_API_KEY as an env var when spawned by Tauri.
"""
configured = bool(_get_analysis_api_key())
"""Check if the analysis provider is configured with a callable API key."""
configured = bool(_resolve_analysis_runtime_settings().get("provider_api_key"))
return {"configured": configured}
@ -151,7 +150,7 @@ _api_key: Optional[str] = None
def _get_api_key() -> Optional[str]:
global _api_key
if _api_key is None:
_api_key = os.environ.get("DASHBOARD_API_KEY") or os.environ.get("ANTHROPIC_API_KEY")
_api_key = os.environ.get("DASHBOARD_API_KEY")
return _api_key
def _check_api_key(api_key: Optional[str]) -> bool:
@ -181,15 +180,73 @@ def _persist_analysis_api_key(api_key_value: str):
CONFIG_PATH.parent.mkdir(parents=True, exist_ok=True)
CONFIG_PATH.write_text(json.dumps({"api_key": api_key_value}, ensure_ascii=False))
os.chmod(CONFIG_PATH, 0o600)
os.environ["ANTHROPIC_API_KEY"] = api_key_value
_api_key = None
def _get_analysis_api_key() -> Optional[str]:
return (
os.environ.get("ANTHROPIC_API_KEY")
or os.environ.get("MINIMAX_API_KEY")
or _load_saved_config().get("api_key")
def _get_analysis_provider_api_key(provider: str, saved_api_key: Optional[str] = None) -> Optional[str]:
env_names = {
"anthropic": ("ANTHROPIC_API_KEY", "MINIMAX_API_KEY"),
"openai": ("OPENAI_API_KEY",),
"openrouter": ("OPENROUTER_API_KEY",),
"xai": ("XAI_API_KEY",),
"google": ("GOOGLE_API_KEY",),
"ollama": tuple(),
}.get(provider.lower(), tuple())
for env_name in env_names:
value = os.environ.get(env_name)
if value:
return value
return saved_api_key
def _resolve_analysis_runtime_settings() -> dict:
saved = _load_saved_config()
defaults = get_default_config()
provider = os.environ.get("TRADINGAGENTS_LLM_PROVIDER")
if not provider:
if os.environ.get("ANTHROPIC_BASE_URL"):
provider = "anthropic"
elif os.environ.get("OPENAI_BASE_URL"):
provider = "openai"
else:
provider = defaults.get("llm_provider", "anthropic")
backend_url = (
os.environ.get("TRADINGAGENTS_BACKEND_URL")
or os.environ.get("ANTHROPIC_BASE_URL")
or os.environ.get("OPENAI_BASE_URL")
or defaults.get("backend_url")
)
deep_model = (
os.environ.get("TRADINGAGENTS_DEEP_MODEL")
or os.environ.get("TRADINGAGENTS_MODEL")
or defaults.get("deep_think_llm")
)
quick_model = (
os.environ.get("TRADINGAGENTS_QUICK_MODEL")
or os.environ.get("TRADINGAGENTS_MODEL")
or defaults.get("quick_think_llm")
)
return {
"llm_provider": provider,
"backend_url": backend_url,
"deep_think_llm": deep_model,
"quick_think_llm": quick_model,
"provider_api_key": _get_analysis_provider_api_key(provider, saved.get("api_key")),
}
def _build_analysis_request_context(request: Request, auth_key: Optional[str]):
settings = _resolve_analysis_runtime_settings()
return build_request_context(
request,
auth_key=auth_key,
provider_api_key=settings["provider_api_key"],
llm_provider=settings["llm_provider"],
backend_url=settings["backend_url"],
deep_think_llm=settings["deep_think_llm"],
quick_think_llm=settings["quick_think_llm"],
)
@ -279,7 +336,7 @@ async def start_analysis(
task_id = f"{payload.ticker}_{datetime.now().strftime('%Y%m%d_%H%M%S')}_{uuid.uuid4().hex[:6]}"
date = payload.date or datetime.now().strftime("%Y-%m-%d")
request_context = build_request_context(http_request, api_key=api_key)
request_context = _build_analysis_request_context(http_request, api_key)
try:
return await app.state.analysis_service.start_analysis(
@ -838,7 +895,7 @@ async def start_portfolio_analysis(
date = datetime.now().strftime("%Y-%m-%d")
task_id = f"port_{date}_{uuid.uuid4().hex[:6]}"
request_context = build_request_context(http_request, api_key=api_key)
request_context = _build_analysis_request_context(http_request, api_key)
try:
return await app.state.analysis_service.start_portfolio_analysis(

View File

@ -154,6 +154,12 @@ class AnalysisService:
started_at=start_time,
code=exc.code,
retryable=exc.retryable,
degradation={
"degraded": bool(exc.degrade_reason_codes) or bool(exc.data_quality),
"reason_codes": list(exc.degrade_reason_codes),
"source_diagnostics": exc.source_diagnostics or {},
} if (exc.degrade_reason_codes or exc.data_quality or exc.source_diagnostics) else None,
data_quality=exc.data_quality,
)
except Exception as exc:
self._fail_analysis_state(
@ -162,6 +168,8 @@ class AnalysisService:
started_at=start_time,
code="analysis_failed",
retryable=False,
degradation=None,
data_quality=None,
)
await broadcast_progress(task_id, self.job_service.task_results[task_id])
@ -279,12 +287,16 @@ class AnalysisService:
started_at: float,
code: str,
retryable: bool,
degradation: Optional[dict],
data_quality: Optional[dict],
) -> None:
state = self.job_service.task_results[task_id]
state["status"] = "failed"
state["elapsed_seconds"] = int(time.monotonic() - started_at)
state["elapsed"] = state["elapsed_seconds"]
state["result"] = None
state["degradation_summary"] = degradation
state["data_quality_summary"] = data_quality
state["error"] = {
"code": code,
"message": message,

View File

@ -75,7 +75,13 @@ print("STAGE:trading", flush=True)
try:
result = orchestrator.get_combined_signal(ticker, date)
except ValueError as exc:
except Exception as exc:
result_meta = {
"degrade_reason_codes": list(getattr(exc, "reason_codes", ()) or ()),
"data_quality": getattr(exc, "data_quality", None),
"source_diagnostics": getattr(exc, "source_diagnostics", None),
}
print("RESULT_META:" + json.dumps(result_meta), file=sys.stderr, flush=True)
print("ANALYSIS_ERROR:" + str(exc), file=sys.stderr, flush=True)
sys.exit(1)
@ -214,10 +220,22 @@ class AnalysisExecutionOutput:
class AnalysisExecutorError(RuntimeError):
def __init__(self, message: str, *, code: str = "analysis_failed", retryable: bool = False):
def __init__(
self,
message: str,
*,
code: str = "analysis_failed",
retryable: bool = False,
degrade_reason_codes: tuple[str, ...] = (),
data_quality: Optional[dict] = None,
source_diagnostics: Optional[dict] = None,
):
super().__init__(message)
self.code = code
self.retryable = retryable
self.degrade_reason_codes = degrade_reason_codes
self.data_quality = data_quality
self.source_diagnostics = source_diagnostics
class AnalysisExecutor(Protocol):
@ -240,7 +258,7 @@ class LegacySubprocessAnalysisExecutor:
*,
analysis_python: Path,
repo_root: Path,
api_key_resolver: Callable[[], Optional[str]],
api_key_resolver: Callable[..., Optional[str]],
process_registry: Optional[ProcessRegistry] = None,
script_template: str = LEGACY_ANALYSIS_SCRIPT_TEMPLATE,
stdout_timeout_secs: float = 300.0,
@ -261,9 +279,10 @@ class LegacySubprocessAnalysisExecutor:
request_context: RequestContext,
on_stage: Optional[StageCallback] = None,
) -> AnalysisExecutionOutput:
analysis_api_key = request_context.api_key or self.api_key_resolver()
if not analysis_api_key:
raise RuntimeError("ANTHROPIC_API_KEY environment variable not set")
llm_provider = (request_context.llm_provider or "anthropic").lower()
analysis_api_key = request_context.provider_api_key or self._resolve_provider_api_key(llm_provider)
if llm_provider != "ollama" and not analysis_api_key:
raise RuntimeError(f"{llm_provider} provider API key not configured")
script_path: Optional[Path] = None
proc: asyncio.subprocess.Process | None = None
@ -279,7 +298,16 @@ class LegacySubprocessAnalysisExecutor:
for key, value in os.environ.items()
if not key.startswith(("PYTHON", "CONDA", "VIRTUAL"))
}
clean_env["ANTHROPIC_API_KEY"] = analysis_api_key
clean_env["TRADINGAGENTS_LLM_PROVIDER"] = llm_provider
if request_context.backend_url:
clean_env["TRADINGAGENTS_BACKEND_URL"] = request_context.backend_url
if request_context.deep_think_llm:
clean_env["TRADINGAGENTS_DEEP_MODEL"] = request_context.deep_think_llm
if request_context.quick_think_llm:
clean_env["TRADINGAGENTS_QUICK_MODEL"] = request_context.quick_think_llm
for env_name in self._provider_api_env_names(llm_provider):
if analysis_api_key:
clean_env[env_name] = analysis_api_key
proc = await asyncio.create_subprocess_exec(
str(self.analysis_python),
@ -317,9 +345,22 @@ class LegacySubprocessAnalysisExecutor:
await proc.wait()
stderr_bytes = await proc.stderr.read() if proc.stderr is not None else b""
stderr_lines = stderr_bytes.decode(errors="replace").splitlines() if stderr_bytes else []
if proc.returncode != 0:
message = stderr_bytes.decode(errors="replace")[-1000:] if stderr_bytes else f"exit {proc.returncode}"
raise AnalysisExecutorError(message)
failure_meta = self._parse_failure_metadata(stdout_lines, stderr_lines)
message = self._extract_error_message(stderr_lines) or (stderr_bytes.decode(errors="replace")[-1000:] if stderr_bytes else f"exit {proc.returncode}")
if failure_meta is None:
raise AnalysisExecutorError(
"analysis subprocess failed without required markers: RESULT_META",
code="analysis_protocol_failed",
)
raise AnalysisExecutorError(
message,
code="analysis_failed",
degrade_reason_codes=failure_meta["degrade_reason_codes"],
data_quality=failure_meta["data_quality"],
source_diagnostics=failure_meta["source_diagnostics"],
)
return self._parse_output(
stdout_lines=stdout_lines,
@ -347,6 +388,48 @@ class LegacySubprocessAnalysisExecutor:
return
await proc.wait()
def _resolve_provider_api_key(self, provider: str) -> Optional[str]:
try:
return self.api_key_resolver(provider) # type: ignore[misc]
except TypeError:
return self.api_key_resolver()
@staticmethod
def _provider_api_env_names(provider: str) -> tuple[str, ...]:
return {
"anthropic": ("ANTHROPIC_API_KEY",),
"openai": ("OPENAI_API_KEY",),
"openrouter": ("OPENROUTER_API_KEY",),
"xai": ("XAI_API_KEY",),
"google": ("GOOGLE_API_KEY",),
"ollama": tuple(),
}.get(provider, tuple())
@staticmethod
def _parse_failure_metadata(stdout_lines: list[str], stderr_lines: list[str]) -> Optional[dict]:
for line in [*stdout_lines, *stderr_lines]:
if line.startswith("RESULT_META:"):
try:
detail = json.loads(line.split(":", 1)[1].strip())
except Exception as exc:
raise AnalysisExecutorError(
"failed to parse RESULT_META payload",
code="analysis_protocol_failed",
) from exc
return {
"degrade_reason_codes": tuple(detail.get("degrade_reason_codes") or ()),
"data_quality": detail.get("data_quality"),
"source_diagnostics": detail.get("source_diagnostics"),
}
return None
@staticmethod
def _extract_error_message(stderr_lines: list[str]) -> Optional[str]:
for line in stderr_lines:
if line.startswith("ANALYSIS_ERROR:"):
return line.split(":", 1)[1].strip()
return None
@staticmethod
def _parse_output(
*,
@ -393,6 +476,8 @@ class LegacySubprocessAnalysisExecutor:
missing_markers = []
if not seen_signal_detail:
missing_markers.append("SIGNAL_DETAIL")
if not seen_result_meta:
missing_markers.append("RESULT_META")
if not seen_complete:
missing_markers.append("ANALYSIS_COMPLETE")
if missing_markers:

View File

@ -18,7 +18,12 @@ class RequestContext:
request_id: str
contract_version: str = CONTRACT_VERSION
executor_type: str = DEFAULT_EXECUTOR_TYPE
api_key: Optional[str] = None
auth_key: Optional[str] = None
provider_api_key: Optional[str] = None
llm_provider: Optional[str] = None
backend_url: Optional[str] = None
deep_think_llm: Optional[str] = None
quick_think_llm: Optional[str] = None
client_host: Optional[str] = None
is_local: bool = False
metadata: dict[str, str] = field(default_factory=dict)
@ -27,7 +32,12 @@ class RequestContext:
def build_request_context(
request: Optional[Request] = None,
*,
api_key: Optional[str] = None,
auth_key: Optional[str] = None,
provider_api_key: Optional[str] = None,
llm_provider: Optional[str] = None,
backend_url: Optional[str] = None,
deep_think_llm: Optional[str] = None,
quick_think_llm: Optional[str] = None,
request_id: Optional[str] = None,
contract_version: str = CONTRACT_VERSION,
executor_type: str = DEFAULT_EXECUTOR_TYPE,
@ -40,7 +50,12 @@ def build_request_context(
request_id=request_id or uuid4().hex,
contract_version=contract_version,
executor_type=executor_type,
api_key=api_key,
auth_key=auth_key,
provider_api_key=provider_api_key,
llm_provider=llm_provider,
backend_url=backend_url,
deep_think_llm=deep_think_llm,
quick_think_llm=quick_think_llm,
client_host=client_host,
is_local=is_local,
metadata=dict(metadata or {}),

View File

@ -140,7 +140,7 @@ def test_portfolio_analyze_route_uses_analysis_service_smoke(monkeypatch):
assert response.json()["status"] == "running"
assert str(captured["task_id"]).startswith("port_")
assert isinstance(captured["date"], str)
assert captured["request_context"].api_key == "service-key"
assert captured["request_context"].auth_key == "service-key"
assert callable(captured["broadcast_progress"])
@ -248,7 +248,7 @@ def test_orchestrator_websocket_smoke_is_contract_first(monkeypatch):
def test_orchestrator_websocket_rejects_unauthorized(monkeypatch):
monkeypatch.delenv("DASHBOARD_API_KEY", raising=False)
monkeypatch.setenv("DASHBOARD_API_KEY", "dashboard-secret")
monkeypatch.setenv("ANTHROPIC_API_KEY", "test-key")
main = _load_main_module(monkeypatch)

View File

@ -71,12 +71,16 @@ def test_executor_raises_when_required_markers_missing(monkeypatch):
)
async def scenario():
with pytest.raises(AnalysisExecutorError, match="required markers: ANALYSIS_COMPLETE"):
with pytest.raises(AnalysisExecutorError, match="required markers: RESULT_META, ANALYSIS_COMPLETE"):
await executor.execute(
task_id="task-1",
ticker="AAPL",
date="2026-04-13",
request_context=build_request_context(api_key="ctx-key"),
request_context=build_request_context(
provider_api_key="ctx-key",
llm_provider="anthropic",
backend_url="https://api.minimaxi.com/anthropic",
),
)
asyncio.run(scenario())
@ -103,7 +107,11 @@ def test_executor_kills_subprocess_on_timeout(monkeypatch):
task_id="task-2",
ticker="AAPL",
date="2026-04-13",
request_context=build_request_context(api_key="ctx-key"),
request_context=build_request_context(
provider_api_key="ctx-key",
llm_provider="anthropic",
backend_url="https://api.minimaxi.com/anthropic",
),
)
asyncio.run(scenario())
@ -136,3 +144,99 @@ def test_executor_marks_degraded_success_when_result_meta_reports_data_quality()
assert contract["status"] == "degraded_success"
assert contract["data_quality"]["state"] == "non_trading_day"
assert contract["degradation"]["reason_codes"] == ["non_trading_day"]
def test_executor_requires_result_meta_on_success():
with pytest.raises(AnalysisExecutorError, match="required markers: RESULT_META"):
LegacySubprocessAnalysisExecutor._parse_output(
stdout_lines=[
'SIGNAL_DETAIL:{"quant_signal":"HOLD","llm_signal":"BUY","confidence":0.6}',
"ANALYSIS_COMPLETE:OVERWEIGHT",
],
ticker="AAPL",
date="2026-04-12",
contract_version="v1alpha1",
executor_type="legacy_subprocess",
)
def test_executor_injects_provider_specific_env(monkeypatch):
captured = {}
process = _FakeProcess(
_FakeStdout(
[
b'SIGNAL_DETAIL:{"quant_signal":"BUY","llm_signal":"BUY","confidence":0.8}\n',
b'RESULT_META:{"degrade_reason_codes":[],"data_quality":{"state":"ok"}}\n',
b"ANALYSIS_COMPLETE:BUY\n",
]
),
returncode=0,
)
async def fake_create_subprocess_exec(*args, **kwargs):
captured["env"] = kwargs["env"]
return process
monkeypatch.setattr(asyncio, "create_subprocess_exec", fake_create_subprocess_exec)
executor = LegacySubprocessAnalysisExecutor(
analysis_python=Path("/usr/bin/python3"),
repo_root=Path("."),
api_key_resolver=lambda provider="openai": "fallback-key",
)
async def scenario():
await executor.execute(
task_id="task-provider",
ticker="AAPL",
date="2026-04-13",
request_context=build_request_context(
auth_key="dashboard-key",
provider_api_key="provider-key",
llm_provider="openai",
backend_url="https://api.openai.com/v1",
deep_think_llm="gpt-5.4",
quick_think_llm="gpt-5.4-mini",
),
)
asyncio.run(scenario())
assert captured["env"]["TRADINGAGENTS_LLM_PROVIDER"] == "openai"
assert captured["env"]["TRADINGAGENTS_BACKEND_URL"] == "https://api.openai.com/v1"
assert captured["env"]["OPENAI_API_KEY"] == "provider-key"
assert "ANTHROPIC_API_KEY" not in captured["env"]
def test_executor_requires_result_meta_on_failure(monkeypatch):
process = _FakeProcess(
_FakeStdout([]),
stderr=b"ANALYSIS_ERROR:boom\n",
returncode=1,
)
async def fake_create_subprocess_exec(*args, **kwargs):
return process
monkeypatch.setattr(asyncio, "create_subprocess_exec", fake_create_subprocess_exec)
executor = LegacySubprocessAnalysisExecutor(
analysis_python=Path("/usr/bin/python3"),
repo_root=Path("."),
api_key_resolver=lambda: "env-key",
)
async def scenario():
with pytest.raises(AnalysisExecutorError, match="required markers: RESULT_META"):
await executor.execute(
task_id="task-failure",
ticker="AAPL",
date="2026-04-13",
request_context=build_request_context(
provider_api_key="ctx-key",
llm_provider="anthropic",
backend_url="https://api.minimaxi.com/anthropic",
),
)
asyncio.run(scenario())

View File

@ -45,9 +45,20 @@ def test_load_migration_flags_from_env(monkeypatch):
def test_build_request_context_defaults():
context = build_request_context(api_key="secret", metadata={"source": "test"})
context = build_request_context(
auth_key="dashboard-secret",
provider_api_key="provider-secret",
llm_provider="anthropic",
backend_url="https://api.minimaxi.com/anthropic",
deep_think_llm="MiniMax-M2.7-highspeed",
quick_think_llm="MiniMax-M2.7-highspeed",
metadata={"source": "test"},
)
assert context.api_key == "secret"
assert context.auth_key == "dashboard-secret"
assert context.provider_api_key == "provider-secret"
assert context.llm_provider == "anthropic"
assert context.backend_url == "https://api.minimaxi.com/anthropic"
assert context.request_id
assert context.contract_version == "v1alpha1"
assert context.executor_type == "legacy_subprocess"
@ -209,7 +220,12 @@ def test_analysis_service_start_analysis_uses_executor(tmp_path):
task_id="task-1",
ticker="AAPL",
date="2026-04-13",
request_context=build_request_context(api_key="secret"),
request_context=build_request_context(
auth_key="dashboard-secret",
provider_api_key="provider-secret",
llm_provider="anthropic",
backend_url="https://api.minimaxi.com/anthropic",
),
broadcast_progress=_broadcast,
)
await analysis_tasks["task-1"]