From 728ae69eab8f2191fb04ae479e298800594f8611 Mon Sep 17 00:00:00 2001 From: ahmet guzererler Date: Thu, 26 Mar 2026 12:55:24 +0100 Subject: [PATCH] =?UTF-8?q?feat:=20PM=20brain=20upgrade=20=E2=80=94=20macr?= =?UTF-8?q?o/micro=20agents=20&=20memory=20split=20(#123)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * feat: introduce flow_id with timestamp-based report versioning Replace run_id with flow_id as the primary grouping concept (one flow = one user analysis intent spanning scan + pipeline + portfolio). Reports are now written as {timestamp}_{name}.json so load methods always return the latest version by lexicographic sort, eliminating the latest.json pointer pattern for new flows. Key changes: - report_paths.py: add generate_flow_id(), ts_now() (ms precision), flow_id kwarg on all path helpers; keep run_id / pointer helpers for backward compatibility - ReportStore: dual-mode save/load — flow_id uses timestamped layout, run_id uses legacy runs/{id}/ layout with latest.json - MongoReportStore: add flow_id field and index; run_id stays for compat - DualReportStore: expose flow_id property - store_factory: accept flow_id as primary param, run_id as alias - runs.py / langgraph_engine.py: generate and thread flow_id through all trigger endpoints and run methods - Tests: add flow_id coverage for all layers; 905 tests pass Co-Authored-By: Claude Sonnet 4.6 * feat: PM brain upgrade — macro/micro summary agents, memory split, forensic dashboard Replaces the PM's raw-JSON context (~6,800 tokens on deep_think) with a MAP-REDUCE compression layer using two parallel mid_think summary agents, achieving ~70% cost reduction at the PM tier. Architecture: - MacroMemory: new regime-level memory class (MongoDB/JSON, separate from per-ticker reflexion memory) with record_macro_state/build_macro_context - ReflexionMemory: extended with collection_name param to isolate micro_reflexion from the pipeline reflexion collection (with distinct local JSON fallback path to prevent file collision) - Macro_Summary_Agent (mid_think): compresses scan_summary into a 1-page regime brief with memory injection; sentinel guard prevents LLM call on empty/error scan data ("NO DATA AVAILABLE - ABORT MACRO") - Micro_Summary_Agent (mid_think): compresses holding_reviews + candidates into a markdown table brief with per-ticker memory injection - Portfolio graph: parallel fan-out (prioritize_candidates → macro_summary ‖ micro_summary → make_pm_decision) using _last_value reducers for safe concurrent state writes (ADR-005 pattern) - PM refactor: Pydantic PMDecisionSchema enforces Forensic Execution Dashboard output (macro_regime, forensic_report, per-trade macro_alignment/memory_note/position_sizing_logic); with_structured_output as primary path, extract_json fallback for non-conforming providers - PM sentinel handling: "NO DATA AVAILABLE" in macro_brief substituted with actionable conservative guidance before LLM sees it 62 new unit tests across 4 test files covering all new components. Co-Authored-By: Claude Sonnet 4.6 * fix: address code review — relaxed error guard, ticker_analyses, PM memory wiring 1. macro_summary_agent: relaxed error guard to only abort when scan_summary's sole key is "error" (partial failures with real data are now processed) 2. micro_summary_agent: now reads ticker_analyses from state and enriches the per-ticker table with trading graph analysis data 3. portfolio_graph: wires macro_memory and micro_memory to PM factory call 4. test_empty_state: updated test for new partial-failure behavior Co-Authored-By: Claude Opus 4.6 --------- Co-authored-by: Claude Sonnet 4.6 --- tests/unit/test_empty_state.py | 120 +++++++ tests/unit/test_macro_memory.py | 188 ++++++++++ tests/unit/test_pydantic_schema.py | 272 ++++++++++++++ tests/unit/test_summary_agents.py | 339 ++++++++++++++++++ tradingagents/agents/portfolio/__init__.py | 4 + .../agents/portfolio/macro_summary_agent.py | 200 +++++++++++ .../agents/portfolio/micro_summary_agent.py | 185 ++++++++++ .../agents/portfolio/pm_decision_agent.py | 205 ++++++++--- tradingagents/graph/portfolio_graph.py | 32 +- tradingagents/graph/portfolio_setup.py | 48 ++- tradingagents/memory/__init__.py | 5 + tradingagents/memory/macro_memory.py | 281 +++++++++++++++ tradingagents/memory/reflexion.py | 7 +- tradingagents/portfolio/dual_report_store.py | 5 + tradingagents/portfolio/mongo_report_store.py | 5 + tradingagents/portfolio/portfolio_states.py | 9 + 16 files changed, 1834 insertions(+), 71 deletions(-) create mode 100644 tests/unit/test_empty_state.py create mode 100644 tests/unit/test_macro_memory.py create mode 100644 tests/unit/test_pydantic_schema.py create mode 100644 tests/unit/test_summary_agents.py create mode 100644 tradingagents/agents/portfolio/macro_summary_agent.py create mode 100644 tradingagents/agents/portfolio/micro_summary_agent.py create mode 100644 tradingagents/memory/macro_memory.py diff --git a/tests/unit/test_empty_state.py b/tests/unit/test_empty_state.py new file mode 100644 index 00000000..9ac450ca --- /dev/null +++ b/tests/unit/test_empty_state.py @@ -0,0 +1,120 @@ +"""Tests for empty/error state handling across agents. + +Validates that agents handle missing/empty/error data gracefully without +hallucinating — particularly the NO-DATA guard in MacroSummaryAgent that +must short-circuit before invoking the LLM. +""" + +from __future__ import annotations + +from unittest.mock import MagicMock + +import pytest + +from tradingagents.agents.portfolio.macro_summary_agent import ( + create_macro_summary_agent, +) + + +class TestEmptyStateGuards: + """Validate that agents handle missing/empty data gracefully without hallucinating.""" + + def test_macro_agent_empty_dict(self): + """Empty scan_summary dict triggers NO DATA sentinel; LLM not invoked.""" + mock_llm = MagicMock() + agent = create_macro_summary_agent(mock_llm) + result = agent({"scan_summary": {}, "messages": [], "analysis_date": ""}) + assert result["macro_brief"] == "NO DATA AVAILABLE - ABORT MACRO" + # LLM must NOT be invoked + mock_llm.invoke.assert_not_called() + mock_llm.with_structured_output.assert_not_called() + + def test_macro_agent_none_scan(self): + """None scan_summary triggers NO DATA sentinel.""" + mock_llm = MagicMock() + agent = create_macro_summary_agent(mock_llm) + result = agent({"scan_summary": None, "messages": [], "analysis_date": ""}) + assert result["macro_brief"] == "NO DATA AVAILABLE - ABORT MACRO" + + def test_macro_agent_error_key(self): + """scan_summary with 'error' key triggers NO DATA sentinel.""" + mock_llm = MagicMock() + agent = create_macro_summary_agent(mock_llm) + result = agent({ + "scan_summary": {"error": "rate limit exceeded"}, + "messages": [], + "analysis_date": "", + }) + assert result["macro_brief"] == "NO DATA AVAILABLE - ABORT MACRO" + + def test_macro_agent_missing_scan_key(self): + """State dict with no scan_summary key at all triggers NO DATA sentinel. + + state.get('scan_summary') returns None → should trigger guard. + """ + mock_llm = MagicMock() + agent = create_macro_summary_agent(mock_llm) + result = agent({"messages": [], "analysis_date": ""}) + assert result["macro_brief"] == "NO DATA AVAILABLE - ABORT MACRO" + + def test_macro_agent_no_data_path_does_not_invoke_llm(self): + """All NO-DATA guard paths must leave the LLM untouched.""" + no_data_states = [ + {"scan_summary": {}, "messages": [], "analysis_date": ""}, + {"scan_summary": None, "messages": [], "analysis_date": ""}, + {"scan_summary": {"error": "timeout"}, "messages": [], "analysis_date": ""}, + {"messages": [], "analysis_date": ""}, + ] + for state in no_data_states: + mock_llm = MagicMock() + agent = create_macro_summary_agent(mock_llm) + agent(state) + mock_llm.invoke.assert_not_called() + mock_llm.__ror__.assert_not_called() + + def test_macro_agent_no_data_returns_correct_sender(self): + """Sender is always 'macro_summary_agent' even on the NO-DATA path.""" + mock_llm = MagicMock() + agent = create_macro_summary_agent(mock_llm) + result = agent({"scan_summary": {}, "messages": [], "analysis_date": ""}) + assert result["sender"] == "macro_summary_agent" + + def test_macro_agent_no_data_macro_memory_context_empty_string(self): + """macro_memory_context is an empty string on the NO-DATA path.""" + mock_llm = MagicMock() + agent = create_macro_summary_agent(mock_llm) + result = agent({"scan_summary": {}, "messages": [], "analysis_date": ""}) + assert result["macro_memory_context"] == "" + + def test_macro_agent_error_only_key_triggers_sentinel(self): + """scan_summary that ONLY contains 'error' (no other keys) triggers guard.""" + mock_llm = MagicMock() + agent = create_macro_summary_agent(mock_llm) + result = agent({ + "scan_summary": {"error": "vendor offline"}, + "messages": [], + "analysis_date": "2026-03-26", + }) + assert result["macro_brief"] == "NO DATA AVAILABLE - ABORT MACRO" + + def test_macro_agent_scan_with_data_and_error_key_proceeds(self): + """scan_summary with real data AND an 'error' key is NOT discarded. + + Only scan_summary whose *only* key is 'error' triggers the guard. + Partial failures with usable data should still be compressed. + """ + from langchain_core.messages import AIMessage + from langchain_core.runnables import RunnableLambda + + mock_llm = RunnableLambda(lambda _: AIMessage(content="MACRO REGIME: neutral\nPartial data processed")) + agent = create_macro_summary_agent(mock_llm) + result = agent({ + "scan_summary": { + "executive_summary": "Partial data", + "error": "partial failure", + }, + "messages": [], + "analysis_date": "2026-03-26", + }) + # Should NOT be sentinel — the LLM was invoked + assert result["macro_brief"] != "NO DATA AVAILABLE - ABORT MACRO" diff --git a/tests/unit/test_macro_memory.py b/tests/unit/test_macro_memory.py new file mode 100644 index 00000000..11a014b7 --- /dev/null +++ b/tests/unit/test_macro_memory.py @@ -0,0 +1,188 @@ +"""Tests for MacroMemory — regime-level learning memory. + +Covers: +- record_macro_state + get_recent round-trip (local JSON fallback) +- build_macro_context formatting +- record_outcome feedback loop +- Ordering guarantees (newest-first) +- Persistence across instances +""" + +from __future__ import annotations + +import json + +import pytest + +from tradingagents.memory.macro_memory import MacroMemory + + +# --------------------------------------------------------------------------- +# Fixtures +# --------------------------------------------------------------------------- + + +@pytest.fixture +def mem(tmp_path): + """Return a MacroMemory using local JSON fallback in a temp directory.""" + return MacroMemory(fallback_path=tmp_path / "macro.json") + + +# --------------------------------------------------------------------------- +# record_macro_state + get_recent +# --------------------------------------------------------------------------- + + +class TestMacroMemoryLocalFallback: + """Tests using local JSON fallback (no MongoDB needed).""" + + def test_record_and_retrieve(self, tmp_path): + """record_macro_state() stores and get_recent() retrieves.""" + m = MacroMemory(fallback_path=tmp_path / "macro.json") + m.record_macro_state( + date="2026-03-26", + vix_level=25.3, + macro_call="risk-off", + sector_thesis="Energy under pressure", + key_themes=["rate hikes", "oil volatility"], + ) + records = m.get_recent(limit=5) + assert len(records) == 1 + assert records[0]["macro_call"] == "risk-off" + assert records[0]["vix_level"] == 25.3 + + def test_build_macro_context_no_history(self, tmp_path): + """build_macro_context() returns a message when no history.""" + m = MacroMemory(fallback_path=tmp_path / "macro.json") + ctx = m.build_macro_context() + assert isinstance(ctx, str) + assert len(ctx) > 0 + + def test_build_macro_context_with_history(self, tmp_path): + """build_macro_context() includes date, macro_call, vix.""" + m = MacroMemory(fallback_path=tmp_path / "macro.json") + m.record_macro_state("2026-03-20", 28.0, "risk-off", "hawkish Fed", ["rates"]) + ctx = m.build_macro_context(limit=1) + assert "2026-03-20" in ctx + assert "risk-off" in ctx or "28" in ctx # either VIX or call shows up + + def test_record_outcome(self, tmp_path): + """record_outcome() attaches an outcome dict to the matching record.""" + m = MacroMemory(fallback_path=tmp_path / "macro.json") + m.record_macro_state("2026-03-20", 25.0, "neutral", "mixed signals", []) + ok = m.record_outcome("2026-03-20", {"correct": True, "note": "regime held"}) + assert ok is True + records = m.get_recent() + assert records[0]["outcome"] is not None + + def test_get_recent_newest_first(self, tmp_path): + """get_recent() returns records sorted newest-first.""" + m = MacroMemory(fallback_path=tmp_path / "macro.json") + m.record_macro_state("2026-03-01", 20.0, "risk-on", "", []) + m.record_macro_state("2026-03-26", 25.0, "risk-off", "", []) + records = m.get_recent(limit=2) + assert records[0]["regime_date"] == "2026-03-26" + assert records[1]["regime_date"] == "2026-03-01" + + +# --------------------------------------------------------------------------- +# Additional coverage +# --------------------------------------------------------------------------- + + +def test_macro_call_normalized_to_lowercase(mem): + """macro_call is stored in lowercase regardless of input casing.""" + mem.record_macro_state("2026-03-26", 20.0, "Risk-Off", "mixed", []) + records = mem.get_recent() + assert records[0]["macro_call"] == "risk-off" + + +def test_vix_stored_as_float(mem): + """vix_level is always stored as a float.""" + mem.record_macro_state("2026-03-26", 22, "neutral", "flat market", []) + records = mem.get_recent() + assert isinstance(records[0]["vix_level"], float) + + +def test_key_themes_stored_as_list(mem): + """key_themes is persisted as a list.""" + themes = ["inflation", "rate hikes"] + mem.record_macro_state("2026-03-26", 20.0, "risk-off", "Fed hawkish", themes) + records = mem.get_recent() + assert records[0]["key_themes"] == themes + + +def test_get_recent_limit_respected(mem): + """get_recent() returns at most *limit* records.""" + for i in range(5): + mem.record_macro_state(f"2026-03-{i + 1:02d}", float(i), "neutral", "", []) + records = mem.get_recent(limit=3) + assert len(records) == 3 + + +def test_record_outcome_returns_false_for_unknown_date(mem): + """record_outcome() returns False when no matching date exists.""" + result = mem.record_outcome("9999-01-01", {"correct": True}) + assert result is False + + +def test_record_outcome_only_fills_null_outcome(mem): + """record_outcome() will not overwrite a record that already has an outcome.""" + mem.record_macro_state("2026-03-26", 25.0, "risk-off", "test", []) + mem.record_outcome("2026-03-26", {"correct": True}) + + # Second call should return False — outcome already set + result = mem.record_outcome("2026-03-26", {"correct": False}) + assert result is False + + records = mem.get_recent() + assert records[0]["outcome"]["correct"] is True + + +def test_build_macro_context_no_prior_history_message(mem): + """build_macro_context() returns informative text when no records exist.""" + ctx = mem.build_macro_context() + assert "No prior" in ctx + + +def test_build_macro_context_shows_outcome_pending(mem): + """build_macro_context() shows 'pending' for records with no outcome.""" + mem.record_macro_state("2026-03-26", 25.0, "risk-off", "test", []) + ctx = mem.build_macro_context() + assert "pending" in ctx + + +def test_build_macro_context_shows_outcome_confirmed(mem): + """build_macro_context() shows outcome notes when outcome is set.""" + mem.record_macro_state("2026-03-26", 25.0, "risk-off", "test", []) + mem.record_outcome( + "2026-03-26", + {"regime_confirmed": True, "notes": "Bear market held"}, + ) + ctx = mem.build_macro_context() + assert "Bear market held" in ctx + + +def test_persistence_across_instances(tmp_path): + """Records written by one MacroMemory instance are visible to another.""" + fb = tmp_path / "macro.json" + + m1 = MacroMemory(fallback_path=fb) + m1.record_macro_state("2026-03-26", 25.0, "risk-off", "thesis", ["theme"]) + + m2 = MacroMemory(fallback_path=fb) + records = m2.get_recent() + assert len(records) == 1 + + +def test_local_file_created_on_first_write(tmp_path): + """The fallback JSON file is created automatically on first write.""" + fb = tmp_path / "subdir" / "macro.json" + assert not fb.exists() + + m = MacroMemory(fallback_path=fb) + m.record_macro_state("2026-03-26", 20.0, "neutral", "test", []) + + assert fb.exists() + data = json.loads(fb.read_text()) + assert len(data) == 1 diff --git a/tests/unit/test_pydantic_schema.py b/tests/unit/test_pydantic_schema.py new file mode 100644 index 00000000..e06ad7d8 --- /dev/null +++ b/tests/unit/test_pydantic_schema.py @@ -0,0 +1,272 @@ +"""Tests for PMDecisionSchema Pydantic structured output model. + +Covers: +- Valid payload parses correctly +- Invalid enum values raise ValidationError +- Required fields enforce presence +- JSON round-trip fidelity +- Type coercion behaviour (SellOrder.macro_driven bool coercion) +""" + +from __future__ import annotations + +import json + +import pytest +from pydantic import ValidationError + +from tradingagents.agents.portfolio.pm_decision_agent import ( + BuyOrder, + ForensicReport, + HoldOrder, + PMDecisionSchema, + SellOrder, +) + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def _valid_payload() -> dict: + """Return a fully valid PMDecisionSchema payload.""" + return { + "macro_regime": "risk-off", + "regime_alignment_note": "Elevated VIX supports defensive posture", + "sells": [ + { + "ticker": "AAPL", + "shares": 10.0, + "rationale": "Overvalued", + "macro_driven": True, + } + ], + "buys": [ + { + "ticker": "XOM", + "shares": 5.0, + "price_target": 120.0, + "stop_loss": 108.0, + "take_profit": 138.0, + "sector": "Energy", + "rationale": "Energy tailwind", + "thesis": "Oil cycle upswing", + "macro_alignment": "Fits risk-off energy play", + "memory_note": "XOM held well in last risk-off", + "position_sizing_logic": "2% position; below 15% cap", + } + ], + "holds": [{"ticker": "MSFT", "rationale": "Thesis intact"}], + "cash_reserve_pct": 0.1, + "portfolio_thesis": "Defensive tilt with energy exposure", + "risk_summary": "Moderate risk; elevated VIX", + "forensic_report": { + "regime_alignment": "risk-off favours energy and cash", + "key_risks": ["oil demand drop", "rate surprise"], + "decision_confidence": "high", + "position_sizing_rationale": "All positions within 15% cap", + }, + } + + +# --------------------------------------------------------------------------- +# TestPMDecisionSchema — valid payloads +# --------------------------------------------------------------------------- + + +class TestPMDecisionSchema: + def test_valid_payload_parses(self): + """A valid payload produces a PMDecisionSchema instance.""" + obj = PMDecisionSchema(**_valid_payload()) + assert obj.macro_regime == "risk-off" + assert len(obj.buys) == 1 + assert obj.forensic_report.decision_confidence == "high" + + def test_macro_regime_all_valid_values(self): + """Each of the four valid macro_regime values parses correctly.""" + for regime in ("risk-on", "risk-off", "neutral", "transition"): + payload = _valid_payload() + payload["macro_regime"] = regime + obj = PMDecisionSchema(**payload) + assert obj.macro_regime == regime + + def test_invalid_macro_regime_raises(self): + """Invalid macro_regime value raises ValidationError.""" + payload = _valid_payload() + payload["macro_regime"] = "unknown" + with pytest.raises(ValidationError): + PMDecisionSchema(**payload) + + def test_empty_string_macro_regime_raises(self): + """Empty string macro_regime raises ValidationError.""" + payload = _valid_payload() + payload["macro_regime"] = "" + with pytest.raises(ValidationError): + PMDecisionSchema(**payload) + + def test_invalid_decision_confidence_raises(self): + """Invalid decision_confidence in forensic_report raises ValidationError.""" + payload = _valid_payload() + payload["forensic_report"]["decision_confidence"] = "very high" + with pytest.raises(ValidationError): + PMDecisionSchema(**payload) + + def test_decision_confidence_all_valid_values(self): + """Each of the three valid decision_confidence values parses correctly.""" + for level in ("high", "medium", "low"): + payload = _valid_payload() + payload["forensic_report"]["decision_confidence"] = level + obj = PMDecisionSchema(**payload) + assert obj.forensic_report.decision_confidence == level + + def test_missing_forensic_report_raises(self): + """Missing forensic_report field raises ValidationError.""" + payload = _valid_payload() + del payload["forensic_report"] + with pytest.raises(ValidationError): + PMDecisionSchema(**payload) + + def test_missing_macro_regime_raises(self): + """Missing macro_regime field raises ValidationError.""" + payload = _valid_payload() + del payload["macro_regime"] + with pytest.raises(ValidationError): + PMDecisionSchema(**payload) + + def test_model_dump_json_roundtrip(self): + """model_dump_json() produces valid JSON that round-trips back.""" + obj = PMDecisionSchema(**_valid_payload()) + json_str = obj.model_dump_json() + data = json.loads(json_str) + assert data["macro_regime"] == "risk-off" + assert data["forensic_report"]["decision_confidence"] == "high" + + def test_sell_order_macro_driven_bool(self): + """SellOrder.macro_driven must be a boolean (Pydantic v2 coerces str).""" + payload = _valid_payload() + payload["sells"][0]["macro_driven"] = "yes" # string — Pydantic v2 coerces + obj = PMDecisionSchema(**payload) + assert isinstance(obj.sells[0].macro_driven, bool) + + def test_empty_sells_buys_holds_allowed(self): + """Sells, buys, and holds can all be empty lists.""" + payload = _valid_payload() + payload["sells"] = [] + payload["buys"] = [] + payload["holds"] = [] + obj = PMDecisionSchema(**payload) + assert obj.sells == [] + assert obj.buys == [] + assert obj.holds == [] + + def test_multiple_buys_parsed(self): + """Multiple BuyOrder entries in buys list all parse correctly.""" + payload = _valid_payload() + extra_buy = { + "ticker": "CVX", + "shares": 3.0, + "price_target": 160.0, + "stop_loss": 144.0, + "take_profit": 184.0, + "sector": "Energy", + "rationale": "CVX undervalued", + "thesis": "Same oil cycle thesis", + "macro_alignment": "Energy fits risk-off", + "memory_note": "CVX volatile in past cycles", + "position_sizing_logic": "1.5% position", + } + payload["buys"].append(extra_buy) + obj = PMDecisionSchema(**payload) + assert len(obj.buys) == 2 + assert obj.buys[1].ticker == "CVX" + + def test_cash_reserve_pct_stored_as_float(self): + """cash_reserve_pct is preserved as a float.""" + payload = _valid_payload() + payload["cash_reserve_pct"] = 0.15 + obj = PMDecisionSchema(**payload) + assert obj.cash_reserve_pct == 0.15 + + +# --------------------------------------------------------------------------- +# TestForensicReport +# --------------------------------------------------------------------------- + + +class TestForensicReport: + def test_valid_forensic_report(self): + """ForensicReport validates correctly with all required fields.""" + report = ForensicReport( + regime_alignment="risk-off supports cash", + key_risks=["rate spike", "credit crunch"], + decision_confidence="medium", + position_sizing_rationale="All within 10% cap", + ) + assert report.decision_confidence == "medium" + assert len(report.key_risks) == 2 + + def test_key_risks_can_be_empty(self): + """key_risks list can be empty.""" + report = ForensicReport( + regime_alignment="aligned", + key_risks=[], + decision_confidence="low", + position_sizing_rationale="cautious", + ) + assert report.key_risks == [] + + +# --------------------------------------------------------------------------- +# TestBuyOrder +# --------------------------------------------------------------------------- + + +class TestBuyOrder: + def test_valid_buy_order(self): + """BuyOrder validates with all required fields.""" + order = BuyOrder( + ticker="NVDA", + shares=2.0, + price_target=900.0, + stop_loss=810.0, + take_profit=1080.0, + sector="Technology", + rationale="AI demand surge", + thesis="GPU dominance continues", + macro_alignment="Neutral regime allows tech exposure", + memory_note="NVDA strong in prior neutral regimes", + position_sizing_logic="1% position", + ) + assert order.ticker == "NVDA" + assert order.price_target == 900.0 + + +# --------------------------------------------------------------------------- +# TestSellOrder +# --------------------------------------------------------------------------- + + +class TestSellOrder: + def test_valid_sell_order(self): + """SellOrder validates with all required fields.""" + order = SellOrder( + ticker="TSLA", + shares=5.0, + rationale="Overextended rally", + macro_driven=False, + ) + assert order.ticker == "TSLA" + assert order.macro_driven is False + + +# --------------------------------------------------------------------------- +# TestHoldOrder +# --------------------------------------------------------------------------- + + +class TestHoldOrder: + def test_valid_hold_order(self): + """HoldOrder validates with required fields.""" + order = HoldOrder(ticker="AMZN", rationale="Cloud thesis intact") + assert order.ticker == "AMZN" diff --git a/tests/unit/test_summary_agents.py b/tests/unit/test_summary_agents.py new file mode 100644 index 00000000..b8a4ca27 --- /dev/null +++ b/tests/unit/test_summary_agents.py @@ -0,0 +1,339 @@ +"""Tests for Macro_Summary_Agent and Micro_Summary_Agent. + +Strategy: +- Empty/error state paths skip the LLM entirely — test those directly. +- LLM-invoked paths require the mock to be a proper LangChain Runnable so that + ``prompt | llm`` creates a working RunnableSequence. LangChain's pipe operator + calls through its own Runnable machinery — a plain MagicMock is NOT invoked via + Python's raw ``__call__``. We use ``RunnableLambda`` to wrap a lambda that + returns a fixed AIMessage, making it fully compatible with the chain. +""" + +from __future__ import annotations + +from unittest.mock import MagicMock + +import pytest +from langchain_core.messages import AIMessage +from langchain_core.runnables import RunnableLambda + +from tradingagents.agents.portfolio.macro_summary_agent import ( + create_macro_summary_agent, +) +from tradingagents.agents.portfolio.micro_summary_agent import ( + create_micro_summary_agent, +) + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def _make_runnable_llm(content: str = "MACRO REGIME: risk-off\nKEY NUMBERS: VIX=25"): + """Build a LangChain-compatible LLM stub via RunnableLambda. + + ``ChatPromptTemplate | llm`` creates a ``RunnableSequence``. LangChain + dispatches through its own Runnable protocol — the LLM must implement + ``.invoke()`` as a Runnable, not just as a Python callable. + ``RunnableLambda`` satisfies that contract. + + Returns: + A ``RunnableLambda`` that always returns ``AIMessage(content=content)``. + """ + ai_msg = AIMessage(content=content) + return RunnableLambda(lambda _: ai_msg) + + +# Keep backward-compatible alias used by some tests that destructure a tuple +def _make_chain_mock(content: str = "MACRO REGIME: risk-off\nKEY NUMBERS: VIX=25"): + """Return (llm_runnable, None) — second element kept for API compatibility.""" + return _make_runnable_llm(content), None + + +# --------------------------------------------------------------------------- +# MacroSummaryAgent — NO-DATA guard paths (LLM never called) +# --------------------------------------------------------------------------- + + +class TestMacroSummaryAgentNoDataGuard: + """Verify the abort-early guard fires and LLM is not invoked.""" + + def test_empty_scan_summary_returns_sentinel(self): + """Empty scan_summary dict triggers NO DATA sentinel without LLM call.""" + mock_llm = MagicMock() + agent = create_macro_summary_agent(mock_llm) + state = {"scan_summary": {}, "messages": [], "analysis_date": "2026-03-26"} + result = agent(state) + assert result["macro_brief"] == "NO DATA AVAILABLE - ABORT MACRO" + mock_llm.invoke.assert_not_called() + + def test_none_scan_summary_returns_sentinel(self): + """None scan_summary triggers NO DATA sentinel.""" + mock_llm = MagicMock() + agent = create_macro_summary_agent(mock_llm) + state = {"scan_summary": None, "messages": [], "analysis_date": "2026-03-26"} + result = agent(state) + assert result["macro_brief"] == "NO DATA AVAILABLE - ABORT MACRO" + + def test_error_key_in_scan_returns_sentinel(self): + """scan_summary with 'error' key triggers NO DATA sentinel.""" + mock_llm = MagicMock() + agent = create_macro_summary_agent(mock_llm) + state = { + "scan_summary": {"error": "vendor timeout"}, + "messages": [], + "analysis_date": "2026-03-26", + } + result = agent(state) + assert result["macro_brief"] == "NO DATA AVAILABLE - ABORT MACRO" + + def test_missing_scan_key_returns_sentinel(self): + """State dict with no scan_summary key at all triggers NO DATA sentinel.""" + mock_llm = MagicMock() + agent = create_macro_summary_agent(mock_llm) + result = agent({"messages": [], "analysis_date": "2026-03-26"}) + assert result["macro_brief"] == "NO DATA AVAILABLE - ABORT MACRO" + + +# --------------------------------------------------------------------------- +# MacroSummaryAgent — required state keys returned +# --------------------------------------------------------------------------- + + +class TestMacroSummaryAgentReturnShape: + """Verify that every execution path returns the expected state keys.""" + + def test_no_data_path_returns_required_keys(self): + """NO-DATA guard path returns all required state keys.""" + agent = create_macro_summary_agent(MagicMock()) + result = agent({"scan_summary": {}, "messages": [], "analysis_date": ""}) + assert "macro_brief" in result + assert "macro_memory_context" in result + assert "sender" in result + assert result["sender"] == "macro_summary_agent" + + def test_no_data_path_messages_is_list(self): + """NO-DATA guard path returns messages as a list.""" + agent = create_macro_summary_agent(MagicMock()) + result = agent({"scan_summary": {}, "messages": [], "analysis_date": ""}) + assert isinstance(result["messages"], list) + + def test_llm_path_returns_required_keys(self): + """LLM-invoked path returns all required state keys.""" + llm_mock, _ = _make_chain_mock("MACRO REGIME: neutral\nKEY NUMBERS: VIX=18") + agent = create_macro_summary_agent(llm_mock) + state = { + "scan_summary": {"executive_summary": "Flat markets"}, + "messages": [], + "analysis_date": "2026-03-26", + } + result = agent(state) + assert "macro_brief" in result + assert "macro_memory_context" in result + assert "sender" in result + assert result["sender"] == "macro_summary_agent" + + def test_llm_path_macro_brief_contains_llm_output(self): + """macro_brief contains the LLM's returned content.""" + content = "MACRO REGIME: risk-on\nKEY NUMBERS: VIX=12" + llm_mock, _ = _make_chain_mock(content) + agent = create_macro_summary_agent(llm_mock) + state = { + "scan_summary": {"executive_summary": "Bull run"}, + "messages": [], + "analysis_date": "2026-03-26", + } + result = agent(state) + assert result["macro_brief"] == content + + +# --------------------------------------------------------------------------- +# MacroSummaryAgent — macro_memory integration +# --------------------------------------------------------------------------- + + +class TestMacroSummaryAgentMemory: + """Verify macro_memory interaction without hitting MongoDB.""" + + def test_no_memory_context_is_empty_string_on_no_data_path(self): + """NO-DATA path returns empty string for macro_memory_context.""" + agent = create_macro_summary_agent(MagicMock()) + result = agent({"scan_summary": {}, "messages": [], "analysis_date": ""}) + assert result["macro_memory_context"] == "" + + def test_memory_context_injected_into_result(self, tmp_path): + """When macro_memory is provided, macro_memory_context is populated.""" + from tradingagents.memory.macro_memory import MacroMemory + + mem = MacroMemory(fallback_path=tmp_path / "macro.json") + mem.record_macro_state("2026-03-20", 25.0, "risk-off", "hawkish", ["rates"]) + + llm_mock, _ = _make_chain_mock("MACRO REGIME: risk-off\nKEY NUMBERS: VIX=25") + agent = create_macro_summary_agent(llm_mock, macro_memory=mem) + state = { + "scan_summary": {"executive_summary": "Risk-off conditions persist"}, + "messages": [], + "analysis_date": "2026-03-26", + } + result = agent(state) + # Past context built from the single recorded state should reference date + assert "2026-03-20" in result["macro_memory_context"] + + +# --------------------------------------------------------------------------- +# MicroSummaryAgent — return shape +# --------------------------------------------------------------------------- + + +class TestMicroSummaryAgentReturnShape: + """Verify the micro summary agent returns all required state keys.""" + + def test_result_has_required_keys(self): + """Agent returns all required state keys.""" + llm_mock, _ = _make_chain_mock("HOLDINGS TABLE:\n| TICKER | ACTION |") + agent = create_micro_summary_agent(llm_mock) + state = { + "holding_reviews": "{}", + "prioritized_candidates": "[]", + "ticker_analyses": {}, + "messages": [], + "analysis_date": "2026-03-26", + } + result = agent(state) + assert "micro_brief" in result + assert "micro_memory_context" in result + assert "sender" in result + assert result["sender"] == "micro_summary_agent" + + def test_micro_brief_contains_llm_output(self): + """micro_brief contains the LLM's returned content.""" + content = "HOLDINGS TABLE:\n| AAPL | HOLD | 180 | green | good |" + llm_mock, _ = _make_chain_mock(content) + agent = create_micro_summary_agent(llm_mock) + state = { + "holding_reviews": '{"AAPL": {"recommendation": "HOLD", "confidence": "high"}}', + "prioritized_candidates": "[]", + "ticker_analyses": {}, + "messages": [], + "analysis_date": "2026-03-26", + } + result = agent(state) + assert result["micro_brief"] == content + + def test_sender_always_set(self): + """sender key is always 'micro_summary_agent'.""" + llm_mock, _ = _make_chain_mock("brief output") + agent = create_micro_summary_agent(llm_mock) + state = { + "holding_reviews": "{}", + "prioritized_candidates": "[]", + "ticker_analyses": {}, + "messages": [], + "analysis_date": "", + } + result = agent(state) + assert result["sender"] == "micro_summary_agent" + + +# --------------------------------------------------------------------------- +# MicroSummaryAgent — malformed input handling +# --------------------------------------------------------------------------- + + +class TestMicroSummaryAgentMalformedInput: + """Verify that malformed JSON in state fields does not raise exceptions.""" + + def test_invalid_holding_reviews_json_handled_gracefully(self): + """Malformed JSON in holding_reviews does not raise.""" + llm_mock, _ = _make_chain_mock("brief") + agent = create_micro_summary_agent(llm_mock) + state = { + "holding_reviews": "not valid json{{", + "prioritized_candidates": "[]", + "ticker_analyses": {}, + "messages": [], + "analysis_date": "2026-03-26", + } + result = agent(state) + assert "micro_brief" in result + + def test_invalid_candidates_json_handled_gracefully(self): + """Malformed JSON in prioritized_candidates does not raise.""" + llm_mock, _ = _make_chain_mock("brief") + agent = create_micro_summary_agent(llm_mock) + state = { + "holding_reviews": "{}", + "prioritized_candidates": "also broken", + "ticker_analyses": {}, + "messages": [], + "analysis_date": "2026-03-26", + } + result = agent(state) + assert "micro_brief" in result + + def test_both_inputs_malformed_does_not_raise(self): + """Both holding_reviews and prioritized_candidates malformed — no raise.""" + llm_mock, _ = _make_chain_mock("brief") + agent = create_micro_summary_agent(llm_mock) + state = { + "holding_reviews": "not valid json{{", + "prioritized_candidates": "also broken", + "ticker_analyses": {}, + "messages": [], + "analysis_date": "2026-03-26", + } + result = agent(state) + assert "micro_brief" in result + + def test_none_holding_reviews_handled(self): + """None holding_reviews falls back gracefully.""" + llm_mock, _ = _make_chain_mock("brief") + agent = create_micro_summary_agent(llm_mock) + state = { + "holding_reviews": None, + "prioritized_candidates": None, + "ticker_analyses": {}, + "messages": [], + "analysis_date": "2026-03-26", + } + result = agent(state) + assert "micro_brief" in result + + def test_missing_state_keys_handled(self): + """Missing optional keys in state do not cause a KeyError.""" + llm_mock, _ = _make_chain_mock("brief") + agent = create_micro_summary_agent(llm_mock) + # Minimal state — only messages is truly required by the chain call + state = {"messages": [], "analysis_date": "2026-03-26"} + result = agent(state) + assert "micro_brief" in result + + +# --------------------------------------------------------------------------- +# MicroSummaryAgent — memory integration +# --------------------------------------------------------------------------- + + +class TestMicroSummaryAgentMemory: + """Verify micro_memory interaction.""" + + def test_micro_memory_context_includes_ticker_history(self, tmp_path): + """When micro_memory is provided with history, context string includes it.""" + from tradingagents.memory.reflexion import ReflexionMemory + + mem = ReflexionMemory(fallback_path=tmp_path / "reflexion.json") + mem.record_decision("AAPL", "2026-03-20", "BUY", "Strong momentum", "high") + + llm_mock, _ = _make_chain_mock("brief") + agent = create_micro_summary_agent(llm_mock, micro_memory=mem) + state = { + "holding_reviews": '{"AAPL": {"recommendation": "HOLD", "confidence": "high"}}', + "prioritized_candidates": "[]", + "ticker_analyses": {}, + "messages": [], + "analysis_date": "2026-03-26", + } + result = agent(state) + # micro_memory_context is JSON-serialised dict — AAPL should appear + assert "AAPL" in result["micro_memory_context"] diff --git a/tradingagents/agents/portfolio/__init__.py b/tradingagents/agents/portfolio/__init__.py index 9d238253..5acabf80 100644 --- a/tradingagents/agents/portfolio/__init__.py +++ b/tradingagents/agents/portfolio/__init__.py @@ -4,8 +4,12 @@ from __future__ import annotations from tradingagents.agents.portfolio.holding_reviewer import create_holding_reviewer from tradingagents.agents.portfolio.pm_decision_agent import create_pm_decision_agent +from tradingagents.agents.portfolio.macro_summary_agent import create_macro_summary_agent +from tradingagents.agents.portfolio.micro_summary_agent import create_micro_summary_agent __all__ = [ "create_holding_reviewer", "create_pm_decision_agent", + "create_macro_summary_agent", + "create_micro_summary_agent", ] diff --git a/tradingagents/agents/portfolio/macro_summary_agent.py b/tradingagents/agents/portfolio/macro_summary_agent.py new file mode 100644 index 00000000..af942da3 --- /dev/null +++ b/tradingagents/agents/portfolio/macro_summary_agent.py @@ -0,0 +1,200 @@ +"""Macro Summary Agent. + +Pure-reasoning LLM node (no tools). Reads the macro scan output and compresses +it into a concise 1-page regime brief, injecting past macro regime memory. + +Pattern: ``create_macro_summary_agent(llm, macro_memory)`` → closure +(mirrors macro_synthesis pattern). +""" + +from __future__ import annotations + +import json +import logging +import re + +from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder + +from tradingagents.memory.macro_memory import MacroMemory + +logger = logging.getLogger(__name__) + + +def create_macro_summary_agent(llm, macro_memory: MacroMemory | None = None): + """Create a macro summary agent node. + + Args: + llm: A LangChain chat model instance (deep_think recommended). + macro_memory: Optional MacroMemory instance for regime history injection + and post-call persistence. When None, memory features are skipped. + + Returns: + A node function ``macro_summary_node(state)`` compatible with LangGraph. + """ + + def macro_summary_node(state: dict) -> dict: + scan_summary = state.get("scan_summary") or {} + + # Guard: abort early if scan data is absent or *only* contains an error + # (partial failures with real data + an "error" key are still usable) + if not scan_summary or (isinstance(scan_summary, dict) and scan_summary.keys() == {"error"}): + return { + "messages": [], + "macro_brief": "NO DATA AVAILABLE - ABORT MACRO", + "macro_memory_context": "", + "sender": "macro_summary_agent", + } + + # ------------------------------------------------------------------ + # Compress scan data to save tokens + # ------------------------------------------------------------------ + executive_summary: str = scan_summary.get("executive_summary", "Not available") + + macro_context: dict = scan_summary.get("macro_context", {}) + macro_context_str = ( + f"Economic cycle: {macro_context.get('economic_cycle', 'N/A')}\n" + f"Central bank stance: {macro_context.get('central_bank_stance', 'N/A')}\n" + f"Geopolitical risks: {macro_context.get('geopolitical_risks', 'N/A')}" + ) + + key_themes: list = scan_summary.get("key_themes", []) + key_themes_str = "\n".join( + f"- {t.get('theme', '?')} [{t.get('conviction', '?')}] " + f"({t.get('timeframe', '?')}): {t.get('description', '')}" + for t in key_themes + ) or "None" + + # Strip verbose rationale — retain only what the brief needs + ticker_conviction = [ + { + "ticker": t.get("ticker", "?"), + "conviction": t.get("conviction", "?"), + "thesis_angle": t.get("thesis_angle", "?"), + } + for t in scan_summary.get("stocks_to_investigate", []) + ] + ticker_conviction_str = json.dumps(ticker_conviction, indent=2) or "[]" + + risk_factors: list = scan_summary.get("risk_factors", []) + risk_factors_str = "\n".join(f"- {r}" for r in risk_factors) or "None" + + # ------------------------------------------------------------------ + # Past macro regime history + # ------------------------------------------------------------------ + if macro_memory is not None: + past_context = macro_memory.build_macro_context(limit=3) + else: + past_context = "No prior macro regime history available." + + # ------------------------------------------------------------------ + # Build system message + # ------------------------------------------------------------------ + system_message = ( + "You are a macro strategist compressing a scanner report into a concise regime brief.\n\n" + "## Past Macro Regime History\n" + f"{past_context}\n\n" + "## Current Scan Data\n" + "### Executive Summary\n" + f"{executive_summary}\n\n" + "### Macro Context\n" + f"{macro_context_str}\n\n" + "### Key Themes\n" + f"{key_themes_str}\n\n" + "### Candidate Tickers (conviction only)\n" + f"{ticker_conviction_str}\n\n" + "### Risk Factors\n" + f"{risk_factors_str}\n\n" + "Produce a structured macro brief in this exact format:\n\n" + "MACRO REGIME: [risk-on|risk-off|neutral|transition]\n\n" + "KEY NUMBERS: [retain ALL exact numeric values — VIX levels, %, yield values, " + "sector weightings — do not round or omit]\n\n" + "TOP 3 THEMES:\n" + "1. [theme]: [description — retain all numbers]\n" + "2. [theme]: [description — retain all numbers]\n" + "3. [theme]: [description — retain all numbers]\n\n" + "MACRO-ALIGNED TICKERS: [list tickers with high conviction and why they fit the regime]\n\n" + "REGIME MEMORY NOTE: [any relevant lesson from past macro history that applies now]\n\n" + "IMPORTANT: Do NOT restrict yourself to a word count. Retain every numeric value from the " + "scan data. If the scan data is incomplete, note it explicitly — do not guess or extrapolate." + ) + + prompt = ChatPromptTemplate.from_messages( + [ + ( + "system", + "You are a helpful AI assistant, collaborating with other assistants." + " You have access to the following tools: {tool_names}.\n{system_message}" + " For your reference, the current date is {current_date}.", + ), + MessagesPlaceholder(variable_name="messages"), + ] + ) + + prompt = prompt.partial(system_message=system_message) + prompt = prompt.partial(tool_names="none") + prompt = prompt.partial(current_date=state.get("analysis_date", "")) + + chain = prompt | llm + result = chain.invoke(state["messages"]) + + # ------------------------------------------------------------------ + # Persist macro regime call to memory + # ------------------------------------------------------------------ + if macro_memory is not None: + _persist_regime(result.content, scan_summary, macro_memory, state) + + return { + "messages": [result], + "macro_brief": result.content, + "macro_memory_context": past_context, + "sender": "macro_summary_agent", + } + + return macro_summary_node + + +def _persist_regime( + brief: str, + scan_summary: dict, + macro_memory: MacroMemory, + state: dict, +) -> None: + """Extract MACRO REGIME line and persist to MacroMemory. + + Fails silently — memory persistence must never break the pipeline. + """ + try: + macro_call = "neutral" + match = re.search(r"MACRO REGIME:\s*([^\n]+)", brief, re.IGNORECASE) + if match: + raw_call = match.group(1).strip().lower() + # Normalise to one of the four valid values + for valid in ("risk-on", "risk-off", "transition", "neutral"): + if valid in raw_call: + macro_call = valid + break + + # Best-effort VIX extraction — scan data rarely includes a bare float + vix_level = 0.0 + vix_match = re.search(r"VIX[:\s]+([0-9]+(?:\.[0-9]+)?)", brief, re.IGNORECASE) + if vix_match: + try: + vix_level = float(vix_match.group(1)) + except ValueError: + pass + + key_themes = [ + t.get("theme", "") for t in scan_summary.get("key_themes", []) if t.get("theme") + ] + sector_thesis = scan_summary.get("executive_summary", "")[:500] + analysis_date = state.get("analysis_date", "") + + macro_memory.record_macro_state( + date=analysis_date, + vix_level=vix_level, + macro_call=macro_call, + sector_thesis=sector_thesis, + key_themes=key_themes, + ) + except Exception: + logger.warning("macro_summary_agent: failed to persist regime to memory", exc_info=True) diff --git a/tradingagents/agents/portfolio/micro_summary_agent.py b/tradingagents/agents/portfolio/micro_summary_agent.py new file mode 100644 index 00000000..3aa9dd5a --- /dev/null +++ b/tradingagents/agents/portfolio/micro_summary_agent.py @@ -0,0 +1,185 @@ +"""Micro Summary Agent. + +Pure-reasoning LLM node (no tools). Compresses holding reviews and ranked +candidates into a 1-page micro brief, injecting per-ticker reflexion memory. + +Pattern: ``create_micro_summary_agent(llm, micro_memory)`` → closure +(mirrors macro_synthesis pattern). +""" + +from __future__ import annotations + +import json +import logging + +from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder + +from tradingagents.memory.reflexion import ReflexionMemory + +logger = logging.getLogger(__name__) + + +def create_micro_summary_agent(llm, micro_memory: ReflexionMemory | None = None): + """Create a micro summary agent node. + + Args: + llm: A LangChain chat model instance (mid_think or deep_think recommended). + micro_memory: Optional ReflexionMemory instance for per-ticker history + injection. When None, memory features are skipped. + + Returns: + A node function ``micro_summary_node(state)`` compatible with LangGraph. + """ + + def micro_summary_node(state: dict) -> dict: + analysis_date = state.get("analysis_date") or "" + + # ------------------------------------------------------------------ + # Parse inputs — handle missing / malformed gracefully + # ------------------------------------------------------------------ + holding_reviews_raw = state.get("holding_reviews") or "{}" + candidates_raw = state.get("prioritized_candidates") or "[]" + + holding_reviews: dict = _parse_json_safely(holding_reviews_raw, default={}) + candidates: list = _parse_json_safely(candidates_raw, default=[]) + + # Optional: per-ticker trading graph analyses (fundamentals, technicals, etc.) + ticker_analyses: dict = state.get("ticker_analyses") or {} + + # ------------------------------------------------------------------ + # Collect all tickers and retrieve per-ticker memory + # ------------------------------------------------------------------ + holding_tickers = list(holding_reviews.keys()) if isinstance(holding_reviews, dict) else [] + candidate_tickers = [ + c.get("ticker", "") for c in candidates if isinstance(c, dict) and c.get("ticker") + ] + all_tickers = list(dict.fromkeys(holding_tickers + candidate_tickers)) # preserve order, dedupe + + ticker_memory_dict: dict[str, str] = {} + if micro_memory is not None: + for ticker in all_tickers: + ticker_memory_dict[ticker] = micro_memory.build_context(ticker, limit=2) + + ticker_memory_str = json.dumps(ticker_memory_dict) + + # ------------------------------------------------------------------ + # Build concise per-ticker input table + # ------------------------------------------------------------------ + table_rows: list[str] = [] + + for ticker in holding_tickers: + review = holding_reviews.get(ticker, {}) if isinstance(holding_reviews, dict) else {} + if not isinstance(review, dict): + review = {} + rec = review.get("recommendation", "?") + confidence = review.get("confidence", "") + label = f"HOLDING | {rec} | conf:{confidence}" if confidence else f"HOLDING | {rec}" + # Enrich with trading graph analysis if available + analysis = ticker_analyses.get(ticker, {}) if isinstance(ticker_analyses, dict) else {} + key_number = analysis.get("final_trade_decision", "")[:80] if isinstance(analysis, dict) else "" + key_number = key_number or "-" + memory_snippet = (ticker_memory_dict.get(ticker, "")[:100] or "no memory") + table_rows.append(f"{ticker} | {label} | {key_number} | {memory_snippet}") + + for c in candidates: + if not isinstance(c, dict): + continue + ticker = c.get("ticker", "?") + conviction = c.get("conviction", "?") + thesis = c.get("thesis_angle", "?") + score = c.get("score", "") + key_number = f"score:{score}" if score != "" else "-" + label = f"CANDIDATE | {conviction} | {thesis}" + memory_snippet = (ticker_memory_dict.get(ticker, "")[:100] or "no memory") + table_rows.append(f"{ticker} | {label} | {key_number} | {memory_snippet}") + + ticker_table = "\n".join(table_rows) or "No tickers available." + + # Serialise full detail for LLM context + holding_reviews_str = ( + json.dumps(holding_reviews, indent=2) + if holding_reviews + else "No holding reviews available." + ) + candidates_str = ( + json.dumps(candidates, indent=2) + if candidates + else "No candidates available." + ) + + # ------------------------------------------------------------------ + # Build system message + # ------------------------------------------------------------------ + system_message = ( + "You are a micro analyst compressing position-level data into a concise brief " + "for a portfolio manager.\n\n" + "## Per-Ticker Data\n" + f"{ticker_table}\n\n" + "## Holding Reviews (full detail)\n" + f"{holding_reviews_str}\n\n" + "## Prioritized Candidates (full detail)\n" + f"{candidates_str}\n\n" + "Produce a structured micro brief in this exact format:\n\n" + "HOLDINGS TABLE:\n" + "| TICKER | ACTION | KEY NUMBER | FLAG | MEMORY |\n" + "|--------|--------|------------|------|--------|\n" + "[one row per holding — if data is missing, write \"NO DATA\" in KEY NUMBER and FLAG columns]\n\n" + "CANDIDATES TABLE:\n" + "| TICKER | CONVICTION | THESIS ANGLE | KEY NUMBER | FLAG | MEMORY |\n" + "|--------|------------|--------------|------------|------|--------|\n" + "[one row per candidate — if data is missing, write \"NO DATA\"]\n\n" + "RED FLAGS: [list any tickers with accounting anomalies, high debt, or historical losses " + "— cite exact numbers]\n" + "GREEN FLAGS: [list tickers with strong momentum, insider buying, or positive memory " + "— cite exact numbers]\n\n" + "IMPORTANT: Retain exact debt ratios, P/E multiples, EPS values, and unrealized P&L " + "percentages. Never round or omit a numeric value. If a ticker has no data, write " + "\"NO DATA\" — do not guess." + ) + + prompt = ChatPromptTemplate.from_messages( + [ + ( + "system", + "You are a helpful AI assistant, collaborating with other assistants." + " You have access to the following tools: {tool_names}.\n{system_message}" + " For your reference, the current date is {current_date}.", + ), + MessagesPlaceholder(variable_name="messages"), + ] + ) + + prompt = prompt.partial(system_message=system_message) + prompt = prompt.partial(tool_names="none") + prompt = prompt.partial(current_date=analysis_date) + + chain = prompt | llm + result = chain.invoke(state["messages"]) + + return { + "messages": [result], + "micro_brief": result.content, + "micro_memory_context": ticker_memory_str, + "sender": "micro_summary_agent", + } + + return micro_summary_node + + +def _parse_json_safely(raw: str, *, default): + """Parse a JSON string, returning *default* on any parse error. + + Args: + raw: Raw string (may be JSON or empty/malformed). + default: Value to return when parsing fails. + """ + if not raw or not raw.strip(): + return default + try: + return json.loads(raw) + except (json.JSONDecodeError, TypeError): + logger.warning( + "micro_summary_agent: could not parse JSON input (first 100): %s", + raw[:100], + ) + return default diff --git a/tradingagents/agents/portfolio/pm_decision_agent.py b/tradingagents/agents/portfolio/pm_decision_agent.py index 7feb6820..54151965 100644 --- a/tradingagents/agents/portfolio/pm_decision_agent.py +++ b/tradingagents/agents/portfolio/pm_decision_agent.py @@ -1,7 +1,7 @@ """Portfolio Manager Decision Agent. -Pure reasoning LLM agent (no tools). Synthesizes risk metrics, holding -reviews, and prioritized candidates into a structured investment decision. +Pure reasoning LLM agent (no tools). Synthesizes macro and micro briefs into a +fully auditable, structured investment decision via Pydantic-schema-driven output. Pattern: ``create_pm_decision_agent(llm)`` → closure (macro_synthesis pattern). """ @@ -10,20 +10,95 @@ from __future__ import annotations import json import logging +from typing import Literal +from langchain_core.messages import AIMessage from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder +from pydantic import BaseModel from tradingagents.agents.utils.json_utils import extract_json logger = logging.getLogger(__name__) -def create_pm_decision_agent(llm, config: dict | None = None): +# --------------------------------------------------------------------------- +# Pydantic output schema +# --------------------------------------------------------------------------- + + +class ForensicReport(BaseModel): + """Audit trail for the PM's decision confidence and risk posture.""" + + regime_alignment: str + key_risks: list[str] + decision_confidence: Literal["high", "medium", "low"] + position_sizing_rationale: str + + +class BuyOrder(BaseModel): + """A fully justified buy order with risk parameters.""" + + ticker: str + shares: float + price_target: float + stop_loss: float + take_profit: float + sector: str + rationale: str + thesis: str + macro_alignment: str + memory_note: str + position_sizing_logic: str + + +class SellOrder(BaseModel): + """A sell order with macro-driven flag.""" + + ticker: str + shares: float + rationale: str + macro_driven: bool + + +class HoldOrder(BaseModel): + """A hold decision with rationale.""" + + ticker: str + rationale: str + + +class PMDecisionSchema(BaseModel): + """Full PM decision output — structured and auditable.""" + + macro_regime: Literal["risk-on", "risk-off", "neutral", "transition"] + regime_alignment_note: str + sells: list[SellOrder] + buys: list[BuyOrder] + holds: list[HoldOrder] + cash_reserve_pct: float + portfolio_thesis: str + risk_summary: str + forensic_report: ForensicReport + + +# --------------------------------------------------------------------------- +# Factory +# --------------------------------------------------------------------------- + + +def create_pm_decision_agent( + llm, + config: dict | None = None, + macro_memory=None, + micro_memory=None, +): """Create a PM decision agent node. Args: llm: A LangChain chat model instance (deep_think recommended). config: Portfolio configuration dictionary containing constraints. + macro_memory: Reserved for future direct retrieval; briefs come via state. + micro_memory: Reserved for future direct retrieval; briefs come via state. Returns: A node function ``pm_decision_node(state)`` compatible with LangGraph. @@ -38,54 +113,56 @@ def create_pm_decision_agent(llm, config: dict | None = None): def pm_decision_node(state): analysis_date = state.get("analysis_date") or "" + + # Read brief fields written by upstream summary agents + _macro_brief_raw = state.get("macro_brief") or "" + if not _macro_brief_raw or "NO DATA AVAILABLE" in _macro_brief_raw: + # Macro scanner failed — give PM explicit guidance rather than passing sentinel + macro_brief = ( + "MACRO DATA UNAVAILABLE: No scanner output was produced. " + "Proceed with micro brief only. Adopt a conservative posture: " + "hold existing positions and avoid new buys unless micro thesis is very strong." + ) + else: + macro_brief = _macro_brief_raw + micro_brief = state.get("micro_brief") or "No micro brief available." + + # Build compressed portfolio summary — avoid passing the full blob portfolio_data_str = state.get("portfolio_data") or "{}" - risk_metrics_str = state.get("risk_metrics") or "{}" - holding_reviews_str = state.get("holding_reviews") or "{}" - prioritized_candidates_str = state.get("prioritized_candidates") or "[]" + try: + pd_raw = json.loads(portfolio_data_str) + portfolio = pd_raw.get("portfolio") or {} + holdings = pd_raw.get("holdings") or [] + compressed = { + "cash": portfolio.get("cash", 0.0), + "n_positions": len(holdings), + "total_value": portfolio.get("total_value"), + } + compressed_str = json.dumps(compressed) + except Exception: + # Fallback: truncated raw string keeps token count bounded + compressed_str = portfolio_data_str[:200] - context = f"""## Portfolio Constraints -{constraints_str} - -## Portfolio Data -{portfolio_data_str} - -## Risk Metrics -{risk_metrics_str} - -## Holding Reviews -{holding_reviews_str} - -## Prioritized Candidates -{prioritized_candidates_str} -""" + context = ( + f"## Portfolio Constraints\n{constraints_str}\n\n" + f"## Portfolio Summary\n{compressed_str}\n\n" + f"## Input A — Macro Context & Memory\n{macro_brief}\n\n" + f"## Input B — Micro Context & Memory\n{micro_brief}\n" + ) system_message = ( - "You are a portfolio manager making final investment decisions. " - "Given the constraints, risk metrics, holding reviews, and prioritized investment candidates, " - "produce a structured JSON investment decision. " + "You are a portfolio manager making final, risk-adjusted investment decisions. " + "You receive two inputs: (A) a macro regime brief with memory, and (B) a micro brief " + "with per-ticker signals and memory. Synthesize A and B into a Forensic Execution " + "Dashboard — a fully auditable decision plan where every trade is justified by both " + "macro alignment and micro thesis.\n\n" "## CONSTRAINTS COMPLIANCE:\n" - "You MUST ensure your suggested buys and position sizes adhere to the portfolio constraints. " - "If a high-conviction candidate would exceed the max position size or sector limit, " - "YOU MUST adjust the suggested 'shares' downward to fit within the limit. " - "Do not suggest buys that you know will be rejected by the risk engine.\n\n" - "Consider: reducing risk where metrics are poor, acting on SELL recommendations, " - "and adding positions in high-conviction candidates that pass constraints. " - "For every BUY you MUST set a stop_loss price (maximum acceptable loss level, " - "typically 5-15% below entry) and a take_profit price (expected sell target, " - "typically 10-30% above entry based on your thesis). " - "Output ONLY valid JSON matching this exact schema:\n" - "{\n" - ' "sells": [{"ticker": "...", "shares": 0.0, "rationale": "..."}],\n' - ' "buys": [{"ticker": "...", "shares": 0.0, "price_target": 0.0, ' - '"stop_loss": 0.0, "take_profit": 0.0, ' - '"sector": "...", "rationale": "...", "thesis": "..."}],\n' - ' "holds": [{"ticker": "...", "rationale": "..."}],\n' - ' "cash_reserve_pct": 0.10,\n' - ' "portfolio_thesis": "...",\n' - ' "risk_summary": "..."\n' - "}\n\n" - "IMPORTANT: Output ONLY valid JSON. Start your response with '{' and end with '}'. " - "Do NOT use markdown code fences. Do NOT include any explanation or preamble before or after the JSON.\n\n" + "You MUST ensure all buys adhere to the portfolio constraints. " + "If a high-conviction candidate exceeds max position size or sector limit, " + "adjust shares downward to fit. For every BUY: set stop_loss (5-15% below entry) " + "and take_profit (10-30% above entry). " + "Every buy must have macro_alignment (how it fits the regime), " + "memory_note (any relevant historical lesson), and position_sizing_logic.\n\n" f"{context}" ) @@ -100,27 +177,41 @@ def create_pm_decision_agent(llm, config: dict | None = None): MessagesPlaceholder(variable_name="messages"), ] ) - prompt = prompt.partial(system_message=system_message) prompt = prompt.partial(tool_names="none") prompt = prompt.partial(current_date=analysis_date) - chain = prompt | llm - result = chain.invoke(state["messages"]) + # Primary path: structured output via Pydantic schema + structured_llm = llm.with_structured_output(PMDecisionSchema) + chain = prompt | structured_llm - raw = result.content or "{}" try: - parsed = extract_json(raw) - decision_str = json.dumps(parsed) - except (ValueError, json.JSONDecodeError): + result = chain.invoke(state["messages"]) + decision_str = result.model_dump_json() + except Exception as exc: logger.warning( - "pm_decision_agent: could not extract JSON; storing raw (first 200): %s", - raw[:200], + "pm_decision_agent: structured output failed (%s); falling back to raw", exc ) - decision_str = raw + # Fallback: plain LLM + extract_json + chain_raw = prompt | llm + raw_result = chain_raw.invoke(state["messages"]) + raw = raw_result.content or "{}" + try: + parsed = extract_json(raw) + decision_str = json.dumps(parsed) + except (ValueError, json.JSONDecodeError): + decision_str = raw + return { + "messages": [raw_result], + "pm_decision": decision_str, + "sender": "pm_decision_agent", + } + # with_structured_output returns the Pydantic model directly, not an AIMessage. + # Wrap in a synthetic AIMessage so downstream message-history nodes stay consistent. + synthetic_msg = AIMessage(content=decision_str) return { - "messages": [result], + "messages": [synthetic_msg], "pm_decision": decision_str, "sender": "pm_decision_agent", } diff --git a/tradingagents/graph/portfolio_graph.py b/tradingagents/graph/portfolio_graph.py index 583bbc34..9a45cc69 100644 --- a/tradingagents/graph/portfolio_graph.py +++ b/tradingagents/graph/portfolio_graph.py @@ -2,13 +2,18 @@ from __future__ import annotations +import os from typing import Any, List, Optional from tradingagents.default_config import DEFAULT_CONFIG from tradingagents.llm_clients import create_llm_client +from tradingagents.memory.macro_memory import MacroMemory +from tradingagents.memory.reflexion import ReflexionMemory from tradingagents.agents.portfolio import ( create_holding_reviewer, create_pm_decision_agent, + create_macro_summary_agent, + create_micro_summary_agent, ) from .portfolio_setup import PortfolioGraphSetup @@ -50,12 +55,31 @@ class PortfolioGraph: portfolio_config = self._get_portfolio_config() + mongo_uri = self.config.get("mongo_uri") or os.environ.get("TRADINGAGENTS_MONGO_URI") + macro_mem = MacroMemory(mongo_uri=mongo_uri) + micro_mem = ReflexionMemory( + mongo_uri=mongo_uri, + collection_name="micro_reflexion", + fallback_path="reports/micro_reflexion.json", # distinct from pipeline reflexion.json + ) + agents = { "review_holdings": create_holding_reviewer(mid_llm), - "pm_decision": create_pm_decision_agent(deep_llm, config=portfolio_config), + "macro_summary": create_macro_summary_agent(mid_llm, macro_mem), + "micro_summary": create_micro_summary_agent(mid_llm, micro_mem), + "pm_decision": create_pm_decision_agent( + deep_llm, config=portfolio_config, + macro_memory=macro_mem, micro_memory=micro_mem, + ), } - setup = PortfolioGraphSetup(agents, repo=self._repo, config=portfolio_config) + setup = PortfolioGraphSetup( + agents, + repo=self._repo, + config=portfolio_config, + macro_memory=macro_mem, + micro_memory=micro_mem, + ) self.graph = setup.setup_graph() def _get_portfolio_config(self) -> dict[str, Any]: @@ -163,6 +187,10 @@ class PortfolioGraph: "risk_metrics": "", "holding_reviews": "", "prioritized_candidates": "", + "macro_brief": "", + "micro_brief": "", + "macro_memory_context": "", + "micro_memory_context": "", "pm_decision": "", "execution_result": "", "sender": "", diff --git a/tradingagents/graph/portfolio_setup.py b/tradingagents/graph/portfolio_setup.py index a2e162fe..70003004 100644 --- a/tradingagents/graph/portfolio_setup.py +++ b/tradingagents/graph/portfolio_setup.py @@ -1,12 +1,15 @@ """Portfolio Manager workflow graph setup. -Sequential workflow: +Fan-out/fan-in workflow: START → load_portfolio → compute_risk → review_holdings - → prioritize_candidates → pm_decision → execute_trades → END + → prioritize_candidates → macro_summary (parallel) + → micro_summary (parallel) + → make_pm_decision → execute_trades → END Non-LLM nodes (load_portfolio, compute_risk, prioritize_candidates, execute_trades) receive ``repo`` and ``config`` via closure. -LLM nodes (review_holdings, pm_decision) are created externally and passed in. +LLM nodes (review_holdings, macro_summary, micro_summary, pm_decision) +are created externally and passed in. """ from __future__ import annotations @@ -34,13 +37,16 @@ _EMPTY_PORTFOLIO_DICT = { class PortfolioGraphSetup: - """Builds the sequential Portfolio Manager workflow graph. + """Builds the Portfolio Manager workflow graph with parallel summary fan-out. Args: - agents: Dict with keys ``review_holdings`` and ``pm_decision`` - mapping to LLM agent node functions. + agents: Dict with keys ``review_holdings``, ``macro_summary``, + ``micro_summary``, and ``pm_decision`` mapping to LLM agent + node functions. repo: PortfolioRepository instance (injected into closure nodes). config: Portfolio config dict. + macro_memory: MacroMemory instance forwarded to summary agents. + micro_memory: ReflexionMemory instance forwarded to summary agents. """ def __init__( @@ -48,10 +54,17 @@ class PortfolioGraphSetup: agents: dict[str, Any], repo=None, config: dict[str, Any] | None = None, + macro_memory=None, + micro_memory=None, ) -> None: self.agents = agents self._repo = repo self._config = config or {} + # Memory instances are already baked into the agent closures at the call site + # (portfolio_graph.py passes them to create_macro/micro_summary_agent). + # Stored here for future direct access by non-LLM closure nodes if needed. + self._macro_memory = macro_memory + self._micro_memory = micro_memory # ------------------------------------------------------------------ # Node factories (non-LLM) @@ -206,7 +219,13 @@ class PortfolioGraphSetup: # ------------------------------------------------------------------ def setup_graph(self): - """Build and compile the sequential portfolio workflow graph. + """Build and compile the portfolio workflow graph with parallel summary fan-out. + + Topology: + START → load_portfolio → compute_risk → review_holdings + → prioritize_candidates → macro_summary (parallel) + → micro_summary (parallel) + → make_pm_decision → execute_trades → END Returns: A compiled LangGraph graph ready to invoke. @@ -221,14 +240,25 @@ class PortfolioGraphSetup: # Register LLM nodes workflow.add_node("review_holdings", self.agents["review_holdings"]) + workflow.add_node("macro_summary", self.agents["macro_summary"]) + workflow.add_node("micro_summary", self.agents["micro_summary"]) workflow.add_node("make_pm_decision", self.agents["pm_decision"]) - # Sequential edges + # Sequential backbone workflow.add_edge(START, "load_portfolio") workflow.add_edge("load_portfolio", "compute_risk") workflow.add_edge("compute_risk", "review_holdings") workflow.add_edge("review_holdings", "prioritize_candidates") - workflow.add_edge("prioritize_candidates", "make_pm_decision") + + # Fan-out: prioritize_candidates → both summary nodes (parallel) + workflow.add_edge("prioritize_candidates", "macro_summary") + workflow.add_edge("prioritize_candidates", "micro_summary") + + # Fan-in: both summary nodes → make_pm_decision + workflow.add_edge("macro_summary", "make_pm_decision") + workflow.add_edge("micro_summary", "make_pm_decision") + + # Tail workflow.add_edge("make_pm_decision", "execute_trades") workflow.add_edge("execute_trades", END) diff --git a/tradingagents/memory/__init__.py b/tradingagents/memory/__init__.py index 547e6576..a1d111d2 100644 --- a/tradingagents/memory/__init__.py +++ b/tradingagents/memory/__init__.py @@ -1 +1,6 @@ """Agent memory subsystem for TradingAgents.""" + +from tradingagents.memory.reflexion import ReflexionMemory +from tradingagents.memory.macro_memory import MacroMemory + +__all__ = ["ReflexionMemory", "MacroMemory"] diff --git a/tradingagents/memory/macro_memory.py b/tradingagents/memory/macro_memory.py new file mode 100644 index 00000000..6c75a023 --- /dev/null +++ b/tradingagents/memory/macro_memory.py @@ -0,0 +1,281 @@ +"""Macro memory — learn from past regime-level market context. + +Stores macro regime states (VIX level, risk-on/off call, sector thesis, key +themes) and later associates outcomes, enabling agents to *reflect* on +regime accuracy and adjust forward-looking bias accordingly. + +Unlike ReflexionMemory (which is per-ticker), MacroMemory operates at the +market-wide level. Each record captures the macro environment on a given date, +independent of any single security. + +Backed by MongoDB when available; falls back to a local JSON file when not. + +Schema (``macro_memory`` collection):: + + { + "regime_date": str, # ISO date "2026-03-26" + "vix_level": float, # e.g. 25.3 + "macro_call": str, # "risk-on" | "risk-off" | "neutral" | "transition" + "sector_thesis": str, # free-form regime summary + "key_themes": list, # list of top macro theme strings + "run_id": str | None, + "outcome": dict | None, # filled later by record_outcome() + "created_at": datetime, + } + +Usage:: + + from tradingagents.memory.macro_memory import MacroMemory + + mem = MacroMemory("mongodb://localhost:27017") + mem.record_macro_state( + date="2026-03-26", + vix_level=25.3, + macro_call="risk-off", + sector_thesis="Energy under pressure, Fed hawkish", + key_themes=["rate hikes", "oil volatility"], + ) + context = mem.build_macro_context(limit=3) +""" + +from __future__ import annotations + +import json +import logging +from datetime import datetime, timezone +from pathlib import Path +from typing import Any + +logger = logging.getLogger(__name__) + +_COLLECTION = "macro_memory" + +_VALID_MACRO_CALLS = {"risk-on", "risk-off", "neutral", "transition"} + + +class MacroMemory: + """MongoDB-backed macro regime memory. + + Falls back to a local JSON file when MongoDB is unavailable, so the + feature always works (though with degraded query performance on the + local variant). + """ + + def __init__( + self, + mongo_uri: str | None = None, + db_name: str = "tradingagents", + fallback_path: str | Path = "reports/macro_memory.json", + ) -> None: + self._col = None + self._fallback_path = Path(fallback_path) + + if mongo_uri: + try: + from pymongo import DESCENDING, MongoClient + + client = MongoClient(mongo_uri) + db = client[db_name] + self._col = db[_COLLECTION] + self._col.create_index([("regime_date", DESCENDING)]) + self._col.create_index("created_at") + logger.info("MacroMemory using MongoDB (db=%s)", db_name) + except Exception: + logger.warning( + "MacroMemory: MongoDB unavailable — using local file", + exc_info=True, + ) + + # ------------------------------------------------------------------ + # Record macro state + # ------------------------------------------------------------------ + + def record_macro_state( + self, + date: str, + vix_level: float, + macro_call: str, + sector_thesis: str, + key_themes: list[str], + run_id: str | None = None, + ) -> None: + """Store a macro regime state for later reflection. + + Args: + date: ISO date string, e.g. "2026-03-26". + vix_level: VIX index level at the time of the call. + macro_call: Regime classification: "risk-on", "risk-off", + "neutral", or "transition". + sector_thesis: Free-form summary of the prevailing sector view. + key_themes: Top macro themes driving the regime call. + run_id: Optional run identifier for traceability. + """ + normalized_call = macro_call.lower() + if normalized_call not in _VALID_MACRO_CALLS: + logger.warning( + "MacroMemory: unexpected macro_call %r (expected one of %s)", + macro_call, + _VALID_MACRO_CALLS, + ) + + doc: dict[str, Any] = { + "regime_date": date, + "vix_level": float(vix_level), + "macro_call": normalized_call, + "sector_thesis": sector_thesis, + "key_themes": list(key_themes), + "run_id": run_id, + "outcome": None, + "created_at": datetime.now(timezone.utc), + } + + if self._col is not None: + self._col.insert_one(doc) + else: + # Local JSON fallback uses ISO string (JSON has no datetime type) + doc["created_at"] = doc["created_at"].isoformat() + self._append_local(doc) + + # ------------------------------------------------------------------ + # Record outcome (feedback loop) + # ------------------------------------------------------------------ + + def record_outcome(self, date: str, outcome: dict[str, Any]) -> bool: + """Attach outcome to the most recent macro state for a given date. + + Args: + date: ISO date string matching the original ``regime_date``. + outcome: Dict with evaluation data, e.g.:: + + { + "evaluation_date": "2026-04-26", + "vix_at_evaluation": 18.2, + "regime_confirmed": True, + "notes": "Risk-off call was correct; market sold off", + } + + Returns: + True if a matching state was found and updated. + """ + if self._col is not None: + from pymongo import DESCENDING + + doc = self._col.find_one_and_update( + {"regime_date": date, "outcome": None}, + {"$set": {"outcome": outcome}}, + sort=[("created_at", DESCENDING)], + ) + return doc is not None + else: + return self._update_local_outcome(date, outcome) + + # ------------------------------------------------------------------ + # Query + # ------------------------------------------------------------------ + + def get_recent(self, limit: int = 3) -> list[dict[str, Any]]: + """Return most recent macro states, newest first. + + Args: + limit: Maximum number of results. + """ + if self._col is not None: + from pymongo import DESCENDING + + cursor = self._col.find( + {}, + {"_id": 0}, + ).sort("regime_date", DESCENDING).limit(limit) + return list(cursor) + else: + return self._load_recent_local(limit) + + def build_macro_context(self, limit: int = 3) -> str: + """Build a human-readable context string from recent macro states. + + Suitable for injection into agent prompts. Returns a multi-line string + summarising recent regime calls and outcomes. + + Format example:: + + - [2026-03-20] risk-off (VIX: 25.3) + Thesis: Energy sector under pressure, Fed hawkish + Themes: ['rate hikes', 'oil volatility'] + Outcome: pending + + Args: + limit: How many past states to include. + + Returns: + Multi-line string summarising recent macro regime states. + """ + recent = self.get_recent(limit=limit) + if not recent: + return "No prior macro regime states recorded." + + lines: list[str] = [] + for rec in recent: + dt = rec.get("regime_date", "?") + call = rec.get("macro_call", "?") + vix = rec.get("vix_level", "?") + thesis = rec.get("sector_thesis", "")[:300] + themes = rec.get("key_themes", []) + + outcome = rec.get("outcome") + if outcome: + confirmed = outcome.get("regime_confirmed", "?") + notes = outcome.get("notes", "") + outcome_str = f" Outcome: confirmed={confirmed} — {notes}" if notes else f" Outcome: confirmed={confirmed}" + else: + outcome_str = " Outcome: pending" + + lines.append( + f"- [{dt}] {call} (VIX: {vix})\n" + f" Thesis: {thesis}\n" + f" Themes: {themes}\n" + f"{outcome_str}" + ) + return "\n".join(lines) + + # ------------------------------------------------------------------ + # Local JSON fallback + # ------------------------------------------------------------------ + + def _load_all_local(self) -> list[dict[str, Any]]: + """Load all records from the local JSON file.""" + if not self._fallback_path.exists(): + return [] + try: + return json.loads(self._fallback_path.read_text(encoding="utf-8")) + except (json.JSONDecodeError, OSError): + return [] + + def _save_all_local(self, records: list[dict[str, Any]]) -> None: + """Overwrite the local JSON file with all records.""" + self._fallback_path.parent.mkdir(parents=True, exist_ok=True) + self._fallback_path.write_text( + json.dumps(records, indent=2), encoding="utf-8" + ) + + def _append_local(self, doc: dict[str, Any]) -> None: + """Append a single record to the local file.""" + records = self._load_all_local() + records.append(doc) + self._save_all_local(records) + + def _load_recent_local(self, limit: int) -> list[dict[str, Any]]: + """Load and sort all records by regime_date descending from the local file.""" + records = self._load_all_local() + records.sort(key=lambda r: r.get("regime_date", ""), reverse=True) + return records[:limit] + + def _update_local_outcome(self, date: str, outcome: dict[str, Any]) -> bool: + """Update the most recent matching macro state in the local file.""" + records = self._load_all_local() + # Iterate newest first (reversed insertion order is a proxy) + for rec in reversed(records): + if rec.get("regime_date") == date and rec.get("outcome") is None: + rec["outcome"] = outcome + self._save_all_local(records) + return True + return False diff --git a/tradingagents/memory/reflexion.py b/tradingagents/memory/reflexion.py index da427a1b..a2e82260 100644 --- a/tradingagents/memory/reflexion.py +++ b/tradingagents/memory/reflexion.py @@ -56,6 +56,7 @@ class ReflexionMemory: mongo_uri: str | None = None, db_name: str = "tradingagents", fallback_path: str | Path = "reports/reflexion.json", + collection_name: str = "reflexion", ) -> None: self._col = None self._fallback_path = Path(fallback_path) @@ -66,7 +67,7 @@ class ReflexionMemory: client = MongoClient(mongo_uri) db = client[db_name] - self._col = db[_COLLECTION] + self._col = db[collection_name] self._col.create_index( [("ticker", 1), ("decision_date", DESCENDING)] ) @@ -184,7 +185,7 @@ class ReflexionMemory: from pymongo import DESCENDING cursor = self._col.find( - {"ticker": ticker.upper()}, + {"ticker": ticker.upper()}, # Hard metadata filter — prevents cross-ticker contamination {"_id": 0}, ).sort("decision_date", DESCENDING).limit(limit) return list(cursor) @@ -260,7 +261,7 @@ class ReflexionMemory: def _load_local(self, ticker: str, limit: int) -> list[dict[str, Any]]: """Load and filter records for a ticker from the local file.""" records = self._load_all_local() - filtered = [r for r in records if r.get("ticker") == ticker] + filtered = [r for r in records if r.get("ticker") == ticker] # Hard metadata filter — local fallback filtered.sort(key=lambda r: r.get("decision_date", ""), reverse=True) return filtered[:limit] diff --git a/tradingagents/portfolio/dual_report_store.py b/tradingagents/portfolio/dual_report_store.py index 38d9a358..9f1bd526 100644 --- a/tradingagents/portfolio/dual_report_store.py +++ b/tradingagents/portfolio/dual_report_store.py @@ -67,6 +67,11 @@ class DualReportStore: """The flow identifier set on this store, if any.""" return self._local.flow_id + @property + def flow_id(self) -> str | None: + """The flow identifier set on this store, if any.""" + return self._local.flow_id + @property def run_id(self) -> str | None: """The run/flow identifier (flow_id takes precedence).""" diff --git a/tradingagents/portfolio/mongo_report_store.py b/tradingagents/portfolio/mongo_report_store.py index 99aa146c..e90719b7 100644 --- a/tradingagents/portfolio/mongo_report_store.py +++ b/tradingagents/portfolio/mongo_report_store.py @@ -96,6 +96,11 @@ class MongoReportStore: """The flow identifier set on this store, if any.""" return self._flow_id + @property + def flow_id(self) -> str | None: + """The flow identifier set on this store, if any.""" + return self._flow_id + @property def run_id(self) -> str | None: """The run/flow identifier (flow_id takes precedence for backward compat).""" diff --git a/tradingagents/portfolio/portfolio_states.py b/tradingagents/portfolio/portfolio_states.py index e9608045..ae840862 100644 --- a/tradingagents/portfolio/portfolio_states.py +++ b/tradingagents/portfolio/portfolio_states.py @@ -35,6 +35,15 @@ class PortfolioManagerState(MessagesState): risk_metrics: Annotated[str, _last_value] holding_reviews: Annotated[str, _last_value] prioritized_candidates: Annotated[str, _last_value] + + # Summary briefs (written by parallel summary agents) + macro_brief: Annotated[str, _last_value] + micro_brief: Annotated[str, _last_value] + + # Pre-fetched memory context strings + macro_memory_context: Annotated[str, _last_value] + micro_memory_context: Annotated[str, _last_value] + pm_decision: Annotated[str, _last_value] execution_result: Annotated[str, _last_value]