Carry Phase 4 rollout-readiness work back into the mainline safely

Team execution produced recoverable commits for market-holiday handling, live websocket contracts, regression coverage, and the remaining frontend contract-view polish. Recover those changes into main without waiting for terminal team shutdown, preserving the verified payload semantics while avoiding the worker auto-checkpoint noise.

Constraint: Team workers were still in progress, so recovery had to avoid destructive shutdown and ignore the worker-3 uv.lock churn
Rejected: Wait for terminal shutdown before recovery | unnecessary delay once commits were already recoverable and verified
Rejected: Cherry-pick worker-3 checkpoint wholesale | would import unrelated uv.lock churn into main
Confidence: high
Scope-risk: moderate
Reversibility: clean
Directive: Treat team INTEGRATED mailbox messages as hints only; always inspect snapshot refs/worktrees before claiming the leader actually merged code
Tested: python -m pytest orchestrator/tests/test_market_calendar.py orchestrator/tests/test_quant_runner.py orchestrator/tests/test_application_service.py orchestrator/tests/test_live_mode.py web_dashboard/backend/tests/test_api_smoke.py -q
Tested: python -m compileall orchestrator web_dashboard/backend
Tested: npm run build (web_dashboard/frontend)
Not-tested: final team terminal completion after recovery
Not-tested: real websocket clients or live provider-backed market holiday sessions
This commit is contained in:
陈少杰 2026-04-14 01:15:18 +08:00
parent 7cd9c4617a
commit 11cbb7ce85
10 changed files with 532 additions and 17 deletions

View File

@ -3,6 +3,9 @@ import logging
from datetime import datetime, timezone
from typing import List, Optional
from orchestrator.contracts.config_schema import CONTRACT_VERSION
from orchestrator.contracts.error_taxonomy import ReasonCode
logger = logging.getLogger(__name__)
@ -15,6 +18,69 @@ class LiveMode:
def __init__(self, orchestrator):
self._orchestrator = orchestrator
@staticmethod
def _serialize_result(signal) -> dict:
return {
"direction": signal.direction,
"confidence": signal.confidence,
"quant_direction": signal.quant_signal.direction if signal.quant_signal else None,
"llm_direction": signal.llm_signal.direction if signal.llm_signal else None,
"timestamp": signal.timestamp.isoformat(),
}
@staticmethod
def _serialize_degradation(signal, data_quality: Optional[dict]) -> dict:
metadata = getattr(signal, "metadata", {}) or {}
return {
"degraded": bool(getattr(signal, "degrade_reason_codes", ())) or bool(data_quality),
"reason_codes": list(getattr(signal, "degrade_reason_codes", ()) or ()),
"source_diagnostics": metadata.get("source_diagnostics") or {},
}
@staticmethod
def _contract_version(signal) -> str:
metadata = getattr(signal, "metadata", {}) or {}
return getattr(signal, "contract_version", None) or metadata.get("contract_version") or CONTRACT_VERSION
def _serialize_signal(self, *, ticker: str, date: str, signal) -> dict:
metadata = getattr(signal, "metadata", {}) or {}
data_quality = metadata.get("data_quality")
degradation = self._serialize_degradation(signal, data_quality)
return {
"contract_version": self._contract_version(signal),
"ticker": ticker,
"date": date,
"status": "degraded_success" if degradation["degraded"] else "completed",
"result": self._serialize_result(signal),
"error": None,
"degradation": degradation,
"data_quality": data_quality,
}
@staticmethod
def _serialize_error(*, ticker: str, date: str, exc: Exception) -> dict:
reason_codes = []
if isinstance(exc, ValueError) and "both quant and llm signals are None" in str(exc):
reason_codes.append(ReasonCode.BOTH_SIGNALS_UNAVAILABLE.value)
return {
"contract_version": CONTRACT_VERSION,
"ticker": ticker,
"date": date,
"status": "failed",
"result": None,
"error": {
"code": "live_signal_failed",
"message": str(exc),
"retryable": False,
},
"degradation": {
"degraded": bool(reason_codes),
"reason_codes": reason_codes,
"source_diagnostics": {},
},
"data_quality": None,
}
async def run_once(self, tickers: List[str], date: Optional[str] = None) -> List[dict]:
"""
Compute combined signals for all tickers on the given date (default: today).
@ -29,20 +95,8 @@ class LiveMode:
sig = await asyncio.to_thread(
self._orchestrator.get_combined_signal, ticker, date
)
results.append({
"ticker": ticker,
"date": date,
"direction": sig.direction,
"confidence": sig.confidence,
"quant_direction": sig.quant_signal.direction if sig.quant_signal else None,
"llm_direction": sig.llm_signal.direction if sig.llm_signal else None,
"timestamp": sig.timestamp.isoformat(),
})
results.append(self._serialize_signal(ticker=ticker, date=date, signal=sig))
except Exception as e:
logger.error("LiveMode: failed for %s %s: %s", ticker, date, e)
results.append({
"ticker": ticker,
"date": date,
"error": str(e),
})
results.append(self._serialize_error(ticker=ticker, date=date, exc=e))
return results

View File

@ -0,0 +1,119 @@
from __future__ import annotations
from datetime import date, timedelta
_A_SHARE_SUFFIXES = {"SH", "SS", "SZ"}
# Mainland exchanges close on weekends plus the annual State Council public-holiday windows.
# Weekend make-up workdays do not become exchange trading days.
_A_SHARE_HOLIDAYS = {
date(2024, 1, 1),
*[date(2024, 2, day) for day in range(10, 18)],
*[date(2024, 4, day) for day in range(4, 7)],
*[date(2024, 5, day) for day in range(1, 6)],
*[date(2024, 6, day) for day in range(8, 11)],
*[date(2024, 9, day) for day in range(15, 18)],
*[date(2024, 10, day) for day in range(1, 8)],
date(2025, 1, 1),
*[date(2025, 1, day) for day in range(28, 32)],
*[date(2025, 2, day) for day in range(1, 5)],
*[date(2025, 4, day) for day in range(4, 7)],
*[date(2025, 5, day) for day in range(1, 6)],
*[date(2025, 5, day) for day in range(31, 32)],
*[date(2025, 6, day) for day in range(1, 3)],
*[date(2025, 10, day) for day in range(1, 9)],
*[date(2026, 1, day) for day in range(1, 4)],
*[date(2026, 2, day) for day in range(15, 24)],
*[date(2026, 4, day) for day in range(4, 7)],
*[date(2026, 5, day) for day in range(1, 6)],
*[date(2026, 6, day) for day in range(19, 22)],
*[date(2026, 9, day) for day in range(25, 28)],
*[date(2026, 10, day) for day in range(1, 8)],
}
def is_non_trading_day(ticker: str, day: date) -> bool:
"""Return whether the requested date is a known non-trading day for the ticker's market."""
if day.weekday() >= 5:
return True
if _is_a_share_ticker(ticker):
return day in _A_SHARE_HOLIDAYS
return _is_nyse_holiday(day)
def _is_a_share_ticker(ticker: str) -> bool:
suffix = ticker.rsplit(".", 1)[-1].upper() if "." in ticker else ""
return suffix in _A_SHARE_SUFFIXES
def _is_nyse_holiday(day: date) -> bool:
observed_new_year = _observed_fixed_holiday(day.year, 1, 1)
observed_juneteenth = _observed_fixed_holiday(day.year, 6, 19)
observed_independence_day = _observed_fixed_holiday(day.year, 7, 4)
observed_christmas = _observed_fixed_holiday(day.year, 12, 25)
holidays = {
observed_new_year,
_nth_weekday(day.year, 1, 0, 3), # Martin Luther King, Jr. Day
_nth_weekday(day.year, 2, 0, 3), # Washington's Birthday
_easter(day.year) - timedelta(days=2), # Good Friday
_last_weekday(day.year, 5, 0), # Memorial Day
observed_independence_day,
_nth_weekday(day.year, 9, 0, 1), # Labor Day
_nth_weekday(day.year, 11, 3, 4), # Thanksgiving Day
observed_christmas,
}
if day.year >= 2022:
holidays.add(observed_juneteenth)
# When Jan 1 falls on Saturday, NYSE observes New Year's Day on the prior Friday.
if day.month == 12 and day.day == 31:
next_new_year = _observed_fixed_holiday(day.year + 1, 1, 1)
if next_new_year.year == day.year:
holidays.add(next_new_year)
return day in holidays
def _observed_fixed_holiday(year: int, month: int, day: int) -> date:
holiday = date(year, month, day)
if holiday.weekday() == 5:
return holiday - timedelta(days=1)
if holiday.weekday() == 6:
return holiday + timedelta(days=1)
return holiday
def _nth_weekday(year: int, month: int, weekday: int, occurrence: int) -> date:
first = date(year, month, 1)
delta = (weekday - first.weekday()) % 7
return first + timedelta(days=delta + 7 * (occurrence - 1))
def _last_weekday(year: int, month: int, weekday: int) -> date:
if month == 12:
cursor = date(year + 1, 1, 1) - timedelta(days=1)
else:
cursor = date(year, month + 1, 1) - timedelta(days=1)
while cursor.weekday() != weekday:
cursor -= timedelta(days=1)
return cursor
def _easter(year: int) -> date:
"""Anonymous Gregorian algorithm."""
a = year % 19
b = year // 100
c = year % 100
d = b // 4
e = b % 4
f = (b + 8) // 25
g = (b - f + 1) // 3
h = (19 * a + b - d - g + 15) % 30
i = c // 4
k = c % 4
l = (32 + 2 * e + 2 * i - h - k) % 7
m = (a + 11 * h + 22 * l) // 451
month = (h + l - 7 * m + 114) // 31
day = ((h + l - 7 * m + 114) % 31) + 1
return date(year, month, day)

View File

@ -11,6 +11,7 @@ import yfinance as yf
from orchestrator.config import OrchestratorConfig
from orchestrator.contracts.error_taxonomy import ReasonCode
from orchestrator.contracts.result_contract import Signal, build_error_signal
from orchestrator.market_calendar import is_non_trading_day
logger = logging.getLogger(__name__)
@ -50,7 +51,7 @@ class QuantRunner:
df = yf.download(ticker, start=start_str, end=end_exclusive, progress=False, auto_adjust=True)
if df.empty:
logger.warning("No price data for %s between %s and %s", ticker, start_str, date)
if end_dt.weekday() >= 5:
if is_non_trading_day(ticker, end_dt.date()):
return build_error_signal(
ticker=ticker,
source="quant",
@ -107,7 +108,7 @@ class QuantRunner:
if hasattr(last_available_ts, "strftime")
else str(last_available_ts)
)
if end_dt.weekday() >= 5:
if is_non_trading_day(ticker, end_dt.date()):
return build_error_signal(
ticker=ticker,
source="quant",

View File

@ -111,3 +111,54 @@ def test_trading_orchestrator_raises_when_both_sources_degrade(monkeypatch):
orchestrator_module.TradingOrchestrator(
OrchestratorConfig(quant_backtest_path="/tmp/quant")
).get_combined_signal("AAPL", "2026-04-11")
def test_trading_orchestrator_surfaces_provider_mismatch_summary_when_llm_degrades(monkeypatch):
class FakeQuantRunner:
def __init__(self, _config):
pass
def get_signal(self, _ticker, _date):
return _signal("quant", direction=1, confidence=0.8)
class FakeLLMRunner:
def __init__(self, _config):
pass
def get_signal(self, _ticker, _date):
return _signal(
"llm",
direction=0,
confidence=0.0,
metadata={
"error": "provider mismatch",
"data_quality": {
"state": "provider_mismatch",
"provider": "anthropic",
"backend_url": "https://api.openai.com/v1",
},
},
reason_code=ReasonCode.PROVIDER_MISMATCH.value,
)
monkeypatch.setattr(orchestrator_module, "QuantRunner", FakeQuantRunner)
monkeypatch.setattr(orchestrator_module, "LLMRunner", FakeLLMRunner)
result = orchestrator_module.TradingOrchestrator(
OrchestratorConfig(quant_backtest_path="/tmp/quant")
).get_combined_signal("AAPL", "2026-04-11")
assert result.direction == 1
assert result.quant_signal is not None
assert result.llm_signal is None
assert result.degrade_reason_codes == (ReasonCode.PROVIDER_MISMATCH.value,)
assert result.metadata["data_quality"]["state"] == "provider_mismatch"
assert result.metadata["data_quality"]["source"] == "llm"
assert result.metadata["data_quality"]["issues"] == [
{
"source": "llm",
"state": "provider_mismatch",
"provider": "anthropic",
"backend_url": "https://api.openai.com/v1",
}
]

View File

@ -0,0 +1,112 @@
import asyncio
from datetime import datetime, timezone
from orchestrator.contracts.error_taxonomy import ReasonCode
from orchestrator.contracts.result_contract import FinalSignal, Signal
from orchestrator.live_mode import LiveMode
def _signal(*, source: str, direction: int, confidence: float) -> Signal:
return Signal(
ticker="AAPL",
direction=direction,
confidence=confidence,
source=source,
timestamp=datetime(2026, 4, 11, 12, 0, tzinfo=timezone.utc),
)
class _StubOrchestrator:
def __init__(self, responses):
self._responses = responses
def get_combined_signal(self, ticker: str, date: str):
response = self._responses[(ticker, date)]
if isinstance(response, Exception):
raise response
return response
def test_live_mode_serializes_degraded_contract_shape():
live_mode = LiveMode(
_StubOrchestrator(
{
("AAPL", "2026-04-11"): FinalSignal(
ticker="AAPL",
direction=-1,
confidence=0.42,
quant_signal=None,
llm_signal=_signal(source="llm", direction=-1, confidence=0.6),
timestamp=datetime(2026, 4, 11, 12, 1, tzinfo=timezone.utc),
degrade_reason_codes=(ReasonCode.QUANT_SIGNAL_FAILED.value,),
metadata={
"contract_version": "v1alpha1",
"data_quality": {"state": "stale_data", "source": "quant"},
"source_diagnostics": {
"quant": {"reason_code": ReasonCode.STALE_DATA.value}
},
},
)
}
)
)
results = asyncio.run(live_mode.run_once(["AAPL"], "2026-04-11"))
assert results == [
{
"contract_version": "v1alpha1",
"ticker": "AAPL",
"date": "2026-04-11",
"status": "degraded_success",
"result": {
"direction": -1,
"confidence": 0.42,
"quant_direction": None,
"llm_direction": -1,
"timestamp": "2026-04-11T12:01:00+00:00",
},
"error": None,
"degradation": {
"degraded": True,
"reason_codes": [ReasonCode.QUANT_SIGNAL_FAILED.value],
"source_diagnostics": {
"quant": {"reason_code": ReasonCode.STALE_DATA.value}
},
},
"data_quality": {"state": "stale_data", "source": "quant"},
}
]
def test_live_mode_serializes_failure_contract_shape():
live_mode = LiveMode(
_StubOrchestrator(
{
("AAPL", "2026-04-11"): ValueError("both quant and llm signals are None")
}
)
)
results = asyncio.run(live_mode.run_once(["AAPL"], "2026-04-11"))
assert results == [
{
"contract_version": "v1alpha1",
"ticker": "AAPL",
"date": "2026-04-11",
"status": "failed",
"result": None,
"error": {
"code": "live_signal_failed",
"message": "both quant and llm signals are None",
"retryable": False,
},
"degradation": {
"degraded": True,
"reason_codes": [ReasonCode.BOTH_SIGNALS_UNAVAILABLE.value],
"source_diagnostics": {},
},
"data_quality": None,
}
]

View File

@ -0,0 +1,15 @@
from datetime import date
from orchestrator.market_calendar import is_non_trading_day
def test_is_non_trading_day_marks_a_share_holiday():
assert is_non_trading_day('600519.SS', date(2024, 10, 2)) is True
def test_is_non_trading_day_marks_nyse_holiday():
assert is_non_trading_day('AAPL', date(2024, 3, 29)) is True
def test_is_non_trading_day_leaves_regular_weekday_open():
assert is_non_trading_day('AAPL', date(2024, 3, 28)) is False

View File

@ -77,6 +77,32 @@ def test_get_signal_returns_reason_code_when_no_data(runner, monkeypatch):
assert signal.reason_code == ReasonCode.QUANT_NO_DATA.value
def test_get_signal_marks_non_trading_day_on_a_share_holiday(runner, monkeypatch):
monkeypatch.setattr(
"orchestrator.quant_runner.yf.download",
lambda *args, **kwargs: pd.DataFrame(),
)
signal = runner.get_signal("600519.SS", "2024-10-02")
assert signal.degraded is True
assert signal.reason_code == ReasonCode.NON_TRADING_DAY.value
assert signal.metadata["data_quality"]["state"] == "non_trading_day"
def test_get_signal_marks_non_trading_day_on_market_holiday(runner, monkeypatch):
monkeypatch.setattr(
"orchestrator.quant_runner.yf.download",
lambda *args, **kwargs: pd.DataFrame(),
)
signal = runner.get_signal("AAPL", "2024-03-29")
assert signal.degraded is True
assert signal.reason_code == ReasonCode.NON_TRADING_DAY.value
assert signal.metadata["data_quality"]["state"] == "non_trading_day"
def test_get_signal_marks_non_trading_day_on_weekend(runner, monkeypatch):
monkeypatch.setattr(
"orchestrator.quant_runner.yf.download",
@ -90,6 +116,30 @@ def test_get_signal_marks_non_trading_day_on_weekend(runner, monkeypatch):
assert signal.metadata["data_quality"]["state"] == "non_trading_day"
def test_get_signal_marks_non_trading_day_on_market_holiday(runner, monkeypatch):
holiday_frame = pd.DataFrame(
{
"Open": [10.0],
"High": [11.0],
"Low": [9.0],
"Close": [10.5],
"Volume": [1000],
},
index=pd.to_datetime(["2024-07-03"]),
)
monkeypatch.setattr(
"orchestrator.quant_runner.yf.download",
lambda *args, **kwargs: holiday_frame,
)
signal = runner.get_signal("AAPL", "2024-07-04")
assert signal.degraded is True
assert signal.reason_code == ReasonCode.NON_TRADING_DAY.value
assert signal.metadata["data_quality"]["state"] == "non_trading_day"
assert signal.metadata["data_quality"]["last_available_date"] == "2024-07-03"
def test_get_signal_marks_stale_data_when_requested_day_missing(runner, monkeypatch):
stale_frame = pd.DataFrame(
{

View File

@ -2,7 +2,9 @@ import importlib
import sys
from pathlib import Path
import pytest
from fastapi.testclient import TestClient
from starlette.websockets import WebSocketDisconnect
def _load_main_module(monkeypatch):
@ -178,3 +180,82 @@ def test_analysis_websocket_progress_is_contract_first(monkeypatch):
assert message["request_id"] == "req-task-ws"
assert message["compat"]["decision"] == "HOLD"
assert "decision" not in message
def test_orchestrator_websocket_smoke_is_contract_first(monkeypatch):
monkeypatch.delenv("DASHBOARD_API_KEY", raising=False)
monkeypatch.setenv("ANTHROPIC_API_KEY", "test-key")
main = _load_main_module(monkeypatch)
import orchestrator.config as config_module
import orchestrator.live_mode as live_mode_module
import orchestrator.orchestrator as orchestrator_module
class DummyConfig:
def __init__(self, *args, **kwargs):
self.args = args
self.kwargs = kwargs
class DummyOrchestrator:
def __init__(self, config):
self.config = config
class DummyLiveMode:
def __init__(self, orchestrator):
self.orchestrator = orchestrator
async def run_once(self, tickers, date=None):
assert tickers == ["AAPL"]
assert date == "2026-04-11"
return [
{
"contract_version": "v1alpha1",
"ticker": "AAPL",
"date": "2026-04-11",
"status": "degraded_success",
"result": {
"direction": 1,
"confidence": 0.55,
"quant_direction": None,
"llm_direction": 1,
"timestamp": "2026-04-11T12:00:00+00:00",
},
"error": None,
"degradation": {
"degraded": True,
"reason_codes": ["quant_signal_failed"],
"source_diagnostics": {"quant": {"reason_code": "quant_signal_failed"}},
},
"data_quality": {"state": "partial_data", "source": "quant"},
}
]
monkeypatch.setattr(config_module, "OrchestratorConfig", DummyConfig)
monkeypatch.setattr(orchestrator_module, "TradingOrchestrator", DummyOrchestrator)
monkeypatch.setattr(live_mode_module, "LiveMode", DummyLiveMode)
with TestClient(main.app) as client:
with client.websocket_connect("/ws/orchestrator?api_key=test-key") as websocket:
websocket.send_json({"tickers": ["AAPL"], "date": "2026-04-11"})
message = websocket.receive_json()
assert message["contract_version"] == "v1alpha1"
assert message["signals"][0]["contract_version"] == "v1alpha1"
assert message["signals"][0]["status"] == "degraded_success"
assert message["signals"][0]["degradation"]["reason_codes"] == ["quant_signal_failed"]
assert message["signals"][0]["data_quality"]["state"] == "partial_data"
def test_orchestrator_websocket_rejects_unauthorized(monkeypatch):
monkeypatch.delenv("DASHBOARD_API_KEY", raising=False)
monkeypatch.setenv("ANTHROPIC_API_KEY", "test-key")
main = _load_main_module(monkeypatch)
with TestClient(main.app) as client:
with pytest.raises(WebSocketDisconnect) as exc_info:
with client.websocket_connect("/ws/orchestrator"):
pass
assert exc_info.value.code == 4401

View File

@ -5,6 +5,8 @@ import DecisionBadge from '../components/DecisionBadge'
import { StatusIcon } from '../components/StatusIcon'
import {
getConfidence,
getDataQualitySummary,
getDegradationSummary,
getDecision,
getDisplayDate,
getErrorMessage,
@ -34,7 +36,8 @@ export default function AnalysisMonitor() {
const quantSignal = getQuantSignal(task)
const confidence = getConfidence(task)
const displayDate = getDisplayDate(task)
const dataQuality = task?.data_quality_summary
const dataQuality = getDataQualitySummary(task)
const degradation = getDegradationSummary(task)
const errorMessage = getErrorMessage(task)
const fetchInitialState = useCallback(async () => {
@ -198,6 +201,11 @@ export default function AnalysisMonitor() {
数据质量: <strong>{dataQuality.state}</strong>
</div>
)}
{degradation?.degraded && degradation?.reason_codes?.length > 0 && (
<div style={{ marginBottom: 12, fontSize: 'var(--text-sm)', color: 'var(--hold)' }}>
降级原因: <strong>{degradation.reason_codes.join(', ')}</strong>
</div>
)}
{errorMessage && (
<div style={{ marginBottom: 12, fontSize: 'var(--text-sm)', color: 'var(--sell)' }}>
错误: {errorMessage}

View File

@ -6,6 +6,26 @@ export function getResult(payload) {
return payload?.result || {}
}
export function getDegradationSummary(payload) {
if (payload?.degradation_summary) return payload.degradation_summary
if (payload?.degradation) return payload.degradation
const result = getResult(payload)
if (typeof result.degraded === 'boolean') {
return {
degraded: result.degraded,
reason_codes: [],
report_available: Boolean(result.report?.available),
}
}
return null
}
export function getDataQualitySummary(payload) {
return payload?.data_quality_summary ?? payload?.data_quality ?? null
}
export function getDecision(payload) {
return getResult(payload).decision ?? getCompat(payload).decision ?? null
}
@ -26,6 +46,10 @@ export function getDisplayDate(payload) {
return payload?.date ?? getCompat(payload).analysis_date ?? null
}
export function isDegradedPayload(payload) {
return Boolean(getDegradationSummary(payload)?.degraded)
}
export function getErrorMessage(payload) {
const error = payload?.error
if (!error) return null