diff --git a/agent_os/backend/main.py b/agent_os/backend/main.py index 437c1cae..cc7c837b 100644 --- a/agent_os/backend/main.py +++ b/agent_os/backend/main.py @@ -35,6 +35,33 @@ app.include_router(portfolios.router) app.include_router(runs.router) app.include_router(websocket.router) +@app.on_event("startup") +async def hydrate_runs_from_disk(): + """Populate the in-memory runs store from persisted run_meta.json files.""" + from agent_os.backend.store import runs + from tradingagents.portfolio.report_store import ReportStore + try: + metas = ReportStore.list_run_metas() + for meta in metas: + rid = meta.get("id", "") + if rid and rid not in runs: + runs[rid] = { + "id": rid, + "short_rid": meta.get("short_rid", rid[:8]), + "type": meta.get("type", ""), + "status": meta.get("status", "completed"), + "created_at": meta.get("created_at", 0), + "user_id": meta.get("user_id", "anonymous"), + "params": meta.get("params", {}), + "rerun_seq": meta.get("rerun_seq", 0), + "events": [], # loaded lazily on demand + } + if metas: + logger.info("Hydrated %d historical runs from disk", len(metas)) + except Exception: + logger.exception("Failed to hydrate runs from disk on startup") + + @app.get("/") async def health_check(): return {"status": "ok", "service": "AgentOS API"} diff --git a/agent_os/backend/routes/runs.py b/agent_os/backend/routes/runs.py index 3810cab4..a2719506 100644 --- a/agent_os/backend/routes/runs.py +++ b/agent_os/backend/routes/runs.py @@ -5,8 +5,9 @@ import uuid import time from agent_os.backend.store import runs from agent_os.backend.dependencies import get_current_user -from agent_os.backend.services.langgraph_engine import LangGraphEngine +from agent_os.backend.services.langgraph_engine import LangGraphEngine, NODE_TO_PHASE from agent_os.backend.services.mock_engine import MockEngine +from tradingagents.report_paths import generate_run_id logger = logging.getLogger("agent_os.runs") @@ -16,6 +17,37 @@ engine = LangGraphEngine() mock_engine = MockEngine() +def _persist_run_to_disk(run_id: str) -> None: + """Persist run metadata and events to the report store.""" + run = runs.get(run_id) + if not run: + return + try: + from tradingagents.portfolio.store_factory import create_report_store + short_rid = run.get("short_rid") or run_id[:8] + store = create_report_store(run_id=short_rid) + date = (run.get("params") or {}).get("date", "") + if not date: + return + meta = { + "id": run_id, + "short_rid": short_rid, + "type": run.get("type", ""), + "status": run.get("status", ""), + "created_at": run.get("created_at", 0), + "completed_at": time.time(), + "user_id": run.get("user_id", "anonymous"), + "date": date, + "params": run.get("params", {}), + "rerun_seq": run.get("rerun_seq", 0), + } + store.save_run_meta(date, meta) + store.save_run_events(date, run.get("events", [])) + logger.info("Persisted run to disk run=%s rid=%s", run_id, short_rid) + except Exception: + logger.exception("Failed to persist run to disk run=%s", run_id) + + async def _run_and_store(run_id: str, gen: AsyncGenerator[Dict[str, Any], None]) -> None: """Drive an engine generator, updating run status and caching events.""" runs[run_id]["status"] = "running" @@ -28,22 +60,26 @@ async def _run_and_store(run_id: str, gen: AsyncGenerator[Dict[str, Any], None]) runs[run_id]["status"] = "failed" runs[run_id]["error"] = str(exc) logger.exception("Run failed run=%s", run_id) + finally: + _persist_run_to_disk(run_id) @router.post("/scan") async def trigger_scan( - background_tasks: BackgroundTasks, - params: Dict[str, Any] = None, + background_tasks: BackgroundTasks, + params: Dict[str, Any] = None, user: dict = Depends(get_current_user) ): run_id = str(uuid.uuid4()) runs[run_id] = { "id": run_id, + "short_rid": generate_run_id(), "type": "scan", "status": "queued", "created_at": time.time(), "user_id": user["user_id"], - "params": params or {} + "params": params or {}, + "rerun_seq": 0, } logger.info("Queued SCAN run=%s user=%s", run_id, user["user_id"]) background_tasks.add_task(_run_and_store, run_id, engine.run_scan(run_id, params or {})) @@ -51,18 +87,20 @@ async def trigger_scan( @router.post("/pipeline") async def trigger_pipeline( - background_tasks: BackgroundTasks, - params: Dict[str, Any] = None, + background_tasks: BackgroundTasks, + params: Dict[str, Any] = None, user: dict = Depends(get_current_user) ): run_id = str(uuid.uuid4()) runs[run_id] = { "id": run_id, + "short_rid": generate_run_id(), "type": "pipeline", "status": "queued", "created_at": time.time(), "user_id": user["user_id"], - "params": params or {} + "params": params or {}, + "rerun_seq": 0, } logger.info("Queued PIPELINE run=%s user=%s", run_id, user["user_id"]) background_tasks.add_task(_run_and_store, run_id, engine.run_pipeline(run_id, params or {})) @@ -70,18 +108,20 @@ async def trigger_pipeline( @router.post("/portfolio") async def trigger_portfolio( - background_tasks: BackgroundTasks, - params: Dict[str, Any] = None, + background_tasks: BackgroundTasks, + params: Dict[str, Any] = None, user: dict = Depends(get_current_user) ): run_id = str(uuid.uuid4()) runs[run_id] = { "id": run_id, + "short_rid": generate_run_id(), "type": "portfolio", "status": "queued", "created_at": time.time(), "user_id": user["user_id"], - "params": params or {} + "params": params or {}, + "rerun_seq": 0, } logger.info("Queued PORTFOLIO run=%s user=%s", run_id, user["user_id"]) background_tasks.add_task(_run_and_store, run_id, engine.run_portfolio(run_id, params or {})) @@ -89,18 +129,20 @@ async def trigger_portfolio( @router.post("/auto") async def trigger_auto( - background_tasks: BackgroundTasks, - params: Dict[str, Any] = None, + background_tasks: BackgroundTasks, + params: Dict[str, Any] = None, user: dict = Depends(get_current_user) ): run_id = str(uuid.uuid4()) runs[run_id] = { "id": run_id, + "short_rid": generate_run_id(), "type": "auto", "status": "queued", "created_at": time.time(), "user_id": user["user_id"], - "params": params or {} + "params": params or {}, + "rerun_seq": 0, } logger.info("Queued AUTO run=%s user=%s", run_id, user["user_id"]) background_tasks.add_task(_run_and_store, run_id, engine.run_auto(run_id, params or {})) @@ -125,11 +167,13 @@ async def trigger_mock( run_id = str(uuid.uuid4()) runs[run_id] = { "id": run_id, + "short_rid": generate_run_id(), "type": "mock", "status": "queued", "created_at": time.time(), "user_id": user["user_id"], "params": p, + "rerun_seq": 0, } logger.info( "Queued MOCK run=%s mock_type=%s user=%s", @@ -140,6 +184,70 @@ async def trigger_mock( ) return {"run_id": run_id, "status": "queued"} +async def _append_and_store(run_id: str, gen) -> None: + """Append events from a re-run generator to an existing run entry.""" + run = runs.get(run_id) + if not run: + return + run["rerun_seq"] = run.get("rerun_seq", 0) + 1 + run["status"] = "running" + try: + async for event in gen: + event["rerun_seq"] = run["rerun_seq"] + if "events" not in run: + run["events"] = [] + run["events"].append(event) + run["status"] = "completed" + except Exception as exc: + run["status"] = "failed" + run["error"] = str(exc) + logger.exception("Rerun failed run=%s", run_id) + finally: + _persist_run_to_disk(run_id) + + +@router.post("/rerun-node") +async def trigger_rerun_node( + background_tasks: BackgroundTasks, + params: Dict[str, Any], + user: dict = Depends(get_current_user), +): + """Re-run a phase of the trading pipeline for a specific ticker. + + Body: { run_id, node_id, identifier, date, portfolio_id } + """ + run_id = params.get("run_id", "") + node_id = params.get("node_id", "") + identifier = params.get("identifier", "") + date = params.get("date", "") + portfolio_id = params.get("portfolio_id", "main_portfolio") + + if run_id not in runs: + raise HTTPException(status_code=404, detail="Run not found") + if node_id not in NODE_TO_PHASE: + raise HTTPException(status_code=422, detail=f"Unknown node_id: {node_id}") + if not identifier: + raise HTTPException(status_code=422, detail="identifier (ticker) is required") + + phase = NODE_TO_PHASE[node_id] + rerun_params = { + "ticker": identifier, + "date": date or (runs[run_id].get("params") or {}).get("date", ""), + "portfolio_id": portfolio_id, + } + + logger.info( + "Queued RERUN run=%s node=%s phase=%s ticker=%s user=%s", + run_id, node_id, phase, identifier, user["user_id"], + ) + background_tasks.add_task( + _append_and_store, + run_id, + engine.run_pipeline_from_phase(f"{run_id}_rerun_{phase}", rerun_params, phase), + ) + return {"run_id": run_id, "phase": phase, "status": "queued"} + + @router.delete("/portfolio-stage") async def reset_portfolio_stage( params: Dict[str, Any], @@ -170,4 +278,21 @@ async def list_runs(user: dict = Depends(get_current_user)): async def get_run_status(run_id: str, user: dict = Depends(get_current_user)): if run_id not in runs: raise HTTPException(status_code=404, detail="Run not found") - return runs[run_id] + run = runs[run_id] + # Lazy-load events from disk if they were not kept in memory + if ( + not run.get("events") + and run.get("status") in ("completed", "failed") + ): + try: + from tradingagents.portfolio.store_factory import create_report_store + short_rid = run.get("short_rid") or run_id[:8] + store = create_report_store(run_id=short_rid) + date = (run.get("params") or {}).get("date", "") + if date: + events = store.load_run_events(date) + if events: + run["events"] = events + except Exception: + logger.warning("Failed to lazy-load events for run=%s", run_id) + return run diff --git a/agent_os/backend/services/langgraph_engine.py b/agent_os/backend/services/langgraph_engine.py index 868c826a..d3212a37 100644 --- a/agent_os/backend/services/langgraph_engine.py +++ b/agent_os/backend/services/langgraph_engine.py @@ -137,6 +137,25 @@ _TOOL_SERVICE_MAP: Dict[str, str] = { } +NODE_TO_PHASE = { + # Phase analysts: re-run full pipeline from scratch + "Market Analyst": "analysts", + "Social Analyst": "analysts", + "News Analyst": "analysts", + "Fundamentals Analyst": "analysts", + # Phase debate_and_trader: load analysts_checkpoint, skip analysts + "Bull Researcher": "debate_and_trader", + "Bear Researcher": "debate_and_trader", + "Research Manager": "debate_and_trader", + "Trader": "debate_and_trader", + # Phase risk: load trader_checkpoint, skip analysts+debate+trader + "Aggressive Analyst": "risk", + "Conservative Analyst": "risk", + "Neutral Analyst": "risk", + "Portfolio Manager": "risk", +} + + class LangGraphEngine: """Orchestrates LangGraph pipeline executions and streams events.""" @@ -191,7 +210,10 @@ class LangGraphEngine: store = create_report_store(run_id=short_rid) rl = self._start_run_logger(run_id) - scanner = ScannerGraph(config=self.config) + scan_config = {**self.config} + if params.get("max_tickers"): + scan_config["max_auto_tickers"] = int(params["max_tickers"]) + scanner = ScannerGraph(config=scan_config) logger.info("Starting SCAN run=%s date=%s rid=%s", run_id, date, short_rid) yield self._system_log(f"Starting macro scan for {date}") @@ -401,6 +423,32 @@ class LangGraphEngine: if digest_content: append_to_digest(date, "analyze", ticker, digest_content) + # Save analysts checkpoint (all 4 analyst reports populated) + _analyst_keys = ("market_report", "sentiment_report", "news_report", "fundamentals_report") + if all(final_state.get(k) for k in _analyst_keys): + analysts_ckpt = { + "company_of_interest": ticker, + "trade_date": date, + **{k: serializable_state.get(k, "") for k in _analyst_keys}, + "macro_regime_report": serializable_state.get("macro_regime_report", ""), + "messages": serializable_state.get("messages", []), + } + store.save_analysts_checkpoint(date, ticker, analysts_ckpt) + + # Save trader checkpoint (trader output populated) + if final_state.get("trader_investment_plan"): + trader_ckpt = { + "company_of_interest": ticker, + "trade_date": date, + **{k: serializable_state.get(k, "") for k in _analyst_keys}, + "macro_regime_report": serializable_state.get("macro_regime_report", ""), + "investment_debate_state": serializable_state.get("investment_debate_state", {}), + "investment_plan": serializable_state.get("investment_plan", ""), + "trader_investment_plan": serializable_state.get("trader_investment_plan", ""), + "messages": serializable_state.get("messages", []), + } + store.save_trader_checkpoint(date, ticker, trader_ckpt) + yield self._system_log(f"Analysis report for {ticker} saved to {save_dir}") logger.info("Saved pipeline report run=%s ticker=%s dir=%s", run_id, ticker, save_dir) except Exception as exc: @@ -630,6 +678,140 @@ class LangGraphEngine: yield self._system_log(f"Error during trade execution: {exc}") raise + async def run_pipeline_from_phase( + self, run_id: str, params: Dict[str, Any], phase: str, + ) -> AsyncGenerator[Dict[str, Any], None]: + """Re-run a single ticker's pipeline from a specific phase. + + Phases: + analysts - full re-run (delegates to run_pipeline) + debate_and_trader - load analysts_checkpoint, run debate+trader+risk subgraph + risk - load trader_checkpoint, run risk subgraph only + + After the subgraph completes the ticker's reports and checkpoints are + overwritten and the portfolio manager is re-run so that the PM + decision reflects the updated ticker analysis. + """ + ticker = params.get("ticker", params.get("identifier", "AAPL")) + date = params.get("date", time.strftime("%Y-%m-%d")) + portfolio_id = params.get("portfolio_id", "main_portfolio") + + store = create_report_store() + short_rid = generate_run_id() + writer_store = create_report_store(run_id=short_rid) + + if phase == "analysts": + # Full re-run + async for evt in self.run_pipeline(run_id, {"ticker": ticker, "date": date}): + yield evt + elif phase == "debate_and_trader": + yield self._system_log(f"Loading analysts checkpoint for {ticker}...") + ckpt = store.load_analysts_checkpoint(date, ticker) + if not ckpt: + yield self._system_log(f"No analysts checkpoint found for {ticker} — falling back to full re-run") + async for evt in self.run_pipeline(run_id, {"ticker": ticker, "date": date}): + yield evt + else: + yield self._system_log(f"Running debate + trader + risk for {ticker} from checkpoint...") + graph_wrapper = TradingAgentsGraph(config=self.config, debug=True) + initial_state = graph_wrapper.propagator.create_initial_state(ticker, date) + # Overlay checkpoint data onto initial state + for k, v in ckpt.items(): + if k in initial_state or k in ("market_report", "sentiment_report", "news_report", "fundamentals_report", "macro_regime_report"): + initial_state[k] = v + + rl = self._start_run_logger(run_id) + self._node_start_times[run_id] = {} + self._run_identifiers[run_id] = ticker.upper() + final_state: Dict[str, Any] = {} + + async for event in graph_wrapper.debate_graph.astream_events( + initial_state, version="v2", + config={"recursion_limit": graph_wrapper.propagator.max_recur_limit, "callbacks": [rl.callback]}, + ): + if self._is_root_chain_end(event): + output = (event.get("data") or {}).get("output") + if isinstance(output, dict): + final_state = output + mapped = self._map_langgraph_event(run_id, event) + if mapped: + yield mapped + + self._node_start_times.pop(run_id, None) + self._node_prompts.pop(run_id, None) + self._run_identifiers.pop(run_id, None) + + if final_state: + serializable_state = self._sanitize_for_json(final_state) + writer_store.save_analysis(date, ticker, serializable_state) + # Overwrite checkpoints + _analyst_keys = ("market_report", "sentiment_report", "news_report", "fundamentals_report") + if final_state.get("trader_investment_plan"): + trader_ckpt = { + "company_of_interest": ticker, + "trade_date": date, + **{k: serializable_state.get(k, "") for k in _analyst_keys}, + "macro_regime_report": serializable_state.get("macro_regime_report", ""), + "investment_debate_state": serializable_state.get("investment_debate_state", {}), + "investment_plan": serializable_state.get("investment_plan", ""), + "trader_investment_plan": serializable_state.get("trader_investment_plan", ""), + "messages": serializable_state.get("messages", []), + } + writer_store.save_trader_checkpoint(date, ticker, trader_ckpt) + + self._finish_run_logger(run_id, get_ticker_dir(date, ticker, run_id=short_rid)) + elif phase == "risk": + yield self._system_log(f"Loading trader checkpoint for {ticker}...") + ckpt = store.load_trader_checkpoint(date, ticker) + if not ckpt: + yield self._system_log(f"No trader checkpoint found for {ticker} — falling back to full re-run") + async for evt in self.run_pipeline(run_id, {"ticker": ticker, "date": date}): + yield evt + else: + yield self._system_log(f"Running risk phase for {ticker} from checkpoint...") + graph_wrapper = TradingAgentsGraph(config=self.config, debug=True) + initial_state = graph_wrapper.propagator.create_initial_state(ticker, date) + for k, v in ckpt.items(): + if k != "messages": + initial_state[k] = v + + rl = self._start_run_logger(run_id) + self._node_start_times[run_id] = {} + self._run_identifiers[run_id] = ticker.upper() + final_state: Dict[str, Any] = {} + + async for event in graph_wrapper.risk_graph.astream_events( + initial_state, version="v2", + config={"recursion_limit": graph_wrapper.propagator.max_recur_limit, "callbacks": [rl.callback]}, + ): + if self._is_root_chain_end(event): + output = (event.get("data") or {}).get("output") + if isinstance(output, dict): + final_state = output + mapped = self._map_langgraph_event(run_id, event) + if mapped: + yield mapped + + self._node_start_times.pop(run_id, None) + self._node_prompts.pop(run_id, None) + self._run_identifiers.pop(run_id, None) + + if final_state: + serializable_state = self._sanitize_for_json(final_state) + writer_store.save_analysis(date, ticker, serializable_state) + + self._finish_run_logger(run_id, get_ticker_dir(date, ticker, run_id=short_rid)) + else: + yield self._system_log(f"Unknown phase '{phase}' — skipping") + return + + # Cascade: re-run portfolio manager with updated data + yield self._system_log(f"Cascading: re-running portfolio manager after {ticker} {phase} re-run...") + async for evt in self.run_portfolio( + f"{run_id}_cascade_pm", {"date": date, "portfolio_id": portfolio_id} + ): + yield evt + async def run_auto( self, run_id: str, params: Dict[str, Any] ) -> AsyncGenerator[Dict[str, Any], None]: @@ -660,6 +842,10 @@ class LangGraphEngine: scan_data = store.load_scan(date) scan_tickers = self._extract_tickers_from_scan_data(scan_data) + # Safety cap: truncate scan candidates to max_auto_tickers (portfolio holdings added after) + max_t = int(params.get("max_tickers") or self.config.get("max_auto_tickers") or 10) + scan_tickers = scan_tickers[:max_t] + # Also include tickers from current portfolio holdings so the PM agent # has fresh analysis for existing positions (hold/sell/add decisions). portfolio_id = params.get("portfolio_id", "main_portfolio") diff --git a/agent_os/frontend/src/Dashboard.tsx b/agent_os/frontend/src/Dashboard.tsx index 1c4d54c1..fffa5718 100644 --- a/agent_os/frontend/src/Dashboard.tsx +++ b/agent_os/frontend/src/Dashboard.tsx @@ -34,8 +34,14 @@ import { Tooltip, Collapse, useToast, + Popover, + PopoverTrigger, + PopoverContent, + PopoverHeader, + PopoverBody, + PopoverCloseButton, } from '@chakra-ui/react'; -import { LayoutDashboard, Wallet, Settings, Terminal as TerminalIcon, ChevronRight, Eye, Search, BarChart3, Bot, ChevronDown, ChevronUp, FlaskConical, Trash2 } from 'lucide-react'; +import { LayoutDashboard, Wallet, Settings, Terminal as TerminalIcon, ChevronRight, Eye, Search, BarChart3, Bot, ChevronDown, ChevronUp, FlaskConical, Trash2, History, Loader2 } from 'lucide-react'; import { MetricHeader } from './components/MetricHeader'; import { AgentGraph } from './components/AgentGraph'; import { PortfolioViewer } from './components/PortfolioViewer'; @@ -54,6 +60,7 @@ interface RunParams { date: string; ticker: string; portfolio_id: string; + max_auto_tickers: string; mock_type: MockType; speed: string; force: boolean; @@ -393,6 +400,7 @@ export const Dashboard: React.FC = () => { date: new Date().toISOString().split('T')[0], ticker: 'AAPL', portfolio_id: 'main_portfolio', + max_auto_tickers: '', mock_type: 'pipeline', speed: '3', force: false, @@ -456,6 +464,7 @@ export const Dashboard: React.FC = () => { date: effectiveParams.date, ticker: effectiveParams.ticker, force: effectiveParams.force, + ...(effectiveParams.max_auto_tickers ? { max_tickers: parseInt(effectiveParams.max_auto_tickers, 10) } : {}), }; const res = await axios.post(`${API_BASE}/run/${type}`, body); setActiveRunId(res.data.run_id); @@ -468,14 +477,19 @@ export const Dashboard: React.FC = () => { }; /** Re-run triggered from a graph node's Re-run button. */ - const handleNodeRerun = useCallback((identifier: string, _nodeId: string) => { + const handleNodeRerun = useCallback((identifier: string, nodeId: string) => { + // If we have an active loaded run and the node is in NODE_TO_PHASE, use phase-level rerun + if (activeRunId && nodeId && identifier && identifier !== 'MARKET' && identifier !== '') { + triggerNodeRerun(activeRunId, identifier, nodeId); + return; + } if (identifier === 'MARKET' || identifier === '') { startRun('scan'); } else { startRun('pipeline', { ticker: identifier }); } // eslint-disable-next-line react-hooks/exhaustive-deps - }, [isRunning, params]); + }, [isRunning, params, activeRunId]); const resetPortfolioStage = async () => { if (!params.date || !params.portfolio_id) { @@ -498,6 +512,70 @@ export const Dashboard: React.FC = () => { } }; + // ─── History panel state ─────────────────────────────────────────── + const [historyRuns, setHistoryRuns] = useState([]); + const [historyLoading, setHistoryLoading] = useState(false); + + const loadHistory = async () => { + setHistoryLoading(true); + try { + const res = await axios.get(`${API_BASE}/run/`); + const sorted = (res.data as any[]).sort((a: any, b: any) => (b.created_at || 0) - (a.created_at || 0)); + setHistoryRuns(sorted); + } catch (err) { + console.error('Failed to load run history', err); + } finally { + setHistoryLoading(false); + } + }; + + const loadRun = (run: any) => { + clearEvents(); + // Pre-fill params from run + if (run.params) { + setParams((p) => ({ + ...p, + date: run.params.date || p.date, + ticker: run.params.ticker || p.ticker, + portfolio_id: run.params.portfolio_id || p.portfolio_id, + })); + } + setActiveRunId(null); + setTimeout(() => setActiveRunId(run.id), 0); + }; + + /** Trigger a phase-level re-run for a specific node on the active run. */ + const triggerNodeRerun = async (runId: string, identifier: string, nodeId: string) => { + try { + const res = await axios.post(`${API_BASE}/run/rerun-node`, { + run_id: runId, + node_id: nodeId, + identifier, + date: params.date, + portfolio_id: params.portfolio_id, + }); + // Force WebSocket reconnect to stream new events + setActiveRunId(null); + setTimeout(() => setActiveRunId(res.data.run_id), 0); + toast({ + title: `Re-running ${res.data.phase} phase for ${identifier}`, + status: 'info', + duration: 3000, + isClosable: true, + position: 'top', + }); + } catch (err: any) { + toast({ + title: 'Re-run failed', + description: err?.response?.data?.detail || String(err), + status: 'error', + duration: 5000, + isClosable: true, + position: 'top', + }); + } + }; + /** Open the full-screen event detail modal */ const openModal = useCallback((evt: AgentEvent) => { setModalEvent(evt); @@ -642,6 +720,52 @@ export const Dashboard: React.FC = () => { {status.toUpperCase()} + + + } + size="xs" + variant="ghost" + color="whiteAlpha.600" + /> + + + + Run History + + {historyLoading && } + {!historyLoading && historyRuns.length === 0 && ( + No runs found + )} + + {historyRuns.map((r) => ( + loadRun(r)} + align="center" + gap={2} + > + + {r.type} + + {(r.params || {}).date || '—'} + + {r.status} + + + {r.created_at ? new Date(r.created_at * 1000).toLocaleTimeString() : ''} + + + ))} + + + + : } @@ -689,6 +813,15 @@ export const Dashboard: React.FC = () => { onChange={(e) => setParams((p) => ({ ...p, portfolio_id: e.target.value }))} /> + + Max Tickers + setParams((p) => ({ ...p, max_auto_tickers: e.target.value }))} /> + (scan only, portfolio always included) + {/* Mock-specific controls */} Mock settings diff --git a/docs/agent/CURRENT_STATE.md b/docs/agent/CURRENT_STATE.md index cf112884..12c02d05 100644 --- a/docs/agent/CURRENT_STATE.md +++ b/docs/agent/CURRENT_STATE.md @@ -1,19 +1,22 @@ # Current Milestone -LLM provider policy error handling complete. Per-tier fallback models (`TRADINGAGENTS_QUICK/MID/DEEP_THINK_FALLBACK_LLM`) auto-retry blocked pipelines. PR#106 observability + MongoDB merged. PR#107 and PR#108 merged. All tests passing (2 pre-existing failures excluded). +FE improvements: configurable max_auto_tickers + run persistence with phase-level node re-run. PR pending review on `feat/fe-max-tickers-load-run`. # Recent Progress -- **PR#108 merged**: Per-tier LLM fallback for 404/policy errors — `_is_policy_error()` + `_build_fallback_config()` in engine, 6 new fallback config keys, clean `logger.error` (no traceback) for policy issues (ADR 017) -- **PR#107 merged**: `save_holding_review` per-ticker fix, `RunLogger` threading.local → contextvars.ContextVar, ADR 016 PR#106 review findings (corrected post-verification) -- **PR#106 merged**: MongoDB report store, RunLogger observability, reflexion memory, run-ID namespaced reports, store factory with graceful filesystem fallback +- **feat/fe-max-tickers-load-run**: Two features implemented: + - Feature 1: `max_auto_tickers` config key + macro synthesis prompt injection + frontend number input + backend safety cap + - Feature 2: Run persistence (run_meta.json + run_events.jsonl), intermediate phase checkpoints (analysts/trader), phase subgraphs (debate + risk), POST /api/run/rerun-node endpoint, frontend history panel + modified node re-run +- **PR#108 merged**: Per-tier LLM fallback for 404/policy errors +- **PR#107 merged**: `save_holding_review` per-ticker fix, `RunLogger` threading.local to contextvars.ContextVar +- **PR#106 merged**: MongoDB report store, RunLogger observability, reflexion memory, run-ID namespaced reports - **Smart Money Scanner**: Finviz integration with Golden Overlap strategy (ADR 014) - **AgentOS**: Full-stack visual observability layer (FastAPI + React + ReactFlow) -- **Portfolio Manager**: Phases 1–10 complete (models, agents, CLI, stop-loss/take-profit) +- **Portfolio Manager**: Phases 1-10 complete (models, agents, CLI, stop-loss/take-profit) # In Progress -- None +- feat/fe-max-tickers-load-run PR under review # Active Blockers diff --git a/tradingagents/agents/scanners/macro_synthesis.py b/tradingagents/agents/scanners/macro_synthesis.py index 2ae76162..39ecdc3e 100644 --- a/tradingagents/agents/scanners/macro_synthesis.py +++ b/tradingagents/agents/scanners/macro_synthesis.py @@ -8,7 +8,7 @@ from tradingagents.agents.utils.json_utils import extract_json logger = logging.getLogger(__name__) -def create_macro_synthesis(llm): +def create_macro_synthesis(llm, max_scan_tickers: int = 10): def macro_synthesis_node(state): scan_date = state["scan_date"] @@ -44,7 +44,7 @@ def create_macro_synthesis(llm): "Synthesize all reports into a structured output with: " "(1) Executive summary of the macro environment, " "(2) Top macro themes with conviction levels, " - "(3) A list of 8-10 specific stocks worth investigating with ticker, name, sector, rationale, " + f"(3) A list of exactly {max_scan_tickers} specific stocks worth investigating with ticker, name, sector, rationale, " "thesis_angle (growth/value/catalyst/turnaround/defensive/momentum), conviction (high/medium/low), " "key_catalysts, and risks. " "Output your response as valid JSON matching this schema:\n" diff --git a/tradingagents/default_config.py b/tradingagents/default_config.py index b02f3082..4a9f94c5 100644 --- a/tradingagents/default_config.py +++ b/tradingagents/default_config.py @@ -106,6 +106,10 @@ DEFAULT_CONFIG = { # Controls how many per-ticker analysis pipelines run in parallel during # 'auto' mode (CLI and AgentOS). Set higher if your API plan supports it. "max_concurrent_pipelines": _env_int("MAX_CONCURRENT_PIPELINES", 2), + # Maximum number of scan-candidate tickers the macro synthesis LLM produces + # in auto mode. Portfolio holdings are always included regardless. + # Set to 0 or leave unset for the default (10). + "max_auto_tickers": _env_int("MAX_AUTO_TICKERS", 10), # Data vendor configuration # Category-level configuration (default for all tools in category) "data_vendors": { diff --git a/tradingagents/graph/scanner_graph.py b/tradingagents/graph/scanner_graph.py index 3d610d4b..655408bf 100644 --- a/tradingagents/graph/scanner_graph.py +++ b/tradingagents/graph/scanner_graph.py @@ -45,13 +45,15 @@ class ScannerGraph: mid_llm = self._create_llm("mid_think") deep_llm = self._create_llm("deep_think") + max_scan_tickers = int(self.config.get("max_auto_tickers", 10)) + agents = { "geopolitical_scanner": create_geopolitical_scanner(quick_llm), "market_movers_scanner": create_market_movers_scanner(quick_llm), "sector_scanner": create_sector_scanner(quick_llm), "smart_money_scanner": create_smart_money_scanner(quick_llm), "industry_deep_dive": create_industry_deep_dive(mid_llm), - "macro_synthesis": create_macro_synthesis(deep_llm), + "macro_synthesis": create_macro_synthesis(deep_llm, max_scan_tickers=max_scan_tickers), } setup = ScannerGraphSetup(agents) diff --git a/tradingagents/graph/setup.py b/tradingagents/graph/setup.py index 418ae227..cd67a53b 100644 --- a/tradingagents/graph/setup.py +++ b/tradingagents/graph/setup.py @@ -202,3 +202,135 @@ class GraphSetup: # Compile and return return workflow.compile() + + def build_debate_subgraph(self): + """Build a subgraph that starts from Bull Researcher (skips analysts). + + Use this to re-run the debate + trader + risk phases when analysts + checkpoints are available. Entry point: Bull Researcher. + """ + # Create researcher and manager nodes + bull_researcher_node = create_bull_researcher( + self.mid_thinking_llm, self.bull_memory + ) + bear_researcher_node = create_bear_researcher( + self.mid_thinking_llm, self.bear_memory + ) + research_manager_node = create_research_manager( + self.deep_thinking_llm, self.invest_judge_memory + ) + trader_node = create_trader(self.mid_thinking_llm, self.trader_memory) + + aggressive_analyst = create_aggressive_debator(self.quick_thinking_llm) + neutral_analyst = create_neutral_debator(self.quick_thinking_llm) + conservative_analyst = create_conservative_debator(self.quick_thinking_llm) + portfolio_manager_node = create_portfolio_manager( + self.deep_thinking_llm, self.portfolio_manager_memory + ) + + workflow = StateGraph(AgentState) + + workflow.add_node("Bull Researcher", bull_researcher_node) + workflow.add_node("Bear Researcher", bear_researcher_node) + workflow.add_node("Research Manager", research_manager_node) + workflow.add_node("Trader", trader_node) + workflow.add_node("Aggressive Analyst", aggressive_analyst) + workflow.add_node("Neutral Analyst", neutral_analyst) + workflow.add_node("Conservative Analyst", conservative_analyst) + workflow.add_node("Portfolio Manager", portfolio_manager_node) + + workflow.add_edge(START, "Bull Researcher") + workflow.add_conditional_edges( + "Bull Researcher", + self.conditional_logic.should_continue_debate, + { + "Bear Researcher": "Bear Researcher", + "Research Manager": "Research Manager", + }, + ) + workflow.add_conditional_edges( + "Bear Researcher", + self.conditional_logic.should_continue_debate, + { + "Bull Researcher": "Bull Researcher", + "Research Manager": "Research Manager", + }, + ) + workflow.add_edge("Research Manager", "Trader") + workflow.add_edge("Trader", "Aggressive Analyst") + workflow.add_conditional_edges( + "Aggressive Analyst", + self.conditional_logic.should_continue_risk_analysis, + { + "Conservative Analyst": "Conservative Analyst", + "Portfolio Manager": "Portfolio Manager", + }, + ) + workflow.add_conditional_edges( + "Conservative Analyst", + self.conditional_logic.should_continue_risk_analysis, + { + "Neutral Analyst": "Neutral Analyst", + "Portfolio Manager": "Portfolio Manager", + }, + ) + workflow.add_conditional_edges( + "Neutral Analyst", + self.conditional_logic.should_continue_risk_analysis, + { + "Aggressive Analyst": "Aggressive Analyst", + "Portfolio Manager": "Portfolio Manager", + }, + ) + workflow.add_edge("Portfolio Manager", END) + + return workflow.compile() + + def build_risk_subgraph(self): + """Build a subgraph that starts from Aggressive Analyst (skips analysts + debate + trader). + + Use this to re-run only the risk debate + PM phases when trader + checkpoints are available. Entry point: Aggressive Analyst. + """ + aggressive_analyst = create_aggressive_debator(self.quick_thinking_llm) + neutral_analyst = create_neutral_debator(self.quick_thinking_llm) + conservative_analyst = create_conservative_debator(self.quick_thinking_llm) + portfolio_manager_node = create_portfolio_manager( + self.deep_thinking_llm, self.portfolio_manager_memory + ) + + workflow = StateGraph(AgentState) + + workflow.add_node("Aggressive Analyst", aggressive_analyst) + workflow.add_node("Neutral Analyst", neutral_analyst) + workflow.add_node("Conservative Analyst", conservative_analyst) + workflow.add_node("Portfolio Manager", portfolio_manager_node) + + workflow.add_edge(START, "Aggressive Analyst") + workflow.add_conditional_edges( + "Aggressive Analyst", + self.conditional_logic.should_continue_risk_analysis, + { + "Conservative Analyst": "Conservative Analyst", + "Portfolio Manager": "Portfolio Manager", + }, + ) + workflow.add_conditional_edges( + "Conservative Analyst", + self.conditional_logic.should_continue_risk_analysis, + { + "Neutral Analyst": "Neutral Analyst", + "Portfolio Manager": "Portfolio Manager", + }, + ) + workflow.add_conditional_edges( + "Neutral Analyst", + self.conditional_logic.should_continue_risk_analysis, + { + "Aggressive Analyst": "Aggressive Analyst", + "Portfolio Manager": "Portfolio Manager", + }, + ) + workflow.add_edge("Portfolio Manager", END) + + return workflow.compile() diff --git a/tradingagents/graph/trading_graph.py b/tradingagents/graph/trading_graph.py index 6e98a931..d9f39ed3 100644 --- a/tradingagents/graph/trading_graph.py +++ b/tradingagents/graph/trading_graph.py @@ -171,6 +171,24 @@ class TradingAgentsGraph: # Set up the graph self.graph = self.graph_setup.setup_graph(selected_analysts) + # Phase subgraphs (compiled lazily on first access) + self._debate_graph = None + self._risk_graph = None + + @property + def debate_graph(self): + """Subgraph starting from Bull Researcher (skips analysts).""" + if self._debate_graph is None: + self._debate_graph = self.graph_setup.build_debate_subgraph() + return self._debate_graph + + @property + def risk_graph(self): + """Subgraph starting from Aggressive Analyst (skips analysts + debate + trader).""" + if self._risk_graph is None: + self._risk_graph = self.graph_setup.build_risk_subgraph() + return self._risk_graph + def _get_provider_kwargs(self, role: str = "") -> Dict[str, Any]: """Get provider-specific kwargs for LLM client creation. diff --git a/tradingagents/portfolio/mongo_report_store.py b/tradingagents/portfolio/mongo_report_store.py index 6ccc334c..6f06b410 100644 --- a/tradingagents/portfolio/mongo_report_store.py +++ b/tradingagents/portfolio/mongo_report_store.py @@ -270,6 +270,64 @@ class MongoReportStore: ) ) + # ------------------------------------------------------------------ + # Run Meta / Events + # ------------------------------------------------------------------ + + def save_run_meta(self, date: str, data: dict[str, Any]) -> str: + return self._save(date, "run_meta", data) + + def load_run_meta(self, date: str, *, run_id: str | None = None) -> dict[str, Any] | None: + return self._load(date, "run_meta", run_id=run_id) + + def save_run_events(self, date: str, events: list[dict[str, Any]]) -> str: + """Save run events as a single document wrapping the events list.""" + return self._save(date, "run_events", {"events": events}) + + def load_run_events(self, date: str, *, run_id: str | None = None) -> list[dict[str, Any]]: + """Load run events. Returns empty list if not found.""" + doc = self._load(date, "run_events", run_id=run_id) + if doc is None: + return [] + return doc.get("events", []) + + def list_run_metas(self) -> list[dict[str, Any]]: + """Return all run_meta documents, newest first.""" + docs = self._col.find( + {"report_type": "run_meta"}, + {"_id": 0}, + sort=[("created_at", DESCENDING)], + ) + return [d.get("data", d) for d in docs] + + # ------------------------------------------------------------------ + # Analyst / Trader Checkpoints + # ------------------------------------------------------------------ + + def save_analysts_checkpoint( + self, date: str, ticker: str, data: dict[str, Any] + ) -> str: + return self._save(date, "analysts_checkpoint", data, ticker=ticker) + + def load_analysts_checkpoint( + self, date: str, ticker: str, *, run_id: str | None = None + ) -> dict[str, Any] | None: + return self._load(date, "analysts_checkpoint", ticker=ticker, run_id=run_id) + + def save_trader_checkpoint( + self, date: str, ticker: str, data: dict[str, Any] + ) -> str: + return self._save(date, "trader_checkpoint", data, ticker=ticker) + + def load_trader_checkpoint( + self, date: str, ticker: str, *, run_id: str | None = None + ) -> dict[str, Any] | None: + return self._load(date, "trader_checkpoint", ticker=ticker, run_id=run_id) + + # ------------------------------------------------------------------ + # Utility (continued) + # ------------------------------------------------------------------ + def list_analyses_for_date(self, date: str) -> list[str]: """Return ticker symbols that have an analysis for the given date.""" docs = self._col.find( diff --git a/tradingagents/portfolio/report_store.py b/tradingagents/portfolio/report_store.py index 0e368360..52316671 100644 --- a/tradingagents/portfolio/report_store.py +++ b/tradingagents/portfolio/report_store.py @@ -393,6 +393,133 @@ class ReportStore: deleted.append(path.name) return deleted + # ------------------------------------------------------------------ + # Run Meta / Events persistence + # ------------------------------------------------------------------ + + def save_run_meta(self, date: str, data: dict[str, Any]) -> Path: + """Save run metadata JSON. + + Path: ``{base}/daily/{date}[/runs/{run_id}]/run_meta.json`` + """ + root = self._date_root(date, for_write=True) + path = root / "run_meta.json" + result = self._write_json(path, data) + self._update_latest(date) + return result + + def load_run_meta(self, date: str) -> dict[str, Any] | None: + """Load run metadata. Returns None if the file does not exist.""" + root = self._date_root(date) + path = root / "run_meta.json" + return self._read_json(path) + + def save_run_events(self, date: str, events: list[dict[str, Any]]) -> Path: + """Save run events as JSONL (one JSON object per line). + + Path: ``{base}/daily/{date}[/runs/{run_id}]/run_events.jsonl`` + """ + root = self._date_root(date, for_write=True) + path = root / "run_events.jsonl" + try: + path.parent.mkdir(parents=True, exist_ok=True) + lines = [] + for evt in events: + sanitized = self._sanitize(evt) + lines.append(json.dumps(sanitized, separators=(",", ":"))) + path.write_text("\n".join(lines) + "\n" if lines else "", encoding="utf-8") + return path + except OSError as exc: + raise ReportStoreError(f"Failed to write {path}: {exc}") from exc + + def load_run_events(self, date: str) -> list[dict[str, Any]]: + """Load run events from JSONL file. Returns empty list if file does not exist.""" + root = self._date_root(date) + path = root / "run_events.jsonl" + if not path.exists(): + return [] + events: list[dict[str, Any]] = [] + try: + for line in path.read_text(encoding="utf-8").splitlines(): + line = line.strip() + if line: + events.append(json.loads(line)) + except json.JSONDecodeError as exc: + raise ReportStoreError(f"Corrupt JSONL at {path}: {exc}") from exc + return events + + @classmethod + def list_run_metas(cls, base_dir: str | Path = "reports") -> list[dict[str, Any]]: + """Scan for all run_meta.json files and return metadata dicts, newest first. + + Args: + base_dir: Root reports directory. + + Returns: + List of run_meta dicts sorted by ``created_at`` descending. + """ + base = Path(base_dir) + pattern = "daily/*/runs/*/run_meta.json" + metas: list[dict[str, Any]] = [] + for path in base.glob(pattern): + try: + data = json.loads(path.read_text(encoding="utf-8")) + metas.append(data) + except (json.JSONDecodeError, OSError): + continue + metas.sort(key=lambda m: m.get("created_at", 0), reverse=True) + return metas + + # ------------------------------------------------------------------ + # Analyst / Trader Checkpoints + # ------------------------------------------------------------------ + + def save_analysts_checkpoint( + self, date: str, ticker: str, data: dict[str, Any] + ) -> Path: + """Save analysts checkpoint for a ticker. + + Path: ``{base}/daily/{date}[/runs/{run_id}]/{TICKER}/analysts_checkpoint.json`` + """ + root = self._date_root(date, for_write=True) + path = root / ticker.upper() / "analysts_checkpoint.json" + result = self._write_json(path, data) + self._update_latest(date) + return result + + def load_analysts_checkpoint( + self, date: str, ticker: str + ) -> dict[str, Any] | None: + """Load analysts checkpoint. Returns None if file does not exist.""" + root = self._date_root(date) + path = root / ticker.upper() / "analysts_checkpoint.json" + return self._read_json(path) + + def save_trader_checkpoint( + self, date: str, ticker: str, data: dict[str, Any] + ) -> Path: + """Save trader checkpoint for a ticker. + + Path: ``{base}/daily/{date}[/runs/{run_id}]/{TICKER}/trader_checkpoint.json`` + """ + root = self._date_root(date, for_write=True) + path = root / ticker.upper() / "trader_checkpoint.json" + result = self._write_json(path, data) + self._update_latest(date) + return result + + def load_trader_checkpoint( + self, date: str, ticker: str + ) -> dict[str, Any] | None: + """Load trader checkpoint. Returns None if file does not exist.""" + root = self._date_root(date) + path = root / ticker.upper() / "trader_checkpoint.json" + return self._read_json(path) + + # ------------------------------------------------------------------ + # PM Decisions + # ------------------------------------------------------------------ + def list_pm_decisions(self, portfolio_id: str) -> list[Path]: """Return all saved PM decision JSON paths for portfolio_id, newest first.