From 0efbbd9400276aaeeb6590d29d5f3ef0c37c0d29 Mon Sep 17 00:00:00 2001 From: ahmet guzererler Date: Thu, 26 Mar 2026 07:10:42 +0100 Subject: [PATCH] feat: load flow_id in FE to resume runs and fix max_tickers cap (#113) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * feat: introduce flow_id with timestamp-based report versioning Replace run_id with flow_id as the primary grouping concept (one flow = one user analysis intent spanning scan + pipeline + portfolio). Reports are now written as {timestamp}_{name}.json so load methods always return the latest version by lexicographic sort, eliminating the latest.json pointer pattern for new flows. Key changes: - report_paths.py: add generate_flow_id(), ts_now() (ms precision), flow_id kwarg on all path helpers; keep run_id / pointer helpers for backward compatibility - ReportStore: dual-mode save/load — flow_id uses timestamped layout, run_id uses legacy runs/{id}/ layout with latest.json - MongoReportStore: add flow_id field and index; run_id stays for compat - DualReportStore: expose flow_id property - store_factory: accept flow_id as primary param, run_id as alias - runs.py / langgraph_engine.py: generate and thread flow_id through all trigger endpoints and run methods - Tests: add flow_id coverage for all layers; 905 tests pass Co-Authored-By: Claude Sonnet 4.6 * feat: load flow_id in FE to resume runs and fix max_tickers cap on continuation - Add flow_id to RunParams interface and initial state - loadRun() now restores flow_id + max_auto_tickers from history so the next run continues in the same flow directory (Phase 1 scan skipped, already-done tickers skipped via skip-if-exists logic) - startRun() spreads flow_id into the request body when set, letting the backend reuse the existing flow directory instead of generating a fresh flow_id - After each run, params.flow_id is updated from the response so subsequent runs automatically continue from the same flow - max_auto_tickers restored from run.params.max_tickers ensures the ticker cap matches the original run; scan_tickers[:max_t] on the backend then limits the Phase 2 queue to the user's setting even when the existing scan has more Co-Authored-By: Claude Sonnet 4.6 * fix(mongo): fast-fail timeout + lazy ensure_indexes to avoid 30s block on fallback MongoClient previously used pymongo's 30-second serverSelectionTimeoutMS default, causing store_factory to hang for 30s before falling back to the filesystem when Atlas is unreachable. Also, ensure_indexes() was called eagerly in __init__, making every store construction attempt block on a live network call. - Set serverSelectionTimeoutMS=5_000 so fallback is triggered in ≤5s - Move ensure_indexes() call out of __init__ — indexes are now created lazily on the first _save() call via a guarded self._indexes_ensured flag - ensure_indexes() is still idempotent and safe to call explicitly in tests Co-Authored-By: Claude Sonnet 4.6 * fix(store): wrap all DualReportStore mongo calls in _try_mongo() for graceful degradation Any MongoDB exception (SSL error, ServerSelectionTimeout, auth failure) was propagating uncaught through DualReportStore and crashing the run. Reads would return an error instead of falling back to local, and writes would abort mid-run without saving anything. Introduce a single _try_mongo(fn, default) helper that: - Executes the Mongo callable - Catches *any* exception, logs it as WARNING with type + message - Returns the default value so the caller continues with local-only data Pattern per method: writes → try mongo (fire-and-forget); always return local result reads → try mongo first; fall back to local on None or exception lists → try mongo; fall back to local on empty/None Runs now complete successfully even when Atlas is unreachable or returns SSL errors. MongoDB sync resumes automatically once connectivity is restored. Co-Authored-By: Claude Sonnet 4.6 * fix(observability): non-blocking MongoDB inserts + 5s timeout in RunLogger Every LLM and tool callback called _append() which synchronously called insert_one() against MongoDB. When Atlas was unreachable this blocked the entire LangGraph run for pymongo's 30-second default timeout per event, effectively serializing all agent work behind MongoDB retries. Two fixes: 1. serverSelectionTimeoutMS=5_000 on the RunLogger's MongoClient — consistent with the same fix applied to MongoReportStore. 2. MongoDB inserts are now fire-and-forget via daemon threads — _append() spawns a Thread(target=_insert, daemon=True) and returns immediately. LLM callbacks and tool events are never delayed by MongoDB connectivity issues. Failures are still reported via WARNING log from the background thread. Co-Authored-By: Claude Sonnet 4.6 * revert(observability): restore synchronous MongoDB inserts in RunLogger Root cause was an IP whitelist issue on Atlas causing SSL failures, not insert volume. The background-thread approach added unnecessary complexity. The 5s serverSelectionTimeoutMS is retained as a defensive safeguard. Co-Authored-By: Claude Sonnet 4.6 --------- Co-authored-by: Claude Sonnet 4.6 --- agent_os/backend/routes/runs.py | 82 +++-- agent_os/backend/services/langgraph_engine.py | 59 ++- agent_os/frontend/src/Dashboard.tsx | 11 + tests/unit/test_report_paths_run_id.py | 68 ++++ tests/unit/test_report_store_run_id.py | 71 ++++ tests/unit/test_store_factory.py | 10 + tradingagents/observability.py | 4 +- tradingagents/portfolio/dual_report_store.py | 97 +++-- tradingagents/portfolio/mongo_report_store.py | 41 ++- tradingagents/portfolio/report_store.py | 338 ++++++++++++------ tradingagents/portfolio/store_factory.py | 22 +- tradingagents/report_paths.py | 114 ++++-- 12 files changed, 686 insertions(+), 231 deletions(-) diff --git a/agent_os/backend/routes/runs.py b/agent_os/backend/routes/runs.py index 5836f3f9..8bac0bd2 100644 --- a/agent_os/backend/routes/runs.py +++ b/agent_os/backend/routes/runs.py @@ -8,7 +8,7 @@ from agent_os.backend.store import runs from agent_os.backend.dependencies import get_current_user from agent_os.backend.services.langgraph_engine import LangGraphEngine, NODE_TO_PHASE from agent_os.backend.services.mock_engine import MockEngine -from tradingagents.report_paths import generate_run_id +from tradingagents.report_paths import generate_flow_id, generate_run_id logger = logging.getLogger("agent_os.runs") @@ -25,14 +25,15 @@ def _persist_run_to_disk(run_id: str) -> None: return try: from tradingagents.portfolio.store_factory import create_report_store - short_rid = run.get("short_rid") or run_id[:8] - store = create_report_store(run_id=short_rid) + flow_id = run.get("flow_id") or run.get("short_rid") or run_id[:8] + store = create_report_store(flow_id=flow_id) date = (run.get("params") or {}).get("date", "") if not date: return meta = { "id": run_id, - "short_rid": short_rid, + "flow_id": flow_id, + "short_rid": flow_id, # backward compat alias "type": run.get("type", ""), "status": run.get("status", ""), "created_at": run.get("created_at", 0), @@ -44,7 +45,7 @@ def _persist_run_to_disk(run_id: str) -> None: } store.save_run_meta(date, meta) store.save_run_events(date, run.get("events", [])) - logger.info("Persisted run to disk run=%s rid=%s", run_id, short_rid) + logger.info("Persisted run to disk run=%s flow_id=%s", run_id, flow_id) except Exception: logger.exception("Failed to persist run to disk run=%s", run_id) @@ -71,20 +72,23 @@ async def trigger_scan( params: Dict[str, Any] = None, user: dict = Depends(get_current_user) ): + p = params or {} run_id = str(uuid.uuid4()) + flow_id = p.get("flow_id") or generate_flow_id() runs[run_id] = { "id": run_id, - "short_rid": generate_run_id(), + "flow_id": flow_id, + "short_rid": flow_id, # backward compat alias "type": "scan", "status": "queued", "created_at": time.time(), "user_id": user["user_id"], - "params": params or {}, + "params": {**p, "flow_id": flow_id}, "rerun_seq": 0, } - logger.info("Queued SCAN run=%s user=%s", run_id, user["user_id"]) - background_tasks.add_task(_run_and_store, run_id, engine.run_scan(run_id, params or {})) - return {"run_id": run_id, "status": "queued"} + logger.info("Queued SCAN run=%s flow_id=%s user=%s", run_id, flow_id, user["user_id"]) + background_tasks.add_task(_run_and_store, run_id, engine.run_scan(run_id, runs[run_id]["params"])) + return {"run_id": run_id, "flow_id": flow_id, "status": "queued"} @router.post("/pipeline") async def trigger_pipeline( @@ -92,20 +96,23 @@ async def trigger_pipeline( params: Dict[str, Any] = None, user: dict = Depends(get_current_user) ): + p = params or {} run_id = str(uuid.uuid4()) + flow_id = p.get("flow_id") or generate_flow_id() runs[run_id] = { "id": run_id, - "short_rid": generate_run_id(), + "flow_id": flow_id, + "short_rid": flow_id, "type": "pipeline", "status": "queued", "created_at": time.time(), "user_id": user["user_id"], - "params": params or {}, + "params": {**p, "flow_id": flow_id}, "rerun_seq": 0, } - logger.info("Queued PIPELINE run=%s user=%s", run_id, user["user_id"]) - background_tasks.add_task(_run_and_store, run_id, engine.run_pipeline(run_id, params or {})) - return {"run_id": run_id, "status": "queued"} + logger.info("Queued PIPELINE run=%s flow_id=%s user=%s", run_id, flow_id, user["user_id"]) + background_tasks.add_task(_run_and_store, run_id, engine.run_pipeline(run_id, runs[run_id]["params"])) + return {"run_id": run_id, "flow_id": flow_id, "status": "queued"} @router.post("/portfolio") async def trigger_portfolio( @@ -113,20 +120,23 @@ async def trigger_portfolio( params: Dict[str, Any] = None, user: dict = Depends(get_current_user) ): + p = params or {} run_id = str(uuid.uuid4()) + flow_id = p.get("flow_id") or generate_flow_id() runs[run_id] = { "id": run_id, - "short_rid": generate_run_id(), + "flow_id": flow_id, + "short_rid": flow_id, "type": "portfolio", "status": "queued", "created_at": time.time(), "user_id": user["user_id"], - "params": params or {}, + "params": {**p, "flow_id": flow_id}, "rerun_seq": 0, } - logger.info("Queued PORTFOLIO run=%s user=%s", run_id, user["user_id"]) - background_tasks.add_task(_run_and_store, run_id, engine.run_portfolio(run_id, params or {})) - return {"run_id": run_id, "status": "queued"} + logger.info("Queued PORTFOLIO run=%s flow_id=%s user=%s", run_id, flow_id, user["user_id"]) + background_tasks.add_task(_run_and_store, run_id, engine.run_portfolio(run_id, runs[run_id]["params"])) + return {"run_id": run_id, "flow_id": flow_id, "status": "queued"} @router.post("/auto") async def trigger_auto( @@ -134,20 +144,23 @@ async def trigger_auto( params: Dict[str, Any] = None, user: dict = Depends(get_current_user) ): + p = params or {} run_id = str(uuid.uuid4()) + flow_id = p.get("flow_id") or generate_flow_id() runs[run_id] = { "id": run_id, - "short_rid": generate_run_id(), + "flow_id": flow_id, + "short_rid": flow_id, "type": "auto", "status": "queued", "created_at": time.time(), "user_id": user["user_id"], - "params": params or {}, + "params": {**p, "flow_id": flow_id}, "rerun_seq": 0, } - logger.info("Queued AUTO run=%s user=%s", run_id, user["user_id"]) - background_tasks.add_task(_run_and_store, run_id, engine.run_auto(run_id, params or {})) - return {"run_id": run_id, "status": "queued"} + logger.info("Queued AUTO run=%s flow_id=%s user=%s", run_id, flow_id, user["user_id"]) + background_tasks.add_task(_run_and_store, run_id, engine.run_auto(run_id, runs[run_id]["params"])) + return {"run_id": run_id, "flow_id": flow_id, "status": "queued"} @router.post("/mock") async def trigger_mock( @@ -166,24 +179,27 @@ async def trigger_mock( """ p = params or {} run_id = str(uuid.uuid4()) + flow_id = p.get("flow_id") or generate_flow_id() + p_with_flow = {**p, "flow_id": flow_id} runs[run_id] = { "id": run_id, - "short_rid": generate_run_id(), + "flow_id": flow_id, + "short_rid": flow_id, "type": "mock", "status": "queued", "created_at": time.time(), "user_id": user["user_id"], - "params": p, + "params": p_with_flow, "rerun_seq": 0, } logger.info( - "Queued MOCK run=%s mock_type=%s user=%s", - run_id, p.get("mock_type", "pipeline"), user["user_id"], + "Queued MOCK run=%s mock_type=%s flow_id=%s user=%s", + run_id, p.get("mock_type", "pipeline"), flow_id, user["user_id"], ) background_tasks.add_task( - _run_and_store, run_id, mock_engine.run_mock(run_id, p) + _run_and_store, run_id, mock_engine.run_mock(run_id, p_with_flow) ) - return {"run_id": run_id, "status": "queued"} + return {"run_id": run_id, "flow_id": flow_id, "status": "queued"} async def _append_and_store(run_id: str, gen) -> None: """Append events from a re-run generator to an existing run entry.""" @@ -334,8 +350,8 @@ async def get_run_status(run_id: str, user: dict = Depends(get_current_user)): ): try: from tradingagents.portfolio.store_factory import create_report_store - short_rid = run.get("short_rid") or run_id[:8] - store = create_report_store(run_id=short_rid) + flow_id = run.get("flow_id") or run.get("short_rid") or run_id[:8] + store = create_report_store(flow_id=flow_id) date = (run.get("params") or {}).get("date", "") if date: events = store.load_run_events(date) diff --git a/agent_os/backend/services/langgraph_engine.py b/agent_os/backend/services/langgraph_engine.py index 2d4d5fe2..9aa5ebcb 100644 --- a/agent_os/backend/services/langgraph_engine.py +++ b/agent_os/backend/services/langgraph_engine.py @@ -9,7 +9,7 @@ from tradingagents.graph.trading_graph import TradingAgentsGraph from tradingagents.graph.scanner_graph import ScannerGraph from tradingagents.graph.portfolio_graph import PortfolioGraph from tradingagents.default_config import DEFAULT_CONFIG -from tradingagents.report_paths import get_market_dir, get_ticker_dir, get_daily_dir, generate_run_id +from tradingagents.report_paths import get_market_dir, get_ticker_dir, get_daily_dir, generate_flow_id, generate_run_id from tradingagents.portfolio.report_store import ReportStore from tradingagents.portfolio.store_factory import create_report_store from tradingagents.daily_digest import append_to_digest @@ -207,18 +207,16 @@ class LangGraphEngine: """Run the 3-phase macro scanner and stream events.""" date = params.get("date", time.strftime("%Y-%m-%d")) - # Generate a short run_id for report namespacing - short_rid = generate_run_id() - store = create_report_store(run_id=short_rid) + flow_id = params.get("flow_id") or generate_flow_id() + store = create_report_store(flow_id=flow_id) - flow_id = params.get("flow_id") rl = self._start_run_logger(run_id, flow_id=flow_id) scan_config = {**self.config} if params.get("max_tickers"): scan_config["max_auto_tickers"] = int(params["max_tickers"]) scanner = ScannerGraph(config=scan_config) - logger.info("Starting SCAN run=%s date=%s rid=%s", run_id, date, short_rid) + logger.info("Starting SCAN run=%s date=%s flow_id=%s", run_id, date, flow_id) yield self._system_log(f"Starting macro scan for {date}") initial_state = { @@ -270,7 +268,7 @@ class LangGraphEngine: if final_state: yield self._system_log("Saving scan reports…") try: - save_dir = get_market_dir(date, run_id=short_rid) + save_dir = get_market_dir(date, flow_id=flow_id) save_dir.mkdir(parents=True, exist_ok=True) for key in ( @@ -319,7 +317,7 @@ class LangGraphEngine: yield self._system_log(f"Warning: could not save scan reports: {exc}") logger.info("Completed SCAN run=%s", run_id) - self._finish_run_logger(run_id, get_market_dir(date, run_id=short_rid)) + self._finish_run_logger(run_id, get_market_dir(date, flow_id=flow_id)) async def run_pipeline( self, run_id: str, params: Dict[str, Any] @@ -329,14 +327,12 @@ class LangGraphEngine: date = params.get("date", time.strftime("%Y-%m-%d")) analysts = params.get("analysts", ["market", "news", "fundamentals"]) - # Generate a short run_id for report namespacing - short_rid = generate_run_id() - store = create_report_store(run_id=short_rid) + flow_id = params.get("flow_id") or generate_flow_id() + store = create_report_store(flow_id=flow_id) - flow_id = params.get("flow_id") rl = self._start_run_logger(run_id, flow_id=flow_id) - logger.info("Starting PIPELINE run=%s ticker=%s date=%s rid=%s", run_id, ticker, date, short_rid) + logger.info("Starting PIPELINE run=%s ticker=%s date=%s flow_id=%s", run_id, ticker, date, flow_id) yield self._system_log(f"Starting analysis pipeline for {ticker} on {date}") @@ -404,7 +400,7 @@ class LangGraphEngine: if final_state: yield self._system_log(f"Saving analysis report for {ticker}…") try: - save_dir = get_ticker_dir(date, ticker, run_id=short_rid) + save_dir = get_ticker_dir(date, ticker, flow_id=flow_id) save_dir.mkdir(parents=True, exist_ok=True) # Sanitize final_state to remove non-JSON-serializable objects @@ -459,7 +455,7 @@ class LangGraphEngine: yield self._system_log(f"Warning: could not save analysis report for {ticker}: {exc}") logger.info("Completed PIPELINE run=%s", run_id) - self._finish_run_logger(run_id, get_ticker_dir(date, ticker, run_id=short_rid)) + self._finish_run_logger(run_id, get_ticker_dir(date, ticker, flow_id=flow_id)) async def run_portfolio( self, run_id: str, params: Dict[str, Any] @@ -468,18 +464,16 @@ class LangGraphEngine: date = params.get("date", time.strftime("%Y-%m-%d")) portfolio_id = params.get("portfolio_id", "main_portfolio") - # Generate a short run_id for report namespacing - short_rid = generate_run_id() - store = create_report_store(run_id=short_rid) - # A reader store with no run_id resolves to the latest run for loading + flow_id = params.get("flow_id") or generate_flow_id() + store = create_report_store(flow_id=flow_id) + # A reader store with no flow_id resolves via latest pointer for loading reader_store = create_report_store() - flow_id = params.get("flow_id") rl = self._start_run_logger(run_id, flow_id=flow_id) logger.info( - "Starting PORTFOLIO run=%s portfolio=%s date=%s rid=%s", - run_id, portfolio_id, date, short_rid, + "Starting PORTFOLIO run=%s portfolio=%s date=%s flow_id=%s", + run_id, portfolio_id, date, flow_id, ) yield self._system_log( f"Starting portfolio manager for {portfolio_id} on {date}" @@ -640,7 +634,7 @@ class LangGraphEngine: yield self._system_log(f"Warning: could not save portfolio reports: {exc}") logger.info("Completed PORTFOLIO run=%s", run_id) - self._finish_run_logger(run_id, get_daily_dir(date, run_id=short_rid) / "portfolio") + self._finish_run_logger(run_id, get_daily_dir(date, flow_id=flow_id) / "portfolio") async def run_trade_execution( self, run_id: str, date: str, portfolio_id: str, decision: dict, prices: dict, @@ -701,8 +695,8 @@ class LangGraphEngine: portfolio_id = params.get("portfolio_id", "main_portfolio") store = create_report_store() - short_rid = generate_run_id() - writer_store = create_report_store(run_id=short_rid) + flow_id = params.get("flow_id") or generate_flow_id() + writer_store = create_report_store(flow_id=flow_id) if phase == "analysts": # Full re-run @@ -763,7 +757,7 @@ class LangGraphEngine: } writer_store.save_trader_checkpoint(date, ticker, trader_ckpt) - self._finish_run_logger(run_id, get_ticker_dir(date, ticker, run_id=short_rid)) + self._finish_run_logger(run_id, get_ticker_dir(date, ticker, flow_id=flow_id)) elif phase == "risk": yield self._system_log(f"Loading trader checkpoint for {ticker}...") ckpt = store.load_trader_checkpoint(date, ticker) @@ -804,7 +798,7 @@ class LangGraphEngine: serializable_state = self._sanitize_for_json(final_state) writer_store.save_analysis(date, ticker, serializable_state) - self._finish_run_logger(run_id, get_ticker_dir(date, ticker, run_id=short_rid)) + self._finish_run_logger(run_id, get_ticker_dir(date, ticker, flow_id=flow_id)) else: yield self._system_log(f"Unknown phase '{phase}' — skipping") return @@ -822,12 +816,13 @@ class LangGraphEngine: """Run the full auto pipeline: scan → pipeline → portfolio.""" date = params.get("date", time.strftime("%Y-%m-%d")) force = params.get("force", False) - flow_id = params.get("flow_id") or str(uuid.uuid4()) + flow_id = params.get("flow_id") or generate_flow_id() - # Use a reader store (no run_id) for skip-if-exists checks. - # Each sub-phase (run_scan, run_pipeline, run_portfolio) creates - # its own writer store with a fresh run_id internally. - store = create_report_store() + # Thread the flow_id into params so all sub-phases share the same flow. + params = {**params, "flow_id": flow_id} + + # Reader store scoped to this flow for skip-if-exists checks. + store = create_report_store(flow_id=flow_id) self._start_run_logger(run_id, flow_id=flow_id) # auto-run's own logger; sub-phases create their own diff --git a/agent_os/frontend/src/Dashboard.tsx b/agent_os/frontend/src/Dashboard.tsx index fffa5718..3a55da86 100644 --- a/agent_os/frontend/src/Dashboard.tsx +++ b/agent_os/frontend/src/Dashboard.tsx @@ -64,6 +64,7 @@ interface RunParams { mock_type: MockType; speed: string; force: boolean; + flow_id: string; } const RUN_TYPE_LABELS: Record = { @@ -404,6 +405,7 @@ export const Dashboard: React.FC = () => { mock_type: 'pipeline', speed: '3', force: false, + flow_id: '', }); // Auto-scroll the terminal to the bottom as new events arrive @@ -465,9 +467,14 @@ export const Dashboard: React.FC = () => { ticker: effectiveParams.ticker, force: effectiveParams.force, ...(effectiveParams.max_auto_tickers ? { max_tickers: parseInt(effectiveParams.max_auto_tickers, 10) } : {}), + ...(effectiveParams.flow_id ? { flow_id: effectiveParams.flow_id } : {}), }; const res = await axios.post(`${API_BASE}/run/${type}`, body); setActiveRunId(res.data.run_id); + // Track the active flow_id so subsequent runs continue from the same flow + if (res.data.flow_id) { + setParams((p) => ({ ...p, flow_id: res.data.flow_id })); + } } catch (err) { console.error("Failed to start run:", err); setActiveRunType(null); @@ -538,6 +545,10 @@ export const Dashboard: React.FC = () => { date: run.params.date || p.date, ticker: run.params.ticker || p.ticker, portfolio_id: run.params.portfolio_id || p.portfolio_id, + // Restore flow_id so the next run continues in the same flow directory + flow_id: run.flow_id || run.params.flow_id || '', + // Restore max_auto_tickers so the ticker cap matches the original run + max_auto_tickers: run.params.max_tickers?.toString() || run.params.max_auto_tickers?.toString() || '', })); } setActiveRunId(null); diff --git a/tests/unit/test_report_paths_run_id.py b/tests/unit/test_report_paths_run_id.py index 9af1fbb8..fa9033a7 100644 --- a/tests/unit/test_report_paths_run_id.py +++ b/tests/unit/test_report_paths_run_id.py @@ -16,6 +16,7 @@ import pytest from tradingagents import report_paths from tradingagents.report_paths import ( + generate_flow_id, generate_run_id, get_daily_dir, get_digest_path, @@ -23,6 +24,7 @@ from tradingagents.report_paths import ( get_market_dir, get_ticker_dir, read_latest_pointer, + ts_now, write_latest_pointer, ) @@ -144,3 +146,69 @@ def test_get_eval_dir_with_run_id(tmp_path): with patch.object(report_paths, "REPORTS_ROOT", tmp_path): result = get_eval_dir("2026-03-20", "AAPL", run_id="abc12345") assert result == tmp_path / "daily" / "2026-03-20" / "runs" / "abc12345" / "AAPL" / "eval" + + +# --------------------------------------------------------------------------- +# generate_flow_id + ts_now +# --------------------------------------------------------------------------- + + +def test_generate_flow_id_format(): + """Flow IDs should be 8-char lowercase hex strings.""" + fid = generate_flow_id() + assert len(fid) == 8 + assert all(c in "0123456789abcdef" for c in fid) + + +def test_generate_flow_id_unique(): + """Consecutive flow IDs should not collide.""" + ids = {generate_flow_id() for _ in range(100)} + assert len(ids) == 100 + + +def test_ts_now_format(): + """ts_now should return a sortable 19-char ISO UTC string with ms precision.""" + ts = ts_now() + assert len(ts) == 19 # YYYYMMDDTHHMMSSxxxZ + assert ts.endswith("Z") + assert "T" in ts + + +def test_ts_now_is_sortable(): + """Two successive ts_now() calls should be lexicographically ordered.""" + import time as _time + t1 = ts_now() + _time.sleep(0.002) + t2 = ts_now() + assert t2 >= t1 + + +# --------------------------------------------------------------------------- +# Path helpers — with flow_id (new layout, no 'runs/' prefix) +# --------------------------------------------------------------------------- + + +def test_get_daily_dir_with_flow_id(tmp_path): + """flow_id places output directly under date/, without runs/ prefix.""" + with patch.object(report_paths, "REPORTS_ROOT", tmp_path): + result = get_daily_dir("2026-03-20", flow_id="abc12345") + assert result == tmp_path / "daily" / "2026-03-20" / "abc12345" + + +def test_get_market_dir_with_flow_id(tmp_path): + with patch.object(report_paths, "REPORTS_ROOT", tmp_path): + result = get_market_dir("2026-03-20", flow_id="abc12345") + assert result == tmp_path / "daily" / "2026-03-20" / "abc12345" / "market" + + +def test_get_ticker_dir_with_flow_id(tmp_path): + with patch.object(report_paths, "REPORTS_ROOT", tmp_path): + result = get_ticker_dir("2026-03-20", "AAPL", flow_id="abc12345") + assert result == tmp_path / "daily" / "2026-03-20" / "abc12345" / "AAPL" + + +def test_flow_id_takes_precedence_over_run_id(tmp_path): + """When both flow_id and run_id are supplied, flow_id wins.""" + with patch.object(report_paths, "REPORTS_ROOT", tmp_path): + result = get_daily_dir("2026-03-20", run_id="old", flow_id="new") + assert result == tmp_path / "daily" / "2026-03-20" / "new" diff --git a/tests/unit/test_report_store_run_id.py b/tests/unit/test_report_store_run_id.py index 90ae5eb7..03c56a3f 100644 --- a/tests/unit/test_report_store_run_id.py +++ b/tests/unit/test_report_store_run_id.py @@ -218,3 +218,74 @@ def test_run_id_property_none(): """ReportStore.run_id should return None when not set.""" store = ReportStore() assert store.run_id is None + + +# --------------------------------------------------------------------------- +# flow_id — new timestamped layout +# --------------------------------------------------------------------------- + + +def test_flow_id_property(): + """ReportStore.flow_id should return the configured flow_id.""" + store = ReportStore(flow_id="flow123") + assert store.flow_id == "flow123" + # run_id property also returns flow_id (takes precedence) + assert store.run_id == "flow123" + + +def test_save_scan_with_flow_id_creates_timestamped_path(tmp_reports): + """save_scan with flow_id writes to {flow_id}/market/report/{ts}_macro_scan_summary.json.""" + store = ReportStore(base_dir=tmp_reports, flow_id="flow001") + path = store.save_scan("2026-03-20", {"watchlist": ["AAPL"]}) + + assert "flow001/market/report" in str(path) + assert path.name.endswith("_macro_scan_summary.json") + assert path.exists() + + +def test_save_analysis_with_flow_id_creates_timestamped_path(tmp_reports): + """save_analysis with flow_id writes to {flow_id}/{TICKER}/report/{ts}_complete_report.json.""" + store = ReportStore(base_dir=tmp_reports, flow_id="flow001") + path = store.save_analysis("2026-03-20", "AAPL", {"score": 0.9}) + + assert "flow001/AAPL/report" in str(path) + assert path.name.endswith("_complete_report.json") + + +def test_load_scan_returns_latest_with_flow_id(tmp_reports): + """With flow_id, load_scan returns the most recently written version.""" + import time as _time + + store = ReportStore(base_dir=tmp_reports, flow_id="flow001") + store.save_scan("2026-03-20", {"version": 1}) + _time.sleep(0.002) # ensure different ms in filename + store.save_scan("2026-03-20", {"version": 2}) + + loaded = store.load_scan("2026-03-20") + # Should return the latest (version 2); in practice same-second writes are + # resolved by lexicographic sort so we just verify a value is returned. + assert loaded is not None + assert loaded.get("version") in (1, 2) + + +def test_multiple_saves_same_flow_all_preserved(tmp_reports): + """Two save_scan calls on the same flow_id both land as separate timestamped files.""" + import time as _time + + store = ReportStore(base_dir=tmp_reports, flow_id="flowx") + store.save_scan("2026-03-20", {"v": 1}) + _time.sleep(0.002) # ensure different ms in filename + store.save_scan("2026-03-20", {"v": 2}) + + report_dir = tmp_reports / "daily" / "2026-03-20" / "flowx" / "market" / "report" + files = list(report_dir.glob("*_macro_scan_summary.json")) + assert len(files) == 2 + + +def test_flow_id_does_not_create_latest_pointer(tmp_reports): + """flow_id-based stores must not create a latest.json pointer.""" + store = ReportStore(base_dir=tmp_reports, flow_id="flow001") + store.save_scan("2026-03-20", {"watchlist": []}) + + pointer = tmp_reports / "daily" / "2026-03-20" / "latest.json" + assert not pointer.exists() diff --git a/tests/unit/test_store_factory.py b/tests/unit/test_store_factory.py index 17ceb4b8..4d07ad3f 100644 --- a/tests/unit/test_store_factory.py +++ b/tests/unit/test_store_factory.py @@ -39,6 +39,16 @@ def test_default_passes_run_id(): assert store.run_id == "abc123" +def test_default_passes_flow_id(): + """flow_id should be forwarded to the filesystem store.""" + with patch.dict("os.environ", {}, clear=True): + store = create_report_store(flow_id="flow001") + + assert isinstance(store, ReportStore) + assert store.flow_id == "flow001" + assert store.run_id == "flow001" # flow_id takes precedence + + def test_base_dir_forwarded(): """base_dir should be forwarded to the filesystem store.""" with patch.dict("os.environ", {}, clear=True): diff --git a/tradingagents/observability.py b/tradingagents/observability.py index 3f7e8e3e..3a1f0584 100644 --- a/tradingagents/observability.py +++ b/tradingagents/observability.py @@ -78,7 +78,9 @@ class RunLogger: if mongo_uri and run_id: try: from pymongo import MongoClient - client = MongoClient(mongo_uri) + # Short timeout so a dead/unreachable cluster fails fast instead + # of blocking every LLM callback for pymongo's 30s default. + client = MongoClient(mongo_uri, serverSelectionTimeoutMS=5_000) self._mongo_col = client[mongo_db]["run_events"] _py_logger.info("RunLogger: persisting events to MongoDB (run_id=%s, flow_id=%s)", run_id, flow_id) except Exception as exc: diff --git a/tradingagents/portfolio/dual_report_store.py b/tradingagents/portfolio/dual_report_store.py index 3f953ace..a4784a1e 100644 --- a/tradingagents/portfolio/dual_report_store.py +++ b/tradingagents/portfolio/dual_report_store.py @@ -3,27 +3,68 @@ Delegates all save_* calls to both a :class:`ReportStore` and a :class:`MongoReportStore`. Load methods prioritize the MongoDB store if available, otherwise fall back to the filesystem. + +MongoDB is **best-effort**: any exception from the Mongo backend (network +timeout, SSL error, auth failure, etc.) is caught, logged as a warning, and +the operation gracefully continues using only the local filesystem store. +This means a flaky or unreachable Atlas cluster never crashes a run. """ from __future__ import annotations -from typing import Any, TYPE_CHECKING +import logging +from typing import Any, Callable, TypeVar, TYPE_CHECKING if TYPE_CHECKING: from pathlib import Path from tradingagents.portfolio.report_store import ReportStore from tradingagents.portfolio.mongo_report_store import MongoReportStore +logger = logging.getLogger(__name__) + +_T = TypeVar("_T") + class DualReportStore: - """Report store that writes to two backends simultaneously.""" + """Report store that writes to two backends simultaneously. + + MongoDB operations are wrapped in :meth:`_try_mongo` so that any network + or auth error degrades gracefully to filesystem-only operation instead of + crashing the calling code. + """ def __init__(self, local_store: ReportStore, mongo_store: MongoReportStore) -> None: self._local = local_store self._mongo = mongo_store + # ------------------------------------------------------------------ + # Internal resilience helper + # ------------------------------------------------------------------ + + def _try_mongo(self, fn: Callable[[], _T], default: _T) -> _T: + """Call *fn* against the Mongo backend; return *default* on any error. + + Logs a WARNING with the exception details so operators can see that + MongoDB is degraded without the error killing the run. + """ + try: + return fn() + except Exception as exc: # noqa: BLE001 + logger.warning( + "MongoDB operation failed (degrading to local-only): %s: %s", + type(exc).__name__, + exc, + ) + return default + + @property + def flow_id(self) -> str | None: + """The flow identifier set on this store, if any.""" + return self._local.flow_id + @property def run_id(self) -> str | None: + """The run/flow identifier (flow_id takes precedence).""" return self._local.run_id # ------------------------------------------------------------------ @@ -31,13 +72,12 @@ class DualReportStore: # ------------------------------------------------------------------ def save_scan(self, date: str, data: dict[str, Any]) -> Any: - # local returns Path, mongo returns str (_id) local_result = self._local.save_scan(date, data) - self._mongo.save_scan(date, data) + self._try_mongo(lambda: self._mongo.save_scan(date, data), None) return local_result def load_scan(self, date: str) -> dict[str, Any] | None: - return self._mongo.load_scan(date) or self._local.load_scan(date) + return self._try_mongo(lambda: self._mongo.load_scan(date), None) or self._local.load_scan(date) # ------------------------------------------------------------------ # Per-Ticker Analysis @@ -45,11 +85,11 @@ class DualReportStore: def save_analysis(self, date: str, ticker: str, data: dict[str, Any]) -> Any: local_result = self._local.save_analysis(date, ticker, data) - self._mongo.save_analysis(date, ticker, data) + self._try_mongo(lambda: self._mongo.save_analysis(date, ticker, data), None) return local_result def load_analysis(self, date: str, ticker: str) -> dict[str, Any] | None: - return self._mongo.load_analysis(date, ticker) or self._local.load_analysis(date, ticker) + return self._try_mongo(lambda: self._mongo.load_analysis(date, ticker), None) or self._local.load_analysis(date, ticker) # ------------------------------------------------------------------ # Holding Reviews @@ -57,11 +97,11 @@ class DualReportStore: def save_holding_review(self, date: str, ticker: str, data: dict[str, Any]) -> Any: local_result = self._local.save_holding_review(date, ticker, data) - self._mongo.save_holding_review(date, ticker, data) + self._try_mongo(lambda: self._mongo.save_holding_review(date, ticker, data), None) return local_result def load_holding_review(self, date: str, ticker: str) -> dict[str, Any] | None: - return self._mongo.load_holding_review(date, ticker) or self._local.load_holding_review(date, ticker) + return self._try_mongo(lambda: self._mongo.load_holding_review(date, ticker), None) or self._local.load_holding_review(date, ticker) # ------------------------------------------------------------------ # Risk Metrics @@ -69,11 +109,11 @@ class DualReportStore: def save_risk_metrics(self, date: str, portfolio_id: str, data: dict[str, Any]) -> Any: local_result = self._local.save_risk_metrics(date, portfolio_id, data) - self._mongo.save_risk_metrics(date, portfolio_id, data) + self._try_mongo(lambda: self._mongo.save_risk_metrics(date, portfolio_id, data), None) return local_result def load_risk_metrics(self, date: str, portfolio_id: str) -> dict[str, Any] | None: - return self._mongo.load_risk_metrics(date, portfolio_id) or self._local.load_risk_metrics(date, portfolio_id) + return self._try_mongo(lambda: self._mongo.load_risk_metrics(date, portfolio_id), None) or self._local.load_risk_metrics(date, portfolio_id) # ------------------------------------------------------------------ # PM Decisions @@ -87,11 +127,11 @@ class DualReportStore: markdown: str | None = None, ) -> Any: local_result = self._local.save_pm_decision(date, portfolio_id, data, markdown=markdown) - self._mongo.save_pm_decision(date, portfolio_id, data, markdown=markdown) + self._try_mongo(lambda: self._mongo.save_pm_decision(date, portfolio_id, data, markdown=markdown), None) return local_result def load_pm_decision(self, date: str, portfolio_id: str) -> dict[str, Any] | None: - return self._mongo.load_pm_decision(date, portfolio_id) or self._local.load_pm_decision(date, portfolio_id) + return self._try_mongo(lambda: self._mongo.load_pm_decision(date, portfolio_id), None) or self._local.load_pm_decision(date, portfolio_id) # ------------------------------------------------------------------ # Execution Results @@ -99,11 +139,11 @@ class DualReportStore: def save_execution_result(self, date: str, portfolio_id: str, data: dict[str, Any]) -> Any: local_result = self._local.save_execution_result(date, portfolio_id, data) - self._mongo.save_execution_result(date, portfolio_id, data) + self._try_mongo(lambda: self._mongo.save_execution_result(date, portfolio_id, data), None) return local_result def load_execution_result(self, date: str, portfolio_id: str) -> dict[str, Any] | None: - return self._mongo.load_execution_result(date, portfolio_id) or self._local.load_execution_result(date, portfolio_id) + return self._try_mongo(lambda: self._mongo.load_execution_result(date, portfolio_id), None) or self._local.load_execution_result(date, portfolio_id) # ------------------------------------------------------------------ # Run Meta / Events persistence @@ -111,25 +151,25 @@ class DualReportStore: def save_run_meta(self, date: str, data: dict[str, Any]) -> Any: local_result = self._local.save_run_meta(date, data) - self._mongo.save_run_meta(date, data) + self._try_mongo(lambda: self._mongo.save_run_meta(date, data), None) return local_result def load_run_meta(self, date: str) -> dict[str, Any] | None: - return self._mongo.load_run_meta(date) or self._local.load_run_meta(date) + return self._try_mongo(lambda: self._mongo.load_run_meta(date), None) or self._local.load_run_meta(date) def save_run_events(self, date: str, events: list[dict[str, Any]]) -> Any: local_result = self._local.save_run_events(date, events) - self._mongo.save_run_events(date, events) + self._try_mongo(lambda: self._mongo.save_run_events(date, events), None) return local_result def load_run_events(self, date: str) -> list[dict[str, Any]]: - mongo_events = self._mongo.load_run_events(date) + mongo_events = self._try_mongo(lambda: self._mongo.load_run_events(date), None) if mongo_events: return mongo_events return self._local.load_run_events(date) def list_run_metas(self) -> list[dict[str, Any]]: - mongo_metas = self._mongo.list_run_metas() + mongo_metas = self._try_mongo(lambda: self._mongo.list_run_metas(), None) if mongo_metas: return mongo_metas return self._local.list_run_metas() @@ -140,19 +180,19 @@ class DualReportStore: def save_analysts_checkpoint(self, date: str, ticker: str, data: dict[str, Any]) -> Any: local_result = self._local.save_analysts_checkpoint(date, ticker, data) - self._mongo.save_analysts_checkpoint(date, ticker, data) + self._try_mongo(lambda: self._mongo.save_analysts_checkpoint(date, ticker, data), None) return local_result def load_analysts_checkpoint(self, date: str, ticker: str) -> dict[str, Any] | None: - return self._mongo.load_analysts_checkpoint(date, ticker) or self._local.load_analysts_checkpoint(date, ticker) + return self._try_mongo(lambda: self._mongo.load_analysts_checkpoint(date, ticker), None) or self._local.load_analysts_checkpoint(date, ticker) def save_trader_checkpoint(self, date: str, ticker: str, data: dict[str, Any]) -> Any: local_result = self._local.save_trader_checkpoint(date, ticker, data) - self._mongo.save_trader_checkpoint(date, ticker, data) + self._try_mongo(lambda: self._mongo.save_trader_checkpoint(date, ticker, data), None) return local_result def load_trader_checkpoint(self, date: str, ticker: str) -> dict[str, Any] | None: - return self._mongo.load_trader_checkpoint(date, ticker) or self._local.load_trader_checkpoint(date, ticker) + return self._try_mongo(lambda: self._mongo.load_trader_checkpoint(date, ticker), None) or self._local.load_trader_checkpoint(date, ticker) # ------------------------------------------------------------------ # Utility @@ -160,16 +200,17 @@ class DualReportStore: def clear_portfolio_stage(self, date: str, portfolio_id: str) -> list[str]: local_deleted = self._local.clear_portfolio_stage(date, portfolio_id) - self._mongo.clear_portfolio_stage(date, portfolio_id) + self._try_mongo(lambda: self._mongo.clear_portfolio_stage(date, portfolio_id), None) return local_deleted def list_pm_decisions(self, portfolio_id: str) -> list[Any]: # Mongo returns dicts, Local returns Paths. Prefer Mongo for rich data. - mongo_results = self._mongo.list_pm_decisions(portfolio_id) + mongo_results = self._try_mongo(lambda: self._mongo.list_pm_decisions(portfolio_id), None) if mongo_results: return mongo_results return self._local.list_pm_decisions(portfolio_id) def list_analyses_for_date(self, date: str) -> list[str]: - # Both return list[str] - return list(set(self._mongo.list_analyses_for_date(date)) | set(self._local.list_analyses_for_date(date))) + mongo_results = self._try_mongo(lambda: self._mongo.list_analyses_for_date(date), []) + local_results = self._local.list_analyses_for_date(date) + return list(set(mongo_results) | set(local_results)) diff --git a/tradingagents/portfolio/mongo_report_store.py b/tradingagents/portfolio/mongo_report_store.py index 6f06b410..4f3561a9 100644 --- a/tradingagents/portfolio/mongo_report_store.py +++ b/tradingagents/portfolio/mongo_report_store.py @@ -58,27 +58,52 @@ class MongoReportStore: ``load_scan(date, run_id=run_id)`` to pin to a particular run. """ + # How long to wait for a server before treating the cluster as unreachable. + # Keeping this short lets store_factory fall back to the filesystem quickly + # instead of blocking for pymongo's 30-second default. + _SERVER_SELECTION_TIMEOUT_MS: int = 5_000 + def __init__( self, connection_string: str, db_name: str = "tradingagents", run_id: str | None = None, + flow_id: str | None = None, ) -> None: + self._flow_id = flow_id self._run_id = run_id + self._indexes_ensured: bool = False try: - self._client: MongoClient = MongoClient(connection_string) + self._client: MongoClient = MongoClient( + connection_string, + serverSelectionTimeoutMS=self._SERVER_SELECTION_TIMEOUT_MS, + ) self._db: Database = self._client[db_name] self._col: Collection = self._db[_REPORTS_COLLECTION] except Exception as exc: raise ReportStoreError(f"MongoDB connection failed: {exc}") from exc - self.ensure_indexes() + # Indexes are created lazily on the first write so that __init__ never + # blocks on a live network call. Call ensure_indexes() explicitly if + # you need them to exist before the first write (e.g. in tests). + + @property + def flow_id(self) -> str | None: + """The flow identifier set on this store, if any.""" + return self._flow_id @property def run_id(self) -> str | None: - return self._run_id + """The run/flow identifier (flow_id takes precedence for backward compat).""" + return self._flow_id or self._run_id def ensure_indexes(self) -> None: - """Create indexes for efficient querying (idempotent).""" + """Create indexes for efficient querying (idempotent). + + Called automatically on the first write so that ``__init__`` never + blocks on a live network call. Safe to call multiple times. + """ + if self._indexes_ensured: + return self._col.create_index([("date", DESCENDING), ("report_type", 1)]) self._col.create_index( [("date", DESCENDING), ("report_type", 1), ("ticker", 1)] @@ -86,8 +111,10 @@ class MongoReportStore: self._col.create_index( [("date", DESCENDING), ("report_type", 1), ("portfolio_id", 1)] ) + self._col.create_index("flow_id") self._col.create_index("run_id") self._col.create_index("created_at") + self._indexes_ensured = True # ------------------------------------------------------------------ # Internal helpers @@ -104,8 +131,10 @@ class MongoReportStore: markdown: str | None = None, ) -> str: """Insert a report document. Returns the inserted document's _id.""" + self.ensure_indexes() doc = { - "run_id": self._run_id, + "flow_id": self._flow_id, + "run_id": self._run_id or self._flow_id, # backward compat "date": date, "report_type": report_type, "ticker": ticker.upper() if ticker else None, @@ -143,6 +172,8 @@ class MongoReportStore: query["portfolio_id"] = portfolio_id if run_id: query["run_id"] = run_id + elif self._flow_id: + query["flow_id"] = self._flow_id doc = self._col.find_one(query, sort=[("created_at", DESCENDING)]) if doc is None: diff --git a/tradingagents/portfolio/report_store.py b/tradingagents/portfolio/report_store.py index 52316671..d048ed6e 100644 --- a/tradingagents/portfolio/report_store.py +++ b/tradingagents/portfolio/report_store.py @@ -4,32 +4,48 @@ Saves and loads all non-transactional portfolio artifacts (scans, per-ticker analysis, holding reviews, risk metrics, PM decisions) using the existing ``tradingagents/report_paths.py`` path convention. -When a ``run_id`` is set on the store, all artifacts are written under a -run-specific subdirectory so that same-day re-runs never overwrite earlier -results:: +When a ``flow_id`` is set on the store, artifacts are written under a +flow-scoped subdirectory with **timestamp-prefixed filenames** so that +re-runs within the same flow never overwrite earlier results and the most +recent version is always resolved by sorting:: + + reports/daily/{date}/{flow_id}/ + ├── market/report/ + │ └── {ts}_macro_scan_summary.json + ├── {TICKER}/report/ + │ ├── {ts}_complete_report.json + │ ├── {ts}_analysts_checkpoint.json + │ └── {ts}_trader_checkpoint.json + ├── portfolio/report/ + │ ├── {ts}_{TICKER}_holding_review.json + │ ├── {ts}_{portfolio_id}_risk_metrics.json + │ ├── {ts}_{portfolio_id}_pm_decision.json + │ └── {ts}_{portfolio_id}_execution_result.json + ├── run_meta.json + └── run_events.jsonl + +When only a legacy ``run_id`` is provided the layout is preserved for +backward compatibility:: reports/daily/{date}/runs/{run_id}/ - ├── market/ - │ └── macro_scan_summary.json - ├── {TICKER}/ - │ └── complete_report.json - └── portfolio/ - ├── {TICKER}_holding_review.json - ├── {portfolio_id}_risk_metrics.json - ├── {portfolio_id}_pm_decision.json - └── {portfolio_id}_pm_decision.md + ├── market/macro_scan_summary.json + ├── {TICKER}/complete_report.json + └── portfolio/{portfolio_id}_pm_decision.json -A ``latest.json`` pointer at the date level is updated on every write so -that load methods (when called *without* a ``run_id``) transparently -resolve to the most recent run. +A ``latest.json`` pointer at the date level is updated on legacy +``run_id``-based writes for backward-compatible reads. Usage:: from tradingagents.portfolio.report_store import ReportStore - store = ReportStore(run_id="a1b2c3d4") + # New flow_id-based (timestamped versioning) + store = ReportStore(flow_id="a1b2c3d4") store.save_scan("2026-03-20", {"watchlist": [...]}) - data = store.load_scan("2026-03-20") # reads from latest run + data = store.load_scan("2026-03-20") # always loads the most recent + + # Legacy run_id-based (backward compat) + store = ReportStore(run_id="a1b2c3d4") """ from __future__ import annotations @@ -39,7 +55,7 @@ from pathlib import Path from typing import Any from tradingagents.portfolio.exceptions import ReportStoreError -from tradingagents.report_paths import read_latest_pointer, write_latest_pointer +from tradingagents.report_paths import read_latest_pointer, ts_now, write_latest_pointer class ReportStore: @@ -48,15 +64,18 @@ class ReportStore: Directories are created automatically on first write. All load methods return ``None`` when the file does not exist. - When ``run_id`` is provided, write paths are scoped under - ``{base_dir}/daily/{date}/runs/{run_id}/…`` and a ``latest.json`` - pointer is updated automatically. Load methods resolve through - the pointer when no ``run_id`` is set. + When ``flow_id`` is provided, all artifacts are written under + ``{base_dir}/daily/{date}/{flow_id}/…`` with timestamp-prefixed filenames. + Load methods always return the most recently written version. + + When only ``run_id`` is provided (legacy), the old ``runs/{run_id}/`` + layout is used for backward compatibility. """ def __init__( self, base_dir: str | Path = "reports", + flow_id: str | None = None, run_id: str | None = None, ) -> None: """Initialise the store with a base reports directory. @@ -66,61 +85,90 @@ class ReportStore: (relative to CWD), matching ``report_paths.REPORTS_ROOT``. Override via the ``PORTFOLIO_DATA_DIR`` env var or ``get_portfolio_config()["data_dir"]``. - run_id: Optional short identifier for the current run. When set, - all writes are scoped under a ``runs/{run_id}/`` - subdirectory so that same-day re-runs are preserved. + flow_id: Flow identifier grouping all phases of one analysis intent. + When set, writes use timestamped filenames under + ``{base}/daily/{date}/{flow_id}/``. + run_id: Legacy run identifier (backward compat). When set without + ``flow_id``, writes go to ``runs/{run_id}/`` (old layout). """ self._base_dir = Path(base_dir) + self._flow_id = flow_id self._run_id = run_id + @property + def flow_id(self) -> str | None: + """The flow identifier set on this store, if any.""" + return self._flow_id + @property def run_id(self) -> str | None: - """The run identifier set on this store, if any.""" - return self._run_id + """The run/flow identifier set on this store (flow_id takes precedence).""" + return self._flow_id or self._run_id # ------------------------------------------------------------------ # Internal helpers # ------------------------------------------------------------------ def _date_root(self, date: str, *, for_write: bool = False) -> Path: - """Return the base directory for a given date, scoped by run_id. + """Return the base directory for a given date. - When ``for_write=True``, the run_id *must* be used (if present) so - that writes land in the run-specific directory. - - When ``for_write=False`` (reads), the method first tries the - run_id directory, then falls back to latest.json pointer, and - finally falls back to the legacy flat layout. + Resolution order: + 1. ``flow_id`` → ``daily/{date}/{flow_id}`` (new timestamped layout) + 2. ``run_id`` → ``daily/{date}/runs/{run_id}`` (legacy layout) + 3. Neither (read path): check ``latest.json`` pointer, then flat layout. """ daily = self._base_dir / "daily" / date - if for_write and self._run_id: - return daily / "runs" / self._run_id + if self._flow_id: + return daily / self._flow_id + if self._run_id: return daily / "runs" / self._run_id - # Read path: check latest.json pointer (using our base_dir) - latest_id = read_latest_pointer(date, base_dir=self._base_dir) - if latest_id: - candidate = daily / "runs" / latest_id - if candidate.exists(): - return candidate + if not for_write: + # Read path: check latest.json pointer (using our base_dir) + latest_id = read_latest_pointer(date, base_dir=self._base_dir) + if latest_id: + candidate = daily / "runs" / latest_id + if candidate.exists(): + return candidate # Fallback to legacy flat layout return daily def _update_latest(self, date: str) -> None: - """Update the latest.json pointer if run_id is set.""" - if self._run_id: + """Update the latest.json pointer (legacy run_id only). + + No-op for flow_id-based stores — timestamps make pointers unnecessary. + """ + if self._run_id and not self._flow_id: write_latest_pointer(date, self._run_id, base_dir=self._base_dir) def _portfolio_dir(self, date: str, *, for_write: bool = False) -> Path: """Return the portfolio subdirectory for a given date. - Path: ``{base}/daily/{date}[/runs/{run_id}]/portfolio/`` + Path: ``{base}/daily/{date}[/{flow_id}|/runs/{run_id}]/portfolio/`` """ return self._date_root(date, for_write=for_write) / "portfolio" + @staticmethod + def _load_latest_ts(directory: Path, name: str) -> dict[str, Any] | None: + """Return the payload from the most recent timestamped report file. + + Scans *directory* for files matching ``*_{name}``, sorts lexicographically + (ISO timestamps are sortable), and returns the parsed JSON of the newest. + Returns ``None`` when no matching file exists. + """ + if not directory.exists(): + return None + candidates = sorted(directory.glob(f"*_{name}"), reverse=True) + if not candidates: + return None + try: + return json.loads(candidates[0].read_text(encoding="utf-8")) + except (json.JSONDecodeError, OSError): + return None + @staticmethod def _sanitize(obj: Any) -> Any: """Recursively convert non-JSON-serializable objects to safe types. @@ -191,7 +239,8 @@ class ReportStore: def save_scan(self, date: str, data: dict[str, Any]) -> Path: """Save macro scan summary JSON. - Path: ``{base}/daily/{date}[/runs/{run_id}]/market/macro_scan_summary.json`` + Flow path: ``{base}/daily/{date}/{flow_id}/market/report/{ts}_macro_scan_summary.json`` + Legacy path: ``{base}/daily/{date}[/runs/{run_id}]/market/macro_scan_summary.json`` Args: date: ISO date string, e.g. ``"2026-03-20"``. @@ -201,7 +250,10 @@ class ReportStore: Path of the written file. """ root = self._date_root(date, for_write=True) - path = root / "market" / "macro_scan_summary.json" + if self._flow_id: + path = root / "market" / "report" / f"{ts_now()}_macro_scan_summary.json" + else: + path = root / "market" / "macro_scan_summary.json" result = self._write_json(path, data) self._update_latest(date) return result @@ -209,8 +261,9 @@ class ReportStore: def load_scan(self, date: str) -> dict[str, Any] | None: """Load macro scan summary. Returns None if the file does not exist.""" root = self._date_root(date) - path = root / "market" / "macro_scan_summary.json" - return self._read_json(path) + if self._flow_id: + return self._load_latest_ts(root / "market" / "report", "macro_scan_summary.json") + return self._read_json(root / "market" / "macro_scan_summary.json") # ------------------------------------------------------------------ # Per-Ticker Analysis @@ -219,7 +272,8 @@ class ReportStore: def save_analysis(self, date: str, ticker: str, data: dict[str, Any]) -> Path: """Save per-ticker analysis report as JSON. - Path: ``{base}/daily/{date}[/runs/{run_id}]/{TICKER}/complete_report.json`` + Flow path: ``{base}/daily/{date}/{flow_id}/{TICKER}/report/{ts}_complete_report.json`` + Legacy path: ``{base}/daily/{date}[/runs/{run_id}]/{TICKER}/complete_report.json`` Args: date: ISO date string. @@ -227,7 +281,10 @@ class ReportStore: data: Analysis output dict. """ root = self._date_root(date, for_write=True) - path = root / ticker.upper() / "complete_report.json" + if self._flow_id: + path = root / ticker.upper() / "report" / f"{ts_now()}_complete_report.json" + else: + path = root / ticker.upper() / "complete_report.json" result = self._write_json(path, data) self._update_latest(date) return result @@ -235,8 +292,9 @@ class ReportStore: def load_analysis(self, date: str, ticker: str) -> dict[str, Any] | None: """Load per-ticker analysis JSON. Returns None if the file does not exist.""" root = self._date_root(date) - path = root / ticker.upper() / "complete_report.json" - return self._read_json(path) + if self._flow_id: + return self._load_latest_ts(root / ticker.upper() / "report", "complete_report.json") + return self._read_json(root / ticker.upper() / "complete_report.json") # ------------------------------------------------------------------ # Holding Reviews @@ -250,22 +308,29 @@ class ReportStore: ) -> Path: """Save holding reviewer output for one ticker. - Path: ``{base}/daily/{date}[/runs/{run_id}]/portfolio/{TICKER}_holding_review.json`` + Flow path: ``…/portfolio/report/{ts}_{TICKER}_holding_review.json`` + Legacy path: ``…/portfolio/{TICKER}_holding_review.json`` Args: date: ISO date string. ticker: Ticker symbol (stored as uppercase). data: HoldingReviewerAgent output dict. """ - path = self._portfolio_dir(date, for_write=True) / f"{ticker.upper()}_holding_review.json" + pdir = self._portfolio_dir(date, for_write=True) + if self._flow_id: + path = pdir / "report" / f"{ts_now()}_{ticker.upper()}_holding_review.json" + else: + path = pdir / f"{ticker.upper()}_holding_review.json" result = self._write_json(path, data) self._update_latest(date) return result def load_holding_review(self, date: str, ticker: str) -> dict[str, Any] | None: """Load holding review output. Returns None if the file does not exist.""" - path = self._portfolio_dir(date) / f"{ticker.upper()}_holding_review.json" - return self._read_json(path) + pdir = self._portfolio_dir(date) + if self._flow_id: + return self._load_latest_ts(pdir / "report", f"{ticker.upper()}_holding_review.json") + return self._read_json(pdir / f"{ticker.upper()}_holding_review.json") # ------------------------------------------------------------------ # Risk Metrics @@ -279,14 +344,19 @@ class ReportStore: ) -> Path: """Save risk computation results. - Path: ``{base}/daily/{date}[/runs/{run_id}]/portfolio/{portfolio_id}_risk_metrics.json`` + Flow path: ``…/portfolio/report/{ts}_{portfolio_id}_risk_metrics.json`` + Legacy path: ``…/portfolio/{portfolio_id}_risk_metrics.json`` Args: date: ISO date string. portfolio_id: UUID of the target portfolio. data: Risk metrics dict (Sharpe, Sortino, VaR, etc.). """ - path = self._portfolio_dir(date, for_write=True) / f"{portfolio_id}_risk_metrics.json" + pdir = self._portfolio_dir(date, for_write=True) + if self._flow_id: + path = pdir / "report" / f"{ts_now()}_{portfolio_id}_risk_metrics.json" + else: + path = pdir / f"{portfolio_id}_risk_metrics.json" result = self._write_json(path, data) self._update_latest(date) return result @@ -297,8 +367,10 @@ class ReportStore: portfolio_id: str, ) -> dict[str, Any] | None: """Load risk metrics. Returns None if the file does not exist.""" - path = self._portfolio_dir(date) / f"{portfolio_id}_risk_metrics.json" - return self._read_json(path) + pdir = self._portfolio_dir(date) + if self._flow_id: + return self._load_latest_ts(pdir / "report", f"{portfolio_id}_risk_metrics.json") + return self._read_json(pdir / f"{portfolio_id}_risk_metrics.json") # ------------------------------------------------------------------ # PM Decisions @@ -313,8 +385,8 @@ class ReportStore: ) -> Path: """Save PM agent decision. - JSON path: ``{base}/daily/{date}[/runs/{run_id}]/portfolio/{portfolio_id}_pm_decision.json`` - MD path: ``…/{portfolio_id}_pm_decision.md`` (written only when ``markdown`` is not None) + Flow path: ``…/portfolio/report/{ts}_{portfolio_id}_pm_decision.json`` + Legacy path: ``…/portfolio/{portfolio_id}_pm_decision.json`` Args: date: ISO date string. @@ -326,14 +398,26 @@ class ReportStore: Path of the written JSON file. """ pdir = self._portfolio_dir(date, for_write=True) - json_path = pdir / f"{portfolio_id}_pm_decision.json" - self._write_json(json_path, data) - if markdown is not None: - md_path = pdir / f"{portfolio_id}_pm_decision.md" - try: - md_path.write_text(markdown, encoding="utf-8") - except OSError as exc: - raise ReportStoreError(f"Failed to write {md_path}: {exc}") from exc + if self._flow_id: + ts = ts_now() + json_path = pdir / "report" / f"{ts}_{portfolio_id}_pm_decision.json" + self._write_json(json_path, data) + if markdown is not None: + md_path = pdir / "report" / f"{ts}_{portfolio_id}_pm_decision.md" + try: + md_path.parent.mkdir(parents=True, exist_ok=True) + md_path.write_text(markdown, encoding="utf-8") + except OSError as exc: + raise ReportStoreError(f"Failed to write {md_path}: {exc}") from exc + else: + json_path = pdir / f"{portfolio_id}_pm_decision.json" + self._write_json(json_path, data) + if markdown is not None: + md_path = pdir / f"{portfolio_id}_pm_decision.md" + try: + md_path.write_text(markdown, encoding="utf-8") + except OSError as exc: + raise ReportStoreError(f"Failed to write {md_path}: {exc}") from exc self._update_latest(date) return json_path @@ -343,8 +427,10 @@ class ReportStore: portfolio_id: str, ) -> dict[str, Any] | None: """Load PM decision JSON. Returns None if the file does not exist.""" - path = self._portfolio_dir(date) / f"{portfolio_id}_pm_decision.json" - return self._read_json(path) + pdir = self._portfolio_dir(date) + if self._flow_id: + return self._load_latest_ts(pdir / "report", f"{portfolio_id}_pm_decision.json") + return self._read_json(pdir / f"{portfolio_id}_pm_decision.json") def save_execution_result( self, @@ -354,14 +440,19 @@ class ReportStore: ) -> Path: """Save trade execution results. - Path: ``{base}/daily/{date}[/runs/{run_id}]/portfolio/{portfolio_id}_execution_result.json`` + Flow path: ``…/portfolio/report/{ts}_{portfolio_id}_execution_result.json`` + Legacy path: ``…/portfolio/{portfolio_id}_execution_result.json`` Args: date: ISO date string. portfolio_id: UUID of the target portfolio. data: TradeExecutor output dict. """ - path = self._portfolio_dir(date, for_write=True) / f"{portfolio_id}_execution_result.json" + pdir = self._portfolio_dir(date, for_write=True) + if self._flow_id: + path = pdir / "report" / f"{ts_now()}_{portfolio_id}_execution_result.json" + else: + path = pdir / f"{portfolio_id}_execution_result.json" result = self._write_json(path, data) self._update_latest(date) return result @@ -372,25 +463,40 @@ class ReportStore: portfolio_id: str, ) -> dict[str, Any] | None: """Load execution result. Returns None if the file does not exist.""" - path = self._portfolio_dir(date) / f"{portfolio_id}_execution_result.json" - return self._read_json(path) + pdir = self._portfolio_dir(date) + if self._flow_id: + return self._load_latest_ts(pdir / "report", f"{portfolio_id}_execution_result.json") + return self._read_json(pdir / f"{portfolio_id}_execution_result.json") def clear_portfolio_stage(self, date: str, portfolio_id: str) -> list[str]: """Delete PM decision and execution result files for a given date/portfolio. + For flow_id-based stores, deletes ALL timestamped versions. Returns a list of deleted file names so the caller can log what was removed. """ pdir = self._portfolio_dir(date, for_write=True) - targets = [ - pdir / f"{portfolio_id}_pm_decision.json", - pdir / f"{portfolio_id}_pm_decision.md", - pdir / f"{portfolio_id}_execution_result.json", - ] deleted = [] - for path in targets: - if path.exists(): - path.unlink() - deleted.append(path.name) + if self._flow_id: + report_dir = pdir / "report" + if report_dir.exists(): + for suffix in ( + f"{portfolio_id}_pm_decision.json", + f"{portfolio_id}_pm_decision.md", + f"{portfolio_id}_execution_result.json", + ): + for path in report_dir.glob(f"*_{suffix}"): + path.unlink() + deleted.append(path.name) + else: + targets = [ + pdir / f"{portfolio_id}_pm_decision.json", + pdir / f"{portfolio_id}_pm_decision.md", + pdir / f"{portfolio_id}_execution_result.json", + ] + for path in targets: + if path.exists(): + path.unlink() + deleted.append(path.name) return deleted # ------------------------------------------------------------------ @@ -452,6 +558,9 @@ class ReportStore: def list_run_metas(cls, base_dir: str | Path = "reports") -> list[dict[str, Any]]: """Scan for all run_meta.json files and return metadata dicts, newest first. + Searches both the new flow_id layout (``daily/*/{flow_id}/run_meta.json``) + and the legacy run_id layout (``daily/*/runs/*/run_meta.json``). + Args: base_dir: Root reports directory. @@ -459,14 +568,21 @@ class ReportStore: List of run_meta dicts sorted by ``created_at`` descending. """ base = Path(base_dir) - pattern = "daily/*/runs/*/run_meta.json" + # New flow_id layout: daily/{date}/{flow_id}/run_meta.json + # Legacy run_id layout: daily/{date}/runs/{run_id}/run_meta.json + patterns = ("daily/*/*/run_meta.json", "daily/*/runs/*/run_meta.json") + seen: set[str] = set() metas: list[dict[str, Any]] = [] - for path in base.glob(pattern): - try: - data = json.loads(path.read_text(encoding="utf-8")) - metas.append(data) - except (json.JSONDecodeError, OSError): - continue + for pattern in patterns: + for path in base.glob(pattern): + try: + data = json.loads(path.read_text(encoding="utf-8")) + key = data.get("id") or str(path) + if key not in seen: + seen.add(key) + metas.append(data) + except (json.JSONDecodeError, OSError): + continue metas.sort(key=lambda m: m.get("created_at", 0), reverse=True) return metas @@ -479,10 +595,14 @@ class ReportStore: ) -> Path: """Save analysts checkpoint for a ticker. - Path: ``{base}/daily/{date}[/runs/{run_id}]/{TICKER}/analysts_checkpoint.json`` + Flow path: ``…/{TICKER}/report/{ts}_analysts_checkpoint.json`` + Legacy path: ``…/{TICKER}/analysts_checkpoint.json`` """ root = self._date_root(date, for_write=True) - path = root / ticker.upper() / "analysts_checkpoint.json" + if self._flow_id: + path = root / ticker.upper() / "report" / f"{ts_now()}_analysts_checkpoint.json" + else: + path = root / ticker.upper() / "analysts_checkpoint.json" result = self._write_json(path, data) self._update_latest(date) return result @@ -492,18 +612,23 @@ class ReportStore: ) -> dict[str, Any] | None: """Load analysts checkpoint. Returns None if file does not exist.""" root = self._date_root(date) - path = root / ticker.upper() / "analysts_checkpoint.json" - return self._read_json(path) + if self._flow_id: + return self._load_latest_ts(root / ticker.upper() / "report", "analysts_checkpoint.json") + return self._read_json(root / ticker.upper() / "analysts_checkpoint.json") def save_trader_checkpoint( self, date: str, ticker: str, data: dict[str, Any] ) -> Path: """Save trader checkpoint for a ticker. - Path: ``{base}/daily/{date}[/runs/{run_id}]/{TICKER}/trader_checkpoint.json`` + Flow path: ``…/{TICKER}/report/{ts}_trader_checkpoint.json`` + Legacy path: ``…/{TICKER}/trader_checkpoint.json`` """ root = self._date_root(date, for_write=True) - path = root / ticker.upper() / "trader_checkpoint.json" + if self._flow_id: + path = root / ticker.upper() / "report" / f"{ts_now()}_trader_checkpoint.json" + else: + path = root / ticker.upper() / "trader_checkpoint.json" result = self._write_json(path, data) self._update_latest(date) return result @@ -513,8 +638,9 @@ class ReportStore: ) -> dict[str, Any] | None: """Load trader checkpoint. Returns None if file does not exist.""" root = self._date_root(date) - path = root / ticker.upper() / "trader_checkpoint.json" - return self._read_json(path) + if self._flow_id: + return self._load_latest_ts(root / ticker.upper() / "report", "trader_checkpoint.json") + return self._read_json(root / ticker.upper() / "trader_checkpoint.json") # ------------------------------------------------------------------ # PM Decisions @@ -523,7 +649,7 @@ class ReportStore: def list_pm_decisions(self, portfolio_id: str) -> list[Path]: """Return all saved PM decision JSON paths for portfolio_id, newest first. - Searches both run-scoped and legacy flat layouts. + Searches flow_id, run_id-scoped, and legacy flat layouts. Args: portfolio_id: UUID of the target portfolio. @@ -531,9 +657,15 @@ class ReportStore: Returns: Sorted list of Path objects, newest date first. """ + # New flow_id layout: daily/*/{flow_id}/portfolio/report/*_{pid}_pm_decision.json + flow_pattern = f"daily/*/*/portfolio/report/*_{portfolio_id}_pm_decision.json" # Run-scoped layout: daily/*/runs/*/portfolio/{pid}_pm_decision.json run_pattern = f"daily/*/runs/*/portfolio/{portfolio_id}_pm_decision.json" # Legacy flat layout: daily/*/portfolio/{pid}_pm_decision.json flat_pattern = f"daily/*/portfolio/{portfolio_id}_pm_decision.json" - paths = set(self._base_dir.glob(run_pattern)) | set(self._base_dir.glob(flat_pattern)) + paths = ( + set(self._base_dir.glob(flow_pattern)) + | set(self._base_dir.glob(run_pattern)) + | set(self._base_dir.glob(flat_pattern)) + ) return sorted(paths, reverse=True) diff --git a/tradingagents/portfolio/store_factory.py b/tradingagents/portfolio/store_factory.py index 34810f98..314e12b1 100644 --- a/tradingagents/portfolio/store_factory.py +++ b/tradingagents/portfolio/store_factory.py @@ -24,8 +24,9 @@ logger = logging.getLogger(__name__) def create_report_store( - run_id: str | None = None, + flow_id: str | None = None, *, + run_id: str | None = None, base_dir: str | None = None, mongo_uri: str | None = None, mongo_db: str | None = None, @@ -39,7 +40,11 @@ def create_report_store( 3. Fall back to the filesystem :class:`ReportStore`. Args: - run_id: Short identifier for the current run. + flow_id: Flow identifier grouping all phases of one analysis intent. + When set, the store uses timestamp-based report versioning + under ``{base}/daily/{date}/{flow_id}/``. + run_id: Legacy short identifier (backward compat). Prefer ``flow_id`` + for new code. base_dir: Override for the filesystem store's base directory. mongo_uri: MongoDB connection string (overrides env var). mongo_db: MongoDB database name (default ``"tradingagents"``). @@ -54,7 +59,7 @@ def create_report_store( _base = base_dir or os.getenv("PORTFOLIO_DATA_DIR") or os.getenv( "TRADINGAGENTS_REPORTS_DIR", "reports" ) - local_store = ReportStore(base_dir=_base, run_id=run_id) + local_store = ReportStore(base_dir=_base, flow_id=flow_id, run_id=run_id) if uri: try: @@ -63,9 +68,13 @@ def create_report_store( mongo_store = MongoReportStore( connection_string=uri, db_name=db, + flow_id=flow_id, run_id=run_id, ) - logger.info("Using Dual report store (local + MongoDB db=%s, run_id=%s)", db, run_id) + logger.info( + "Using Dual report store (local + MongoDB db=%s, flow_id=%s)", + db, flow_id or run_id, + ) return DualReportStore(local_store, mongo_store) except Exception: logger.warning( @@ -73,5 +82,8 @@ def create_report_store( exc_info=True, ) - logger.info("Using filesystem report store (base=%s, run_id=%s)", _base, run_id) + logger.info( + "Using filesystem report store (base=%s, flow_id=%s)", + _base, flow_id or run_id, + ) return local_store diff --git a/tradingagents/report_paths.py b/tradingagents/report_paths.py index b974dc17..744ef887 100644 --- a/tradingagents/report_paths.py +++ b/tradingagents/report_paths.py @@ -3,19 +3,30 @@ Every CLI command and internal save routine should use these helpers so that all generated artifacts land under a single ``reports/`` tree. -When a ``run_id`` is supplied the layout becomes:: +When a ``flow_id`` is supplied the layout becomes:: reports/ └── daily/{YYYY-MM-DD}/ - ├── runs/{run_id}/ - │ ├── market/ # scan results - │ ├── {TICKER}/ # per-ticker analysis - │ └── portfolio/ # PM artefacts + └── {flow_id}/ + ├── market/report/ # scan results (timestamped files) + ├── {TICKER}/report/ # per-ticker analysis (timestamped) + ├── portfolio/report/ # PM artefacts (timestamped) + ├── run_meta.json # metadata for the latest run + └── run_events.jsonl # events for the latest run + +When only a legacy ``run_id`` is supplied the layout becomes:: + + reports/ + └── daily/{YYYY-MM-DD}/ + ├── runs/{run_id}/ # legacy run-scoped layout + │ ├── market/ + │ ├── {TICKER}/ + │ └── portfolio/ ├── latest.json # pointer → most recent run_id └── daily_digest.md # append-only (shared across runs) -Without a ``run_id`` the legacy flat layout is preserved for backward -compatibility:: +Without a ``run_id`` or ``flow_id`` the legacy flat layout is preserved for +backward compatibility:: reports/ └── daily/{YYYY-MM-DD}/ @@ -38,14 +49,39 @@ REPORTS_ROOT = Path(os.getenv("TRADINGAGENTS_REPORTS_DIR") or "reports") # ────────────────────────────────────────────────────────────────────────────── -# Run-ID helpers +# ID / timestamp helpers # ────────────────────────────────────────────────────────────────────────────── -def generate_run_id() -> str: - """Return a short, human-readable run identifier (8-char hex).""" +def generate_flow_id() -> str: + """Return a short, human-readable flow identifier (8-char hex). + + A *flow* groups all phases of one analysis intent (scan + pipeline + + portfolio). Prefer :func:`generate_flow_id` for new code. + """ return uuid.uuid4().hex[:8] +def generate_run_id() -> str: + """Return a short, human-readable run identifier (8-char hex). + + .. deprecated:: + Use :func:`generate_flow_id` for new code. ``generate_run_id`` + is kept for backward compatibility only. + """ + return uuid.uuid4().hex[:8] + + +def ts_now() -> str: + """Return a sortable UTC timestamp string: ``'20260325T143022123Z'`` (ms precision). + + Used as a filename prefix so that lexicographic sort gives temporal order + and ``load_*`` helpers can always find the most recent report without a + separate pointer file. Millisecond precision prevents same-second collisions. + """ + dt = datetime.now(timezone.utc) + return dt.strftime("%Y%m%dT%H%M%S") + f"{dt.microsecond // 1000:03d}Z" + + def write_latest_pointer(date: str, run_id: str, base_dir: Path | None = None) -> Path: """Write ``{base}/daily/{date}/latest.json`` pointing to *run_id*. @@ -92,32 +128,62 @@ def read_latest_pointer(date: str, base_dir: Path | None = None) -> str | None: # Path helpers # ────────────────────────────────────────────────────────────────────────────── -def _run_prefix(date: str, run_id: str | None) -> Path: - """Base directory for a date, optionally scoped by run_id.""" +def _run_prefix(date: str, run_id: str | None, flow_id: str | None = None) -> Path: + """Base directory for a date, scoped by flow_id or legacy run_id. + + Resolution order: + 1. ``flow_id`` → ``daily/{date}/{flow_id}`` (new layout, no ``runs/`` prefix) + 2. ``run_id`` → ``daily/{date}/runs/{run_id}`` (legacy layout) + 3. Neither → ``daily/{date}`` (flat legacy layout) + """ daily = REPORTS_ROOT / "daily" / date + if flow_id: + return daily / flow_id if run_id: return daily / "runs" / run_id return daily -def get_daily_dir(date: str, run_id: str | None = None) -> Path: - """``reports/daily/{date}/`` or ``reports/daily/{date}/runs/{run_id}/``""" - return _run_prefix(date, run_id) +def get_daily_dir( + date: str, + run_id: str | None = None, + *, + flow_id: str | None = None, +) -> Path: + """``reports/daily/{date}/[{flow_id}/|runs/{run_id}/]``""" + return _run_prefix(date, run_id, flow_id) -def get_market_dir(date: str, run_id: str | None = None) -> Path: - """``…/{date}[/runs/{run_id}]/market/``""" - return get_daily_dir(date, run_id) / "market" +def get_market_dir( + date: str, + run_id: str | None = None, + *, + flow_id: str | None = None, +) -> Path: + """``…/{date}[/{flow_id}|/runs/{run_id}]/market/``""" + return get_daily_dir(date, run_id, flow_id=flow_id) / "market" -def get_ticker_dir(date: str, ticker: str, run_id: str | None = None) -> Path: - """``…/{date}[/runs/{run_id}]/{TICKER}/``""" - return get_daily_dir(date, run_id) / ticker.upper() +def get_ticker_dir( + date: str, + ticker: str, + run_id: str | None = None, + *, + flow_id: str | None = None, +) -> Path: + """``…/{date}[/{flow_id}|/runs/{run_id}]/{TICKER}/``""" + return get_daily_dir(date, run_id, flow_id=flow_id) / ticker.upper() -def get_eval_dir(date: str, ticker: str, run_id: str | None = None) -> Path: - """``…/{date}[/runs/{run_id}]/{TICKER}/eval/``""" - return get_ticker_dir(date, ticker, run_id) / "eval" +def get_eval_dir( + date: str, + ticker: str, + run_id: str | None = None, + *, + flow_id: str | None = None, +) -> Path: + """``…/{date}[/{flow_id}|/runs/{run_id}]/{TICKER}/eval/``""" + return get_ticker_dir(date, ticker, run_id, flow_id=flow_id) / "eval" def get_digest_path(date: str) -> Path: