feat: finalise storage layout, run history loading & phase-level re-run (#121)

* feat: introduce flow_id with timestamp-based report versioning

Replace run_id with flow_id as the primary grouping concept (one flow =
one user analysis intent spanning scan + pipeline + portfolio). Reports
are now written as {timestamp}_{name}.json so load methods always return
the latest version by lexicographic sort, eliminating the latest.json
pointer pattern for new flows.

Key changes:
- report_paths.py: add generate_flow_id(), ts_now() (ms precision),
  flow_id kwarg on all path helpers; keep run_id / pointer helpers for
  backward compatibility
- ReportStore: dual-mode save/load — flow_id uses timestamped layout,
  run_id uses legacy runs/{id}/ layout with latest.json
- MongoReportStore: add flow_id field and index; run_id stays for compat
- DualReportStore: expose flow_id property
- store_factory: accept flow_id as primary param, run_id as alias
- runs.py / langgraph_engine.py: generate and thread flow_id through all
  trigger endpoints and run methods
- Tests: add flow_id coverage for all layers; 905 tests pass

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* feat: finalise storage layout, run history loading & phase-level re-run

Storage / persistence
- flow_id (8-char hex) replaces run_id as the disk storage key; all
  sub-phases of one auto run share the same flow_id directory
- Startup hydration: hydrate_runs_from_disk() rebuilds in-memory run
  store from run_meta.json on server restart (events lazy-loaded)

WebSocket / run history fixes
- Lazy-load events from run_events.jsonl on first WS connect; fixes
  blank terminal when clicking a historical run after restart
- Orphaned "running" runs (server restarted mid-run) auto-detected and
  marked "failed" with partial events replayed correctly

Phase re-run fixes
- Analysts checkpoint: use any() instead of all() — Social Analyst is
  optional; all() silently blocked checkpoint saves in typical runs
- Checkpoint lookup: pass original flow_id through rerun_params so
  _date_root() resolves to the correct flow_id subdirectory
- Selective event filtering on re-run: preserves scan nodes and other
  tickers; only removes stale events for the re-run phase+ticker
- Frontend graph now shows full auto-flow context during phase re-runs

Documentation
- ADR 018: canonical reference for storage layout, event schema,
  WebSocket streaming flows, checkpoint structure, MongoDB vs local
- ADR 013 updated: reflects background-task + lazy-loading evolution
- ADR 015 marked superseded by ADR 018
- CLAUDE.md: AgentOS storage section + 4 new critical patterns
- CURRENT_STATE.md updated

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

---------

Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
ahmet guzererler 2026-03-26 11:12:16 +01:00 committed by GitHub
parent 6b644c6058
commit 6b3dd4172a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
13 changed files with 651 additions and 27 deletions

View File

@ -10,7 +10,7 @@
{
"name": "Frontend (Vite / React)",
"runtimeExecutable": "/opt/homebrew/bin/node",
"runtimeArgs": ["/Users/Ahmet/Repo/TradingAgents/agent_os/frontend/node_modules/.bin/vite", "/Users/Ahmet/Repo/TradingAgents/agent_os/frontend"],
"runtimeArgs": ["/Users/Ahmet/Repo/TradingAgents/agent_os/frontend/node_modules/.bin/vite", "/Users/Ahmet/Repo/TradingAgents/.claude/worktrees/wizardly-poitras/agent_os/frontend"],
"port": 5173,
"autoPort": true
}

View File

@ -29,7 +29,10 @@ conda activate tradingagents
- `tradingagents/agents/` - Agent implementations
- `tradingagents/graph/` - Workflow graphs and setup
- `tradingagents/dataflows/` - Data access layer
- `tradingagents/portfolio/` - Portfolio models, report stores, store factory
- `cli/` - Command-line interface
- `agent_os/backend/` - FastAPI backend (routes, engine, services)
- `agent_os/frontend/` - React + Chakra UI + ReactFlow dashboard
## Agent Flow (Existing Trading Analysis)
@ -87,6 +90,75 @@ OpenAI, Anthropic, Google, xAI, OpenRouter, Ollama
- Graph setup (scanner): `tradingagents/graph/scanner_setup.py`
- Inline tool loop: `tradingagents/agents/utils/tool_runner.py`
## AgentOS — Storage, Events & Phase Re-run (see ADR 018 for full detail)
### Storage Layout
Reports are scoped by `flow_id` (8-char hex), NOT `run_id` (UUID):
```
reports/daily/{date}/{flow_id}/
run_meta.json ← run metadata persisted on completion
run_events.jsonl ← all WebSocket events, newline-delimited JSON
{TICKER}/report/ ← e.g. RIG/report/
{ts}_complete_report.json
{ts}_analysts_checkpoint.json ← written after analysts phase
{ts}_trader_checkpoint.json ← written after trader phase
market/report/ ← scan output
portfolio/report/ ← PM decisions, execution results
```
- **`flow_id`** = stable disk key, shared across all sub-phases of one auto run
- **`run_id`** = ephemeral in-memory UUID (WebSocket endpoint key only)
### Store Factory — Always Use It
```python
from tradingagents.portfolio.store_factory import create_report_store
# Writing: always pass flow_id
writer = create_report_store(flow_id=flow_id)
# Reading / checkpoint lookup: always pass the ORIGINAL flow_id
reader = create_report_store(flow_id=original_flow_id)
# Reading latest (skip-if-exists checks): omit flow_id
reader = create_report_store()
```
**Never** instantiate `ReportStore()` or `MongoReportStore()` directly in engine code.
### Phase Re-run
Node → phase mapping lives in `NODE_TO_PHASE` (langgraph_engine.py):
| Nodes | Phase | Checkpoint loaded |
|-------|-------|-------------------|
| Market/News/Fundamentals/Social Analyst | `analysts` | none |
| Bull/Bear Researcher, Research Manager, Trader | `debate_and_trader` | analysts_checkpoint |
| Aggressive/Conservative/Neutral Analyst, Portfolio Manager | `risk` | trader_checkpoint |
- **Checkpoint lookup requires the original `flow_id`** — pass it through `rerun_params["flow_id"]`
- **Analysts checkpoint**: saved when `any()` analyst report is populated (Social Analyst is optional — never use `all()`)
- **Selective event filtering**: re-run preserves events from other tickers and earlier phases; only clears nodes in the re-run scope
- **Cascade**: every phase re-run ends with a `run_portfolio()` call to update the PM decision
### WebSocket Event Flow
```
POST /api/run/{type} → BackgroundTask drives engine → caches events in runs[run_id]
WS /ws/stream/{run_id} → replays cached events (polling 50ms) → streams new ones
On reconnect (history) → lazy-loads run_events.jsonl from disk if events == []
Orphaned "running" run with disk events → auto-marked "failed"
```
### MongoDB vs Local Storage
- **Local (default)**: development, single-machine, offline. Set via `TRADINGAGENTS_REPORTS_DIR`.
- **MongoDB**: multi-process, production, reflexion memory. Set `TRADINGAGENTS_MONGO_URI`.
- `DualReportStore` writes to both when Mongo is configured; reads Mongo first, falls back to disk.
- Mongo failures always fall back gracefully — never crash on missing Mongo.
## Critical Patterns (see `docs/agent/decisions/008-lessons-learned.md` for full details)
- **Tool execution**: Trading graph uses `ToolNode` in graph. Scanner agents use `run_tool_loop()` inline. If `bind_tools()` is used, there MUST be a tool execution path.
@ -99,6 +171,10 @@ OpenAI, Anthropic, Google, xAI, OpenRouter, Ollama
- **Rate limiter locks**: Never hold a lock during `sleep()` or IO. Release, sleep, re-acquire.
- **LLM policy errors**: `_is_policy_error(exc)` detects 404 from any provider (checks `status_code` attribute or message content). `_build_fallback_config(config)` substitutes per-tier fallback models. Both live in `agent_os/backend/services/langgraph_engine.py`.
- **Config fallback keys**: `llm_provider` and `backend_url` must always exist at top level — `scanner_graph.py` and `trading_graph.py` use them as fallbacks.
- **Report store writes**: always pass `flow_id` to `create_report_store(flow_id=…)`. Omitting it writes to the flat legacy path and overwrites across runs.
- **Checkpoint lookup on re-run**: pass the original run's `flow_id` (from `run.get("flow_id") or run.get("short_rid") or run["params"]["flow_id"]`). Without it, `_date_root()` falls back to flat layout and finds nothing.
- **Analysts checkpoint condition**: use `any()` not `all()` over analyst keys — Social Analyst is not in the default analysts list, so `sentiment_report` is empty in typical runs.
- **Re-run event filtering**: use `_filter_rerun_events(events, ticker, phase)` — never clear all events on re-run. Clearing all loses scan nodes and other tickers from the graph.
## Agentic Memory (docs/agent/)

View File

@ -201,18 +201,67 @@ async def trigger_mock(
)
return {"run_id": run_id, "flow_id": flow_id, "status": "queued"}
async def _append_and_store(run_id: str, gen) -> None:
"""Append events from a re-run generator to an existing run entry."""
# Nodes produced by each phase (used to selectively remove stale events on re-run)
_DEBATE_TRADER_NODES = frozenset({
"Bull Researcher", "Bear Researcher", "Research Manager", "Trader",
"Aggressive Analyst", "Conservative Analyst", "Neutral Analyst", "Portfolio Manager",
})
_RISK_NODES = frozenset({
"Aggressive Analyst", "Conservative Analyst", "Neutral Analyst", "Portfolio Manager",
})
# Portfolio-level cascade nodes always re-run after any phase re-run
_PORTFOLIO_NODES = frozenset({"review_holdings", "make_pm_decision"})
def _filter_rerun_events(events: list, ticker: str, phase: str) -> list:
"""Remove stale events for ticker+phase so fresh re-run events can replace them.
Events for other tickers and phases earlier than the requested phase are kept,
preserving the full auto-flow graph context in the UI.
"""
if phase == "analysts":
ticker_nodes_to_clear = None # clear all nodes for this ticker
elif phase == "debate_and_trader":
ticker_nodes_to_clear = _DEBATE_TRADER_NODES
elif phase == "risk":
ticker_nodes_to_clear = _RISK_NODES
else:
return events # unknown phase — keep everything
kept = []
for e in events:
ident = e.get("identifier", "")
node_id = e.get("node_id", "")
parent = e.get("parent_node_id", "")
# Always remove portfolio-level cascade events (they will be re-emitted)
if node_id in _PORTFOLIO_NODES:
continue
# Remove stale ticker events for the phase being re-run
if ident == ticker:
if ticker_nodes_to_clear is None:
continue
if node_id in ticker_nodes_to_clear or parent in ticker_nodes_to_clear:
continue
kept.append(e)
return kept
async def _append_and_store(run_id: str, gen, ticker: str = None, phase: str = None) -> None:
"""Drive a re-run generator, preserving events from other tickers/phases."""
run = runs.get(run_id)
if not run:
return
run["rerun_seq"] = run.get("rerun_seq", 0) + 1
run["status"] = "running"
# Preserve events for other tickers and earlier phases; remove only the stale
# nodes that the re-run will replace.
if ticker and phase:
run["events"] = _filter_rerun_events(run.get("events") or [], ticker, phase)
else:
run["events"] = []
try:
async for event in gen:
event["rerun_seq"] = run["rerun_seq"]
if "events" not in run:
run["events"] = []
run["events"].append(event)
run["status"] = "completed"
except Exception as exc:
@ -247,20 +296,32 @@ async def trigger_rerun_node(
raise HTTPException(status_code=422, detail="identifier (ticker) is required")
phase = NODE_TO_PHASE[node_id]
_run = runs[run_id]
_orig_flow_id = (
_run.get("flow_id")
or _run.get("short_rid")
or (_run.get("params") or {}).get("flow_id")
)
rerun_params = {
"ticker": identifier,
"date": date or (runs[run_id].get("params") or {}).get("date", ""),
"date": date or (_run.get("params") or {}).get("date", ""),
"portfolio_id": portfolio_id,
"flow_id": _orig_flow_id, # preserve original flow_id for checkpoint lookup
}
logger.info(
"Queued RERUN run=%s node=%s phase=%s ticker=%s user=%s",
run_id, node_id, phase, identifier, user["user_id"],
)
# Set status synchronously so the WebSocket that reconnects immediately after
# this response sees "running" and enters the polling loop instead of closing.
runs[run_id]["status"] = "running"
background_tasks.add_task(
_append_and_store,
run_id,
engine.run_pipeline_from_phase(f"{run_id}_rerun_{phase}", rerun_params, phase),
ticker=identifier,
phase=phase,
)
return {"run_id": run_id, "phase": phase, "status": "queued"}

View File

@ -37,6 +37,29 @@ async def websocket_endpoint(
run_type = run_info["type"]
params = run_info.get("params", {})
# Lazy-load events from disk when not in memory.
# Covers three cases: hydrated completed/failed runs (events=[]), and orphaned "running"
# runs where the server restarted mid-run. New genuine runs return [] from disk (safe no-op).
if not run_info.get("events"):
try:
from tradingagents.portfolio.store_factory import create_report_store
flow_id = run_info.get("flow_id") or run_info.get("short_rid") or run_id[:8]
store = create_report_store(flow_id=flow_id)
date = (run_info.get("params") or {}).get("date", "")
if date:
disk_events = store.load_run_events(date)
if disk_events:
run_info["events"] = disk_events
logger.info("Lazy-loaded %d events from disk run=%s", len(disk_events), run_id)
# If still marked "running" but disk has events, the producer is gone
# (server restarted mid-run) — mark failed so the replay loop terminates
# and the UI shows the partial output with an error indicator.
if run_info.get("status") == "running":
run_info["status"] = "failed"
run_info["error"] = "Run did not complete (server restarted)"
except Exception:
logger.warning("Failed to lazy-load events for run=%s", run_id)
try:
status = run_info.get("status", "queued")

View File

@ -422,9 +422,9 @@ class LangGraphEngine:
if digest_content:
append_to_digest(date, "analyze", ticker, digest_content)
# Save analysts checkpoint (all 4 analyst reports populated)
# Save analysts checkpoint (any analyst report populated — social is optional)
_analyst_keys = ("market_report", "sentiment_report", "news_report", "fundamentals_report")
if all(final_state.get(k) for k in _analyst_keys):
if any(final_state.get(k) for k in _analyst_keys):
analysts_ckpt = {
"company_of_interest": ticker,
"trade_date": date,
@ -694,9 +694,9 @@ class LangGraphEngine:
date = params.get("date", time.strftime("%Y-%m-%d"))
portfolio_id = params.get("portfolio_id", "main_portfolio")
store = create_report_store()
flow_id = params.get("flow_id") or generate_flow_id()
writer_store = create_report_store(flow_id=flow_id)
store = create_report_store(flow_id=flow_id)
writer_store = store
if phase == "analysts":
# Full re-run

View File

@ -565,7 +565,8 @@ export const Dashboard: React.FC = () => {
date: params.date,
portfolio_id: params.portfolio_id,
});
// Force WebSocket reconnect to stream new events
// Clear terminal and reconnect WebSocket to stream only the new phase's events
clearEvents();
setActiveRunId(null);
setTimeout(() => setActiveRunId(res.data.run_id), 0);
toast({

View File

@ -4,6 +4,9 @@ import react from '@vitejs/plugin-react'
// https://vitejs.dev/config/
export default defineConfig({
plugins: [react()],
resolve: {
dedupe: ['react', 'react-dom'],
},
server: {
port: 5173,
host: true,

View File

@ -1,23 +1,45 @@
# Current Milestone
FE improvements: configurable max_auto_tickers + run persistence with phase-level node re-run. PR pending review on `feat/fe-max-tickers-load-run`.
Storage finalisation + run history UX. Branch `claude/wizardly-poitras` (PR pending).
All storage, event, checkpoint, and phase re-run logic is now documented in ADR 018.
# Recent Progress
- **feat/fe-max-tickers-load-run**: Two features implemented:
- Feature 1: `max_auto_tickers` config key + macro synthesis prompt injection + frontend number input + backend safety cap
- Feature 2: Run persistence (run_meta.json + run_events.jsonl), intermediate phase checkpoints (analysts/trader), phase subgraphs (debate + risk), POST /api/run/rerun-node endpoint, frontend history panel + modified node re-run
- **PR#108 merged**: Per-tier LLM fallback for 404/policy errors
- **PR#107 merged**: `save_holding_review` per-ticker fix, `RunLogger` threading.local to contextvars.ContextVar
- **PR#106 merged**: MongoDB report store, RunLogger observability, reflexion memory, run-ID namespaced reports
- **Smart Money Scanner**: Finviz integration with Golden Overlap strategy (ADR 014)
- **AgentOS**: Full-stack visual observability layer (FastAPI + React + ReactFlow)
- **Portfolio Manager**: Phases 1-10 complete (models, agents, CLI, stop-loss/take-profit)
- **feat/fe-max-tickers-load-run** (merged base):
- `max_auto_tickers` config + macro synthesis prompt injection + frontend input
- Run persistence: `run_meta.json` + `run_events.jsonl` per flow_id
- Phase subgraphs (debate_graph, risk_graph) in LangGraphEngine
- `POST /api/run/rerun-node` endpoint + frontend Re-run buttons on graph nodes
- Run History popover in UI
- **claude/wizardly-poitras** (this PR — storage finalisation):
- **flow_id layout** replaces run_id namespacing: `reports/daily/{date}/{flow_id}/`
- **Startup hydration**: `hydrate_runs_from_disk()` rebuilds runs dict from disk on restart
- **WebSocket lazy-loading**: events loaded from disk on first WS connect (fixes blank Run History)
- **Orphaned run detection**: runs stuck in "running" with disk events → marked "failed"
- **Analysts checkpoint fix**: `any()` instead of `all()` — Social Analyst is optional
- **flow_id checkpoint lookup**: re-run passes original flow_id to store so checkpoints resolve correctly
- **Selective event filtering**: phase re-run preserves scan + other tickers; only clears stale nodes for the re-run scope
- **ADR 018**: definitive documentation of storage, events, checkpoints, MongoDB vs local
- **PR#108 merged**: Per-tier LLM fallback for 404/policy errors (ADR 017)
- **PR#107 merged**: `save_holding_review` per-ticker fix; RunLogger threading.local → contextvars
- **PR#106 merged**: MongoDB report store, RunLogger observability, reflexion memory
# In Progress
- feat/fe-max-tickers-load-run PR under review
- claude/wizardly-poitras PR: storage finalisation + run history UX
# Active Blockers
- None
# Key Architectural Decisions Active
| ADR | Topic | Status |
|-----|-------|--------|
| 013 | WebSocket streaming (extended by 018) | accepted |
| 015 | MongoDB/run-id namespacing | superseded by 018 (flow_id replaces run_id) |
| 016 | PR#106 review findings | accepted |
| 017 | LLM policy fallback | accepted |
| 018 | Storage layout, events, checkpoints, MongoDB vs local | **accepted — canonical reference** |

View File

@ -2,7 +2,7 @@
## Status
Accepted
Accepted — extended by ADR 018 (adds lazy-loading, run history, phase re-run)
## Context
@ -39,12 +39,25 @@ Portfolio models use different field names than the frontend expects. The `/late
`run_pipeline()` passes `config={"recursion_limit": propagator.max_recur_limit}` (default 100) to `astream_events()`. Without this, LangGraph defaults to 25, which is insufficient for the debate + risk cycles (up to ~10 iterations).
## Updated Architecture (see ADR 018 for full detail)
The original "WebSocket is the sole executor" model was revised. REST endpoints
now start a `BackgroundTask` that drives the engine generator and caches events
in `runs[run_id]["events"]`. The WebSocket streams from this cache, polling every
50ms. This allows the connection to disconnect and reconnect without losing events.
On server restart, `hydrate_runs_from_disk()` rebuilds the runs dict from
`run_meta.json` files (events are lazy-loaded on first WebSocket connect). Runs
stuck in `"running"` state with events on disk are detected as orphaned and
marked `"failed"` automatically.
## Consequences
- **Pro**: Real-time visibility into agent execution with zero CLI changes
- **Pro**: Crash-proof event mapping — one bad event doesn't kill the stream
- **Pro**: Clean separation — frontend can reconnect to ongoing runs
- **Con**: In-memory run store is not persistent (acceptable for V1)
- **Pro**: Run history survives server restarts via disk persistence
- **Pro**: Phase-level re-run preserves full graph context across tickers
- **Con**: In-memory run store cleared on restart (mitigated by disk hydration)
- **Con**: Single-tenant auth (hardcoded user) — needs JWT for production
## Source Files

View File

@ -1,9 +1,13 @@
# ADR 015 — MongoDB Report Store, Run-ID Namespacing, and Reflexion Memory
**Status**: accepted
**Date**: 2026-03-24
**Status**: superseded by ADR 018
**Date**: 2026-03-24
**Deciders**: @aguzererler
> **Superseded**: The `run_id` namespacing described here has been replaced by
> `flow_id`-based layout. The MongoDB vs local storage decision guide has been
> expanded in ADR 018, which is now the canonical reference.
## Context
Three problems with the existing filesystem report store:

View File

@ -0,0 +1,411 @@
# ADR 018 — Storage Layout, Event Persistence, WebSocket Streaming & Phase Re-run
**Status**: accepted
**Date**: 2026-03-26
**Supersedes**: ADR 015 (run_id namespacing — replaced by flow_id layout)
**Extends**: ADR 013 (WebSocket streaming — adds lazy-loading and run history)
---
## Context
This ADR finalises the storage architecture introduced across several PRs
(`feat/fe-max-tickers-load-run`, PR#106, PR#107, PR#108) and documents the
decisions made while fixing the Run History loading bug and phase-level re-run
capability.
Key problems solved:
1. **Run history lost on server restart** — in-memory run store (`runs` dict) is
not durable. Users could not replay completed runs after a restart.
2. **Checkpoint-less re-runs** — re-running a node started from scratch (full
analysts → debate → risk) instead of resuming from the correct phase.
3. **Re-run wiped full graph context** — clearing all events on re-run removed
scan nodes and other tickers from the graph, leaving only the re-run phase.
4. **Analysts checkpoint never saved** — Social Analyst is optional; requiring
*all* four analyst keys caused the checkpoint to be skipped silently.
---
## 1. Directory Structure (flow_id Layout)
### Layout
```
reports/
└── daily/
└── {date}/ ← e.g. 2026-03-26/
├── latest.json ← pointer to most-recent flow_id (legacy compat)
├── daily_digest.md ← appended by every run on this date
├── {flow_id}/ ← 8-char hex, e.g. 021f29ef/
│ ├── run_meta.json ← run metadata (id, status, params, …)
│ ├── run_events.jsonl ← newline-delimited JSON events
│ ├── market/
│ │ └── report/
│ │ ├── {ts}_scan_report.json
│ │ └── {ts}_complete_report.json
│ ├── {TICKER}/ ← e.g. RIG/, TSDD/
│ │ └── report/
│ │ ├── {ts}_complete_report.json
│ │ ├── {ts}_analysts_checkpoint.json
│ │ ├── {ts}_trader_checkpoint.json
│ │ └── complete_report.md
│ └── portfolio/
│ └── report/
│ ├── {ts}_pm_decision.json
│ └── {ts}_execution_result.json
└── runs/ ← legacy run_id layout (backward compat only)
└── {run_id}/
```
### flow_id vs run_id
| Concept | Type | Purpose |
|---------|------|---------|
| `run_id` | UUID | In-memory identity for a live run; used as WebSocket endpoint key |
| `flow_id` | 8-char hex timestamp | Disk storage key; stable across server restarts |
`flow_id` is generated once per run via `generate_flow_id()` and threaded through
all sub-phases of an auto run so scan + pipeline + portfolio share the same folder.
`run_id` is ephemeral — it exists only in the `runs` dict and is not persisted.
### Startup Hydration
On server start, `hydrate_runs_from_disk()` scans `reports/daily/*/` for
`run_meta.json` files and rebuilds the `runs` dict with `events: []` (lazy).
Events are only loaded when actually needed (WebSocket connect or GET run detail).
---
## 2. Event Structure
Every event sent over WebSocket or persisted to `run_events.jsonl` follows this
schema:
```jsonc
{
// Core identity
"type": "thought" | "tool" | "tool_result" | "result" | "log" | "system",
"node_id": "Bull Researcher", // LangGraph node name
"parent_node_id": "Bull Researcher", // parent node (for tool events)
"identifier": "RIG", // ticker, "MARKET", or portfolio_id
"agent": "BULL RESEARCHER", // uppercase display name
"timestamp": "10:28:49", // HH:MM:SS
// Content
"message": "Thinking... (gls...)", // truncated display text
"prompt": "You are a bull researcher…", // full prompt (thought/result only)
"response": "Based on the analysis…", // full response (result/tool_result)
// Metrics (result events)
"metrics": {
"model": "glm-4.7-flash:q4_K_M",
"tokens_in": 1240,
"tokens_out": 856,
"latency_ms": 17843
},
// Tool events
"status": "running" | "success" | "error" | "graceful_skip",
"service": "yfinance", // data vendor used
// Re-run tracking
"rerun_seq": 1 // incremented on each phase re-run; 0 = original
}
```
### Event Types
| Type | Emitted by | Content |
|------|-----------|---------|
| `thought` | LLM streaming chunk | `message` (truncated), `prompt` |
| `result` | LLM final output | `message`, `prompt`, `response`, `metrics` |
| `tool` | Tool invocation start | `node_id`, `status: "running"`, `service` |
| `tool_result` | Tool completion | `status`, `response` (tool output), `service` |
| `log` | `RunLogger` | structured log line |
| `system` | Engine | human-readable status update; special messages `"Run completed."` and `"Error: …"` control frontend state machine |
### Graph Rendering Rules
The frontend renders graph nodes by grouping events on `(node_id, identifier)`.
For each unique pair, the node shows the **latest** event's metrics (last result
event wins). Nodes within the same `identifier` are stacked vertically; each
`identifier` becomes a column.
---
## 3. How Events Are Sent
### Normal Run Flow
```
POST /api/run/{type} → queues run, returns run_id + flow_id
status: "queued"
WS /ws/stream/{run_id} → connects
if status == "queued" → WebSocket IS the executor
engine.run_*() → streams events live to socket
run_info["events"].append() → events cached in memory
run_info["status"] = completed/failed
if status in running/completed/failed
→ replay cached events, poll for new ones until terminal state
```
### Background Task Flow (POST → BackgroundTask)
```
POST /api/run/{type}
BackgroundTask(_run_and_store) → drives engine generator
events cached in runs[run_id]["events"]
status updated to running → completed/failed
WS /ws/stream/{run_id}
→ enters "streaming from cache" loop
→ polls events[sent:] every 50ms until status is terminal
```
### Lazy-Loading (Server Restart / Run History)
```
Server restart
hydrate_runs_from_disk() → runs[run_id] = {..., "events": []}
WS /ws/stream/{run_id}
run_info.events == []
→ create_report_store(flow_id=flow_id)
→ store.load_run_events(date)
→ run_info["events"] = disk_events
if status == "running" and disk_events:
→ status = "failed", error = "Run did not complete (server restarted)"
→ replay all events, send "Run completed." or "Error: …"
```
### Key Invariants
- **Events are append-only** during a live run. Never modified in place.
- **run_events.jsonl is written on run completion** (not streamed to disk in real time).
This is acceptable for V1; periodic flush is a future enhancement.
- **WebSocket polling interval** is 50ms (`_EVENT_POLL_INTERVAL_SECONDS = 0.05`).
- **System messages** `"Run completed."` and `"Error: <msg>"` are terminal — the
frontend transitions to `completed` or `error` state on receiving them.
---
## 4. Checkpoint Structure
Checkpoints are intermediate snapshots that allow phase-level re-runs without
re-executing earlier phases.
### Analysts Checkpoint
**Written by**: `run_pipeline()` after the graph completes
**Condition**: at least one of `market_report`, `sentiment_report`,
`news_report`, `fundamentals_report` is populated (Social Analyst is optional)
**Path**: `{flow_id}/{TICKER}/report/{ts}_analysts_checkpoint.json`
```jsonc
{
"company_of_interest": "RIG",
"trade_date": "2026-03-26",
"market_report": "…", // from Market Analyst
"news_report": "…", // from News Analyst
"fundamentals_report": "…", // from Fundamentals Analyst
"sentiment_report": "", // from Social Analyst (may be empty — that's OK)
"macro_regime_report": "…", // from Macro Synthesis scan
"messages": [...] // LangGraph message history (for debate context)
}
```
**Used by**: `run_pipeline_from_phase()` when `phase == "debate_and_trader"`.
Overlaid onto `initial_state` before running `debate_graph`.
### Trader Checkpoint
**Written by**: `run_pipeline()` after the graph completes
**Condition**: `trader_investment_plan` is populated
**Path**: `{flow_id}/{TICKER}/report/{ts}_trader_checkpoint.json`
```jsonc
{
"company_of_interest": "RIG",
"trade_date": "2026-03-26",
"market_report": "…",
"news_report": "…",
"fundamentals_report": "…",
"sentiment_report": "",
"macro_regime_report": "…",
"investment_debate_state": {...}, // full bull/bear debate transcript
"investment_plan": "…", // Research Manager output
"trader_investment_plan": "…", // Trader output
"messages": [...]
}
```
**Used by**: `run_pipeline_from_phase()` when `phase == "risk"`.
### Phase Re-run Routing
```
node_id → phase → checkpoint loaded
──────────────────────────────────────────────────────────────
Market Analyst → analysts → none (full re-run)
News Analyst → analysts → none
Fundamentals Analyst → analysts → none
Social Analyst → analysts → none
Bull Researcher → debate_and_trader → analysts_checkpoint
Bear Researcher → debate_and_trader → analysts_checkpoint
Research Manager → debate_and_trader → analysts_checkpoint
Trader → debate_and_trader → analysts_checkpoint
Aggressive Analyst → risk → trader_checkpoint
Conservative Analyst → risk → trader_checkpoint
Neutral Analyst → risk → trader_checkpoint
Portfolio Manager → risk → trader_checkpoint
```
After any phase re-run completes, the engine **cascades** to `run_portfolio()`
so the PM decision incorporates the updated ticker analysis.
### Checkpoint Lookup Rule
**CRITICAL**: The read store used to load checkpoints **must use the same
`flow_id` as the original run**. Without the `flow_id`, `_date_root()` falls
back to the legacy flat layout and will never find checkpoints stored under
`{flow_id}/{TICKER}/report/`.
In `trigger_rerun_node`, the original flow_id is resolved as:
```python
flow_id = run.get("flow_id") or run.get("short_rid") or run["params"].get("flow_id")
```
This is then passed through `rerun_params["flow_id"]` to `run_pipeline_from_phase`,
which passes it to `create_report_store(flow_id=flow_id)`.
---
## 5. Selective Event Filtering on Re-run
When a phase re-run is triggered, the run's event list is **selectively filtered**
to remove stale events for the re-run scope while preserving events from:
- Other tickers (TSDD events preserved when re-running RIG)
- Earlier phases of the same ticker (analyst events preserved when re-running debate)
- Scan/market events (always preserved)
```python
# Nodes cleared per phase (plus all tool events with matching parent_node_id)
debate_and_trader → {Bull Researcher, Bear Researcher, Research Manager, Trader,
Aggressive Analyst, Conservative Analyst, Neutral Analyst,
Portfolio Manager}
risk → {Aggressive Analyst, Conservative Analyst, Neutral Analyst,
Portfolio Manager}
analysts → all nodes for the ticker
# Portfolio cascade nodes (always cleared — re-run always cascades to PM)
{review_holdings, make_pm_decision}
```
The WebSocket replays this filtered set first (rebuilding the full graph), then
streams the new re-run events on top. The frontend's `clearEvents()` + WebSocket
reconnect ensures a clean state before replay.
---
## 6. MongoDB vs Local Storage — Decision Guide
### Use Local Storage (ReportStore) when:
- **Development or single-machine deployment** — no infrastructure required
- **Offline / air-gapped environments** — no network dependency
- **Report files are the primary output** — reports as .json/.md files that
can be read with any tool
- **Simplicity over scalability** — one process, one machine
### Use MongoDB (MongoReportStore) when:
- **Multi-process or multi-node deployment** — local files are not shared
- **Run history across restarts** — hydration from MongoDB is more reliable
than scanning the filesystem
- **Reflexion memory**`ReflexionMemory` works best with MongoDB for
efficient per-ticker history queries
- **Future: TTL / retention** — MongoDB TTL indexes make automatic cleanup easy
- **Production environments** — MongoDB provides durability, replication, and
backup
### Configuration
```env
# Enable MongoDB:
TRADINGAGENTS_MONGO_URI=mongodb://localhost:27017
TRADINGAGENTS_MONGO_DB=tradingagents # optional, default: "tradingagents"
# Local storage (default when MONGO_URI is unset):
TRADINGAGENTS_REPORTS_DIR=/path/to/reports # optional, default: ./reports
```
### Factory Behaviour
```python
# Always use the factory — never instantiate stores directly
from tradingagents.portfolio.store_factory import create_report_store
# Writing: always pass flow_id (scopes writes to the correct run folder)
writer = create_report_store(flow_id=flow_id)
# Reading: omit flow_id (resolves via latest.json or MongoDB latest query)
reader = create_report_store()
```
`create_report_store()` returns:
1. `DualReportStore(MongoReportStore, ReportStore)` — when `MONGO_URI` is set
and pymongo is installed (writes to both; reads from Mongo first, falls back
to disk)
2. `ReportStore` — when MongoDB is unavailable or not configured
MongoDB failures **always** fall back to filesystem with a warning log. The
application must remain functional without MongoDB.
### Known V1 Limitations (Future Work)
| Issue | Status |
|-------|--------|
| `pymongo` is synchronous — blocks asyncio event loop | Deferred: migrate to `motor` before production |
| No TTL index — reports accumulate indefinitely | Deferred: requires retention policy decision |
| `MongoClient` created per store instance | Deferred: singleton via FastAPI app lifespan |
| `run_events.jsonl` written on completion, not streaming | Deferred: periodic flush for long runs |
---
## Consequences & Constraints
### MUST
- **Always use `create_report_store(flow_id=…)` for writes** — never pass no
args when writing, as the flat fallback path will overwrite across runs.
- **Always pass the original `flow_id` when loading checkpoints for re-run**
checkpoint lookup will silently return `None` otherwise, causing full re-run
fallback.
- **Save analysts checkpoint if `any()` analyst report is populated** — Social
Analyst is optional; `all()` silently blocks checkpoints when social is disabled.
- **Selective event filtering on re-run** — never clear all events; always use
`_filter_rerun_events(events, ticker, phase)` to preserve other tickers and
earlier phases.
### MUST NOT
- **Never hard-code `ReportStore()` in engine methods** — always use the factory.
- **Never hold pymongo in the async hot path** — wrap in `asyncio.to_thread` if
blocking becomes measurable.
### Source Files
```
tradingagents/portfolio/report_store.py ← ReportStore (filesystem)
tradingagents/portfolio/mongo_report_store.py ← MongoReportStore
tradingagents/portfolio/dual_report_store.py ← DualReportStore (both)
tradingagents/portfolio/store_factory.py ← create_report_store()
tradingagents/report_paths.py ← flow_id/run_id helpers, ts_now()
agent_os/backend/main.py ← hydrate_runs_from_disk()
agent_os/backend/routes/runs.py ← _run_and_store, _append_and_store,
_filter_rerun_events, trigger_rerun_node
agent_os/backend/routes/websocket.py ← lazy-loading, orphaned run detection
agent_os/backend/services/langgraph_engine.py ← run_pipeline_from_phase, NODE_TO_PHASE,
checkpoint save/load logic
agent_os/frontend/src/hooks/useAgentStream.ts ← WebSocket client, event accumulation
agent_os/frontend/src/Dashboard.tsx ← triggerNodeRerun, loadRun, clearEvents
```

View File

@ -62,6 +62,11 @@ class DualReportStore:
"""The flow identifier set on this store, if any."""
return self._local.flow_id
@property
def flow_id(self) -> str | None:
"""The flow identifier set on this store, if any."""
return self._local.flow_id
@property
def run_id(self) -> str | None:
"""The run/flow identifier (flow_id takes precedence)."""

View File

@ -91,6 +91,11 @@ class MongoReportStore:
"""The flow identifier set on this store, if any."""
return self._flow_id
@property
def flow_id(self) -> str | None:
"""The flow identifier set on this store, if any."""
return self._flow_id
@property
def run_id(self) -> str | None:
"""The run/flow identifier (flow_id takes precedence for backward compat)."""