Harden executor configuration and failure contracts before further rollout
The rollout-ready branch still conflated dashboard auth with provider credentials, discarded diagnostics when both signal lanes degraded, and treated RESULT_META as optional even though downstream contracts now depend on it. This change separates provider runtime settings from request auth, preserves source diagnostics/data quality in full-failure contracts, requires RESULT_META in the subprocess protocol, and moves A-share holidays into an updateable calendar data source. Constraint: No external market-calendar dependency is available in env312 and dependency policy forbids adding one casually Rejected: Keep reading provider keys from request headers | couples dashboard auth to execution and breaks non-anthropic providers Rejected: Leave both-signals-unavailable as a bare ValueError | loses diagnostics before live/backend contracts can serialize them Rejected: Keep A-share holidays embedded in Python constants | requires code edits every year and preserves the stopgap design Confidence: high Scope-risk: moderate Reversibility: clean Directive: Keep subprocess protocol fields explicit and fail closed when RESULT_META is missing; do not route provider credentials through dashboard auth again Tested: python -m pytest web_dashboard/backend/tests/test_executors.py web_dashboard/backend/tests/test_services_migration.py web_dashboard/backend/tests/test_api_smoke.py orchestrator/tests/test_market_calendar.py orchestrator/tests/test_live_mode.py orchestrator/tests/test_application_service.py orchestrator/tests/test_quant_runner.py orchestrator/tests/test_llm_runner.py -q Tested: python -m compileall orchestrator web_dashboard/backend Not-tested: real provider-backed execution across openai/google providers Not-tested: browser/manual verification beyond existing frontend contract consumers
This commit is contained in:
parent
a70b485d6f
commit
63858bf98b
|
|
@ -10,6 +10,7 @@ from orchestrator.contracts.config_schema import (
|
|||
)
|
||||
from orchestrator.contracts.error_taxonomy import ReasonCode
|
||||
from orchestrator.contracts.result_contract import (
|
||||
CombinedSignalFailure,
|
||||
FinalSignal,
|
||||
Signal,
|
||||
build_error_signal,
|
||||
|
|
@ -18,6 +19,7 @@ from orchestrator.contracts.result_contract import (
|
|||
|
||||
__all__ = [
|
||||
"CONTRACT_VERSION",
|
||||
"CombinedSignalFailure",
|
||||
"FinalSignal",
|
||||
"OrchestratorConfigSchema",
|
||||
"ReasonCode",
|
||||
|
|
|
|||
|
|
@ -97,3 +97,20 @@ def signal_reason_code(signal: Optional[Signal]) -> Optional[str]:
|
|||
if signal is None:
|
||||
return None
|
||||
return signal.reason_code or signal.metadata.get("reason_code")
|
||||
|
||||
|
||||
class CombinedSignalFailure(ValueError):
|
||||
"""Structured failure for cases where no merged signal can be produced."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
message: str,
|
||||
*,
|
||||
reason_codes: tuple[str, ...] = (),
|
||||
source_diagnostics: Optional[dict[str, Any]] = None,
|
||||
data_quality: Optional[dict[str, Any]] = None,
|
||||
) -> None:
|
||||
super().__init__(message)
|
||||
self.reason_codes = tuple(reason_codes)
|
||||
self.source_diagnostics = dict(source_diagnostics or {})
|
||||
self.data_quality = dict(data_quality) if data_quality is not None else None
|
||||
|
|
|
|||
|
|
@ -0,0 +1,101 @@
|
|||
{
|
||||
"a_share": {
|
||||
"2024": [
|
||||
"2024-01-01",
|
||||
"2024-02-10",
|
||||
"2024-02-11",
|
||||
"2024-02-12",
|
||||
"2024-02-13",
|
||||
"2024-02-14",
|
||||
"2024-02-15",
|
||||
"2024-02-16",
|
||||
"2024-02-17",
|
||||
"2024-04-04",
|
||||
"2024-04-05",
|
||||
"2024-04-06",
|
||||
"2024-05-01",
|
||||
"2024-05-02",
|
||||
"2024-05-03",
|
||||
"2024-05-04",
|
||||
"2024-05-05",
|
||||
"2024-06-08",
|
||||
"2024-06-09",
|
||||
"2024-06-10",
|
||||
"2024-09-15",
|
||||
"2024-09-16",
|
||||
"2024-09-17",
|
||||
"2024-10-01",
|
||||
"2024-10-02",
|
||||
"2024-10-03",
|
||||
"2024-10-04",
|
||||
"2024-10-05",
|
||||
"2024-10-06",
|
||||
"2024-10-07"
|
||||
],
|
||||
"2025": [
|
||||
"2025-01-01",
|
||||
"2025-01-28",
|
||||
"2025-01-29",
|
||||
"2025-01-30",
|
||||
"2025-01-31",
|
||||
"2025-02-01",
|
||||
"2025-02-02",
|
||||
"2025-02-03",
|
||||
"2025-02-04",
|
||||
"2025-04-04",
|
||||
"2025-04-05",
|
||||
"2025-04-06",
|
||||
"2025-05-01",
|
||||
"2025-05-02",
|
||||
"2025-05-03",
|
||||
"2025-05-04",
|
||||
"2025-05-05",
|
||||
"2025-05-31",
|
||||
"2025-06-01",
|
||||
"2025-06-02",
|
||||
"2025-10-01",
|
||||
"2025-10-02",
|
||||
"2025-10-03",
|
||||
"2025-10-04",
|
||||
"2025-10-05",
|
||||
"2025-10-06",
|
||||
"2025-10-07",
|
||||
"2025-10-08"
|
||||
],
|
||||
"2026": [
|
||||
"2026-01-01",
|
||||
"2026-01-02",
|
||||
"2026-01-03",
|
||||
"2026-02-15",
|
||||
"2026-02-16",
|
||||
"2026-02-17",
|
||||
"2026-02-18",
|
||||
"2026-02-19",
|
||||
"2026-02-20",
|
||||
"2026-02-21",
|
||||
"2026-02-22",
|
||||
"2026-02-23",
|
||||
"2026-04-04",
|
||||
"2026-04-05",
|
||||
"2026-04-06",
|
||||
"2026-05-01",
|
||||
"2026-05-02",
|
||||
"2026-05-03",
|
||||
"2026-05-04",
|
||||
"2026-05-05",
|
||||
"2026-06-19",
|
||||
"2026-06-20",
|
||||
"2026-06-21",
|
||||
"2026-09-25",
|
||||
"2026-09-26",
|
||||
"2026-09-27",
|
||||
"2026-10-01",
|
||||
"2026-10-02",
|
||||
"2026-10-03",
|
||||
"2026-10-04",
|
||||
"2026-10-05",
|
||||
"2026-10-06",
|
||||
"2026-10-07"
|
||||
]
|
||||
}
|
||||
}
|
||||
|
|
@ -1,49 +1,80 @@
|
|||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import os
|
||||
from datetime import date, timedelta
|
||||
from pathlib import Path
|
||||
|
||||
_A_SHARE_SUFFIXES = {"SH", "SS", "SZ"}
|
||||
|
||||
# Mainland exchanges close on weekends plus the annual State Council public-holiday windows.
|
||||
# Weekend make-up workdays do not become exchange trading days.
|
||||
_A_SHARE_HOLIDAYS = {
|
||||
date(2024, 1, 1),
|
||||
*[date(2024, 2, day) for day in range(10, 18)],
|
||||
*[date(2024, 4, day) for day in range(4, 7)],
|
||||
*[date(2024, 5, day) for day in range(1, 6)],
|
||||
*[date(2024, 6, day) for day in range(8, 11)],
|
||||
*[date(2024, 9, day) for day in range(15, 18)],
|
||||
*[date(2024, 10, day) for day in range(1, 8)],
|
||||
date(2025, 1, 1),
|
||||
*[date(2025, 1, day) for day in range(28, 32)],
|
||||
*[date(2025, 2, day) for day in range(1, 5)],
|
||||
*[date(2025, 4, day) for day in range(4, 7)],
|
||||
*[date(2025, 5, day) for day in range(1, 6)],
|
||||
*[date(2025, 5, day) for day in range(31, 32)],
|
||||
*[date(2025, 6, day) for day in range(1, 3)],
|
||||
*[date(2025, 10, day) for day in range(1, 9)],
|
||||
*[date(2026, 1, day) for day in range(1, 4)],
|
||||
*[date(2026, 2, day) for day in range(15, 24)],
|
||||
*[date(2026, 4, day) for day in range(4, 7)],
|
||||
*[date(2026, 5, day) for day in range(1, 6)],
|
||||
*[date(2026, 6, day) for day in range(19, 22)],
|
||||
*[date(2026, 9, day) for day in range(25, 28)],
|
||||
*[date(2026, 10, day) for day in range(1, 8)],
|
||||
}
|
||||
_DEFAULT_MARKET_HOLIDAYS_PATH = Path(__file__).with_name("data") / "market_holidays.json"
|
||||
|
||||
|
||||
def is_non_trading_day(ticker: str, day: date) -> bool:
|
||||
def is_non_trading_day(ticker: str, day: date, *, data_path: Path | None = None) -> bool:
|
||||
"""Return whether the requested date is a known non-trading day for the ticker's market."""
|
||||
if day.weekday() >= 5:
|
||||
return True
|
||||
if _is_a_share_ticker(ticker):
|
||||
return day in _A_SHARE_HOLIDAYS
|
||||
return _is_nyse_holiday(day)
|
||||
market = market_for_ticker(ticker)
|
||||
if market == "a_share":
|
||||
return day in get_market_holidays(market, day.year, data_path=data_path)
|
||||
if market == "nyse":
|
||||
return _is_nyse_holiday(day)
|
||||
return False
|
||||
|
||||
|
||||
def _is_a_share_ticker(ticker: str) -> bool:
|
||||
def market_for_ticker(ticker: str) -> str:
|
||||
suffix = ticker.rsplit(".", 1)[-1].upper() if "." in ticker else ""
|
||||
return suffix in _A_SHARE_SUFFIXES
|
||||
if suffix in _A_SHARE_SUFFIXES:
|
||||
return "a_share"
|
||||
return "nyse"
|
||||
|
||||
|
||||
def get_market_holidays(market: str, year: int, *, data_path: Path | None = None) -> set[date]:
|
||||
holidays_by_market = load_market_holidays(data_path=data_path)
|
||||
market_data = holidays_by_market.get(market, {})
|
||||
values = market_data.get(str(year), [])
|
||||
return {date.fromisoformat(raw) for raw in values}
|
||||
|
||||
|
||||
def load_market_holidays(*, data_path: Path | None = None) -> dict[str, dict[str, list[str]]]:
|
||||
path = _resolve_market_holidays_path(data_path)
|
||||
if not path.exists():
|
||||
return {}
|
||||
payload = json.loads(path.read_text())
|
||||
return {
|
||||
str(market): {str(year): list(days) for year, days in years.items()}
|
||||
for market, years in payload.items()
|
||||
}
|
||||
|
||||
|
||||
def update_market_holidays(
|
||||
*,
|
||||
market: str,
|
||||
year: int,
|
||||
holiday_dates: list[date | str],
|
||||
data_path: Path | None = None,
|
||||
) -> Path:
|
||||
path = _resolve_market_holidays_path(data_path)
|
||||
payload = load_market_holidays(data_path=path)
|
||||
payload.setdefault(market, {})
|
||||
normalized_days = sorted(
|
||||
{
|
||||
item.isoformat() if isinstance(item, date) else date.fromisoformat(item).isoformat()
|
||||
for item in holiday_dates
|
||||
}
|
||||
)
|
||||
payload[market][str(year)] = normalized_days
|
||||
path.parent.mkdir(parents=True, exist_ok=True)
|
||||
path.write_text(json.dumps(payload, ensure_ascii=False, indent=2, sort_keys=True))
|
||||
return path
|
||||
|
||||
|
||||
def _resolve_market_holidays_path(data_path: Path | None = None) -> Path:
|
||||
if data_path is not None:
|
||||
return data_path
|
||||
env_path = os.environ.get("TRADINGAGENTS_MARKET_HOLIDAYS_PATH")
|
||||
if env_path:
|
||||
return Path(env_path)
|
||||
return _DEFAULT_MARKET_HOLIDAYS_PATH
|
||||
|
||||
|
||||
def _is_nyse_holiday(day: date) -> bool:
|
||||
|
|
@ -66,7 +97,6 @@ def _is_nyse_holiday(day: date) -> bool:
|
|||
if day.year >= 2022:
|
||||
holidays.add(observed_juneteenth)
|
||||
|
||||
# When Jan 1 falls on Saturday, NYSE observes New Year's Day on the prior Friday.
|
||||
if day.month == 12 and day.day == 31:
|
||||
next_new_year = _observed_fixed_holiday(day.year + 1, 1, 1)
|
||||
if next_new_year.year == day.year:
|
||||
|
|
|
|||
|
|
@ -3,8 +3,8 @@ from typing import Optional
|
|||
|
||||
from orchestrator.config import OrchestratorConfig
|
||||
from orchestrator.contracts.error_taxonomy import ReasonCode
|
||||
from orchestrator.contracts.result_contract import FinalSignal, Signal, signal_reason_code
|
||||
from orchestrator.signals import Signal, FinalSignal, SignalMerger
|
||||
from orchestrator.contracts.result_contract import CombinedSignalFailure, FinalSignal, Signal, signal_reason_code
|
||||
from orchestrator.signals import SignalMerger
|
||||
from orchestrator.quant_runner import QuantRunner
|
||||
from orchestrator.llm_runner import LLMRunner
|
||||
|
||||
|
|
@ -92,9 +92,16 @@ class TradingOrchestrator:
|
|||
source_diagnostics["llm"] = {"reason_code": ReasonCode.LLM_SIGNAL_FAILED.value}
|
||||
llm_sig = None
|
||||
|
||||
# merge raises ValueError if both None
|
||||
# Preserve diagnostics even when both lanes degrade and no FinalSignal can be produced.
|
||||
if quant_sig is None and llm_sig is None:
|
||||
degradation_reasons.append(ReasonCode.BOTH_SIGNALS_UNAVAILABLE.value)
|
||||
raise CombinedSignalFailure(
|
||||
"both quant and llm signals are None",
|
||||
reason_codes=tuple(dict.fromkeys(degradation_reasons)),
|
||||
source_diagnostics=source_diagnostics,
|
||||
data_quality=self._summarize_data_quality(source_diagnostics),
|
||||
)
|
||||
|
||||
final_signal = self._merger.merge(
|
||||
quant_sig,
|
||||
llm_sig,
|
||||
|
|
|
|||
|
|
@ -5,6 +5,7 @@ import pytest
|
|||
import orchestrator.orchestrator as orchestrator_module
|
||||
from orchestrator.config import OrchestratorConfig
|
||||
from orchestrator.contracts.error_taxonomy import ReasonCode
|
||||
from orchestrator.contracts.result_contract import CombinedSignalFailure
|
||||
from orchestrator.signals import Signal
|
||||
|
||||
|
||||
|
|
@ -107,11 +108,16 @@ def test_trading_orchestrator_raises_when_both_sources_degrade(monkeypatch):
|
|||
monkeypatch.setattr(orchestrator_module, "QuantRunner", FakeQuantRunner)
|
||||
monkeypatch.setattr(orchestrator_module, "LLMRunner", FakeLLMRunner)
|
||||
|
||||
with pytest.raises(ValueError, match="both quant and llm signals are None"):
|
||||
with pytest.raises(CombinedSignalFailure) as exc_info:
|
||||
orchestrator_module.TradingOrchestrator(
|
||||
OrchestratorConfig(quant_backtest_path="/tmp/quant")
|
||||
).get_combined_signal("AAPL", "2026-04-11")
|
||||
|
||||
assert str(exc_info.value) == "both quant and llm signals are None"
|
||||
assert exc_info.value.reason_codes[0] == ReasonCode.QUANT_NO_DATA.value
|
||||
assert exc_info.value.reason_codes[-1] == ReasonCode.BOTH_SIGNALS_UNAVAILABLE.value
|
||||
assert exc_info.value.source_diagnostics["quant"]["reason_code"] == ReasonCode.QUANT_NO_DATA.value
|
||||
|
||||
|
||||
def test_trading_orchestrator_surfaces_provider_mismatch_summary_when_llm_degrades(monkeypatch):
|
||||
class FakeQuantRunner:
|
||||
|
|
|
|||
|
|
@ -2,7 +2,7 @@ import asyncio
|
|||
from datetime import datetime, timezone
|
||||
|
||||
from orchestrator.contracts.error_taxonomy import ReasonCode
|
||||
from orchestrator.contracts.result_contract import FinalSignal, Signal
|
||||
from orchestrator.contracts.result_contract import CombinedSignalFailure, FinalSignal, Signal
|
||||
from orchestrator.live_mode import LiveMode
|
||||
|
||||
|
||||
|
|
@ -83,7 +83,12 @@ def test_live_mode_serializes_failure_contract_shape():
|
|||
live_mode = LiveMode(
|
||||
_StubOrchestrator(
|
||||
{
|
||||
("AAPL", "2026-04-11"): ValueError("both quant and llm signals are None")
|
||||
("AAPL", "2026-04-11"): CombinedSignalFailure(
|
||||
"both quant and llm signals are None",
|
||||
reason_codes=(ReasonCode.BOTH_SIGNALS_UNAVAILABLE.value, ReasonCode.PROVIDER_MISMATCH.value),
|
||||
source_diagnostics={"llm": {"reason_code": ReasonCode.PROVIDER_MISMATCH.value}},
|
||||
data_quality={"state": "provider_mismatch", "source": "llm"},
|
||||
)
|
||||
}
|
||||
)
|
||||
)
|
||||
|
|
@ -104,9 +109,14 @@ def test_live_mode_serializes_failure_contract_shape():
|
|||
},
|
||||
"degradation": {
|
||||
"degraded": True,
|
||||
"reason_codes": [ReasonCode.BOTH_SIGNALS_UNAVAILABLE.value],
|
||||
"source_diagnostics": {},
|
||||
"reason_codes": [
|
||||
ReasonCode.BOTH_SIGNALS_UNAVAILABLE.value,
|
||||
ReasonCode.PROVIDER_MISMATCH.value,
|
||||
],
|
||||
"source_diagnostics": {
|
||||
"llm": {"reason_code": ReasonCode.PROVIDER_MISMATCH.value},
|
||||
},
|
||||
},
|
||||
"data_quality": None,
|
||||
"data_quality": {"state": "provider_mismatch", "source": "llm"},
|
||||
}
|
||||
]
|
||||
|
|
|
|||
|
|
@ -1,6 +1,7 @@
|
|||
import json
|
||||
from datetime import date
|
||||
|
||||
from orchestrator.market_calendar import is_non_trading_day
|
||||
from orchestrator.market_calendar import get_market_holidays, is_non_trading_day, update_market_holidays
|
||||
|
||||
|
||||
def test_is_non_trading_day_marks_a_share_holiday():
|
||||
|
|
@ -13,3 +14,22 @@ def test_is_non_trading_day_marks_nyse_holiday():
|
|||
|
||||
def test_is_non_trading_day_leaves_regular_weekday_open():
|
||||
assert is_non_trading_day('AAPL', date(2024, 3, 28)) is False
|
||||
|
||||
|
||||
def test_update_market_holidays_creates_maintainable_future_year_entry(tmp_path):
|
||||
data_path = tmp_path / "market_holidays.json"
|
||||
data_path.write_text(json.dumps({"a_share": {}}))
|
||||
|
||||
update_market_holidays(
|
||||
market="a_share",
|
||||
year=2027,
|
||||
holiday_dates=["2027-02-10", "2027-02-11"],
|
||||
data_path=data_path,
|
||||
)
|
||||
|
||||
assert get_market_holidays("a_share", 2027, data_path=data_path) == {
|
||||
date(2027, 2, 10),
|
||||
date(2027, 2, 11),
|
||||
}
|
||||
assert is_non_trading_day("600519.SS", date(2027, 2, 10)) is False
|
||||
assert is_non_trading_day("600519.SS", date(2027, 2, 10), data_path=data_path) is True
|
||||
|
|
|
|||
|
|
@ -18,6 +18,7 @@ from fastapi.middleware.cors import CORSMiddleware
|
|||
from fastapi.responses import Response, FileResponse
|
||||
from fastapi.staticfiles import StaticFiles
|
||||
from pydantic import BaseModel
|
||||
from tradingagents.default_config import get_default_config
|
||||
|
||||
from services import AnalysisService, JobService, ResultStore, build_request_context, load_migration_flags
|
||||
from services.executor import LegacySubprocessAnalysisExecutor
|
||||
|
|
@ -55,7 +56,7 @@ async def lifespan(app: FastAPI):
|
|||
executor=LegacySubprocessAnalysisExecutor(
|
||||
analysis_python=ANALYSIS_PYTHON,
|
||||
repo_root=REPO_ROOT,
|
||||
api_key_resolver=_get_analysis_api_key,
|
||||
api_key_resolver=_get_analysis_provider_api_key,
|
||||
process_registry=app.state.job_service.register_process,
|
||||
),
|
||||
result_store=app.state.result_store,
|
||||
|
|
@ -103,10 +104,8 @@ class ScreenRequest(BaseModel):
|
|||
|
||||
@app.get("/api/config/check")
|
||||
async def check_config():
|
||||
"""Check if the app is configured (API key is set).
|
||||
The FastAPI backend receives ANTHROPIC_API_KEY as an env var when spawned by Tauri.
|
||||
"""
|
||||
configured = bool(_get_analysis_api_key())
|
||||
"""Check if the analysis provider is configured with a callable API key."""
|
||||
configured = bool(_resolve_analysis_runtime_settings().get("provider_api_key"))
|
||||
return {"configured": configured}
|
||||
|
||||
|
||||
|
|
@ -151,7 +150,7 @@ _api_key: Optional[str] = None
|
|||
def _get_api_key() -> Optional[str]:
|
||||
global _api_key
|
||||
if _api_key is None:
|
||||
_api_key = os.environ.get("DASHBOARD_API_KEY") or os.environ.get("ANTHROPIC_API_KEY")
|
||||
_api_key = os.environ.get("DASHBOARD_API_KEY")
|
||||
return _api_key
|
||||
|
||||
def _check_api_key(api_key: Optional[str]) -> bool:
|
||||
|
|
@ -181,15 +180,73 @@ def _persist_analysis_api_key(api_key_value: str):
|
|||
CONFIG_PATH.parent.mkdir(parents=True, exist_ok=True)
|
||||
CONFIG_PATH.write_text(json.dumps({"api_key": api_key_value}, ensure_ascii=False))
|
||||
os.chmod(CONFIG_PATH, 0o600)
|
||||
os.environ["ANTHROPIC_API_KEY"] = api_key_value
|
||||
_api_key = None
|
||||
|
||||
|
||||
def _get_analysis_api_key() -> Optional[str]:
|
||||
return (
|
||||
os.environ.get("ANTHROPIC_API_KEY")
|
||||
or os.environ.get("MINIMAX_API_KEY")
|
||||
or _load_saved_config().get("api_key")
|
||||
def _get_analysis_provider_api_key(provider: str, saved_api_key: Optional[str] = None) -> Optional[str]:
|
||||
env_names = {
|
||||
"anthropic": ("ANTHROPIC_API_KEY", "MINIMAX_API_KEY"),
|
||||
"openai": ("OPENAI_API_KEY",),
|
||||
"openrouter": ("OPENROUTER_API_KEY",),
|
||||
"xai": ("XAI_API_KEY",),
|
||||
"google": ("GOOGLE_API_KEY",),
|
||||
"ollama": tuple(),
|
||||
}.get(provider.lower(), tuple())
|
||||
for env_name in env_names:
|
||||
value = os.environ.get(env_name)
|
||||
if value:
|
||||
return value
|
||||
return saved_api_key
|
||||
|
||||
|
||||
def _resolve_analysis_runtime_settings() -> dict:
|
||||
saved = _load_saved_config()
|
||||
defaults = get_default_config()
|
||||
|
||||
provider = os.environ.get("TRADINGAGENTS_LLM_PROVIDER")
|
||||
if not provider:
|
||||
if os.environ.get("ANTHROPIC_BASE_URL"):
|
||||
provider = "anthropic"
|
||||
elif os.environ.get("OPENAI_BASE_URL"):
|
||||
provider = "openai"
|
||||
else:
|
||||
provider = defaults.get("llm_provider", "anthropic")
|
||||
|
||||
backend_url = (
|
||||
os.environ.get("TRADINGAGENTS_BACKEND_URL")
|
||||
or os.environ.get("ANTHROPIC_BASE_URL")
|
||||
or os.environ.get("OPENAI_BASE_URL")
|
||||
or defaults.get("backend_url")
|
||||
)
|
||||
deep_model = (
|
||||
os.environ.get("TRADINGAGENTS_DEEP_MODEL")
|
||||
or os.environ.get("TRADINGAGENTS_MODEL")
|
||||
or defaults.get("deep_think_llm")
|
||||
)
|
||||
quick_model = (
|
||||
os.environ.get("TRADINGAGENTS_QUICK_MODEL")
|
||||
or os.environ.get("TRADINGAGENTS_MODEL")
|
||||
or defaults.get("quick_think_llm")
|
||||
)
|
||||
return {
|
||||
"llm_provider": provider,
|
||||
"backend_url": backend_url,
|
||||
"deep_think_llm": deep_model,
|
||||
"quick_think_llm": quick_model,
|
||||
"provider_api_key": _get_analysis_provider_api_key(provider, saved.get("api_key")),
|
||||
}
|
||||
|
||||
|
||||
def _build_analysis_request_context(request: Request, auth_key: Optional[str]):
|
||||
settings = _resolve_analysis_runtime_settings()
|
||||
return build_request_context(
|
||||
request,
|
||||
auth_key=auth_key,
|
||||
provider_api_key=settings["provider_api_key"],
|
||||
llm_provider=settings["llm_provider"],
|
||||
backend_url=settings["backend_url"],
|
||||
deep_think_llm=settings["deep_think_llm"],
|
||||
quick_think_llm=settings["quick_think_llm"],
|
||||
)
|
||||
|
||||
|
||||
|
|
@ -279,7 +336,7 @@ async def start_analysis(
|
|||
|
||||
task_id = f"{payload.ticker}_{datetime.now().strftime('%Y%m%d_%H%M%S')}_{uuid.uuid4().hex[:6]}"
|
||||
date = payload.date or datetime.now().strftime("%Y-%m-%d")
|
||||
request_context = build_request_context(http_request, api_key=api_key)
|
||||
request_context = _build_analysis_request_context(http_request, api_key)
|
||||
|
||||
try:
|
||||
return await app.state.analysis_service.start_analysis(
|
||||
|
|
@ -838,7 +895,7 @@ async def start_portfolio_analysis(
|
|||
|
||||
date = datetime.now().strftime("%Y-%m-%d")
|
||||
task_id = f"port_{date}_{uuid.uuid4().hex[:6]}"
|
||||
request_context = build_request_context(http_request, api_key=api_key)
|
||||
request_context = _build_analysis_request_context(http_request, api_key)
|
||||
|
||||
try:
|
||||
return await app.state.analysis_service.start_portfolio_analysis(
|
||||
|
|
|
|||
|
|
@ -154,6 +154,12 @@ class AnalysisService:
|
|||
started_at=start_time,
|
||||
code=exc.code,
|
||||
retryable=exc.retryable,
|
||||
degradation={
|
||||
"degraded": bool(exc.degrade_reason_codes) or bool(exc.data_quality),
|
||||
"reason_codes": list(exc.degrade_reason_codes),
|
||||
"source_diagnostics": exc.source_diagnostics or {},
|
||||
} if (exc.degrade_reason_codes or exc.data_quality or exc.source_diagnostics) else None,
|
||||
data_quality=exc.data_quality,
|
||||
)
|
||||
except Exception as exc:
|
||||
self._fail_analysis_state(
|
||||
|
|
@ -162,6 +168,8 @@ class AnalysisService:
|
|||
started_at=start_time,
|
||||
code="analysis_failed",
|
||||
retryable=False,
|
||||
degradation=None,
|
||||
data_quality=None,
|
||||
)
|
||||
|
||||
await broadcast_progress(task_id, self.job_service.task_results[task_id])
|
||||
|
|
@ -279,12 +287,16 @@ class AnalysisService:
|
|||
started_at: float,
|
||||
code: str,
|
||||
retryable: bool,
|
||||
degradation: Optional[dict],
|
||||
data_quality: Optional[dict],
|
||||
) -> None:
|
||||
state = self.job_service.task_results[task_id]
|
||||
state["status"] = "failed"
|
||||
state["elapsed_seconds"] = int(time.monotonic() - started_at)
|
||||
state["elapsed"] = state["elapsed_seconds"]
|
||||
state["result"] = None
|
||||
state["degradation_summary"] = degradation
|
||||
state["data_quality_summary"] = data_quality
|
||||
state["error"] = {
|
||||
"code": code,
|
||||
"message": message,
|
||||
|
|
|
|||
|
|
@ -75,7 +75,13 @@ print("STAGE:trading", flush=True)
|
|||
|
||||
try:
|
||||
result = orchestrator.get_combined_signal(ticker, date)
|
||||
except ValueError as exc:
|
||||
except Exception as exc:
|
||||
result_meta = {
|
||||
"degrade_reason_codes": list(getattr(exc, "reason_codes", ()) or ()),
|
||||
"data_quality": getattr(exc, "data_quality", None),
|
||||
"source_diagnostics": getattr(exc, "source_diagnostics", None),
|
||||
}
|
||||
print("RESULT_META:" + json.dumps(result_meta), file=sys.stderr, flush=True)
|
||||
print("ANALYSIS_ERROR:" + str(exc), file=sys.stderr, flush=True)
|
||||
sys.exit(1)
|
||||
|
||||
|
|
@ -214,10 +220,22 @@ class AnalysisExecutionOutput:
|
|||
|
||||
|
||||
class AnalysisExecutorError(RuntimeError):
|
||||
def __init__(self, message: str, *, code: str = "analysis_failed", retryable: bool = False):
|
||||
def __init__(
|
||||
self,
|
||||
message: str,
|
||||
*,
|
||||
code: str = "analysis_failed",
|
||||
retryable: bool = False,
|
||||
degrade_reason_codes: tuple[str, ...] = (),
|
||||
data_quality: Optional[dict] = None,
|
||||
source_diagnostics: Optional[dict] = None,
|
||||
):
|
||||
super().__init__(message)
|
||||
self.code = code
|
||||
self.retryable = retryable
|
||||
self.degrade_reason_codes = degrade_reason_codes
|
||||
self.data_quality = data_quality
|
||||
self.source_diagnostics = source_diagnostics
|
||||
|
||||
|
||||
class AnalysisExecutor(Protocol):
|
||||
|
|
@ -240,7 +258,7 @@ class LegacySubprocessAnalysisExecutor:
|
|||
*,
|
||||
analysis_python: Path,
|
||||
repo_root: Path,
|
||||
api_key_resolver: Callable[[], Optional[str]],
|
||||
api_key_resolver: Callable[..., Optional[str]],
|
||||
process_registry: Optional[ProcessRegistry] = None,
|
||||
script_template: str = LEGACY_ANALYSIS_SCRIPT_TEMPLATE,
|
||||
stdout_timeout_secs: float = 300.0,
|
||||
|
|
@ -261,9 +279,10 @@ class LegacySubprocessAnalysisExecutor:
|
|||
request_context: RequestContext,
|
||||
on_stage: Optional[StageCallback] = None,
|
||||
) -> AnalysisExecutionOutput:
|
||||
analysis_api_key = request_context.api_key or self.api_key_resolver()
|
||||
if not analysis_api_key:
|
||||
raise RuntimeError("ANTHROPIC_API_KEY environment variable not set")
|
||||
llm_provider = (request_context.llm_provider or "anthropic").lower()
|
||||
analysis_api_key = request_context.provider_api_key or self._resolve_provider_api_key(llm_provider)
|
||||
if llm_provider != "ollama" and not analysis_api_key:
|
||||
raise RuntimeError(f"{llm_provider} provider API key not configured")
|
||||
|
||||
script_path: Optional[Path] = None
|
||||
proc: asyncio.subprocess.Process | None = None
|
||||
|
|
@ -279,7 +298,16 @@ class LegacySubprocessAnalysisExecutor:
|
|||
for key, value in os.environ.items()
|
||||
if not key.startswith(("PYTHON", "CONDA", "VIRTUAL"))
|
||||
}
|
||||
clean_env["ANTHROPIC_API_KEY"] = analysis_api_key
|
||||
clean_env["TRADINGAGENTS_LLM_PROVIDER"] = llm_provider
|
||||
if request_context.backend_url:
|
||||
clean_env["TRADINGAGENTS_BACKEND_URL"] = request_context.backend_url
|
||||
if request_context.deep_think_llm:
|
||||
clean_env["TRADINGAGENTS_DEEP_MODEL"] = request_context.deep_think_llm
|
||||
if request_context.quick_think_llm:
|
||||
clean_env["TRADINGAGENTS_QUICK_MODEL"] = request_context.quick_think_llm
|
||||
for env_name in self._provider_api_env_names(llm_provider):
|
||||
if analysis_api_key:
|
||||
clean_env[env_name] = analysis_api_key
|
||||
|
||||
proc = await asyncio.create_subprocess_exec(
|
||||
str(self.analysis_python),
|
||||
|
|
@ -317,9 +345,22 @@ class LegacySubprocessAnalysisExecutor:
|
|||
|
||||
await proc.wait()
|
||||
stderr_bytes = await proc.stderr.read() if proc.stderr is not None else b""
|
||||
stderr_lines = stderr_bytes.decode(errors="replace").splitlines() if stderr_bytes else []
|
||||
if proc.returncode != 0:
|
||||
message = stderr_bytes.decode(errors="replace")[-1000:] if stderr_bytes else f"exit {proc.returncode}"
|
||||
raise AnalysisExecutorError(message)
|
||||
failure_meta = self._parse_failure_metadata(stdout_lines, stderr_lines)
|
||||
message = self._extract_error_message(stderr_lines) or (stderr_bytes.decode(errors="replace")[-1000:] if stderr_bytes else f"exit {proc.returncode}")
|
||||
if failure_meta is None:
|
||||
raise AnalysisExecutorError(
|
||||
"analysis subprocess failed without required markers: RESULT_META",
|
||||
code="analysis_protocol_failed",
|
||||
)
|
||||
raise AnalysisExecutorError(
|
||||
message,
|
||||
code="analysis_failed",
|
||||
degrade_reason_codes=failure_meta["degrade_reason_codes"],
|
||||
data_quality=failure_meta["data_quality"],
|
||||
source_diagnostics=failure_meta["source_diagnostics"],
|
||||
)
|
||||
|
||||
return self._parse_output(
|
||||
stdout_lines=stdout_lines,
|
||||
|
|
@ -347,6 +388,48 @@ class LegacySubprocessAnalysisExecutor:
|
|||
return
|
||||
await proc.wait()
|
||||
|
||||
def _resolve_provider_api_key(self, provider: str) -> Optional[str]:
|
||||
try:
|
||||
return self.api_key_resolver(provider) # type: ignore[misc]
|
||||
except TypeError:
|
||||
return self.api_key_resolver()
|
||||
|
||||
@staticmethod
|
||||
def _provider_api_env_names(provider: str) -> tuple[str, ...]:
|
||||
return {
|
||||
"anthropic": ("ANTHROPIC_API_KEY",),
|
||||
"openai": ("OPENAI_API_KEY",),
|
||||
"openrouter": ("OPENROUTER_API_KEY",),
|
||||
"xai": ("XAI_API_KEY",),
|
||||
"google": ("GOOGLE_API_KEY",),
|
||||
"ollama": tuple(),
|
||||
}.get(provider, tuple())
|
||||
|
||||
@staticmethod
|
||||
def _parse_failure_metadata(stdout_lines: list[str], stderr_lines: list[str]) -> Optional[dict]:
|
||||
for line in [*stdout_lines, *stderr_lines]:
|
||||
if line.startswith("RESULT_META:"):
|
||||
try:
|
||||
detail = json.loads(line.split(":", 1)[1].strip())
|
||||
except Exception as exc:
|
||||
raise AnalysisExecutorError(
|
||||
"failed to parse RESULT_META payload",
|
||||
code="analysis_protocol_failed",
|
||||
) from exc
|
||||
return {
|
||||
"degrade_reason_codes": tuple(detail.get("degrade_reason_codes") or ()),
|
||||
"data_quality": detail.get("data_quality"),
|
||||
"source_diagnostics": detail.get("source_diagnostics"),
|
||||
}
|
||||
return None
|
||||
|
||||
@staticmethod
|
||||
def _extract_error_message(stderr_lines: list[str]) -> Optional[str]:
|
||||
for line in stderr_lines:
|
||||
if line.startswith("ANALYSIS_ERROR:"):
|
||||
return line.split(":", 1)[1].strip()
|
||||
return None
|
||||
|
||||
@staticmethod
|
||||
def _parse_output(
|
||||
*,
|
||||
|
|
@ -393,6 +476,8 @@ class LegacySubprocessAnalysisExecutor:
|
|||
missing_markers = []
|
||||
if not seen_signal_detail:
|
||||
missing_markers.append("SIGNAL_DETAIL")
|
||||
if not seen_result_meta:
|
||||
missing_markers.append("RESULT_META")
|
||||
if not seen_complete:
|
||||
missing_markers.append("ANALYSIS_COMPLETE")
|
||||
if missing_markers:
|
||||
|
|
|
|||
|
|
@ -18,7 +18,12 @@ class RequestContext:
|
|||
request_id: str
|
||||
contract_version: str = CONTRACT_VERSION
|
||||
executor_type: str = DEFAULT_EXECUTOR_TYPE
|
||||
api_key: Optional[str] = None
|
||||
auth_key: Optional[str] = None
|
||||
provider_api_key: Optional[str] = None
|
||||
llm_provider: Optional[str] = None
|
||||
backend_url: Optional[str] = None
|
||||
deep_think_llm: Optional[str] = None
|
||||
quick_think_llm: Optional[str] = None
|
||||
client_host: Optional[str] = None
|
||||
is_local: bool = False
|
||||
metadata: dict[str, str] = field(default_factory=dict)
|
||||
|
|
@ -27,7 +32,12 @@ class RequestContext:
|
|||
def build_request_context(
|
||||
request: Optional[Request] = None,
|
||||
*,
|
||||
api_key: Optional[str] = None,
|
||||
auth_key: Optional[str] = None,
|
||||
provider_api_key: Optional[str] = None,
|
||||
llm_provider: Optional[str] = None,
|
||||
backend_url: Optional[str] = None,
|
||||
deep_think_llm: Optional[str] = None,
|
||||
quick_think_llm: Optional[str] = None,
|
||||
request_id: Optional[str] = None,
|
||||
contract_version: str = CONTRACT_VERSION,
|
||||
executor_type: str = DEFAULT_EXECUTOR_TYPE,
|
||||
|
|
@ -40,7 +50,12 @@ def build_request_context(
|
|||
request_id=request_id or uuid4().hex,
|
||||
contract_version=contract_version,
|
||||
executor_type=executor_type,
|
||||
api_key=api_key,
|
||||
auth_key=auth_key,
|
||||
provider_api_key=provider_api_key,
|
||||
llm_provider=llm_provider,
|
||||
backend_url=backend_url,
|
||||
deep_think_llm=deep_think_llm,
|
||||
quick_think_llm=quick_think_llm,
|
||||
client_host=client_host,
|
||||
is_local=is_local,
|
||||
metadata=dict(metadata or {}),
|
||||
|
|
|
|||
|
|
@ -140,7 +140,7 @@ def test_portfolio_analyze_route_uses_analysis_service_smoke(monkeypatch):
|
|||
assert response.json()["status"] == "running"
|
||||
assert str(captured["task_id"]).startswith("port_")
|
||||
assert isinstance(captured["date"], str)
|
||||
assert captured["request_context"].api_key == "service-key"
|
||||
assert captured["request_context"].auth_key == "service-key"
|
||||
assert callable(captured["broadcast_progress"])
|
||||
|
||||
|
||||
|
|
@ -248,7 +248,7 @@ def test_orchestrator_websocket_smoke_is_contract_first(monkeypatch):
|
|||
|
||||
|
||||
def test_orchestrator_websocket_rejects_unauthorized(monkeypatch):
|
||||
monkeypatch.delenv("DASHBOARD_API_KEY", raising=False)
|
||||
monkeypatch.setenv("DASHBOARD_API_KEY", "dashboard-secret")
|
||||
monkeypatch.setenv("ANTHROPIC_API_KEY", "test-key")
|
||||
|
||||
main = _load_main_module(monkeypatch)
|
||||
|
|
|
|||
|
|
@ -71,12 +71,16 @@ def test_executor_raises_when_required_markers_missing(monkeypatch):
|
|||
)
|
||||
|
||||
async def scenario():
|
||||
with pytest.raises(AnalysisExecutorError, match="required markers: ANALYSIS_COMPLETE"):
|
||||
with pytest.raises(AnalysisExecutorError, match="required markers: RESULT_META, ANALYSIS_COMPLETE"):
|
||||
await executor.execute(
|
||||
task_id="task-1",
|
||||
ticker="AAPL",
|
||||
date="2026-04-13",
|
||||
request_context=build_request_context(api_key="ctx-key"),
|
||||
request_context=build_request_context(
|
||||
provider_api_key="ctx-key",
|
||||
llm_provider="anthropic",
|
||||
backend_url="https://api.minimaxi.com/anthropic",
|
||||
),
|
||||
)
|
||||
|
||||
asyncio.run(scenario())
|
||||
|
|
@ -103,7 +107,11 @@ def test_executor_kills_subprocess_on_timeout(monkeypatch):
|
|||
task_id="task-2",
|
||||
ticker="AAPL",
|
||||
date="2026-04-13",
|
||||
request_context=build_request_context(api_key="ctx-key"),
|
||||
request_context=build_request_context(
|
||||
provider_api_key="ctx-key",
|
||||
llm_provider="anthropic",
|
||||
backend_url="https://api.minimaxi.com/anthropic",
|
||||
),
|
||||
)
|
||||
|
||||
asyncio.run(scenario())
|
||||
|
|
@ -136,3 +144,99 @@ def test_executor_marks_degraded_success_when_result_meta_reports_data_quality()
|
|||
assert contract["status"] == "degraded_success"
|
||||
assert contract["data_quality"]["state"] == "non_trading_day"
|
||||
assert contract["degradation"]["reason_codes"] == ["non_trading_day"]
|
||||
|
||||
|
||||
def test_executor_requires_result_meta_on_success():
|
||||
with pytest.raises(AnalysisExecutorError, match="required markers: RESULT_META"):
|
||||
LegacySubprocessAnalysisExecutor._parse_output(
|
||||
stdout_lines=[
|
||||
'SIGNAL_DETAIL:{"quant_signal":"HOLD","llm_signal":"BUY","confidence":0.6}',
|
||||
"ANALYSIS_COMPLETE:OVERWEIGHT",
|
||||
],
|
||||
ticker="AAPL",
|
||||
date="2026-04-12",
|
||||
contract_version="v1alpha1",
|
||||
executor_type="legacy_subprocess",
|
||||
)
|
||||
|
||||
|
||||
def test_executor_injects_provider_specific_env(monkeypatch):
|
||||
captured = {}
|
||||
process = _FakeProcess(
|
||||
_FakeStdout(
|
||||
[
|
||||
b'SIGNAL_DETAIL:{"quant_signal":"BUY","llm_signal":"BUY","confidence":0.8}\n',
|
||||
b'RESULT_META:{"degrade_reason_codes":[],"data_quality":{"state":"ok"}}\n',
|
||||
b"ANALYSIS_COMPLETE:BUY\n",
|
||||
]
|
||||
),
|
||||
returncode=0,
|
||||
)
|
||||
|
||||
async def fake_create_subprocess_exec(*args, **kwargs):
|
||||
captured["env"] = kwargs["env"]
|
||||
return process
|
||||
|
||||
monkeypatch.setattr(asyncio, "create_subprocess_exec", fake_create_subprocess_exec)
|
||||
|
||||
executor = LegacySubprocessAnalysisExecutor(
|
||||
analysis_python=Path("/usr/bin/python3"),
|
||||
repo_root=Path("."),
|
||||
api_key_resolver=lambda provider="openai": "fallback-key",
|
||||
)
|
||||
|
||||
async def scenario():
|
||||
await executor.execute(
|
||||
task_id="task-provider",
|
||||
ticker="AAPL",
|
||||
date="2026-04-13",
|
||||
request_context=build_request_context(
|
||||
auth_key="dashboard-key",
|
||||
provider_api_key="provider-key",
|
||||
llm_provider="openai",
|
||||
backend_url="https://api.openai.com/v1",
|
||||
deep_think_llm="gpt-5.4",
|
||||
quick_think_llm="gpt-5.4-mini",
|
||||
),
|
||||
)
|
||||
|
||||
asyncio.run(scenario())
|
||||
|
||||
assert captured["env"]["TRADINGAGENTS_LLM_PROVIDER"] == "openai"
|
||||
assert captured["env"]["TRADINGAGENTS_BACKEND_URL"] == "https://api.openai.com/v1"
|
||||
assert captured["env"]["OPENAI_API_KEY"] == "provider-key"
|
||||
assert "ANTHROPIC_API_KEY" not in captured["env"]
|
||||
|
||||
|
||||
def test_executor_requires_result_meta_on_failure(monkeypatch):
|
||||
process = _FakeProcess(
|
||||
_FakeStdout([]),
|
||||
stderr=b"ANALYSIS_ERROR:boom\n",
|
||||
returncode=1,
|
||||
)
|
||||
|
||||
async def fake_create_subprocess_exec(*args, **kwargs):
|
||||
return process
|
||||
|
||||
monkeypatch.setattr(asyncio, "create_subprocess_exec", fake_create_subprocess_exec)
|
||||
|
||||
executor = LegacySubprocessAnalysisExecutor(
|
||||
analysis_python=Path("/usr/bin/python3"),
|
||||
repo_root=Path("."),
|
||||
api_key_resolver=lambda: "env-key",
|
||||
)
|
||||
|
||||
async def scenario():
|
||||
with pytest.raises(AnalysisExecutorError, match="required markers: RESULT_META"):
|
||||
await executor.execute(
|
||||
task_id="task-failure",
|
||||
ticker="AAPL",
|
||||
date="2026-04-13",
|
||||
request_context=build_request_context(
|
||||
provider_api_key="ctx-key",
|
||||
llm_provider="anthropic",
|
||||
backend_url="https://api.minimaxi.com/anthropic",
|
||||
),
|
||||
)
|
||||
|
||||
asyncio.run(scenario())
|
||||
|
|
|
|||
|
|
@ -45,9 +45,20 @@ def test_load_migration_flags_from_env(monkeypatch):
|
|||
|
||||
|
||||
def test_build_request_context_defaults():
|
||||
context = build_request_context(api_key="secret", metadata={"source": "test"})
|
||||
context = build_request_context(
|
||||
auth_key="dashboard-secret",
|
||||
provider_api_key="provider-secret",
|
||||
llm_provider="anthropic",
|
||||
backend_url="https://api.minimaxi.com/anthropic",
|
||||
deep_think_llm="MiniMax-M2.7-highspeed",
|
||||
quick_think_llm="MiniMax-M2.7-highspeed",
|
||||
metadata={"source": "test"},
|
||||
)
|
||||
|
||||
assert context.api_key == "secret"
|
||||
assert context.auth_key == "dashboard-secret"
|
||||
assert context.provider_api_key == "provider-secret"
|
||||
assert context.llm_provider == "anthropic"
|
||||
assert context.backend_url == "https://api.minimaxi.com/anthropic"
|
||||
assert context.request_id
|
||||
assert context.contract_version == "v1alpha1"
|
||||
assert context.executor_type == "legacy_subprocess"
|
||||
|
|
@ -209,7 +220,12 @@ def test_analysis_service_start_analysis_uses_executor(tmp_path):
|
|||
task_id="task-1",
|
||||
ticker="AAPL",
|
||||
date="2026-04-13",
|
||||
request_context=build_request_context(api_key="secret"),
|
||||
request_context=build_request_context(
|
||||
auth_key="dashboard-secret",
|
||||
provider_api_key="provider-secret",
|
||||
llm_provider="anthropic",
|
||||
backend_url="https://api.minimaxi.com/anthropic",
|
||||
),
|
||||
broadcast_progress=_broadcast,
|
||||
)
|
||||
await analysis_tasks["task-1"]
|
||||
|
|
|
|||
Loading…
Reference in New Issue