diff --git a/orchestrator/live_mode.py b/orchestrator/live_mode.py index 76c04c51..3d6d8480 100644 --- a/orchestrator/live_mode.py +++ b/orchestrator/live_mode.py @@ -3,6 +3,9 @@ import logging from datetime import datetime, timezone from typing import List, Optional +from orchestrator.contracts.config_schema import CONTRACT_VERSION +from orchestrator.contracts.error_taxonomy import ReasonCode + logger = logging.getLogger(__name__) @@ -15,6 +18,69 @@ class LiveMode: def __init__(self, orchestrator): self._orchestrator = orchestrator + @staticmethod + def _serialize_result(signal) -> dict: + return { + "direction": signal.direction, + "confidence": signal.confidence, + "quant_direction": signal.quant_signal.direction if signal.quant_signal else None, + "llm_direction": signal.llm_signal.direction if signal.llm_signal else None, + "timestamp": signal.timestamp.isoformat(), + } + + @staticmethod + def _serialize_degradation(signal, data_quality: Optional[dict]) -> dict: + metadata = getattr(signal, "metadata", {}) or {} + return { + "degraded": bool(getattr(signal, "degrade_reason_codes", ())) or bool(data_quality), + "reason_codes": list(getattr(signal, "degrade_reason_codes", ()) or ()), + "source_diagnostics": metadata.get("source_diagnostics") or {}, + } + + @staticmethod + def _contract_version(signal) -> str: + metadata = getattr(signal, "metadata", {}) or {} + return getattr(signal, "contract_version", None) or metadata.get("contract_version") or CONTRACT_VERSION + + def _serialize_signal(self, *, ticker: str, date: str, signal) -> dict: + metadata = getattr(signal, "metadata", {}) or {} + data_quality = metadata.get("data_quality") + degradation = self._serialize_degradation(signal, data_quality) + return { + "contract_version": self._contract_version(signal), + "ticker": ticker, + "date": date, + "status": "degraded_success" if degradation["degraded"] else "completed", + "result": self._serialize_result(signal), + "error": None, + "degradation": degradation, + "data_quality": data_quality, + } + + @staticmethod + def _serialize_error(*, ticker: str, date: str, exc: Exception) -> dict: + reason_codes = [] + if isinstance(exc, ValueError) and "both quant and llm signals are None" in str(exc): + reason_codes.append(ReasonCode.BOTH_SIGNALS_UNAVAILABLE.value) + return { + "contract_version": CONTRACT_VERSION, + "ticker": ticker, + "date": date, + "status": "failed", + "result": None, + "error": { + "code": "live_signal_failed", + "message": str(exc), + "retryable": False, + }, + "degradation": { + "degraded": bool(reason_codes), + "reason_codes": reason_codes, + "source_diagnostics": {}, + }, + "data_quality": None, + } + async def run_once(self, tickers: List[str], date: Optional[str] = None) -> List[dict]: """ Compute combined signals for all tickers on the given date (default: today). @@ -29,20 +95,8 @@ class LiveMode: sig = await asyncio.to_thread( self._orchestrator.get_combined_signal, ticker, date ) - results.append({ - "ticker": ticker, - "date": date, - "direction": sig.direction, - "confidence": sig.confidence, - "quant_direction": sig.quant_signal.direction if sig.quant_signal else None, - "llm_direction": sig.llm_signal.direction if sig.llm_signal else None, - "timestamp": sig.timestamp.isoformat(), - }) + results.append(self._serialize_signal(ticker=ticker, date=date, signal=sig)) except Exception as e: logger.error("LiveMode: failed for %s %s: %s", ticker, date, e) - results.append({ - "ticker": ticker, - "date": date, - "error": str(e), - }) + results.append(self._serialize_error(ticker=ticker, date=date, exc=e)) return results diff --git a/orchestrator/market_calendar.py b/orchestrator/market_calendar.py new file mode 100644 index 00000000..6a5d6cde --- /dev/null +++ b/orchestrator/market_calendar.py @@ -0,0 +1,119 @@ +from __future__ import annotations + +from datetime import date, timedelta + +_A_SHARE_SUFFIXES = {"SH", "SS", "SZ"} + +# Mainland exchanges close on weekends plus the annual State Council public-holiday windows. +# Weekend make-up workdays do not become exchange trading days. +_A_SHARE_HOLIDAYS = { + date(2024, 1, 1), + *[date(2024, 2, day) for day in range(10, 18)], + *[date(2024, 4, day) for day in range(4, 7)], + *[date(2024, 5, day) for day in range(1, 6)], + *[date(2024, 6, day) for day in range(8, 11)], + *[date(2024, 9, day) for day in range(15, 18)], + *[date(2024, 10, day) for day in range(1, 8)], + date(2025, 1, 1), + *[date(2025, 1, day) for day in range(28, 32)], + *[date(2025, 2, day) for day in range(1, 5)], + *[date(2025, 4, day) for day in range(4, 7)], + *[date(2025, 5, day) for day in range(1, 6)], + *[date(2025, 5, day) for day in range(31, 32)], + *[date(2025, 6, day) for day in range(1, 3)], + *[date(2025, 10, day) for day in range(1, 9)], + *[date(2026, 1, day) for day in range(1, 4)], + *[date(2026, 2, day) for day in range(15, 24)], + *[date(2026, 4, day) for day in range(4, 7)], + *[date(2026, 5, day) for day in range(1, 6)], + *[date(2026, 6, day) for day in range(19, 22)], + *[date(2026, 9, day) for day in range(25, 28)], + *[date(2026, 10, day) for day in range(1, 8)], +} + + +def is_non_trading_day(ticker: str, day: date) -> bool: + """Return whether the requested date is a known non-trading day for the ticker's market.""" + if day.weekday() >= 5: + return True + if _is_a_share_ticker(ticker): + return day in _A_SHARE_HOLIDAYS + return _is_nyse_holiday(day) + + +def _is_a_share_ticker(ticker: str) -> bool: + suffix = ticker.rsplit(".", 1)[-1].upper() if "." in ticker else "" + return suffix in _A_SHARE_SUFFIXES + + +def _is_nyse_holiday(day: date) -> bool: + observed_new_year = _observed_fixed_holiday(day.year, 1, 1) + observed_juneteenth = _observed_fixed_holiday(day.year, 6, 19) + observed_independence_day = _observed_fixed_holiday(day.year, 7, 4) + observed_christmas = _observed_fixed_holiday(day.year, 12, 25) + + holidays = { + observed_new_year, + _nth_weekday(day.year, 1, 0, 3), # Martin Luther King, Jr. Day + _nth_weekday(day.year, 2, 0, 3), # Washington's Birthday + _easter(day.year) - timedelta(days=2), # Good Friday + _last_weekday(day.year, 5, 0), # Memorial Day + observed_independence_day, + _nth_weekday(day.year, 9, 0, 1), # Labor Day + _nth_weekday(day.year, 11, 3, 4), # Thanksgiving Day + observed_christmas, + } + if day.year >= 2022: + holidays.add(observed_juneteenth) + + # When Jan 1 falls on Saturday, NYSE observes New Year's Day on the prior Friday. + if day.month == 12 and day.day == 31: + next_new_year = _observed_fixed_holiday(day.year + 1, 1, 1) + if next_new_year.year == day.year: + holidays.add(next_new_year) + + return day in holidays + + +def _observed_fixed_holiday(year: int, month: int, day: int) -> date: + holiday = date(year, month, day) + if holiday.weekday() == 5: + return holiday - timedelta(days=1) + if holiday.weekday() == 6: + return holiday + timedelta(days=1) + return holiday + + +def _nth_weekday(year: int, month: int, weekday: int, occurrence: int) -> date: + first = date(year, month, 1) + delta = (weekday - first.weekday()) % 7 + return first + timedelta(days=delta + 7 * (occurrence - 1)) + + +def _last_weekday(year: int, month: int, weekday: int) -> date: + if month == 12: + cursor = date(year + 1, 1, 1) - timedelta(days=1) + else: + cursor = date(year, month + 1, 1) - timedelta(days=1) + while cursor.weekday() != weekday: + cursor -= timedelta(days=1) + return cursor + + +def _easter(year: int) -> date: + """Anonymous Gregorian algorithm.""" + a = year % 19 + b = year // 100 + c = year % 100 + d = b // 4 + e = b % 4 + f = (b + 8) // 25 + g = (b - f + 1) // 3 + h = (19 * a + b - d - g + 15) % 30 + i = c // 4 + k = c % 4 + l = (32 + 2 * e + 2 * i - h - k) % 7 + m = (a + 11 * h + 22 * l) // 451 + month = (h + l - 7 * m + 114) // 31 + day = ((h + l - 7 * m + 114) % 31) + 1 + return date(year, month, day) diff --git a/orchestrator/quant_runner.py b/orchestrator/quant_runner.py index e3ba3877..c7a0a02b 100644 --- a/orchestrator/quant_runner.py +++ b/orchestrator/quant_runner.py @@ -11,6 +11,7 @@ import yfinance as yf from orchestrator.config import OrchestratorConfig from orchestrator.contracts.error_taxonomy import ReasonCode from orchestrator.contracts.result_contract import Signal, build_error_signal +from orchestrator.market_calendar import is_non_trading_day logger = logging.getLogger(__name__) @@ -50,7 +51,7 @@ class QuantRunner: df = yf.download(ticker, start=start_str, end=end_exclusive, progress=False, auto_adjust=True) if df.empty: logger.warning("No price data for %s between %s and %s", ticker, start_str, date) - if end_dt.weekday() >= 5: + if is_non_trading_day(ticker, end_dt.date()): return build_error_signal( ticker=ticker, source="quant", @@ -107,7 +108,7 @@ class QuantRunner: if hasattr(last_available_ts, "strftime") else str(last_available_ts) ) - if end_dt.weekday() >= 5: + if is_non_trading_day(ticker, end_dt.date()): return build_error_signal( ticker=ticker, source="quant", diff --git a/orchestrator/tests/test_application_service.py b/orchestrator/tests/test_application_service.py index 33ede5ca..0b0c2d5f 100644 --- a/orchestrator/tests/test_application_service.py +++ b/orchestrator/tests/test_application_service.py @@ -111,3 +111,54 @@ def test_trading_orchestrator_raises_when_both_sources_degrade(monkeypatch): orchestrator_module.TradingOrchestrator( OrchestratorConfig(quant_backtest_path="/tmp/quant") ).get_combined_signal("AAPL", "2026-04-11") + + +def test_trading_orchestrator_surfaces_provider_mismatch_summary_when_llm_degrades(monkeypatch): + class FakeQuantRunner: + def __init__(self, _config): + pass + + def get_signal(self, _ticker, _date): + return _signal("quant", direction=1, confidence=0.8) + + class FakeLLMRunner: + def __init__(self, _config): + pass + + def get_signal(self, _ticker, _date): + return _signal( + "llm", + direction=0, + confidence=0.0, + metadata={ + "error": "provider mismatch", + "data_quality": { + "state": "provider_mismatch", + "provider": "anthropic", + "backend_url": "https://api.openai.com/v1", + }, + }, + reason_code=ReasonCode.PROVIDER_MISMATCH.value, + ) + + monkeypatch.setattr(orchestrator_module, "QuantRunner", FakeQuantRunner) + monkeypatch.setattr(orchestrator_module, "LLMRunner", FakeLLMRunner) + + result = orchestrator_module.TradingOrchestrator( + OrchestratorConfig(quant_backtest_path="/tmp/quant") + ).get_combined_signal("AAPL", "2026-04-11") + + assert result.direction == 1 + assert result.quant_signal is not None + assert result.llm_signal is None + assert result.degrade_reason_codes == (ReasonCode.PROVIDER_MISMATCH.value,) + assert result.metadata["data_quality"]["state"] == "provider_mismatch" + assert result.metadata["data_quality"]["source"] == "llm" + assert result.metadata["data_quality"]["issues"] == [ + { + "source": "llm", + "state": "provider_mismatch", + "provider": "anthropic", + "backend_url": "https://api.openai.com/v1", + } + ] diff --git a/orchestrator/tests/test_live_mode.py b/orchestrator/tests/test_live_mode.py new file mode 100644 index 00000000..fd555910 --- /dev/null +++ b/orchestrator/tests/test_live_mode.py @@ -0,0 +1,112 @@ +import asyncio +from datetime import datetime, timezone + +from orchestrator.contracts.error_taxonomy import ReasonCode +from orchestrator.contracts.result_contract import FinalSignal, Signal +from orchestrator.live_mode import LiveMode + + +def _signal(*, source: str, direction: int, confidence: float) -> Signal: + return Signal( + ticker="AAPL", + direction=direction, + confidence=confidence, + source=source, + timestamp=datetime(2026, 4, 11, 12, 0, tzinfo=timezone.utc), + ) + + +class _StubOrchestrator: + def __init__(self, responses): + self._responses = responses + + def get_combined_signal(self, ticker: str, date: str): + response = self._responses[(ticker, date)] + if isinstance(response, Exception): + raise response + return response + + +def test_live_mode_serializes_degraded_contract_shape(): + live_mode = LiveMode( + _StubOrchestrator( + { + ("AAPL", "2026-04-11"): FinalSignal( + ticker="AAPL", + direction=-1, + confidence=0.42, + quant_signal=None, + llm_signal=_signal(source="llm", direction=-1, confidence=0.6), + timestamp=datetime(2026, 4, 11, 12, 1, tzinfo=timezone.utc), + degrade_reason_codes=(ReasonCode.QUANT_SIGNAL_FAILED.value,), + metadata={ + "contract_version": "v1alpha1", + "data_quality": {"state": "stale_data", "source": "quant"}, + "source_diagnostics": { + "quant": {"reason_code": ReasonCode.STALE_DATA.value} + }, + }, + ) + } + ) + ) + + results = asyncio.run(live_mode.run_once(["AAPL"], "2026-04-11")) + + assert results == [ + { + "contract_version": "v1alpha1", + "ticker": "AAPL", + "date": "2026-04-11", + "status": "degraded_success", + "result": { + "direction": -1, + "confidence": 0.42, + "quant_direction": None, + "llm_direction": -1, + "timestamp": "2026-04-11T12:01:00+00:00", + }, + "error": None, + "degradation": { + "degraded": True, + "reason_codes": [ReasonCode.QUANT_SIGNAL_FAILED.value], + "source_diagnostics": { + "quant": {"reason_code": ReasonCode.STALE_DATA.value} + }, + }, + "data_quality": {"state": "stale_data", "source": "quant"}, + } + ] + + +def test_live_mode_serializes_failure_contract_shape(): + live_mode = LiveMode( + _StubOrchestrator( + { + ("AAPL", "2026-04-11"): ValueError("both quant and llm signals are None") + } + ) + ) + + results = asyncio.run(live_mode.run_once(["AAPL"], "2026-04-11")) + + assert results == [ + { + "contract_version": "v1alpha1", + "ticker": "AAPL", + "date": "2026-04-11", + "status": "failed", + "result": None, + "error": { + "code": "live_signal_failed", + "message": "both quant and llm signals are None", + "retryable": False, + }, + "degradation": { + "degraded": True, + "reason_codes": [ReasonCode.BOTH_SIGNALS_UNAVAILABLE.value], + "source_diagnostics": {}, + }, + "data_quality": None, + } + ] diff --git a/orchestrator/tests/test_market_calendar.py b/orchestrator/tests/test_market_calendar.py new file mode 100644 index 00000000..6b2ac65e --- /dev/null +++ b/orchestrator/tests/test_market_calendar.py @@ -0,0 +1,15 @@ +from datetime import date + +from orchestrator.market_calendar import is_non_trading_day + + +def test_is_non_trading_day_marks_a_share_holiday(): + assert is_non_trading_day('600519.SS', date(2024, 10, 2)) is True + + +def test_is_non_trading_day_marks_nyse_holiday(): + assert is_non_trading_day('AAPL', date(2024, 3, 29)) is True + + +def test_is_non_trading_day_leaves_regular_weekday_open(): + assert is_non_trading_day('AAPL', date(2024, 3, 28)) is False diff --git a/orchestrator/tests/test_quant_runner.py b/orchestrator/tests/test_quant_runner.py index f04ebe10..a6f26551 100644 --- a/orchestrator/tests/test_quant_runner.py +++ b/orchestrator/tests/test_quant_runner.py @@ -77,6 +77,32 @@ def test_get_signal_returns_reason_code_when_no_data(runner, monkeypatch): assert signal.reason_code == ReasonCode.QUANT_NO_DATA.value +def test_get_signal_marks_non_trading_day_on_a_share_holiday(runner, monkeypatch): + monkeypatch.setattr( + "orchestrator.quant_runner.yf.download", + lambda *args, **kwargs: pd.DataFrame(), + ) + + signal = runner.get_signal("600519.SS", "2024-10-02") + + assert signal.degraded is True + assert signal.reason_code == ReasonCode.NON_TRADING_DAY.value + assert signal.metadata["data_quality"]["state"] == "non_trading_day" + + +def test_get_signal_marks_non_trading_day_on_market_holiday(runner, monkeypatch): + monkeypatch.setattr( + "orchestrator.quant_runner.yf.download", + lambda *args, **kwargs: pd.DataFrame(), + ) + + signal = runner.get_signal("AAPL", "2024-03-29") + + assert signal.degraded is True + assert signal.reason_code == ReasonCode.NON_TRADING_DAY.value + assert signal.metadata["data_quality"]["state"] == "non_trading_day" + + def test_get_signal_marks_non_trading_day_on_weekend(runner, monkeypatch): monkeypatch.setattr( "orchestrator.quant_runner.yf.download", @@ -90,6 +116,30 @@ def test_get_signal_marks_non_trading_day_on_weekend(runner, monkeypatch): assert signal.metadata["data_quality"]["state"] == "non_trading_day" +def test_get_signal_marks_non_trading_day_on_market_holiday(runner, monkeypatch): + holiday_frame = pd.DataFrame( + { + "Open": [10.0], + "High": [11.0], + "Low": [9.0], + "Close": [10.5], + "Volume": [1000], + }, + index=pd.to_datetime(["2024-07-03"]), + ) + monkeypatch.setattr( + "orchestrator.quant_runner.yf.download", + lambda *args, **kwargs: holiday_frame, + ) + + signal = runner.get_signal("AAPL", "2024-07-04") + + assert signal.degraded is True + assert signal.reason_code == ReasonCode.NON_TRADING_DAY.value + assert signal.metadata["data_quality"]["state"] == "non_trading_day" + assert signal.metadata["data_quality"]["last_available_date"] == "2024-07-03" + + def test_get_signal_marks_stale_data_when_requested_day_missing(runner, monkeypatch): stale_frame = pd.DataFrame( { diff --git a/web_dashboard/backend/tests/test_api_smoke.py b/web_dashboard/backend/tests/test_api_smoke.py index 6824ad26..d02924d3 100644 --- a/web_dashboard/backend/tests/test_api_smoke.py +++ b/web_dashboard/backend/tests/test_api_smoke.py @@ -2,7 +2,9 @@ import importlib import sys from pathlib import Path +import pytest from fastapi.testclient import TestClient +from starlette.websockets import WebSocketDisconnect def _load_main_module(monkeypatch): @@ -178,3 +180,82 @@ def test_analysis_websocket_progress_is_contract_first(monkeypatch): assert message["request_id"] == "req-task-ws" assert message["compat"]["decision"] == "HOLD" assert "decision" not in message + + +def test_orchestrator_websocket_smoke_is_contract_first(monkeypatch): + monkeypatch.delenv("DASHBOARD_API_KEY", raising=False) + monkeypatch.setenv("ANTHROPIC_API_KEY", "test-key") + + main = _load_main_module(monkeypatch) + + import orchestrator.config as config_module + import orchestrator.live_mode as live_mode_module + import orchestrator.orchestrator as orchestrator_module + + class DummyConfig: + def __init__(self, *args, **kwargs): + self.args = args + self.kwargs = kwargs + + class DummyOrchestrator: + def __init__(self, config): + self.config = config + + class DummyLiveMode: + def __init__(self, orchestrator): + self.orchestrator = orchestrator + + async def run_once(self, tickers, date=None): + assert tickers == ["AAPL"] + assert date == "2026-04-11" + return [ + { + "contract_version": "v1alpha1", + "ticker": "AAPL", + "date": "2026-04-11", + "status": "degraded_success", + "result": { + "direction": 1, + "confidence": 0.55, + "quant_direction": None, + "llm_direction": 1, + "timestamp": "2026-04-11T12:00:00+00:00", + }, + "error": None, + "degradation": { + "degraded": True, + "reason_codes": ["quant_signal_failed"], + "source_diagnostics": {"quant": {"reason_code": "quant_signal_failed"}}, + }, + "data_quality": {"state": "partial_data", "source": "quant"}, + } + ] + + monkeypatch.setattr(config_module, "OrchestratorConfig", DummyConfig) + monkeypatch.setattr(orchestrator_module, "TradingOrchestrator", DummyOrchestrator) + monkeypatch.setattr(live_mode_module, "LiveMode", DummyLiveMode) + + with TestClient(main.app) as client: + with client.websocket_connect("/ws/orchestrator?api_key=test-key") as websocket: + websocket.send_json({"tickers": ["AAPL"], "date": "2026-04-11"}) + message = websocket.receive_json() + + assert message["contract_version"] == "v1alpha1" + assert message["signals"][0]["contract_version"] == "v1alpha1" + assert message["signals"][0]["status"] == "degraded_success" + assert message["signals"][0]["degradation"]["reason_codes"] == ["quant_signal_failed"] + assert message["signals"][0]["data_quality"]["state"] == "partial_data" + + +def test_orchestrator_websocket_rejects_unauthorized(monkeypatch): + monkeypatch.delenv("DASHBOARD_API_KEY", raising=False) + monkeypatch.setenv("ANTHROPIC_API_KEY", "test-key") + + main = _load_main_module(monkeypatch) + + with TestClient(main.app) as client: + with pytest.raises(WebSocketDisconnect) as exc_info: + with client.websocket_connect("/ws/orchestrator"): + pass + + assert exc_info.value.code == 4401 diff --git a/web_dashboard/frontend/src/pages/AnalysisMonitor.jsx b/web_dashboard/frontend/src/pages/AnalysisMonitor.jsx index 6c7a6109..0487a28a 100644 --- a/web_dashboard/frontend/src/pages/AnalysisMonitor.jsx +++ b/web_dashboard/frontend/src/pages/AnalysisMonitor.jsx @@ -5,6 +5,8 @@ import DecisionBadge from '../components/DecisionBadge' import { StatusIcon } from '../components/StatusIcon' import { getConfidence, + getDataQualitySummary, + getDegradationSummary, getDecision, getDisplayDate, getErrorMessage, @@ -34,7 +36,8 @@ export default function AnalysisMonitor() { const quantSignal = getQuantSignal(task) const confidence = getConfidence(task) const displayDate = getDisplayDate(task) - const dataQuality = task?.data_quality_summary + const dataQuality = getDataQualitySummary(task) + const degradation = getDegradationSummary(task) const errorMessage = getErrorMessage(task) const fetchInitialState = useCallback(async () => { @@ -198,6 +201,11 @@ export default function AnalysisMonitor() { 数据质量: {dataQuality.state} )} + {degradation?.degraded && degradation?.reason_codes?.length > 0 && ( +
+ 降级原因: {degradation.reason_codes.join(', ')} +
+ )} {errorMessage && (
错误: {errorMessage} diff --git a/web_dashboard/frontend/src/utils/contractView.js b/web_dashboard/frontend/src/utils/contractView.js index 02f30555..066fa18a 100644 --- a/web_dashboard/frontend/src/utils/contractView.js +++ b/web_dashboard/frontend/src/utils/contractView.js @@ -6,6 +6,26 @@ export function getResult(payload) { return payload?.result || {} } +export function getDegradationSummary(payload) { + if (payload?.degradation_summary) return payload.degradation_summary + if (payload?.degradation) return payload.degradation + + const result = getResult(payload) + if (typeof result.degraded === 'boolean') { + return { + degraded: result.degraded, + reason_codes: [], + report_available: Boolean(result.report?.available), + } + } + + return null +} + +export function getDataQualitySummary(payload) { + return payload?.data_quality_summary ?? payload?.data_quality ?? null +} + export function getDecision(payload) { return getResult(payload).decision ?? getCompat(payload).decision ?? null } @@ -26,6 +46,10 @@ export function getDisplayDate(payload) { return payload?.date ?? getCompat(payload).analysis_date ?? null } +export function isDegradedPayload(payload) { + return Boolean(getDegradationSummary(payload)?.degraded) +} + export function getErrorMessage(payload) { const error = payload?.error if (!error) return null