diff --git a/.claude/launch.json b/.claude/launch.json index ee66737b..4050717f 100644 --- a/.claude/launch.json +++ b/.claude/launch.json @@ -10,7 +10,7 @@ { "name": "Frontend (Vite / React)", "runtimeExecutable": "/opt/homebrew/bin/node", - "runtimeArgs": ["/Users/Ahmet/Repo/TradingAgents/agent_os/frontend/node_modules/.bin/vite", "/Users/Ahmet/Repo/TradingAgents/agent_os/frontend"], + "runtimeArgs": ["/Users/Ahmet/Repo/TradingAgents/agent_os/frontend/node_modules/.bin/vite", "/Users/Ahmet/Repo/TradingAgents/.claude/worktrees/wizardly-poitras/agent_os/frontend"], "port": 5173, "autoPort": true } diff --git a/CLAUDE.md b/CLAUDE.md index 2b32eac9..3f1ef2af 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -29,7 +29,10 @@ conda activate tradingagents - `tradingagents/agents/` - Agent implementations - `tradingagents/graph/` - Workflow graphs and setup - `tradingagents/dataflows/` - Data access layer +- `tradingagents/portfolio/` - Portfolio models, report stores, store factory - `cli/` - Command-line interface +- `agent_os/backend/` - FastAPI backend (routes, engine, services) +- `agent_os/frontend/` - React + Chakra UI + ReactFlow dashboard ## Agent Flow (Existing Trading Analysis) @@ -87,6 +90,75 @@ OpenAI, Anthropic, Google, xAI, OpenRouter, Ollama - Graph setup (scanner): `tradingagents/graph/scanner_setup.py` - Inline tool loop: `tradingagents/agents/utils/tool_runner.py` +## AgentOS — Storage, Events & Phase Re-run (see ADR 018 for full detail) + +### Storage Layout + +Reports are scoped by `flow_id` (8-char hex), NOT `run_id` (UUID): + +``` +reports/daily/{date}/{flow_id}/ + run_meta.json ← run metadata persisted on completion + run_events.jsonl ← all WebSocket events, newline-delimited JSON + {TICKER}/report/ ← e.g. RIG/report/ + {ts}_complete_report.json + {ts}_analysts_checkpoint.json ← written after analysts phase + {ts}_trader_checkpoint.json ← written after trader phase + market/report/ ← scan output + portfolio/report/ ← PM decisions, execution results +``` + +- **`flow_id`** = stable disk key, shared across all sub-phases of one auto run +- **`run_id`** = ephemeral in-memory UUID (WebSocket endpoint key only) + +### Store Factory — Always Use It + +```python +from tradingagents.portfolio.store_factory import create_report_store + +# Writing: always pass flow_id +writer = create_report_store(flow_id=flow_id) + +# Reading / checkpoint lookup: always pass the ORIGINAL flow_id +reader = create_report_store(flow_id=original_flow_id) + +# Reading latest (skip-if-exists checks): omit flow_id +reader = create_report_store() +``` + +**Never** instantiate `ReportStore()` or `MongoReportStore()` directly in engine code. + +### Phase Re-run + +Node → phase mapping lives in `NODE_TO_PHASE` (langgraph_engine.py): + +| Nodes | Phase | Checkpoint loaded | +|-------|-------|-------------------| +| Market/News/Fundamentals/Social Analyst | `analysts` | none | +| Bull/Bear Researcher, Research Manager, Trader | `debate_and_trader` | analysts_checkpoint | +| Aggressive/Conservative/Neutral Analyst, Portfolio Manager | `risk` | trader_checkpoint | + +- **Checkpoint lookup requires the original `flow_id`** — pass it through `rerun_params["flow_id"]` +- **Analysts checkpoint**: saved when `any()` analyst report is populated (Social Analyst is optional — never use `all()`) +- **Selective event filtering**: re-run preserves events from other tickers and earlier phases; only clears nodes in the re-run scope +- **Cascade**: every phase re-run ends with a `run_portfolio()` call to update the PM decision + +### WebSocket Event Flow + +``` +POST /api/run/{type} → BackgroundTask drives engine → caches events in runs[run_id] +WS /ws/stream/{run_id} → replays cached events (polling 50ms) → streams new ones +On reconnect (history) → lazy-loads run_events.jsonl from disk if events == [] +Orphaned "running" run with disk events → auto-marked "failed" +``` + +### MongoDB vs Local Storage + +- **Local (default)**: development, single-machine, offline. Set via `TRADINGAGENTS_REPORTS_DIR`. +- **MongoDB**: multi-process, production, reflexion memory. Set `TRADINGAGENTS_MONGO_URI`. +- `DualReportStore` writes to both when Mongo is configured; reads Mongo first, falls back to disk. +- Mongo failures always fall back gracefully — never crash on missing Mongo. + ## Critical Patterns (see `docs/agent/decisions/008-lessons-learned.md` for full details) - **Tool execution**: Trading graph uses `ToolNode` in graph. Scanner agents use `run_tool_loop()` inline. If `bind_tools()` is used, there MUST be a tool execution path. @@ -99,6 +171,10 @@ OpenAI, Anthropic, Google, xAI, OpenRouter, Ollama - **Rate limiter locks**: Never hold a lock during `sleep()` or IO. Release, sleep, re-acquire. - **LLM policy errors**: `_is_policy_error(exc)` detects 404 from any provider (checks `status_code` attribute or message content). `_build_fallback_config(config)` substitutes per-tier fallback models. Both live in `agent_os/backend/services/langgraph_engine.py`. - **Config fallback keys**: `llm_provider` and `backend_url` must always exist at top level — `scanner_graph.py` and `trading_graph.py` use them as fallbacks. +- **Report store writes**: always pass `flow_id` to `create_report_store(flow_id=…)`. Omitting it writes to the flat legacy path and overwrites across runs. +- **Checkpoint lookup on re-run**: pass the original run's `flow_id` (from `run.get("flow_id") or run.get("short_rid") or run["params"]["flow_id"]`). Without it, `_date_root()` falls back to flat layout and finds nothing. +- **Analysts checkpoint condition**: use `any()` not `all()` over analyst keys — Social Analyst is not in the default analysts list, so `sentiment_report` is empty in typical runs. +- **Re-run event filtering**: use `_filter_rerun_events(events, ticker, phase)` — never clear all events on re-run. Clearing all loses scan nodes and other tickers from the graph. ## Agentic Memory (docs/agent/) diff --git a/agent_os/backend/routes/runs.py b/agent_os/backend/routes/runs.py index 8bac0bd2..55c44e0b 100644 --- a/agent_os/backend/routes/runs.py +++ b/agent_os/backend/routes/runs.py @@ -201,18 +201,67 @@ async def trigger_mock( ) return {"run_id": run_id, "flow_id": flow_id, "status": "queued"} -async def _append_and_store(run_id: str, gen) -> None: - """Append events from a re-run generator to an existing run entry.""" +# Nodes produced by each phase (used to selectively remove stale events on re-run) +_DEBATE_TRADER_NODES = frozenset({ + "Bull Researcher", "Bear Researcher", "Research Manager", "Trader", + "Aggressive Analyst", "Conservative Analyst", "Neutral Analyst", "Portfolio Manager", +}) +_RISK_NODES = frozenset({ + "Aggressive Analyst", "Conservative Analyst", "Neutral Analyst", "Portfolio Manager", +}) +# Portfolio-level cascade nodes always re-run after any phase re-run +_PORTFOLIO_NODES = frozenset({"review_holdings", "make_pm_decision"}) + + +def _filter_rerun_events(events: list, ticker: str, phase: str) -> list: + """Remove stale events for ticker+phase so fresh re-run events can replace them. + + Events for other tickers and phases earlier than the requested phase are kept, + preserving the full auto-flow graph context in the UI. + """ + if phase == "analysts": + ticker_nodes_to_clear = None # clear all nodes for this ticker + elif phase == "debate_and_trader": + ticker_nodes_to_clear = _DEBATE_TRADER_NODES + elif phase == "risk": + ticker_nodes_to_clear = _RISK_NODES + else: + return events # unknown phase — keep everything + + kept = [] + for e in events: + ident = e.get("identifier", "") + node_id = e.get("node_id", "") + parent = e.get("parent_node_id", "") + # Always remove portfolio-level cascade events (they will be re-emitted) + if node_id in _PORTFOLIO_NODES: + continue + # Remove stale ticker events for the phase being re-run + if ident == ticker: + if ticker_nodes_to_clear is None: + continue + if node_id in ticker_nodes_to_clear or parent in ticker_nodes_to_clear: + continue + kept.append(e) + return kept + + +async def _append_and_store(run_id: str, gen, ticker: str = None, phase: str = None) -> None: + """Drive a re-run generator, preserving events from other tickers/phases.""" run = runs.get(run_id) if not run: return run["rerun_seq"] = run.get("rerun_seq", 0) + 1 run["status"] = "running" + # Preserve events for other tickers and earlier phases; remove only the stale + # nodes that the re-run will replace. + if ticker and phase: + run["events"] = _filter_rerun_events(run.get("events") or [], ticker, phase) + else: + run["events"] = [] try: async for event in gen: event["rerun_seq"] = run["rerun_seq"] - if "events" not in run: - run["events"] = [] run["events"].append(event) run["status"] = "completed" except Exception as exc: @@ -247,20 +296,32 @@ async def trigger_rerun_node( raise HTTPException(status_code=422, detail="identifier (ticker) is required") phase = NODE_TO_PHASE[node_id] + _run = runs[run_id] + _orig_flow_id = ( + _run.get("flow_id") + or _run.get("short_rid") + or (_run.get("params") or {}).get("flow_id") + ) rerun_params = { "ticker": identifier, - "date": date or (runs[run_id].get("params") or {}).get("date", ""), + "date": date or (_run.get("params") or {}).get("date", ""), "portfolio_id": portfolio_id, + "flow_id": _orig_flow_id, # preserve original flow_id for checkpoint lookup } logger.info( "Queued RERUN run=%s node=%s phase=%s ticker=%s user=%s", run_id, node_id, phase, identifier, user["user_id"], ) + # Set status synchronously so the WebSocket that reconnects immediately after + # this response sees "running" and enters the polling loop instead of closing. + runs[run_id]["status"] = "running" background_tasks.add_task( _append_and_store, run_id, engine.run_pipeline_from_phase(f"{run_id}_rerun_{phase}", rerun_params, phase), + ticker=identifier, + phase=phase, ) return {"run_id": run_id, "phase": phase, "status": "queued"} diff --git a/agent_os/backend/routes/websocket.py b/agent_os/backend/routes/websocket.py index 01db0934..cfd6c698 100644 --- a/agent_os/backend/routes/websocket.py +++ b/agent_os/backend/routes/websocket.py @@ -37,6 +37,29 @@ async def websocket_endpoint( run_type = run_info["type"] params = run_info.get("params", {}) + # Lazy-load events from disk when not in memory. + # Covers three cases: hydrated completed/failed runs (events=[]), and orphaned "running" + # runs where the server restarted mid-run. New genuine runs return [] from disk (safe no-op). + if not run_info.get("events"): + try: + from tradingagents.portfolio.store_factory import create_report_store + flow_id = run_info.get("flow_id") or run_info.get("short_rid") or run_id[:8] + store = create_report_store(flow_id=flow_id) + date = (run_info.get("params") or {}).get("date", "") + if date: + disk_events = store.load_run_events(date) + if disk_events: + run_info["events"] = disk_events + logger.info("Lazy-loaded %d events from disk run=%s", len(disk_events), run_id) + # If still marked "running" but disk has events, the producer is gone + # (server restarted mid-run) — mark failed so the replay loop terminates + # and the UI shows the partial output with an error indicator. + if run_info.get("status") == "running": + run_info["status"] = "failed" + run_info["error"] = "Run did not complete (server restarted)" + except Exception: + logger.warning("Failed to lazy-load events for run=%s", run_id) + try: status = run_info.get("status", "queued") diff --git a/agent_os/backend/services/langgraph_engine.py b/agent_os/backend/services/langgraph_engine.py index 9aa5ebcb..989d50e3 100644 --- a/agent_os/backend/services/langgraph_engine.py +++ b/agent_os/backend/services/langgraph_engine.py @@ -422,9 +422,9 @@ class LangGraphEngine: if digest_content: append_to_digest(date, "analyze", ticker, digest_content) - # Save analysts checkpoint (all 4 analyst reports populated) + # Save analysts checkpoint (any analyst report populated — social is optional) _analyst_keys = ("market_report", "sentiment_report", "news_report", "fundamentals_report") - if all(final_state.get(k) for k in _analyst_keys): + if any(final_state.get(k) for k in _analyst_keys): analysts_ckpt = { "company_of_interest": ticker, "trade_date": date, @@ -694,9 +694,9 @@ class LangGraphEngine: date = params.get("date", time.strftime("%Y-%m-%d")) portfolio_id = params.get("portfolio_id", "main_portfolio") - store = create_report_store() flow_id = params.get("flow_id") or generate_flow_id() - writer_store = create_report_store(flow_id=flow_id) + store = create_report_store(flow_id=flow_id) + writer_store = store if phase == "analysts": # Full re-run diff --git a/agent_os/frontend/src/Dashboard.tsx b/agent_os/frontend/src/Dashboard.tsx index 3a55da86..ef97bb9e 100644 --- a/agent_os/frontend/src/Dashboard.tsx +++ b/agent_os/frontend/src/Dashboard.tsx @@ -565,7 +565,8 @@ export const Dashboard: React.FC = () => { date: params.date, portfolio_id: params.portfolio_id, }); - // Force WebSocket reconnect to stream new events + // Clear terminal and reconnect WebSocket to stream only the new phase's events + clearEvents(); setActiveRunId(null); setTimeout(() => setActiveRunId(res.data.run_id), 0); toast({ diff --git a/agent_os/frontend/vite.config.ts b/agent_os/frontend/vite.config.ts index e7c8807b..6509a0d1 100644 --- a/agent_os/frontend/vite.config.ts +++ b/agent_os/frontend/vite.config.ts @@ -4,6 +4,9 @@ import react from '@vitejs/plugin-react' // https://vitejs.dev/config/ export default defineConfig({ plugins: [react()], + resolve: { + dedupe: ['react', 'react-dom'], + }, server: { port: 5173, host: true, diff --git a/docs/agent/CURRENT_STATE.md b/docs/agent/CURRENT_STATE.md index 12c02d05..cea38350 100644 --- a/docs/agent/CURRENT_STATE.md +++ b/docs/agent/CURRENT_STATE.md @@ -1,23 +1,45 @@ # Current Milestone -FE improvements: configurable max_auto_tickers + run persistence with phase-level node re-run. PR pending review on `feat/fe-max-tickers-load-run`. +Storage finalisation + run history UX. Branch `claude/wizardly-poitras` (PR pending). +All storage, event, checkpoint, and phase re-run logic is now documented in ADR 018. # Recent Progress -- **feat/fe-max-tickers-load-run**: Two features implemented: - - Feature 1: `max_auto_tickers` config key + macro synthesis prompt injection + frontend number input + backend safety cap - - Feature 2: Run persistence (run_meta.json + run_events.jsonl), intermediate phase checkpoints (analysts/trader), phase subgraphs (debate + risk), POST /api/run/rerun-node endpoint, frontend history panel + modified node re-run -- **PR#108 merged**: Per-tier LLM fallback for 404/policy errors -- **PR#107 merged**: `save_holding_review` per-ticker fix, `RunLogger` threading.local to contextvars.ContextVar -- **PR#106 merged**: MongoDB report store, RunLogger observability, reflexion memory, run-ID namespaced reports -- **Smart Money Scanner**: Finviz integration with Golden Overlap strategy (ADR 014) -- **AgentOS**: Full-stack visual observability layer (FastAPI + React + ReactFlow) -- **Portfolio Manager**: Phases 1-10 complete (models, agents, CLI, stop-loss/take-profit) +- **feat/fe-max-tickers-load-run** (merged base): + - `max_auto_tickers` config + macro synthesis prompt injection + frontend input + - Run persistence: `run_meta.json` + `run_events.jsonl` per flow_id + - Phase subgraphs (debate_graph, risk_graph) in LangGraphEngine + - `POST /api/run/rerun-node` endpoint + frontend Re-run buttons on graph nodes + - Run History popover in UI + +- **claude/wizardly-poitras** (this PR — storage finalisation): + - **flow_id layout** replaces run_id namespacing: `reports/daily/{date}/{flow_id}/` + - **Startup hydration**: `hydrate_runs_from_disk()` rebuilds runs dict from disk on restart + - **WebSocket lazy-loading**: events loaded from disk on first WS connect (fixes blank Run History) + - **Orphaned run detection**: runs stuck in "running" with disk events → marked "failed" + - **Analysts checkpoint fix**: `any()` instead of `all()` — Social Analyst is optional + - **flow_id checkpoint lookup**: re-run passes original flow_id to store so checkpoints resolve correctly + - **Selective event filtering**: phase re-run preserves scan + other tickers; only clears stale nodes for the re-run scope + - **ADR 018**: definitive documentation of storage, events, checkpoints, MongoDB vs local + +- **PR#108 merged**: Per-tier LLM fallback for 404/policy errors (ADR 017) +- **PR#107 merged**: `save_holding_review` per-ticker fix; RunLogger threading.local → contextvars +- **PR#106 merged**: MongoDB report store, RunLogger observability, reflexion memory # In Progress -- feat/fe-max-tickers-load-run PR under review +- claude/wizardly-poitras PR: storage finalisation + run history UX # Active Blockers - None + +# Key Architectural Decisions Active + +| ADR | Topic | Status | +|-----|-------|--------| +| 013 | WebSocket streaming (extended by 018) | accepted | +| 015 | MongoDB/run-id namespacing | superseded by 018 (flow_id replaces run_id) | +| 016 | PR#106 review findings | accepted | +| 017 | LLM policy fallback | accepted | +| 018 | Storage layout, events, checkpoints, MongoDB vs local | **accepted — canonical reference** | diff --git a/docs/agent/decisions/013-agentos-websocket-streaming.md b/docs/agent/decisions/013-agentos-websocket-streaming.md index 2d856ee4..a026c77f 100644 --- a/docs/agent/decisions/013-agentos-websocket-streaming.md +++ b/docs/agent/decisions/013-agentos-websocket-streaming.md @@ -2,7 +2,7 @@ ## Status -Accepted +Accepted — extended by ADR 018 (adds lazy-loading, run history, phase re-run) ## Context @@ -39,12 +39,25 @@ Portfolio models use different field names than the frontend expects. The `/late `run_pipeline()` passes `config={"recursion_limit": propagator.max_recur_limit}` (default 100) to `astream_events()`. Without this, LangGraph defaults to 25, which is insufficient for the debate + risk cycles (up to ~10 iterations). +## Updated Architecture (see ADR 018 for full detail) + +The original "WebSocket is the sole executor" model was revised. REST endpoints +now start a `BackgroundTask` that drives the engine generator and caches events +in `runs[run_id]["events"]`. The WebSocket streams from this cache, polling every +50ms. This allows the connection to disconnect and reconnect without losing events. + +On server restart, `hydrate_runs_from_disk()` rebuilds the runs dict from +`run_meta.json` files (events are lazy-loaded on first WebSocket connect). Runs +stuck in `"running"` state with events on disk are detected as orphaned and +marked `"failed"` automatically. + ## Consequences - **Pro**: Real-time visibility into agent execution with zero CLI changes - **Pro**: Crash-proof event mapping — one bad event doesn't kill the stream -- **Pro**: Clean separation — frontend can reconnect to ongoing runs -- **Con**: In-memory run store is not persistent (acceptable for V1) +- **Pro**: Run history survives server restarts via disk persistence +- **Pro**: Phase-level re-run preserves full graph context across tickers +- **Con**: In-memory run store cleared on restart (mitigated by disk hydration) - **Con**: Single-tenant auth (hardcoded user) — needs JWT for production ## Source Files diff --git a/docs/agent/decisions/015-mongodb-report-store-reflexion.md b/docs/agent/decisions/015-mongodb-report-store-reflexion.md index 87a5c5db..74ebe577 100644 --- a/docs/agent/decisions/015-mongodb-report-store-reflexion.md +++ b/docs/agent/decisions/015-mongodb-report-store-reflexion.md @@ -1,9 +1,13 @@ # ADR 015 — MongoDB Report Store, Run-ID Namespacing, and Reflexion Memory -**Status**: accepted -**Date**: 2026-03-24 +**Status**: superseded by ADR 018 +**Date**: 2026-03-24 **Deciders**: @aguzererler +> **Superseded**: The `run_id` namespacing described here has been replaced by +> `flow_id`-based layout. The MongoDB vs local storage decision guide has been +> expanded in ADR 018, which is now the canonical reference. + ## Context Three problems with the existing filesystem report store: diff --git a/docs/agent/decisions/018-storage-events-checkpoints.md b/docs/agent/decisions/018-storage-events-checkpoints.md new file mode 100644 index 00000000..7dbb2c0a --- /dev/null +++ b/docs/agent/decisions/018-storage-events-checkpoints.md @@ -0,0 +1,411 @@ +# ADR 018 — Storage Layout, Event Persistence, WebSocket Streaming & Phase Re-run + +**Status**: accepted +**Date**: 2026-03-26 +**Supersedes**: ADR 015 (run_id namespacing — replaced by flow_id layout) +**Extends**: ADR 013 (WebSocket streaming — adds lazy-loading and run history) + +--- + +## Context + +This ADR finalises the storage architecture introduced across several PRs +(`feat/fe-max-tickers-load-run`, PR#106, PR#107, PR#108) and documents the +decisions made while fixing the Run History loading bug and phase-level re-run +capability. + +Key problems solved: + +1. **Run history lost on server restart** — in-memory run store (`runs` dict) is + not durable. Users could not replay completed runs after a restart. +2. **Checkpoint-less re-runs** — re-running a node started from scratch (full + analysts → debate → risk) instead of resuming from the correct phase. +3. **Re-run wiped full graph context** — clearing all events on re-run removed + scan nodes and other tickers from the graph, leaving only the re-run phase. +4. **Analysts checkpoint never saved** — Social Analyst is optional; requiring + *all* four analyst keys caused the checkpoint to be skipped silently. + +--- + +## 1. Directory Structure (flow_id Layout) + +### Layout + +``` +reports/ +└── daily/ + └── {date}/ ← e.g. 2026-03-26/ + ├── latest.json ← pointer to most-recent flow_id (legacy compat) + ├── daily_digest.md ← appended by every run on this date + ├── {flow_id}/ ← 8-char hex, e.g. 021f29ef/ + │ ├── run_meta.json ← run metadata (id, status, params, …) + │ ├── run_events.jsonl ← newline-delimited JSON events + │ ├── market/ + │ │ └── report/ + │ │ ├── {ts}_scan_report.json + │ │ └── {ts}_complete_report.json + │ ├── {TICKER}/ ← e.g. RIG/, TSDD/ + │ │ └── report/ + │ │ ├── {ts}_complete_report.json + │ │ ├── {ts}_analysts_checkpoint.json + │ │ ├── {ts}_trader_checkpoint.json + │ │ └── complete_report.md + │ └── portfolio/ + │ └── report/ + │ ├── {ts}_pm_decision.json + │ └── {ts}_execution_result.json + └── runs/ ← legacy run_id layout (backward compat only) + └── {run_id}/ +``` + +### flow_id vs run_id + +| Concept | Type | Purpose | +|---------|------|---------| +| `run_id` | UUID | In-memory identity for a live run; used as WebSocket endpoint key | +| `flow_id` | 8-char hex timestamp | Disk storage key; stable across server restarts | + +`flow_id` is generated once per run via `generate_flow_id()` and threaded through +all sub-phases of an auto run so scan + pipeline + portfolio share the same folder. +`run_id` is ephemeral — it exists only in the `runs` dict and is not persisted. + +### Startup Hydration + +On server start, `hydrate_runs_from_disk()` scans `reports/daily/*/` for +`run_meta.json` files and rebuilds the `runs` dict with `events: []` (lazy). +Events are only loaded when actually needed (WebSocket connect or GET run detail). + +--- + +## 2. Event Structure + +Every event sent over WebSocket or persisted to `run_events.jsonl` follows this +schema: + +```jsonc +{ + // Core identity + "type": "thought" | "tool" | "tool_result" | "result" | "log" | "system", + "node_id": "Bull Researcher", // LangGraph node name + "parent_node_id": "Bull Researcher", // parent node (for tool events) + "identifier": "RIG", // ticker, "MARKET", or portfolio_id + "agent": "BULL RESEARCHER", // uppercase display name + "timestamp": "10:28:49", // HH:MM:SS + + // Content + "message": "Thinking... (gls...)", // truncated display text + "prompt": "You are a bull researcher…", // full prompt (thought/result only) + "response": "Based on the analysis…", // full response (result/tool_result) + + // Metrics (result events) + "metrics": { + "model": "glm-4.7-flash:q4_K_M", + "tokens_in": 1240, + "tokens_out": 856, + "latency_ms": 17843 + }, + + // Tool events + "status": "running" | "success" | "error" | "graceful_skip", + "service": "yfinance", // data vendor used + + // Re-run tracking + "rerun_seq": 1 // incremented on each phase re-run; 0 = original +} +``` + +### Event Types + +| Type | Emitted by | Content | +|------|-----------|---------| +| `thought` | LLM streaming chunk | `message` (truncated), `prompt` | +| `result` | LLM final output | `message`, `prompt`, `response`, `metrics` | +| `tool` | Tool invocation start | `node_id`, `status: "running"`, `service` | +| `tool_result` | Tool completion | `status`, `response` (tool output), `service` | +| `log` | `RunLogger` | structured log line | +| `system` | Engine | human-readable status update; special messages `"Run completed."` and `"Error: …"` control frontend state machine | + +### Graph Rendering Rules + +The frontend renders graph nodes by grouping events on `(node_id, identifier)`. +For each unique pair, the node shows the **latest** event's metrics (last result +event wins). Nodes within the same `identifier` are stacked vertically; each +`identifier` becomes a column. + +--- + +## 3. How Events Are Sent + +### Normal Run Flow + +``` +POST /api/run/{type} → queues run, returns run_id + flow_id + status: "queued" +WS /ws/stream/{run_id} → connects + if status == "queued" → WebSocket IS the executor + engine.run_*() → streams events live to socket + run_info["events"].append() → events cached in memory + run_info["status"] = completed/failed + if status in running/completed/failed + → replay cached events, poll for new ones until terminal state +``` + +### Background Task Flow (POST → BackgroundTask) + +``` +POST /api/run/{type} + BackgroundTask(_run_and_store) → drives engine generator + events cached in runs[run_id]["events"] + status updated to running → completed/failed +WS /ws/stream/{run_id} + → enters "streaming from cache" loop + → polls events[sent:] every 50ms until status is terminal +``` + +### Lazy-Loading (Server Restart / Run History) + +``` +Server restart + hydrate_runs_from_disk() → runs[run_id] = {..., "events": []} + +WS /ws/stream/{run_id} + run_info.events == [] + → create_report_store(flow_id=flow_id) + → store.load_run_events(date) + → run_info["events"] = disk_events + if status == "running" and disk_events: + → status = "failed", error = "Run did not complete (server restarted)" + → replay all events, send "Run completed." or "Error: …" +``` + +### Key Invariants + +- **Events are append-only** during a live run. Never modified in place. +- **run_events.jsonl is written on run completion** (not streamed to disk in real time). + This is acceptable for V1; periodic flush is a future enhancement. +- **WebSocket polling interval** is 50ms (`_EVENT_POLL_INTERVAL_SECONDS = 0.05`). +- **System messages** `"Run completed."` and `"Error: "` are terminal — the + frontend transitions to `completed` or `error` state on receiving them. + +--- + +## 4. Checkpoint Structure + +Checkpoints are intermediate snapshots that allow phase-level re-runs without +re-executing earlier phases. + +### Analysts Checkpoint + +**Written by**: `run_pipeline()` after the graph completes +**Condition**: at least one of `market_report`, `sentiment_report`, +`news_report`, `fundamentals_report` is populated (Social Analyst is optional) +**Path**: `{flow_id}/{TICKER}/report/{ts}_analysts_checkpoint.json` + +```jsonc +{ + "company_of_interest": "RIG", + "trade_date": "2026-03-26", + "market_report": "…", // from Market Analyst + "news_report": "…", // from News Analyst + "fundamentals_report": "…", // from Fundamentals Analyst + "sentiment_report": "", // from Social Analyst (may be empty — that's OK) + "macro_regime_report": "…", // from Macro Synthesis scan + "messages": [...] // LangGraph message history (for debate context) +} +``` + +**Used by**: `run_pipeline_from_phase()` when `phase == "debate_and_trader"`. +Overlaid onto `initial_state` before running `debate_graph`. + +### Trader Checkpoint + +**Written by**: `run_pipeline()` after the graph completes +**Condition**: `trader_investment_plan` is populated +**Path**: `{flow_id}/{TICKER}/report/{ts}_trader_checkpoint.json` + +```jsonc +{ + "company_of_interest": "RIG", + "trade_date": "2026-03-26", + "market_report": "…", + "news_report": "…", + "fundamentals_report": "…", + "sentiment_report": "", + "macro_regime_report": "…", + "investment_debate_state": {...}, // full bull/bear debate transcript + "investment_plan": "…", // Research Manager output + "trader_investment_plan": "…", // Trader output + "messages": [...] +} +``` + +**Used by**: `run_pipeline_from_phase()` when `phase == "risk"`. + +### Phase Re-run Routing + +``` +node_id → phase → checkpoint loaded +────────────────────────────────────────────────────────────── +Market Analyst → analysts → none (full re-run) +News Analyst → analysts → none +Fundamentals Analyst → analysts → none +Social Analyst → analysts → none +Bull Researcher → debate_and_trader → analysts_checkpoint +Bear Researcher → debate_and_trader → analysts_checkpoint +Research Manager → debate_and_trader → analysts_checkpoint +Trader → debate_and_trader → analysts_checkpoint +Aggressive Analyst → risk → trader_checkpoint +Conservative Analyst → risk → trader_checkpoint +Neutral Analyst → risk → trader_checkpoint +Portfolio Manager → risk → trader_checkpoint +``` + +After any phase re-run completes, the engine **cascades** to `run_portfolio()` +so the PM decision incorporates the updated ticker analysis. + +### Checkpoint Lookup Rule + +**CRITICAL**: The read store used to load checkpoints **must use the same +`flow_id` as the original run**. Without the `flow_id`, `_date_root()` falls +back to the legacy flat layout and will never find checkpoints stored under +`{flow_id}/{TICKER}/report/`. + +In `trigger_rerun_node`, the original flow_id is resolved as: +```python +flow_id = run.get("flow_id") or run.get("short_rid") or run["params"].get("flow_id") +``` +This is then passed through `rerun_params["flow_id"]` to `run_pipeline_from_phase`, +which passes it to `create_report_store(flow_id=flow_id)`. + +--- + +## 5. Selective Event Filtering on Re-run + +When a phase re-run is triggered, the run's event list is **selectively filtered** +to remove stale events for the re-run scope while preserving events from: +- Other tickers (TSDD events preserved when re-running RIG) +- Earlier phases of the same ticker (analyst events preserved when re-running debate) +- Scan/market events (always preserved) + +```python +# Nodes cleared per phase (plus all tool events with matching parent_node_id) +debate_and_trader → {Bull Researcher, Bear Researcher, Research Manager, Trader, + Aggressive Analyst, Conservative Analyst, Neutral Analyst, + Portfolio Manager} +risk → {Aggressive Analyst, Conservative Analyst, Neutral Analyst, + Portfolio Manager} +analysts → all nodes for the ticker + +# Portfolio cascade nodes (always cleared — re-run always cascades to PM) +{review_holdings, make_pm_decision} +``` + +The WebSocket replays this filtered set first (rebuilding the full graph), then +streams the new re-run events on top. The frontend's `clearEvents()` + WebSocket +reconnect ensures a clean state before replay. + +--- + +## 6. MongoDB vs Local Storage — Decision Guide + +### Use Local Storage (ReportStore) when: + +- **Development or single-machine deployment** — no infrastructure required +- **Offline / air-gapped environments** — no network dependency +- **Report files are the primary output** — reports as .json/.md files that + can be read with any tool +- **Simplicity over scalability** — one process, one machine + +### Use MongoDB (MongoReportStore) when: + +- **Multi-process or multi-node deployment** — local files are not shared +- **Run history across restarts** — hydration from MongoDB is more reliable + than scanning the filesystem +- **Reflexion memory** — `ReflexionMemory` works best with MongoDB for + efficient per-ticker history queries +- **Future: TTL / retention** — MongoDB TTL indexes make automatic cleanup easy +- **Production environments** — MongoDB provides durability, replication, and + backup + +### Configuration + +```env +# Enable MongoDB: +TRADINGAGENTS_MONGO_URI=mongodb://localhost:27017 +TRADINGAGENTS_MONGO_DB=tradingagents # optional, default: "tradingagents" + +# Local storage (default when MONGO_URI is unset): +TRADINGAGENTS_REPORTS_DIR=/path/to/reports # optional, default: ./reports +``` + +### Factory Behaviour + +```python +# Always use the factory — never instantiate stores directly +from tradingagents.portfolio.store_factory import create_report_store + +# Writing: always pass flow_id (scopes writes to the correct run folder) +writer = create_report_store(flow_id=flow_id) + +# Reading: omit flow_id (resolves via latest.json or MongoDB latest query) +reader = create_report_store() +``` + +`create_report_store()` returns: +1. `DualReportStore(MongoReportStore, ReportStore)` — when `MONGO_URI` is set + and pymongo is installed (writes to both; reads from Mongo first, falls back + to disk) +2. `ReportStore` — when MongoDB is unavailable or not configured + +MongoDB failures **always** fall back to filesystem with a warning log. The +application must remain functional without MongoDB. + +### Known V1 Limitations (Future Work) + +| Issue | Status | +|-------|--------| +| `pymongo` is synchronous — blocks asyncio event loop | Deferred: migrate to `motor` before production | +| No TTL index — reports accumulate indefinitely | Deferred: requires retention policy decision | +| `MongoClient` created per store instance | Deferred: singleton via FastAPI app lifespan | +| `run_events.jsonl` written on completion, not streaming | Deferred: periodic flush for long runs | + +--- + +## Consequences & Constraints + +### MUST + +- **Always use `create_report_store(flow_id=…)` for writes** — never pass no + args when writing, as the flat fallback path will overwrite across runs. +- **Always pass the original `flow_id` when loading checkpoints for re-run** — + checkpoint lookup will silently return `None` otherwise, causing full re-run + fallback. +- **Save analysts checkpoint if `any()` analyst report is populated** — Social + Analyst is optional; `all()` silently blocks checkpoints when social is disabled. +- **Selective event filtering on re-run** — never clear all events; always use + `_filter_rerun_events(events, ticker, phase)` to preserve other tickers and + earlier phases. + +### MUST NOT + +- **Never hard-code `ReportStore()` in engine methods** — always use the factory. +- **Never hold pymongo in the async hot path** — wrap in `asyncio.to_thread` if + blocking becomes measurable. + +### Source Files + +``` +tradingagents/portfolio/report_store.py ← ReportStore (filesystem) +tradingagents/portfolio/mongo_report_store.py ← MongoReportStore +tradingagents/portfolio/dual_report_store.py ← DualReportStore (both) +tradingagents/portfolio/store_factory.py ← create_report_store() +tradingagents/report_paths.py ← flow_id/run_id helpers, ts_now() +agent_os/backend/main.py ← hydrate_runs_from_disk() +agent_os/backend/routes/runs.py ← _run_and_store, _append_and_store, + _filter_rerun_events, trigger_rerun_node +agent_os/backend/routes/websocket.py ← lazy-loading, orphaned run detection +agent_os/backend/services/langgraph_engine.py ← run_pipeline_from_phase, NODE_TO_PHASE, + checkpoint save/load logic +agent_os/frontend/src/hooks/useAgentStream.ts ← WebSocket client, event accumulation +agent_os/frontend/src/Dashboard.tsx ← triggerNodeRerun, loadRun, clearEvents +``` diff --git a/tradingagents/portfolio/dual_report_store.py b/tradingagents/portfolio/dual_report_store.py index a4784a1e..38d9a358 100644 --- a/tradingagents/portfolio/dual_report_store.py +++ b/tradingagents/portfolio/dual_report_store.py @@ -62,6 +62,11 @@ class DualReportStore: """The flow identifier set on this store, if any.""" return self._local.flow_id + @property + def flow_id(self) -> str | None: + """The flow identifier set on this store, if any.""" + return self._local.flow_id + @property def run_id(self) -> str | None: """The run/flow identifier (flow_id takes precedence).""" diff --git a/tradingagents/portfolio/mongo_report_store.py b/tradingagents/portfolio/mongo_report_store.py index 4f3561a9..99aa146c 100644 --- a/tradingagents/portfolio/mongo_report_store.py +++ b/tradingagents/portfolio/mongo_report_store.py @@ -91,6 +91,11 @@ class MongoReportStore: """The flow identifier set on this store, if any.""" return self._flow_id + @property + def flow_id(self) -> str | None: + """The flow identifier set on this store, if any.""" + return self._flow_id + @property def run_id(self) -> str | None: """The run/flow identifier (flow_id takes precedence for backward compat)."""