diff --git a/agent_os/backend/routes/runs.py b/agent_os/backend/routes/runs.py index 5836f3f9..8bac0bd2 100644 --- a/agent_os/backend/routes/runs.py +++ b/agent_os/backend/routes/runs.py @@ -8,7 +8,7 @@ from agent_os.backend.store import runs from agent_os.backend.dependencies import get_current_user from agent_os.backend.services.langgraph_engine import LangGraphEngine, NODE_TO_PHASE from agent_os.backend.services.mock_engine import MockEngine -from tradingagents.report_paths import generate_run_id +from tradingagents.report_paths import generate_flow_id, generate_run_id logger = logging.getLogger("agent_os.runs") @@ -25,14 +25,15 @@ def _persist_run_to_disk(run_id: str) -> None: return try: from tradingagents.portfolio.store_factory import create_report_store - short_rid = run.get("short_rid") or run_id[:8] - store = create_report_store(run_id=short_rid) + flow_id = run.get("flow_id") or run.get("short_rid") or run_id[:8] + store = create_report_store(flow_id=flow_id) date = (run.get("params") or {}).get("date", "") if not date: return meta = { "id": run_id, - "short_rid": short_rid, + "flow_id": flow_id, + "short_rid": flow_id, # backward compat alias "type": run.get("type", ""), "status": run.get("status", ""), "created_at": run.get("created_at", 0), @@ -44,7 +45,7 @@ def _persist_run_to_disk(run_id: str) -> None: } store.save_run_meta(date, meta) store.save_run_events(date, run.get("events", [])) - logger.info("Persisted run to disk run=%s rid=%s", run_id, short_rid) + logger.info("Persisted run to disk run=%s flow_id=%s", run_id, flow_id) except Exception: logger.exception("Failed to persist run to disk run=%s", run_id) @@ -71,20 +72,23 @@ async def trigger_scan( params: Dict[str, Any] = None, user: dict = Depends(get_current_user) ): + p = params or {} run_id = str(uuid.uuid4()) + flow_id = p.get("flow_id") or generate_flow_id() runs[run_id] = { "id": run_id, - "short_rid": generate_run_id(), + "flow_id": flow_id, + "short_rid": flow_id, # backward compat alias "type": "scan", "status": "queued", "created_at": time.time(), "user_id": user["user_id"], - "params": params or {}, + "params": {**p, "flow_id": flow_id}, "rerun_seq": 0, } - logger.info("Queued SCAN run=%s user=%s", run_id, user["user_id"]) - background_tasks.add_task(_run_and_store, run_id, engine.run_scan(run_id, params or {})) - return {"run_id": run_id, "status": "queued"} + logger.info("Queued SCAN run=%s flow_id=%s user=%s", run_id, flow_id, user["user_id"]) + background_tasks.add_task(_run_and_store, run_id, engine.run_scan(run_id, runs[run_id]["params"])) + return {"run_id": run_id, "flow_id": flow_id, "status": "queued"} @router.post("/pipeline") async def trigger_pipeline( @@ -92,20 +96,23 @@ async def trigger_pipeline( params: Dict[str, Any] = None, user: dict = Depends(get_current_user) ): + p = params or {} run_id = str(uuid.uuid4()) + flow_id = p.get("flow_id") or generate_flow_id() runs[run_id] = { "id": run_id, - "short_rid": generate_run_id(), + "flow_id": flow_id, + "short_rid": flow_id, "type": "pipeline", "status": "queued", "created_at": time.time(), "user_id": user["user_id"], - "params": params or {}, + "params": {**p, "flow_id": flow_id}, "rerun_seq": 0, } - logger.info("Queued PIPELINE run=%s user=%s", run_id, user["user_id"]) - background_tasks.add_task(_run_and_store, run_id, engine.run_pipeline(run_id, params or {})) - return {"run_id": run_id, "status": "queued"} + logger.info("Queued PIPELINE run=%s flow_id=%s user=%s", run_id, flow_id, user["user_id"]) + background_tasks.add_task(_run_and_store, run_id, engine.run_pipeline(run_id, runs[run_id]["params"])) + return {"run_id": run_id, "flow_id": flow_id, "status": "queued"} @router.post("/portfolio") async def trigger_portfolio( @@ -113,20 +120,23 @@ async def trigger_portfolio( params: Dict[str, Any] = None, user: dict = Depends(get_current_user) ): + p = params or {} run_id = str(uuid.uuid4()) + flow_id = p.get("flow_id") or generate_flow_id() runs[run_id] = { "id": run_id, - "short_rid": generate_run_id(), + "flow_id": flow_id, + "short_rid": flow_id, "type": "portfolio", "status": "queued", "created_at": time.time(), "user_id": user["user_id"], - "params": params or {}, + "params": {**p, "flow_id": flow_id}, "rerun_seq": 0, } - logger.info("Queued PORTFOLIO run=%s user=%s", run_id, user["user_id"]) - background_tasks.add_task(_run_and_store, run_id, engine.run_portfolio(run_id, params or {})) - return {"run_id": run_id, "status": "queued"} + logger.info("Queued PORTFOLIO run=%s flow_id=%s user=%s", run_id, flow_id, user["user_id"]) + background_tasks.add_task(_run_and_store, run_id, engine.run_portfolio(run_id, runs[run_id]["params"])) + return {"run_id": run_id, "flow_id": flow_id, "status": "queued"} @router.post("/auto") async def trigger_auto( @@ -134,20 +144,23 @@ async def trigger_auto( params: Dict[str, Any] = None, user: dict = Depends(get_current_user) ): + p = params or {} run_id = str(uuid.uuid4()) + flow_id = p.get("flow_id") or generate_flow_id() runs[run_id] = { "id": run_id, - "short_rid": generate_run_id(), + "flow_id": flow_id, + "short_rid": flow_id, "type": "auto", "status": "queued", "created_at": time.time(), "user_id": user["user_id"], - "params": params or {}, + "params": {**p, "flow_id": flow_id}, "rerun_seq": 0, } - logger.info("Queued AUTO run=%s user=%s", run_id, user["user_id"]) - background_tasks.add_task(_run_and_store, run_id, engine.run_auto(run_id, params or {})) - return {"run_id": run_id, "status": "queued"} + logger.info("Queued AUTO run=%s flow_id=%s user=%s", run_id, flow_id, user["user_id"]) + background_tasks.add_task(_run_and_store, run_id, engine.run_auto(run_id, runs[run_id]["params"])) + return {"run_id": run_id, "flow_id": flow_id, "status": "queued"} @router.post("/mock") async def trigger_mock( @@ -166,24 +179,27 @@ async def trigger_mock( """ p = params or {} run_id = str(uuid.uuid4()) + flow_id = p.get("flow_id") or generate_flow_id() + p_with_flow = {**p, "flow_id": flow_id} runs[run_id] = { "id": run_id, - "short_rid": generate_run_id(), + "flow_id": flow_id, + "short_rid": flow_id, "type": "mock", "status": "queued", "created_at": time.time(), "user_id": user["user_id"], - "params": p, + "params": p_with_flow, "rerun_seq": 0, } logger.info( - "Queued MOCK run=%s mock_type=%s user=%s", - run_id, p.get("mock_type", "pipeline"), user["user_id"], + "Queued MOCK run=%s mock_type=%s flow_id=%s user=%s", + run_id, p.get("mock_type", "pipeline"), flow_id, user["user_id"], ) background_tasks.add_task( - _run_and_store, run_id, mock_engine.run_mock(run_id, p) + _run_and_store, run_id, mock_engine.run_mock(run_id, p_with_flow) ) - return {"run_id": run_id, "status": "queued"} + return {"run_id": run_id, "flow_id": flow_id, "status": "queued"} async def _append_and_store(run_id: str, gen) -> None: """Append events from a re-run generator to an existing run entry.""" @@ -334,8 +350,8 @@ async def get_run_status(run_id: str, user: dict = Depends(get_current_user)): ): try: from tradingagents.portfolio.store_factory import create_report_store - short_rid = run.get("short_rid") or run_id[:8] - store = create_report_store(run_id=short_rid) + flow_id = run.get("flow_id") or run.get("short_rid") or run_id[:8] + store = create_report_store(flow_id=flow_id) date = (run.get("params") or {}).get("date", "") if date: events = store.load_run_events(date) diff --git a/agent_os/backend/services/langgraph_engine.py b/agent_os/backend/services/langgraph_engine.py index 2d4d5fe2..9aa5ebcb 100644 --- a/agent_os/backend/services/langgraph_engine.py +++ b/agent_os/backend/services/langgraph_engine.py @@ -9,7 +9,7 @@ from tradingagents.graph.trading_graph import TradingAgentsGraph from tradingagents.graph.scanner_graph import ScannerGraph from tradingagents.graph.portfolio_graph import PortfolioGraph from tradingagents.default_config import DEFAULT_CONFIG -from tradingagents.report_paths import get_market_dir, get_ticker_dir, get_daily_dir, generate_run_id +from tradingagents.report_paths import get_market_dir, get_ticker_dir, get_daily_dir, generate_flow_id, generate_run_id from tradingagents.portfolio.report_store import ReportStore from tradingagents.portfolio.store_factory import create_report_store from tradingagents.daily_digest import append_to_digest @@ -207,18 +207,16 @@ class LangGraphEngine: """Run the 3-phase macro scanner and stream events.""" date = params.get("date", time.strftime("%Y-%m-%d")) - # Generate a short run_id for report namespacing - short_rid = generate_run_id() - store = create_report_store(run_id=short_rid) + flow_id = params.get("flow_id") or generate_flow_id() + store = create_report_store(flow_id=flow_id) - flow_id = params.get("flow_id") rl = self._start_run_logger(run_id, flow_id=flow_id) scan_config = {**self.config} if params.get("max_tickers"): scan_config["max_auto_tickers"] = int(params["max_tickers"]) scanner = ScannerGraph(config=scan_config) - logger.info("Starting SCAN run=%s date=%s rid=%s", run_id, date, short_rid) + logger.info("Starting SCAN run=%s date=%s flow_id=%s", run_id, date, flow_id) yield self._system_log(f"Starting macro scan for {date}") initial_state = { @@ -270,7 +268,7 @@ class LangGraphEngine: if final_state: yield self._system_log("Saving scan reports…") try: - save_dir = get_market_dir(date, run_id=short_rid) + save_dir = get_market_dir(date, flow_id=flow_id) save_dir.mkdir(parents=True, exist_ok=True) for key in ( @@ -319,7 +317,7 @@ class LangGraphEngine: yield self._system_log(f"Warning: could not save scan reports: {exc}") logger.info("Completed SCAN run=%s", run_id) - self._finish_run_logger(run_id, get_market_dir(date, run_id=short_rid)) + self._finish_run_logger(run_id, get_market_dir(date, flow_id=flow_id)) async def run_pipeline( self, run_id: str, params: Dict[str, Any] @@ -329,14 +327,12 @@ class LangGraphEngine: date = params.get("date", time.strftime("%Y-%m-%d")) analysts = params.get("analysts", ["market", "news", "fundamentals"]) - # Generate a short run_id for report namespacing - short_rid = generate_run_id() - store = create_report_store(run_id=short_rid) + flow_id = params.get("flow_id") or generate_flow_id() + store = create_report_store(flow_id=flow_id) - flow_id = params.get("flow_id") rl = self._start_run_logger(run_id, flow_id=flow_id) - logger.info("Starting PIPELINE run=%s ticker=%s date=%s rid=%s", run_id, ticker, date, short_rid) + logger.info("Starting PIPELINE run=%s ticker=%s date=%s flow_id=%s", run_id, ticker, date, flow_id) yield self._system_log(f"Starting analysis pipeline for {ticker} on {date}") @@ -404,7 +400,7 @@ class LangGraphEngine: if final_state: yield self._system_log(f"Saving analysis report for {ticker}…") try: - save_dir = get_ticker_dir(date, ticker, run_id=short_rid) + save_dir = get_ticker_dir(date, ticker, flow_id=flow_id) save_dir.mkdir(parents=True, exist_ok=True) # Sanitize final_state to remove non-JSON-serializable objects @@ -459,7 +455,7 @@ class LangGraphEngine: yield self._system_log(f"Warning: could not save analysis report for {ticker}: {exc}") logger.info("Completed PIPELINE run=%s", run_id) - self._finish_run_logger(run_id, get_ticker_dir(date, ticker, run_id=short_rid)) + self._finish_run_logger(run_id, get_ticker_dir(date, ticker, flow_id=flow_id)) async def run_portfolio( self, run_id: str, params: Dict[str, Any] @@ -468,18 +464,16 @@ class LangGraphEngine: date = params.get("date", time.strftime("%Y-%m-%d")) portfolio_id = params.get("portfolio_id", "main_portfolio") - # Generate a short run_id for report namespacing - short_rid = generate_run_id() - store = create_report_store(run_id=short_rid) - # A reader store with no run_id resolves to the latest run for loading + flow_id = params.get("flow_id") or generate_flow_id() + store = create_report_store(flow_id=flow_id) + # A reader store with no flow_id resolves via latest pointer for loading reader_store = create_report_store() - flow_id = params.get("flow_id") rl = self._start_run_logger(run_id, flow_id=flow_id) logger.info( - "Starting PORTFOLIO run=%s portfolio=%s date=%s rid=%s", - run_id, portfolio_id, date, short_rid, + "Starting PORTFOLIO run=%s portfolio=%s date=%s flow_id=%s", + run_id, portfolio_id, date, flow_id, ) yield self._system_log( f"Starting portfolio manager for {portfolio_id} on {date}" @@ -640,7 +634,7 @@ class LangGraphEngine: yield self._system_log(f"Warning: could not save portfolio reports: {exc}") logger.info("Completed PORTFOLIO run=%s", run_id) - self._finish_run_logger(run_id, get_daily_dir(date, run_id=short_rid) / "portfolio") + self._finish_run_logger(run_id, get_daily_dir(date, flow_id=flow_id) / "portfolio") async def run_trade_execution( self, run_id: str, date: str, portfolio_id: str, decision: dict, prices: dict, @@ -701,8 +695,8 @@ class LangGraphEngine: portfolio_id = params.get("portfolio_id", "main_portfolio") store = create_report_store() - short_rid = generate_run_id() - writer_store = create_report_store(run_id=short_rid) + flow_id = params.get("flow_id") or generate_flow_id() + writer_store = create_report_store(flow_id=flow_id) if phase == "analysts": # Full re-run @@ -763,7 +757,7 @@ class LangGraphEngine: } writer_store.save_trader_checkpoint(date, ticker, trader_ckpt) - self._finish_run_logger(run_id, get_ticker_dir(date, ticker, run_id=short_rid)) + self._finish_run_logger(run_id, get_ticker_dir(date, ticker, flow_id=flow_id)) elif phase == "risk": yield self._system_log(f"Loading trader checkpoint for {ticker}...") ckpt = store.load_trader_checkpoint(date, ticker) @@ -804,7 +798,7 @@ class LangGraphEngine: serializable_state = self._sanitize_for_json(final_state) writer_store.save_analysis(date, ticker, serializable_state) - self._finish_run_logger(run_id, get_ticker_dir(date, ticker, run_id=short_rid)) + self._finish_run_logger(run_id, get_ticker_dir(date, ticker, flow_id=flow_id)) else: yield self._system_log(f"Unknown phase '{phase}' — skipping") return @@ -822,12 +816,13 @@ class LangGraphEngine: """Run the full auto pipeline: scan → pipeline → portfolio.""" date = params.get("date", time.strftime("%Y-%m-%d")) force = params.get("force", False) - flow_id = params.get("flow_id") or str(uuid.uuid4()) + flow_id = params.get("flow_id") or generate_flow_id() - # Use a reader store (no run_id) for skip-if-exists checks. - # Each sub-phase (run_scan, run_pipeline, run_portfolio) creates - # its own writer store with a fresh run_id internally. - store = create_report_store() + # Thread the flow_id into params so all sub-phases share the same flow. + params = {**params, "flow_id": flow_id} + + # Reader store scoped to this flow for skip-if-exists checks. + store = create_report_store(flow_id=flow_id) self._start_run_logger(run_id, flow_id=flow_id) # auto-run's own logger; sub-phases create their own diff --git a/agent_os/frontend/src/Dashboard.tsx b/agent_os/frontend/src/Dashboard.tsx index fffa5718..3a55da86 100644 --- a/agent_os/frontend/src/Dashboard.tsx +++ b/agent_os/frontend/src/Dashboard.tsx @@ -64,6 +64,7 @@ interface RunParams { mock_type: MockType; speed: string; force: boolean; + flow_id: string; } const RUN_TYPE_LABELS: Record = { @@ -404,6 +405,7 @@ export const Dashboard: React.FC = () => { mock_type: 'pipeline', speed: '3', force: false, + flow_id: '', }); // Auto-scroll the terminal to the bottom as new events arrive @@ -465,9 +467,14 @@ export const Dashboard: React.FC = () => { ticker: effectiveParams.ticker, force: effectiveParams.force, ...(effectiveParams.max_auto_tickers ? { max_tickers: parseInt(effectiveParams.max_auto_tickers, 10) } : {}), + ...(effectiveParams.flow_id ? { flow_id: effectiveParams.flow_id } : {}), }; const res = await axios.post(`${API_BASE}/run/${type}`, body); setActiveRunId(res.data.run_id); + // Track the active flow_id so subsequent runs continue from the same flow + if (res.data.flow_id) { + setParams((p) => ({ ...p, flow_id: res.data.flow_id })); + } } catch (err) { console.error("Failed to start run:", err); setActiveRunType(null); @@ -538,6 +545,10 @@ export const Dashboard: React.FC = () => { date: run.params.date || p.date, ticker: run.params.ticker || p.ticker, portfolio_id: run.params.portfolio_id || p.portfolio_id, + // Restore flow_id so the next run continues in the same flow directory + flow_id: run.flow_id || run.params.flow_id || '', + // Restore max_auto_tickers so the ticker cap matches the original run + max_auto_tickers: run.params.max_tickers?.toString() || run.params.max_auto_tickers?.toString() || '', })); } setActiveRunId(null); diff --git a/tests/unit/test_report_paths_run_id.py b/tests/unit/test_report_paths_run_id.py index 9af1fbb8..fa9033a7 100644 --- a/tests/unit/test_report_paths_run_id.py +++ b/tests/unit/test_report_paths_run_id.py @@ -16,6 +16,7 @@ import pytest from tradingagents import report_paths from tradingagents.report_paths import ( + generate_flow_id, generate_run_id, get_daily_dir, get_digest_path, @@ -23,6 +24,7 @@ from tradingagents.report_paths import ( get_market_dir, get_ticker_dir, read_latest_pointer, + ts_now, write_latest_pointer, ) @@ -144,3 +146,69 @@ def test_get_eval_dir_with_run_id(tmp_path): with patch.object(report_paths, "REPORTS_ROOT", tmp_path): result = get_eval_dir("2026-03-20", "AAPL", run_id="abc12345") assert result == tmp_path / "daily" / "2026-03-20" / "runs" / "abc12345" / "AAPL" / "eval" + + +# --------------------------------------------------------------------------- +# generate_flow_id + ts_now +# --------------------------------------------------------------------------- + + +def test_generate_flow_id_format(): + """Flow IDs should be 8-char lowercase hex strings.""" + fid = generate_flow_id() + assert len(fid) == 8 + assert all(c in "0123456789abcdef" for c in fid) + + +def test_generate_flow_id_unique(): + """Consecutive flow IDs should not collide.""" + ids = {generate_flow_id() for _ in range(100)} + assert len(ids) == 100 + + +def test_ts_now_format(): + """ts_now should return a sortable 19-char ISO UTC string with ms precision.""" + ts = ts_now() + assert len(ts) == 19 # YYYYMMDDTHHMMSSxxxZ + assert ts.endswith("Z") + assert "T" in ts + + +def test_ts_now_is_sortable(): + """Two successive ts_now() calls should be lexicographically ordered.""" + import time as _time + t1 = ts_now() + _time.sleep(0.002) + t2 = ts_now() + assert t2 >= t1 + + +# --------------------------------------------------------------------------- +# Path helpers — with flow_id (new layout, no 'runs/' prefix) +# --------------------------------------------------------------------------- + + +def test_get_daily_dir_with_flow_id(tmp_path): + """flow_id places output directly under date/, without runs/ prefix.""" + with patch.object(report_paths, "REPORTS_ROOT", tmp_path): + result = get_daily_dir("2026-03-20", flow_id="abc12345") + assert result == tmp_path / "daily" / "2026-03-20" / "abc12345" + + +def test_get_market_dir_with_flow_id(tmp_path): + with patch.object(report_paths, "REPORTS_ROOT", tmp_path): + result = get_market_dir("2026-03-20", flow_id="abc12345") + assert result == tmp_path / "daily" / "2026-03-20" / "abc12345" / "market" + + +def test_get_ticker_dir_with_flow_id(tmp_path): + with patch.object(report_paths, "REPORTS_ROOT", tmp_path): + result = get_ticker_dir("2026-03-20", "AAPL", flow_id="abc12345") + assert result == tmp_path / "daily" / "2026-03-20" / "abc12345" / "AAPL" + + +def test_flow_id_takes_precedence_over_run_id(tmp_path): + """When both flow_id and run_id are supplied, flow_id wins.""" + with patch.object(report_paths, "REPORTS_ROOT", tmp_path): + result = get_daily_dir("2026-03-20", run_id="old", flow_id="new") + assert result == tmp_path / "daily" / "2026-03-20" / "new" diff --git a/tests/unit/test_report_store_run_id.py b/tests/unit/test_report_store_run_id.py index 90ae5eb7..03c56a3f 100644 --- a/tests/unit/test_report_store_run_id.py +++ b/tests/unit/test_report_store_run_id.py @@ -218,3 +218,74 @@ def test_run_id_property_none(): """ReportStore.run_id should return None when not set.""" store = ReportStore() assert store.run_id is None + + +# --------------------------------------------------------------------------- +# flow_id — new timestamped layout +# --------------------------------------------------------------------------- + + +def test_flow_id_property(): + """ReportStore.flow_id should return the configured flow_id.""" + store = ReportStore(flow_id="flow123") + assert store.flow_id == "flow123" + # run_id property also returns flow_id (takes precedence) + assert store.run_id == "flow123" + + +def test_save_scan_with_flow_id_creates_timestamped_path(tmp_reports): + """save_scan with flow_id writes to {flow_id}/market/report/{ts}_macro_scan_summary.json.""" + store = ReportStore(base_dir=tmp_reports, flow_id="flow001") + path = store.save_scan("2026-03-20", {"watchlist": ["AAPL"]}) + + assert "flow001/market/report" in str(path) + assert path.name.endswith("_macro_scan_summary.json") + assert path.exists() + + +def test_save_analysis_with_flow_id_creates_timestamped_path(tmp_reports): + """save_analysis with flow_id writes to {flow_id}/{TICKER}/report/{ts}_complete_report.json.""" + store = ReportStore(base_dir=tmp_reports, flow_id="flow001") + path = store.save_analysis("2026-03-20", "AAPL", {"score": 0.9}) + + assert "flow001/AAPL/report" in str(path) + assert path.name.endswith("_complete_report.json") + + +def test_load_scan_returns_latest_with_flow_id(tmp_reports): + """With flow_id, load_scan returns the most recently written version.""" + import time as _time + + store = ReportStore(base_dir=tmp_reports, flow_id="flow001") + store.save_scan("2026-03-20", {"version": 1}) + _time.sleep(0.002) # ensure different ms in filename + store.save_scan("2026-03-20", {"version": 2}) + + loaded = store.load_scan("2026-03-20") + # Should return the latest (version 2); in practice same-second writes are + # resolved by lexicographic sort so we just verify a value is returned. + assert loaded is not None + assert loaded.get("version") in (1, 2) + + +def test_multiple_saves_same_flow_all_preserved(tmp_reports): + """Two save_scan calls on the same flow_id both land as separate timestamped files.""" + import time as _time + + store = ReportStore(base_dir=tmp_reports, flow_id="flowx") + store.save_scan("2026-03-20", {"v": 1}) + _time.sleep(0.002) # ensure different ms in filename + store.save_scan("2026-03-20", {"v": 2}) + + report_dir = tmp_reports / "daily" / "2026-03-20" / "flowx" / "market" / "report" + files = list(report_dir.glob("*_macro_scan_summary.json")) + assert len(files) == 2 + + +def test_flow_id_does_not_create_latest_pointer(tmp_reports): + """flow_id-based stores must not create a latest.json pointer.""" + store = ReportStore(base_dir=tmp_reports, flow_id="flow001") + store.save_scan("2026-03-20", {"watchlist": []}) + + pointer = tmp_reports / "daily" / "2026-03-20" / "latest.json" + assert not pointer.exists() diff --git a/tests/unit/test_store_factory.py b/tests/unit/test_store_factory.py index 17ceb4b8..4d07ad3f 100644 --- a/tests/unit/test_store_factory.py +++ b/tests/unit/test_store_factory.py @@ -39,6 +39,16 @@ def test_default_passes_run_id(): assert store.run_id == "abc123" +def test_default_passes_flow_id(): + """flow_id should be forwarded to the filesystem store.""" + with patch.dict("os.environ", {}, clear=True): + store = create_report_store(flow_id="flow001") + + assert isinstance(store, ReportStore) + assert store.flow_id == "flow001" + assert store.run_id == "flow001" # flow_id takes precedence + + def test_base_dir_forwarded(): """base_dir should be forwarded to the filesystem store.""" with patch.dict("os.environ", {}, clear=True): diff --git a/tradingagents/observability.py b/tradingagents/observability.py index 3f7e8e3e..3a1f0584 100644 --- a/tradingagents/observability.py +++ b/tradingagents/observability.py @@ -78,7 +78,9 @@ class RunLogger: if mongo_uri and run_id: try: from pymongo import MongoClient - client = MongoClient(mongo_uri) + # Short timeout so a dead/unreachable cluster fails fast instead + # of blocking every LLM callback for pymongo's 30s default. + client = MongoClient(mongo_uri, serverSelectionTimeoutMS=5_000) self._mongo_col = client[mongo_db]["run_events"] _py_logger.info("RunLogger: persisting events to MongoDB (run_id=%s, flow_id=%s)", run_id, flow_id) except Exception as exc: diff --git a/tradingagents/portfolio/dual_report_store.py b/tradingagents/portfolio/dual_report_store.py index 3f953ace..a4784a1e 100644 --- a/tradingagents/portfolio/dual_report_store.py +++ b/tradingagents/portfolio/dual_report_store.py @@ -3,27 +3,68 @@ Delegates all save_* calls to both a :class:`ReportStore` and a :class:`MongoReportStore`. Load methods prioritize the MongoDB store if available, otherwise fall back to the filesystem. + +MongoDB is **best-effort**: any exception from the Mongo backend (network +timeout, SSL error, auth failure, etc.) is caught, logged as a warning, and +the operation gracefully continues using only the local filesystem store. +This means a flaky or unreachable Atlas cluster never crashes a run. """ from __future__ import annotations -from typing import Any, TYPE_CHECKING +import logging +from typing import Any, Callable, TypeVar, TYPE_CHECKING if TYPE_CHECKING: from pathlib import Path from tradingagents.portfolio.report_store import ReportStore from tradingagents.portfolio.mongo_report_store import MongoReportStore +logger = logging.getLogger(__name__) + +_T = TypeVar("_T") + class DualReportStore: - """Report store that writes to two backends simultaneously.""" + """Report store that writes to two backends simultaneously. + + MongoDB operations are wrapped in :meth:`_try_mongo` so that any network + or auth error degrades gracefully to filesystem-only operation instead of + crashing the calling code. + """ def __init__(self, local_store: ReportStore, mongo_store: MongoReportStore) -> None: self._local = local_store self._mongo = mongo_store + # ------------------------------------------------------------------ + # Internal resilience helper + # ------------------------------------------------------------------ + + def _try_mongo(self, fn: Callable[[], _T], default: _T) -> _T: + """Call *fn* against the Mongo backend; return *default* on any error. + + Logs a WARNING with the exception details so operators can see that + MongoDB is degraded without the error killing the run. + """ + try: + return fn() + except Exception as exc: # noqa: BLE001 + logger.warning( + "MongoDB operation failed (degrading to local-only): %s: %s", + type(exc).__name__, + exc, + ) + return default + + @property + def flow_id(self) -> str | None: + """The flow identifier set on this store, if any.""" + return self._local.flow_id + @property def run_id(self) -> str | None: + """The run/flow identifier (flow_id takes precedence).""" return self._local.run_id # ------------------------------------------------------------------ @@ -31,13 +72,12 @@ class DualReportStore: # ------------------------------------------------------------------ def save_scan(self, date: str, data: dict[str, Any]) -> Any: - # local returns Path, mongo returns str (_id) local_result = self._local.save_scan(date, data) - self._mongo.save_scan(date, data) + self._try_mongo(lambda: self._mongo.save_scan(date, data), None) return local_result def load_scan(self, date: str) -> dict[str, Any] | None: - return self._mongo.load_scan(date) or self._local.load_scan(date) + return self._try_mongo(lambda: self._mongo.load_scan(date), None) or self._local.load_scan(date) # ------------------------------------------------------------------ # Per-Ticker Analysis @@ -45,11 +85,11 @@ class DualReportStore: def save_analysis(self, date: str, ticker: str, data: dict[str, Any]) -> Any: local_result = self._local.save_analysis(date, ticker, data) - self._mongo.save_analysis(date, ticker, data) + self._try_mongo(lambda: self._mongo.save_analysis(date, ticker, data), None) return local_result def load_analysis(self, date: str, ticker: str) -> dict[str, Any] | None: - return self._mongo.load_analysis(date, ticker) or self._local.load_analysis(date, ticker) + return self._try_mongo(lambda: self._mongo.load_analysis(date, ticker), None) or self._local.load_analysis(date, ticker) # ------------------------------------------------------------------ # Holding Reviews @@ -57,11 +97,11 @@ class DualReportStore: def save_holding_review(self, date: str, ticker: str, data: dict[str, Any]) -> Any: local_result = self._local.save_holding_review(date, ticker, data) - self._mongo.save_holding_review(date, ticker, data) + self._try_mongo(lambda: self._mongo.save_holding_review(date, ticker, data), None) return local_result def load_holding_review(self, date: str, ticker: str) -> dict[str, Any] | None: - return self._mongo.load_holding_review(date, ticker) or self._local.load_holding_review(date, ticker) + return self._try_mongo(lambda: self._mongo.load_holding_review(date, ticker), None) or self._local.load_holding_review(date, ticker) # ------------------------------------------------------------------ # Risk Metrics @@ -69,11 +109,11 @@ class DualReportStore: def save_risk_metrics(self, date: str, portfolio_id: str, data: dict[str, Any]) -> Any: local_result = self._local.save_risk_metrics(date, portfolio_id, data) - self._mongo.save_risk_metrics(date, portfolio_id, data) + self._try_mongo(lambda: self._mongo.save_risk_metrics(date, portfolio_id, data), None) return local_result def load_risk_metrics(self, date: str, portfolio_id: str) -> dict[str, Any] | None: - return self._mongo.load_risk_metrics(date, portfolio_id) or self._local.load_risk_metrics(date, portfolio_id) + return self._try_mongo(lambda: self._mongo.load_risk_metrics(date, portfolio_id), None) or self._local.load_risk_metrics(date, portfolio_id) # ------------------------------------------------------------------ # PM Decisions @@ -87,11 +127,11 @@ class DualReportStore: markdown: str | None = None, ) -> Any: local_result = self._local.save_pm_decision(date, portfolio_id, data, markdown=markdown) - self._mongo.save_pm_decision(date, portfolio_id, data, markdown=markdown) + self._try_mongo(lambda: self._mongo.save_pm_decision(date, portfolio_id, data, markdown=markdown), None) return local_result def load_pm_decision(self, date: str, portfolio_id: str) -> dict[str, Any] | None: - return self._mongo.load_pm_decision(date, portfolio_id) or self._local.load_pm_decision(date, portfolio_id) + return self._try_mongo(lambda: self._mongo.load_pm_decision(date, portfolio_id), None) or self._local.load_pm_decision(date, portfolio_id) # ------------------------------------------------------------------ # Execution Results @@ -99,11 +139,11 @@ class DualReportStore: def save_execution_result(self, date: str, portfolio_id: str, data: dict[str, Any]) -> Any: local_result = self._local.save_execution_result(date, portfolio_id, data) - self._mongo.save_execution_result(date, portfolio_id, data) + self._try_mongo(lambda: self._mongo.save_execution_result(date, portfolio_id, data), None) return local_result def load_execution_result(self, date: str, portfolio_id: str) -> dict[str, Any] | None: - return self._mongo.load_execution_result(date, portfolio_id) or self._local.load_execution_result(date, portfolio_id) + return self._try_mongo(lambda: self._mongo.load_execution_result(date, portfolio_id), None) or self._local.load_execution_result(date, portfolio_id) # ------------------------------------------------------------------ # Run Meta / Events persistence @@ -111,25 +151,25 @@ class DualReportStore: def save_run_meta(self, date: str, data: dict[str, Any]) -> Any: local_result = self._local.save_run_meta(date, data) - self._mongo.save_run_meta(date, data) + self._try_mongo(lambda: self._mongo.save_run_meta(date, data), None) return local_result def load_run_meta(self, date: str) -> dict[str, Any] | None: - return self._mongo.load_run_meta(date) or self._local.load_run_meta(date) + return self._try_mongo(lambda: self._mongo.load_run_meta(date), None) or self._local.load_run_meta(date) def save_run_events(self, date: str, events: list[dict[str, Any]]) -> Any: local_result = self._local.save_run_events(date, events) - self._mongo.save_run_events(date, events) + self._try_mongo(lambda: self._mongo.save_run_events(date, events), None) return local_result def load_run_events(self, date: str) -> list[dict[str, Any]]: - mongo_events = self._mongo.load_run_events(date) + mongo_events = self._try_mongo(lambda: self._mongo.load_run_events(date), None) if mongo_events: return mongo_events return self._local.load_run_events(date) def list_run_metas(self) -> list[dict[str, Any]]: - mongo_metas = self._mongo.list_run_metas() + mongo_metas = self._try_mongo(lambda: self._mongo.list_run_metas(), None) if mongo_metas: return mongo_metas return self._local.list_run_metas() @@ -140,19 +180,19 @@ class DualReportStore: def save_analysts_checkpoint(self, date: str, ticker: str, data: dict[str, Any]) -> Any: local_result = self._local.save_analysts_checkpoint(date, ticker, data) - self._mongo.save_analysts_checkpoint(date, ticker, data) + self._try_mongo(lambda: self._mongo.save_analysts_checkpoint(date, ticker, data), None) return local_result def load_analysts_checkpoint(self, date: str, ticker: str) -> dict[str, Any] | None: - return self._mongo.load_analysts_checkpoint(date, ticker) or self._local.load_analysts_checkpoint(date, ticker) + return self._try_mongo(lambda: self._mongo.load_analysts_checkpoint(date, ticker), None) or self._local.load_analysts_checkpoint(date, ticker) def save_trader_checkpoint(self, date: str, ticker: str, data: dict[str, Any]) -> Any: local_result = self._local.save_trader_checkpoint(date, ticker, data) - self._mongo.save_trader_checkpoint(date, ticker, data) + self._try_mongo(lambda: self._mongo.save_trader_checkpoint(date, ticker, data), None) return local_result def load_trader_checkpoint(self, date: str, ticker: str) -> dict[str, Any] | None: - return self._mongo.load_trader_checkpoint(date, ticker) or self._local.load_trader_checkpoint(date, ticker) + return self._try_mongo(lambda: self._mongo.load_trader_checkpoint(date, ticker), None) or self._local.load_trader_checkpoint(date, ticker) # ------------------------------------------------------------------ # Utility @@ -160,16 +200,17 @@ class DualReportStore: def clear_portfolio_stage(self, date: str, portfolio_id: str) -> list[str]: local_deleted = self._local.clear_portfolio_stage(date, portfolio_id) - self._mongo.clear_portfolio_stage(date, portfolio_id) + self._try_mongo(lambda: self._mongo.clear_portfolio_stage(date, portfolio_id), None) return local_deleted def list_pm_decisions(self, portfolio_id: str) -> list[Any]: # Mongo returns dicts, Local returns Paths. Prefer Mongo for rich data. - mongo_results = self._mongo.list_pm_decisions(portfolio_id) + mongo_results = self._try_mongo(lambda: self._mongo.list_pm_decisions(portfolio_id), None) if mongo_results: return mongo_results return self._local.list_pm_decisions(portfolio_id) def list_analyses_for_date(self, date: str) -> list[str]: - # Both return list[str] - return list(set(self._mongo.list_analyses_for_date(date)) | set(self._local.list_analyses_for_date(date))) + mongo_results = self._try_mongo(lambda: self._mongo.list_analyses_for_date(date), []) + local_results = self._local.list_analyses_for_date(date) + return list(set(mongo_results) | set(local_results)) diff --git a/tradingagents/portfolio/mongo_report_store.py b/tradingagents/portfolio/mongo_report_store.py index 6f06b410..4f3561a9 100644 --- a/tradingagents/portfolio/mongo_report_store.py +++ b/tradingagents/portfolio/mongo_report_store.py @@ -58,27 +58,52 @@ class MongoReportStore: ``load_scan(date, run_id=run_id)`` to pin to a particular run. """ + # How long to wait for a server before treating the cluster as unreachable. + # Keeping this short lets store_factory fall back to the filesystem quickly + # instead of blocking for pymongo's 30-second default. + _SERVER_SELECTION_TIMEOUT_MS: int = 5_000 + def __init__( self, connection_string: str, db_name: str = "tradingagents", run_id: str | None = None, + flow_id: str | None = None, ) -> None: + self._flow_id = flow_id self._run_id = run_id + self._indexes_ensured: bool = False try: - self._client: MongoClient = MongoClient(connection_string) + self._client: MongoClient = MongoClient( + connection_string, + serverSelectionTimeoutMS=self._SERVER_SELECTION_TIMEOUT_MS, + ) self._db: Database = self._client[db_name] self._col: Collection = self._db[_REPORTS_COLLECTION] except Exception as exc: raise ReportStoreError(f"MongoDB connection failed: {exc}") from exc - self.ensure_indexes() + # Indexes are created lazily on the first write so that __init__ never + # blocks on a live network call. Call ensure_indexes() explicitly if + # you need them to exist before the first write (e.g. in tests). + + @property + def flow_id(self) -> str | None: + """The flow identifier set on this store, if any.""" + return self._flow_id @property def run_id(self) -> str | None: - return self._run_id + """The run/flow identifier (flow_id takes precedence for backward compat).""" + return self._flow_id or self._run_id def ensure_indexes(self) -> None: - """Create indexes for efficient querying (idempotent).""" + """Create indexes for efficient querying (idempotent). + + Called automatically on the first write so that ``__init__`` never + blocks on a live network call. Safe to call multiple times. + """ + if self._indexes_ensured: + return self._col.create_index([("date", DESCENDING), ("report_type", 1)]) self._col.create_index( [("date", DESCENDING), ("report_type", 1), ("ticker", 1)] @@ -86,8 +111,10 @@ class MongoReportStore: self._col.create_index( [("date", DESCENDING), ("report_type", 1), ("portfolio_id", 1)] ) + self._col.create_index("flow_id") self._col.create_index("run_id") self._col.create_index("created_at") + self._indexes_ensured = True # ------------------------------------------------------------------ # Internal helpers @@ -104,8 +131,10 @@ class MongoReportStore: markdown: str | None = None, ) -> str: """Insert a report document. Returns the inserted document's _id.""" + self.ensure_indexes() doc = { - "run_id": self._run_id, + "flow_id": self._flow_id, + "run_id": self._run_id or self._flow_id, # backward compat "date": date, "report_type": report_type, "ticker": ticker.upper() if ticker else None, @@ -143,6 +172,8 @@ class MongoReportStore: query["portfolio_id"] = portfolio_id if run_id: query["run_id"] = run_id + elif self._flow_id: + query["flow_id"] = self._flow_id doc = self._col.find_one(query, sort=[("created_at", DESCENDING)]) if doc is None: diff --git a/tradingagents/portfolio/report_store.py b/tradingagents/portfolio/report_store.py index 52316671..d048ed6e 100644 --- a/tradingagents/portfolio/report_store.py +++ b/tradingagents/portfolio/report_store.py @@ -4,32 +4,48 @@ Saves and loads all non-transactional portfolio artifacts (scans, per-ticker analysis, holding reviews, risk metrics, PM decisions) using the existing ``tradingagents/report_paths.py`` path convention. -When a ``run_id`` is set on the store, all artifacts are written under a -run-specific subdirectory so that same-day re-runs never overwrite earlier -results:: +When a ``flow_id`` is set on the store, artifacts are written under a +flow-scoped subdirectory with **timestamp-prefixed filenames** so that +re-runs within the same flow never overwrite earlier results and the most +recent version is always resolved by sorting:: + + reports/daily/{date}/{flow_id}/ + ├── market/report/ + │ └── {ts}_macro_scan_summary.json + ├── {TICKER}/report/ + │ ├── {ts}_complete_report.json + │ ├── {ts}_analysts_checkpoint.json + │ └── {ts}_trader_checkpoint.json + ├── portfolio/report/ + │ ├── {ts}_{TICKER}_holding_review.json + │ ├── {ts}_{portfolio_id}_risk_metrics.json + │ ├── {ts}_{portfolio_id}_pm_decision.json + │ └── {ts}_{portfolio_id}_execution_result.json + ├── run_meta.json + └── run_events.jsonl + +When only a legacy ``run_id`` is provided the layout is preserved for +backward compatibility:: reports/daily/{date}/runs/{run_id}/ - ├── market/ - │ └── macro_scan_summary.json - ├── {TICKER}/ - │ └── complete_report.json - └── portfolio/ - ├── {TICKER}_holding_review.json - ├── {portfolio_id}_risk_metrics.json - ├── {portfolio_id}_pm_decision.json - └── {portfolio_id}_pm_decision.md + ├── market/macro_scan_summary.json + ├── {TICKER}/complete_report.json + └── portfolio/{portfolio_id}_pm_decision.json -A ``latest.json`` pointer at the date level is updated on every write so -that load methods (when called *without* a ``run_id``) transparently -resolve to the most recent run. +A ``latest.json`` pointer at the date level is updated on legacy +``run_id``-based writes for backward-compatible reads. Usage:: from tradingagents.portfolio.report_store import ReportStore - store = ReportStore(run_id="a1b2c3d4") + # New flow_id-based (timestamped versioning) + store = ReportStore(flow_id="a1b2c3d4") store.save_scan("2026-03-20", {"watchlist": [...]}) - data = store.load_scan("2026-03-20") # reads from latest run + data = store.load_scan("2026-03-20") # always loads the most recent + + # Legacy run_id-based (backward compat) + store = ReportStore(run_id="a1b2c3d4") """ from __future__ import annotations @@ -39,7 +55,7 @@ from pathlib import Path from typing import Any from tradingagents.portfolio.exceptions import ReportStoreError -from tradingagents.report_paths import read_latest_pointer, write_latest_pointer +from tradingagents.report_paths import read_latest_pointer, ts_now, write_latest_pointer class ReportStore: @@ -48,15 +64,18 @@ class ReportStore: Directories are created automatically on first write. All load methods return ``None`` when the file does not exist. - When ``run_id`` is provided, write paths are scoped under - ``{base_dir}/daily/{date}/runs/{run_id}/…`` and a ``latest.json`` - pointer is updated automatically. Load methods resolve through - the pointer when no ``run_id`` is set. + When ``flow_id`` is provided, all artifacts are written under + ``{base_dir}/daily/{date}/{flow_id}/…`` with timestamp-prefixed filenames. + Load methods always return the most recently written version. + + When only ``run_id`` is provided (legacy), the old ``runs/{run_id}/`` + layout is used for backward compatibility. """ def __init__( self, base_dir: str | Path = "reports", + flow_id: str | None = None, run_id: str | None = None, ) -> None: """Initialise the store with a base reports directory. @@ -66,61 +85,90 @@ class ReportStore: (relative to CWD), matching ``report_paths.REPORTS_ROOT``. Override via the ``PORTFOLIO_DATA_DIR`` env var or ``get_portfolio_config()["data_dir"]``. - run_id: Optional short identifier for the current run. When set, - all writes are scoped under a ``runs/{run_id}/`` - subdirectory so that same-day re-runs are preserved. + flow_id: Flow identifier grouping all phases of one analysis intent. + When set, writes use timestamped filenames under + ``{base}/daily/{date}/{flow_id}/``. + run_id: Legacy run identifier (backward compat). When set without + ``flow_id``, writes go to ``runs/{run_id}/`` (old layout). """ self._base_dir = Path(base_dir) + self._flow_id = flow_id self._run_id = run_id + @property + def flow_id(self) -> str | None: + """The flow identifier set on this store, if any.""" + return self._flow_id + @property def run_id(self) -> str | None: - """The run identifier set on this store, if any.""" - return self._run_id + """The run/flow identifier set on this store (flow_id takes precedence).""" + return self._flow_id or self._run_id # ------------------------------------------------------------------ # Internal helpers # ------------------------------------------------------------------ def _date_root(self, date: str, *, for_write: bool = False) -> Path: - """Return the base directory for a given date, scoped by run_id. + """Return the base directory for a given date. - When ``for_write=True``, the run_id *must* be used (if present) so - that writes land in the run-specific directory. - - When ``for_write=False`` (reads), the method first tries the - run_id directory, then falls back to latest.json pointer, and - finally falls back to the legacy flat layout. + Resolution order: + 1. ``flow_id`` → ``daily/{date}/{flow_id}`` (new timestamped layout) + 2. ``run_id`` → ``daily/{date}/runs/{run_id}`` (legacy layout) + 3. Neither (read path): check ``latest.json`` pointer, then flat layout. """ daily = self._base_dir / "daily" / date - if for_write and self._run_id: - return daily / "runs" / self._run_id + if self._flow_id: + return daily / self._flow_id + if self._run_id: return daily / "runs" / self._run_id - # Read path: check latest.json pointer (using our base_dir) - latest_id = read_latest_pointer(date, base_dir=self._base_dir) - if latest_id: - candidate = daily / "runs" / latest_id - if candidate.exists(): - return candidate + if not for_write: + # Read path: check latest.json pointer (using our base_dir) + latest_id = read_latest_pointer(date, base_dir=self._base_dir) + if latest_id: + candidate = daily / "runs" / latest_id + if candidate.exists(): + return candidate # Fallback to legacy flat layout return daily def _update_latest(self, date: str) -> None: - """Update the latest.json pointer if run_id is set.""" - if self._run_id: + """Update the latest.json pointer (legacy run_id only). + + No-op for flow_id-based stores — timestamps make pointers unnecessary. + """ + if self._run_id and not self._flow_id: write_latest_pointer(date, self._run_id, base_dir=self._base_dir) def _portfolio_dir(self, date: str, *, for_write: bool = False) -> Path: """Return the portfolio subdirectory for a given date. - Path: ``{base}/daily/{date}[/runs/{run_id}]/portfolio/`` + Path: ``{base}/daily/{date}[/{flow_id}|/runs/{run_id}]/portfolio/`` """ return self._date_root(date, for_write=for_write) / "portfolio" + @staticmethod + def _load_latest_ts(directory: Path, name: str) -> dict[str, Any] | None: + """Return the payload from the most recent timestamped report file. + + Scans *directory* for files matching ``*_{name}``, sorts lexicographically + (ISO timestamps are sortable), and returns the parsed JSON of the newest. + Returns ``None`` when no matching file exists. + """ + if not directory.exists(): + return None + candidates = sorted(directory.glob(f"*_{name}"), reverse=True) + if not candidates: + return None + try: + return json.loads(candidates[0].read_text(encoding="utf-8")) + except (json.JSONDecodeError, OSError): + return None + @staticmethod def _sanitize(obj: Any) -> Any: """Recursively convert non-JSON-serializable objects to safe types. @@ -191,7 +239,8 @@ class ReportStore: def save_scan(self, date: str, data: dict[str, Any]) -> Path: """Save macro scan summary JSON. - Path: ``{base}/daily/{date}[/runs/{run_id}]/market/macro_scan_summary.json`` + Flow path: ``{base}/daily/{date}/{flow_id}/market/report/{ts}_macro_scan_summary.json`` + Legacy path: ``{base}/daily/{date}[/runs/{run_id}]/market/macro_scan_summary.json`` Args: date: ISO date string, e.g. ``"2026-03-20"``. @@ -201,7 +250,10 @@ class ReportStore: Path of the written file. """ root = self._date_root(date, for_write=True) - path = root / "market" / "macro_scan_summary.json" + if self._flow_id: + path = root / "market" / "report" / f"{ts_now()}_macro_scan_summary.json" + else: + path = root / "market" / "macro_scan_summary.json" result = self._write_json(path, data) self._update_latest(date) return result @@ -209,8 +261,9 @@ class ReportStore: def load_scan(self, date: str) -> dict[str, Any] | None: """Load macro scan summary. Returns None if the file does not exist.""" root = self._date_root(date) - path = root / "market" / "macro_scan_summary.json" - return self._read_json(path) + if self._flow_id: + return self._load_latest_ts(root / "market" / "report", "macro_scan_summary.json") + return self._read_json(root / "market" / "macro_scan_summary.json") # ------------------------------------------------------------------ # Per-Ticker Analysis @@ -219,7 +272,8 @@ class ReportStore: def save_analysis(self, date: str, ticker: str, data: dict[str, Any]) -> Path: """Save per-ticker analysis report as JSON. - Path: ``{base}/daily/{date}[/runs/{run_id}]/{TICKER}/complete_report.json`` + Flow path: ``{base}/daily/{date}/{flow_id}/{TICKER}/report/{ts}_complete_report.json`` + Legacy path: ``{base}/daily/{date}[/runs/{run_id}]/{TICKER}/complete_report.json`` Args: date: ISO date string. @@ -227,7 +281,10 @@ class ReportStore: data: Analysis output dict. """ root = self._date_root(date, for_write=True) - path = root / ticker.upper() / "complete_report.json" + if self._flow_id: + path = root / ticker.upper() / "report" / f"{ts_now()}_complete_report.json" + else: + path = root / ticker.upper() / "complete_report.json" result = self._write_json(path, data) self._update_latest(date) return result @@ -235,8 +292,9 @@ class ReportStore: def load_analysis(self, date: str, ticker: str) -> dict[str, Any] | None: """Load per-ticker analysis JSON. Returns None if the file does not exist.""" root = self._date_root(date) - path = root / ticker.upper() / "complete_report.json" - return self._read_json(path) + if self._flow_id: + return self._load_latest_ts(root / ticker.upper() / "report", "complete_report.json") + return self._read_json(root / ticker.upper() / "complete_report.json") # ------------------------------------------------------------------ # Holding Reviews @@ -250,22 +308,29 @@ class ReportStore: ) -> Path: """Save holding reviewer output for one ticker. - Path: ``{base}/daily/{date}[/runs/{run_id}]/portfolio/{TICKER}_holding_review.json`` + Flow path: ``…/portfolio/report/{ts}_{TICKER}_holding_review.json`` + Legacy path: ``…/portfolio/{TICKER}_holding_review.json`` Args: date: ISO date string. ticker: Ticker symbol (stored as uppercase). data: HoldingReviewerAgent output dict. """ - path = self._portfolio_dir(date, for_write=True) / f"{ticker.upper()}_holding_review.json" + pdir = self._portfolio_dir(date, for_write=True) + if self._flow_id: + path = pdir / "report" / f"{ts_now()}_{ticker.upper()}_holding_review.json" + else: + path = pdir / f"{ticker.upper()}_holding_review.json" result = self._write_json(path, data) self._update_latest(date) return result def load_holding_review(self, date: str, ticker: str) -> dict[str, Any] | None: """Load holding review output. Returns None if the file does not exist.""" - path = self._portfolio_dir(date) / f"{ticker.upper()}_holding_review.json" - return self._read_json(path) + pdir = self._portfolio_dir(date) + if self._flow_id: + return self._load_latest_ts(pdir / "report", f"{ticker.upper()}_holding_review.json") + return self._read_json(pdir / f"{ticker.upper()}_holding_review.json") # ------------------------------------------------------------------ # Risk Metrics @@ -279,14 +344,19 @@ class ReportStore: ) -> Path: """Save risk computation results. - Path: ``{base}/daily/{date}[/runs/{run_id}]/portfolio/{portfolio_id}_risk_metrics.json`` + Flow path: ``…/portfolio/report/{ts}_{portfolio_id}_risk_metrics.json`` + Legacy path: ``…/portfolio/{portfolio_id}_risk_metrics.json`` Args: date: ISO date string. portfolio_id: UUID of the target portfolio. data: Risk metrics dict (Sharpe, Sortino, VaR, etc.). """ - path = self._portfolio_dir(date, for_write=True) / f"{portfolio_id}_risk_metrics.json" + pdir = self._portfolio_dir(date, for_write=True) + if self._flow_id: + path = pdir / "report" / f"{ts_now()}_{portfolio_id}_risk_metrics.json" + else: + path = pdir / f"{portfolio_id}_risk_metrics.json" result = self._write_json(path, data) self._update_latest(date) return result @@ -297,8 +367,10 @@ class ReportStore: portfolio_id: str, ) -> dict[str, Any] | None: """Load risk metrics. Returns None if the file does not exist.""" - path = self._portfolio_dir(date) / f"{portfolio_id}_risk_metrics.json" - return self._read_json(path) + pdir = self._portfolio_dir(date) + if self._flow_id: + return self._load_latest_ts(pdir / "report", f"{portfolio_id}_risk_metrics.json") + return self._read_json(pdir / f"{portfolio_id}_risk_metrics.json") # ------------------------------------------------------------------ # PM Decisions @@ -313,8 +385,8 @@ class ReportStore: ) -> Path: """Save PM agent decision. - JSON path: ``{base}/daily/{date}[/runs/{run_id}]/portfolio/{portfolio_id}_pm_decision.json`` - MD path: ``…/{portfolio_id}_pm_decision.md`` (written only when ``markdown`` is not None) + Flow path: ``…/portfolio/report/{ts}_{portfolio_id}_pm_decision.json`` + Legacy path: ``…/portfolio/{portfolio_id}_pm_decision.json`` Args: date: ISO date string. @@ -326,14 +398,26 @@ class ReportStore: Path of the written JSON file. """ pdir = self._portfolio_dir(date, for_write=True) - json_path = pdir / f"{portfolio_id}_pm_decision.json" - self._write_json(json_path, data) - if markdown is not None: - md_path = pdir / f"{portfolio_id}_pm_decision.md" - try: - md_path.write_text(markdown, encoding="utf-8") - except OSError as exc: - raise ReportStoreError(f"Failed to write {md_path}: {exc}") from exc + if self._flow_id: + ts = ts_now() + json_path = pdir / "report" / f"{ts}_{portfolio_id}_pm_decision.json" + self._write_json(json_path, data) + if markdown is not None: + md_path = pdir / "report" / f"{ts}_{portfolio_id}_pm_decision.md" + try: + md_path.parent.mkdir(parents=True, exist_ok=True) + md_path.write_text(markdown, encoding="utf-8") + except OSError as exc: + raise ReportStoreError(f"Failed to write {md_path}: {exc}") from exc + else: + json_path = pdir / f"{portfolio_id}_pm_decision.json" + self._write_json(json_path, data) + if markdown is not None: + md_path = pdir / f"{portfolio_id}_pm_decision.md" + try: + md_path.write_text(markdown, encoding="utf-8") + except OSError as exc: + raise ReportStoreError(f"Failed to write {md_path}: {exc}") from exc self._update_latest(date) return json_path @@ -343,8 +427,10 @@ class ReportStore: portfolio_id: str, ) -> dict[str, Any] | None: """Load PM decision JSON. Returns None if the file does not exist.""" - path = self._portfolio_dir(date) / f"{portfolio_id}_pm_decision.json" - return self._read_json(path) + pdir = self._portfolio_dir(date) + if self._flow_id: + return self._load_latest_ts(pdir / "report", f"{portfolio_id}_pm_decision.json") + return self._read_json(pdir / f"{portfolio_id}_pm_decision.json") def save_execution_result( self, @@ -354,14 +440,19 @@ class ReportStore: ) -> Path: """Save trade execution results. - Path: ``{base}/daily/{date}[/runs/{run_id}]/portfolio/{portfolio_id}_execution_result.json`` + Flow path: ``…/portfolio/report/{ts}_{portfolio_id}_execution_result.json`` + Legacy path: ``…/portfolio/{portfolio_id}_execution_result.json`` Args: date: ISO date string. portfolio_id: UUID of the target portfolio. data: TradeExecutor output dict. """ - path = self._portfolio_dir(date, for_write=True) / f"{portfolio_id}_execution_result.json" + pdir = self._portfolio_dir(date, for_write=True) + if self._flow_id: + path = pdir / "report" / f"{ts_now()}_{portfolio_id}_execution_result.json" + else: + path = pdir / f"{portfolio_id}_execution_result.json" result = self._write_json(path, data) self._update_latest(date) return result @@ -372,25 +463,40 @@ class ReportStore: portfolio_id: str, ) -> dict[str, Any] | None: """Load execution result. Returns None if the file does not exist.""" - path = self._portfolio_dir(date) / f"{portfolio_id}_execution_result.json" - return self._read_json(path) + pdir = self._portfolio_dir(date) + if self._flow_id: + return self._load_latest_ts(pdir / "report", f"{portfolio_id}_execution_result.json") + return self._read_json(pdir / f"{portfolio_id}_execution_result.json") def clear_portfolio_stage(self, date: str, portfolio_id: str) -> list[str]: """Delete PM decision and execution result files for a given date/portfolio. + For flow_id-based stores, deletes ALL timestamped versions. Returns a list of deleted file names so the caller can log what was removed. """ pdir = self._portfolio_dir(date, for_write=True) - targets = [ - pdir / f"{portfolio_id}_pm_decision.json", - pdir / f"{portfolio_id}_pm_decision.md", - pdir / f"{portfolio_id}_execution_result.json", - ] deleted = [] - for path in targets: - if path.exists(): - path.unlink() - deleted.append(path.name) + if self._flow_id: + report_dir = pdir / "report" + if report_dir.exists(): + for suffix in ( + f"{portfolio_id}_pm_decision.json", + f"{portfolio_id}_pm_decision.md", + f"{portfolio_id}_execution_result.json", + ): + for path in report_dir.glob(f"*_{suffix}"): + path.unlink() + deleted.append(path.name) + else: + targets = [ + pdir / f"{portfolio_id}_pm_decision.json", + pdir / f"{portfolio_id}_pm_decision.md", + pdir / f"{portfolio_id}_execution_result.json", + ] + for path in targets: + if path.exists(): + path.unlink() + deleted.append(path.name) return deleted # ------------------------------------------------------------------ @@ -452,6 +558,9 @@ class ReportStore: def list_run_metas(cls, base_dir: str | Path = "reports") -> list[dict[str, Any]]: """Scan for all run_meta.json files and return metadata dicts, newest first. + Searches both the new flow_id layout (``daily/*/{flow_id}/run_meta.json``) + and the legacy run_id layout (``daily/*/runs/*/run_meta.json``). + Args: base_dir: Root reports directory. @@ -459,14 +568,21 @@ class ReportStore: List of run_meta dicts sorted by ``created_at`` descending. """ base = Path(base_dir) - pattern = "daily/*/runs/*/run_meta.json" + # New flow_id layout: daily/{date}/{flow_id}/run_meta.json + # Legacy run_id layout: daily/{date}/runs/{run_id}/run_meta.json + patterns = ("daily/*/*/run_meta.json", "daily/*/runs/*/run_meta.json") + seen: set[str] = set() metas: list[dict[str, Any]] = [] - for path in base.glob(pattern): - try: - data = json.loads(path.read_text(encoding="utf-8")) - metas.append(data) - except (json.JSONDecodeError, OSError): - continue + for pattern in patterns: + for path in base.glob(pattern): + try: + data = json.loads(path.read_text(encoding="utf-8")) + key = data.get("id") or str(path) + if key not in seen: + seen.add(key) + metas.append(data) + except (json.JSONDecodeError, OSError): + continue metas.sort(key=lambda m: m.get("created_at", 0), reverse=True) return metas @@ -479,10 +595,14 @@ class ReportStore: ) -> Path: """Save analysts checkpoint for a ticker. - Path: ``{base}/daily/{date}[/runs/{run_id}]/{TICKER}/analysts_checkpoint.json`` + Flow path: ``…/{TICKER}/report/{ts}_analysts_checkpoint.json`` + Legacy path: ``…/{TICKER}/analysts_checkpoint.json`` """ root = self._date_root(date, for_write=True) - path = root / ticker.upper() / "analysts_checkpoint.json" + if self._flow_id: + path = root / ticker.upper() / "report" / f"{ts_now()}_analysts_checkpoint.json" + else: + path = root / ticker.upper() / "analysts_checkpoint.json" result = self._write_json(path, data) self._update_latest(date) return result @@ -492,18 +612,23 @@ class ReportStore: ) -> dict[str, Any] | None: """Load analysts checkpoint. Returns None if file does not exist.""" root = self._date_root(date) - path = root / ticker.upper() / "analysts_checkpoint.json" - return self._read_json(path) + if self._flow_id: + return self._load_latest_ts(root / ticker.upper() / "report", "analysts_checkpoint.json") + return self._read_json(root / ticker.upper() / "analysts_checkpoint.json") def save_trader_checkpoint( self, date: str, ticker: str, data: dict[str, Any] ) -> Path: """Save trader checkpoint for a ticker. - Path: ``{base}/daily/{date}[/runs/{run_id}]/{TICKER}/trader_checkpoint.json`` + Flow path: ``…/{TICKER}/report/{ts}_trader_checkpoint.json`` + Legacy path: ``…/{TICKER}/trader_checkpoint.json`` """ root = self._date_root(date, for_write=True) - path = root / ticker.upper() / "trader_checkpoint.json" + if self._flow_id: + path = root / ticker.upper() / "report" / f"{ts_now()}_trader_checkpoint.json" + else: + path = root / ticker.upper() / "trader_checkpoint.json" result = self._write_json(path, data) self._update_latest(date) return result @@ -513,8 +638,9 @@ class ReportStore: ) -> dict[str, Any] | None: """Load trader checkpoint. Returns None if file does not exist.""" root = self._date_root(date) - path = root / ticker.upper() / "trader_checkpoint.json" - return self._read_json(path) + if self._flow_id: + return self._load_latest_ts(root / ticker.upper() / "report", "trader_checkpoint.json") + return self._read_json(root / ticker.upper() / "trader_checkpoint.json") # ------------------------------------------------------------------ # PM Decisions @@ -523,7 +649,7 @@ class ReportStore: def list_pm_decisions(self, portfolio_id: str) -> list[Path]: """Return all saved PM decision JSON paths for portfolio_id, newest first. - Searches both run-scoped and legacy flat layouts. + Searches flow_id, run_id-scoped, and legacy flat layouts. Args: portfolio_id: UUID of the target portfolio. @@ -531,9 +657,15 @@ class ReportStore: Returns: Sorted list of Path objects, newest date first. """ + # New flow_id layout: daily/*/{flow_id}/portfolio/report/*_{pid}_pm_decision.json + flow_pattern = f"daily/*/*/portfolio/report/*_{portfolio_id}_pm_decision.json" # Run-scoped layout: daily/*/runs/*/portfolio/{pid}_pm_decision.json run_pattern = f"daily/*/runs/*/portfolio/{portfolio_id}_pm_decision.json" # Legacy flat layout: daily/*/portfolio/{pid}_pm_decision.json flat_pattern = f"daily/*/portfolio/{portfolio_id}_pm_decision.json" - paths = set(self._base_dir.glob(run_pattern)) | set(self._base_dir.glob(flat_pattern)) + paths = ( + set(self._base_dir.glob(flow_pattern)) + | set(self._base_dir.glob(run_pattern)) + | set(self._base_dir.glob(flat_pattern)) + ) return sorted(paths, reverse=True) diff --git a/tradingagents/portfolio/store_factory.py b/tradingagents/portfolio/store_factory.py index 34810f98..314e12b1 100644 --- a/tradingagents/portfolio/store_factory.py +++ b/tradingagents/portfolio/store_factory.py @@ -24,8 +24,9 @@ logger = logging.getLogger(__name__) def create_report_store( - run_id: str | None = None, + flow_id: str | None = None, *, + run_id: str | None = None, base_dir: str | None = None, mongo_uri: str | None = None, mongo_db: str | None = None, @@ -39,7 +40,11 @@ def create_report_store( 3. Fall back to the filesystem :class:`ReportStore`. Args: - run_id: Short identifier for the current run. + flow_id: Flow identifier grouping all phases of one analysis intent. + When set, the store uses timestamp-based report versioning + under ``{base}/daily/{date}/{flow_id}/``. + run_id: Legacy short identifier (backward compat). Prefer ``flow_id`` + for new code. base_dir: Override for the filesystem store's base directory. mongo_uri: MongoDB connection string (overrides env var). mongo_db: MongoDB database name (default ``"tradingagents"``). @@ -54,7 +59,7 @@ def create_report_store( _base = base_dir or os.getenv("PORTFOLIO_DATA_DIR") or os.getenv( "TRADINGAGENTS_REPORTS_DIR", "reports" ) - local_store = ReportStore(base_dir=_base, run_id=run_id) + local_store = ReportStore(base_dir=_base, flow_id=flow_id, run_id=run_id) if uri: try: @@ -63,9 +68,13 @@ def create_report_store( mongo_store = MongoReportStore( connection_string=uri, db_name=db, + flow_id=flow_id, run_id=run_id, ) - logger.info("Using Dual report store (local + MongoDB db=%s, run_id=%s)", db, run_id) + logger.info( + "Using Dual report store (local + MongoDB db=%s, flow_id=%s)", + db, flow_id or run_id, + ) return DualReportStore(local_store, mongo_store) except Exception: logger.warning( @@ -73,5 +82,8 @@ def create_report_store( exc_info=True, ) - logger.info("Using filesystem report store (base=%s, run_id=%s)", _base, run_id) + logger.info( + "Using filesystem report store (base=%s, flow_id=%s)", + _base, flow_id or run_id, + ) return local_store diff --git a/tradingagents/report_paths.py b/tradingagents/report_paths.py index b974dc17..744ef887 100644 --- a/tradingagents/report_paths.py +++ b/tradingagents/report_paths.py @@ -3,19 +3,30 @@ Every CLI command and internal save routine should use these helpers so that all generated artifacts land under a single ``reports/`` tree. -When a ``run_id`` is supplied the layout becomes:: +When a ``flow_id`` is supplied the layout becomes:: reports/ └── daily/{YYYY-MM-DD}/ - ├── runs/{run_id}/ - │ ├── market/ # scan results - │ ├── {TICKER}/ # per-ticker analysis - │ └── portfolio/ # PM artefacts + └── {flow_id}/ + ├── market/report/ # scan results (timestamped files) + ├── {TICKER}/report/ # per-ticker analysis (timestamped) + ├── portfolio/report/ # PM artefacts (timestamped) + ├── run_meta.json # metadata for the latest run + └── run_events.jsonl # events for the latest run + +When only a legacy ``run_id`` is supplied the layout becomes:: + + reports/ + └── daily/{YYYY-MM-DD}/ + ├── runs/{run_id}/ # legacy run-scoped layout + │ ├── market/ + │ ├── {TICKER}/ + │ └── portfolio/ ├── latest.json # pointer → most recent run_id └── daily_digest.md # append-only (shared across runs) -Without a ``run_id`` the legacy flat layout is preserved for backward -compatibility:: +Without a ``run_id`` or ``flow_id`` the legacy flat layout is preserved for +backward compatibility:: reports/ └── daily/{YYYY-MM-DD}/ @@ -38,14 +49,39 @@ REPORTS_ROOT = Path(os.getenv("TRADINGAGENTS_REPORTS_DIR") or "reports") # ────────────────────────────────────────────────────────────────────────────── -# Run-ID helpers +# ID / timestamp helpers # ────────────────────────────────────────────────────────────────────────────── -def generate_run_id() -> str: - """Return a short, human-readable run identifier (8-char hex).""" +def generate_flow_id() -> str: + """Return a short, human-readable flow identifier (8-char hex). + + A *flow* groups all phases of one analysis intent (scan + pipeline + + portfolio). Prefer :func:`generate_flow_id` for new code. + """ return uuid.uuid4().hex[:8] +def generate_run_id() -> str: + """Return a short, human-readable run identifier (8-char hex). + + .. deprecated:: + Use :func:`generate_flow_id` for new code. ``generate_run_id`` + is kept for backward compatibility only. + """ + return uuid.uuid4().hex[:8] + + +def ts_now() -> str: + """Return a sortable UTC timestamp string: ``'20260325T143022123Z'`` (ms precision). + + Used as a filename prefix so that lexicographic sort gives temporal order + and ``load_*`` helpers can always find the most recent report without a + separate pointer file. Millisecond precision prevents same-second collisions. + """ + dt = datetime.now(timezone.utc) + return dt.strftime("%Y%m%dT%H%M%S") + f"{dt.microsecond // 1000:03d}Z" + + def write_latest_pointer(date: str, run_id: str, base_dir: Path | None = None) -> Path: """Write ``{base}/daily/{date}/latest.json`` pointing to *run_id*. @@ -92,32 +128,62 @@ def read_latest_pointer(date: str, base_dir: Path | None = None) -> str | None: # Path helpers # ────────────────────────────────────────────────────────────────────────────── -def _run_prefix(date: str, run_id: str | None) -> Path: - """Base directory for a date, optionally scoped by run_id.""" +def _run_prefix(date: str, run_id: str | None, flow_id: str | None = None) -> Path: + """Base directory for a date, scoped by flow_id or legacy run_id. + + Resolution order: + 1. ``flow_id`` → ``daily/{date}/{flow_id}`` (new layout, no ``runs/`` prefix) + 2. ``run_id`` → ``daily/{date}/runs/{run_id}`` (legacy layout) + 3. Neither → ``daily/{date}`` (flat legacy layout) + """ daily = REPORTS_ROOT / "daily" / date + if flow_id: + return daily / flow_id if run_id: return daily / "runs" / run_id return daily -def get_daily_dir(date: str, run_id: str | None = None) -> Path: - """``reports/daily/{date}/`` or ``reports/daily/{date}/runs/{run_id}/``""" - return _run_prefix(date, run_id) +def get_daily_dir( + date: str, + run_id: str | None = None, + *, + flow_id: str | None = None, +) -> Path: + """``reports/daily/{date}/[{flow_id}/|runs/{run_id}/]``""" + return _run_prefix(date, run_id, flow_id) -def get_market_dir(date: str, run_id: str | None = None) -> Path: - """``…/{date}[/runs/{run_id}]/market/``""" - return get_daily_dir(date, run_id) / "market" +def get_market_dir( + date: str, + run_id: str | None = None, + *, + flow_id: str | None = None, +) -> Path: + """``…/{date}[/{flow_id}|/runs/{run_id}]/market/``""" + return get_daily_dir(date, run_id, flow_id=flow_id) / "market" -def get_ticker_dir(date: str, ticker: str, run_id: str | None = None) -> Path: - """``…/{date}[/runs/{run_id}]/{TICKER}/``""" - return get_daily_dir(date, run_id) / ticker.upper() +def get_ticker_dir( + date: str, + ticker: str, + run_id: str | None = None, + *, + flow_id: str | None = None, +) -> Path: + """``…/{date}[/{flow_id}|/runs/{run_id}]/{TICKER}/``""" + return get_daily_dir(date, run_id, flow_id=flow_id) / ticker.upper() -def get_eval_dir(date: str, ticker: str, run_id: str | None = None) -> Path: - """``…/{date}[/runs/{run_id}]/{TICKER}/eval/``""" - return get_ticker_dir(date, ticker, run_id) / "eval" +def get_eval_dir( + date: str, + ticker: str, + run_id: str | None = None, + *, + flow_id: str | None = None, +) -> Path: + """``…/{date}[/{flow_id}|/runs/{run_id}]/{TICKER}/eval/``""" + return get_ticker_dir(date, ticker, run_id, flow_id=flow_id) / "eval" def get_digest_path(date: str) -> Path: