omx(team): auto-checkpoint worker-2 [unknown]

This commit is contained in:
陈少杰 2026-04-14 04:45:44 +08:00
parent addc4a1e9c
commit 909519ff17
2 changed files with 245 additions and 0 deletions

View File

@ -0,0 +1,163 @@
import json
from datetime import datetime as real_datetime, timezone
from pathlib import Path
import pytest
import orchestrator.profile_stage_chain as profile_stage_chain
class _FakeGraphStream:
def __init__(self, events):
self._events = events
def stream(self, state, stream_mode, config):
assert state["company_of_interest"] == "AAPL"
assert state["trade_date"] == "2026-04-11"
assert stream_mode == "updates"
assert config == {"recursion_limit": 100, "max_concurrency": 1}
for event in self._events:
yield event
class _FakeTradingAgentsGraph:
def __init__(self, *, selected_analysts, config):
assert selected_analysts == ["market", "social"]
assert config["selected_analysts"] == ["market", "social"]
assert config["analysis_prompt_style"] == "balanced"
self.graph = _FakeGraphStream(
[
{
"Bull Researcher": {
"investment_debate_state": {
"research_status": "degraded",
"degraded_reason": "bull_researcher_timeout",
"history": "Bull Analyst: case",
"current_response": "Bull Analyst: case",
}
}
},
{
"Research Manager": {
"investment_debate_state": {
"research_status": "degraded",
"degraded_reason": "research_manager_timeout",
"history": "Bull Analyst: case\nRecommendation: HOLD",
"current_response": "Recommendation: HOLD",
}
}
},
]
)
class _FakePropagator:
def create_initial_state(self, ticker, date):
return {
"company_of_interest": ticker,
"trade_date": date,
"investment_debate_state": {},
}
class _FixedDateTime:
@staticmethod
def now(tz=None):
return real_datetime(2026, 4, 14, 0, 0, tzinfo=timezone.utc)
@pytest.mark.parametrize(
("event", "expected"),
[
({}, (None, None, 0, 0)),
(
{
"Bull Researcher": {
"investment_debate_state": {
"research_status": "degraded",
"degraded_reason": "bull_researcher_timeout",
"history": "abc",
"current_response": "xy",
}
}
},
("degraded", "bull_researcher_timeout", 3, 2),
),
],
)
def test_extract_research_state_captures_trace_fields(event, expected):
assert profile_stage_chain._extract_research_state(event) == expected
def test_main_writes_trace_payload_with_research_provenance(monkeypatch, tmp_path, capsys):
monotonic_points = iter([100.0, 100.4, 101.0])
monkeypatch.setattr(profile_stage_chain, "TradingAgentsGraph", _FakeTradingAgentsGraph)
monkeypatch.setattr(profile_stage_chain, "Propagator", _FakePropagator)
monkeypatch.setattr(profile_stage_chain.time, "monotonic", lambda: next(monotonic_points))
monkeypatch.setattr(profile_stage_chain.signal, "signal", lambda *args, **kwargs: None)
monkeypatch.setattr(profile_stage_chain.signal, "alarm", lambda *args, **kwargs: None)
monkeypatch.setattr(profile_stage_chain, "datetime", _FixedDateTime)
monkeypatch.setattr(
"sys.argv",
[
"profile_stage_chain.py",
"--ticker",
"AAPL",
"--date",
"2026-04-11",
"--selected-analysts",
"market,social",
"--analysis-prompt-style",
"balanced",
"--dump-dir",
str(tmp_path),
],
)
profile_stage_chain.main()
output = json.loads(capsys.readouterr().out)
assert output["status"] == "ok"
assert output["ticker"] == "AAPL"
assert output["date"] == "2026-04-11"
assert output["selected_analysts"] == ["market", "social"]
assert output["analysis_prompt_style"] == "balanced"
assert output["phase_totals_seconds"] == {"research": 1.0}
assert output["raw_events"] == []
assert output["node_timings"] == [
{
"run_id": "20260414T000000Z",
"nodes": ["Bull Researcher"],
"phases": ["research"],
"llm_kinds": ["quick"],
"start_at": 0.0,
"end_at": 0.4,
"elapsed_ms": 400,
"selected_analysts": ["market", "social"],
"analysis_prompt_style": "balanced",
"research_status": "degraded",
"degraded_reason": "bull_researcher_timeout",
"history_len": len("Bull Analyst: case"),
"response_len": len("Bull Analyst: case"),
},
{
"run_id": "20260414T000000Z",
"nodes": ["Research Manager"],
"phases": ["research"],
"llm_kinds": ["deep"],
"start_at": 0.4,
"end_at": 1.0,
"elapsed_ms": 600,
"selected_analysts": ["market", "social"],
"analysis_prompt_style": "balanced",
"research_status": "degraded",
"degraded_reason": "research_manager_timeout",
"history_len": len("Bull Analyst: case\nRecommendation: HOLD"),
"response_len": len("Recommendation: HOLD"),
},
]
dump_path = Path(output["dump_path"])
assert dump_path.exists()
assert json.loads(dump_path.read_text()) == output

View File

@ -89,6 +89,88 @@ def test_bull_guard_success_records_coverage():
assert debate["covered_dimensions"] == ["bull"]
def test_manager_success_sets_confidence_without_changing_shape():
setup = _setup()
state = {
"investment_debate_state": {
"history": "Bull Analyst: case\nBear Analyst: counter",
"bull_history": "Bull Analyst: case",
"bear_history": "Bear Analyst: counter",
"current_response": "Bear Analyst: counter",
"judge_decision": "",
"count": 2,
"research_status": "full",
"research_mode": "debate",
"timed_out_nodes": [],
"degraded_reason": None,
"covered_dimensions": ["bull", "bear"],
"manager_confidence": None,
}
}
result = {
"investment_debate_state": {
"history": "Bull Analyst: case\nBear Analyst: counter",
"bull_history": "Bull Analyst: case",
"bear_history": "Bear Analyst: counter",
"current_response": "Recommendation: BUY",
"judge_decision": "Recommendation: BUY",
"count": 2,
},
"investment_plan": "Recommendation: BUY",
}
updated = setup._apply_research_success(state, result, dimension="manager")
debate = updated["investment_debate_state"]
assert updated["investment_plan"] == "Recommendation: BUY"
assert debate["judge_decision"] == "Recommendation: BUY"
assert debate["research_status"] == "full"
assert debate["research_mode"] == "debate"
assert debate["covered_dimensions"] == ["bull", "bear", "manager"]
assert debate["manager_confidence"] == 1.0
def test_bear_guard_exception_returns_degraded_argument(monkeypatch):
def broken_bear(_llm, _memory):
def node(_state):
raise ConnectionError("downstream unavailable")
return node
monkeypatch.setattr(graph_setup_module, "create_bear_researcher", broken_bear)
setup = _setup()
wrapped = setup._guard_research_node("Bear Researcher", None, None)
state = {
"investment_debate_state": {
"history": "Bull Analyst: case",
"bull_history": "Bull Analyst: case",
"bear_history": "",
"current_response": "Bull Analyst: case",
"judge_decision": "",
"count": 1,
"research_status": "full",
"research_mode": "debate",
"timed_out_nodes": [],
"degraded_reason": None,
"covered_dimensions": ["bull"],
"manager_confidence": None,
}
}
result = wrapped(state)
debate = result["investment_debate_state"]
assert debate["research_status"] == "degraded"
assert debate["research_mode"] == "degraded_synthesis"
assert debate["degraded_reason"] == "bear_researcher_connectionerror"
assert debate["timed_out_nodes"] == []
assert debate["count"] == 2
assert debate["current_response"].startswith(
"Bear Analyst: [DEGRADED] Bear Researcher unavailable (bear_researcher_connectionerror)."
)
assert debate["history"].startswith("Bull Analyst: case\nBear Analyst: [DEGRADED]")
assert debate["bear_history"].startswith("\nBear Analyst: [DEGRADED]")
def test_guard_timeout_returns_without_waiting_for_node_completion(monkeypatch):
def slow_bull(_llm, _memory):
def node(_state):