diff --git a/.env.example b/.env.example index 0cb52e02..b9dc54dc 100644 --- a/.env.example +++ b/.env.example @@ -84,6 +84,13 @@ FINNHUB_API_KEY= # PostgreSQL connection string for Supabase (required for portfolio commands) # SUPABASE_CONNECTION_STRING=postgresql://postgres.:@aws-1-.pooler.supabase.com:6543/postgres +# ── MongoDB report store (optional) ────────────────────────────────── +# When set, all reports (scans, analyses, decisions) are stored in MongoDB +# instead of the filesystem. Each run creates separate documents so +# same-day re-runs never overwrite earlier results. +# TRADINGAGENTS_MONGO_URI=mongodb://localhost:27017 +# TRADINGAGENTS_MONGO_DB=tradingagents + # Root directory for all reports (scans, analysis, portfolio artifacts). # All output lands under {REPORTS_DIR}/daily/{date}/... # PORTFOLIO_DATA_DIR overrides this for portfolio-only reports if you need them split. diff --git a/agent_os/backend/routes/runs.py b/agent_os/backend/routes/runs.py index 8b114f0c..3810cab4 100644 --- a/agent_os/backend/routes/runs.py +++ b/agent_os/backend/routes/runs.py @@ -150,12 +150,12 @@ async def reset_portfolio_stage( After calling this, an auto run will re-run Phase 3 from scratch (Phases 1 & 2 are skipped if their cached results still exist). """ - from tradingagents.portfolio.report_store import ReportStore + from tradingagents.portfolio.store_factory import create_report_store date = params.get("date") portfolio_id = params.get("portfolio_id") if not date or not portfolio_id: raise HTTPException(status_code=422, detail="date and portfolio_id are required") - store = ReportStore() + store = create_report_store() deleted = store.clear_portfolio_stage(date, portfolio_id) logger.info("reset_portfolio_stage date=%s portfolio=%s deleted=%s user=%s", date, portfolio_id, deleted, user["user_id"]) return {"deleted": deleted, "date": date, "portfolio_id": portfolio_id} diff --git a/agent_os/backend/services/langgraph_engine.py b/agent_os/backend/services/langgraph_engine.py index 04901a9b..261c1d61 100644 --- a/agent_os/backend/services/langgraph_engine.py +++ b/agent_os/backend/services/langgraph_engine.py @@ -9,10 +9,12 @@ from tradingagents.graph.trading_graph import TradingAgentsGraph from tradingagents.graph.scanner_graph import ScannerGraph from tradingagents.graph.portfolio_graph import PortfolioGraph from tradingagents.default_config import DEFAULT_CONFIG -from tradingagents.report_paths import get_market_dir, get_ticker_dir +from tradingagents.report_paths import get_market_dir, get_ticker_dir, get_daily_dir, generate_run_id from tradingagents.portfolio.report_store import ReportStore +from tradingagents.portfolio.store_factory import create_report_store from tradingagents.daily_digest import append_to_digest from tradingagents.agents.utils.json_utils import extract_json +from tradingagents.observability import RunLogger, set_run_logger logger = logging.getLogger("agent_os.engine") @@ -61,6 +63,48 @@ def _tickers_from_decision(decision: dict) -> list[str]: # Maximum characters of prompt/response for the full fields (generous limit) _MAX_FULL_LEN = 50_000 +# Keywords in tool output that indicate the error was handled gracefully +_GRACEFUL_SKIP_KEYWORDS = ("gracefully", "fallback", "skipped") + +# ────────────────────────────────────────────────────────────────────────────── +# Tool-name → primary service mapping (best-effort, used for display only) +# ────────────────────────────────────────────────────────────────────────────── +_TOOL_SERVICE_MAP: Dict[str, str] = { + # Core stock APIs + "get_stock_data": "yfinance", + "get_indicators": "yfinance", + # Fundamental data + "get_fundamentals": "yfinance", + "get_balance_sheet": "yfinance", + "get_cashflow": "yfinance", + "get_income_statement": "yfinance", + "get_ttm_analysis": "yfinance (derived)", + "get_peer_comparison": "yfinance (derived)", + "get_sector_relative": "yfinance (derived)", + "get_macro_regime": "yfinance (derived)", + # News + "get_news": "yfinance", + "get_global_news": "yfinance", + "get_insider_transactions": "finnhub", + # Scanner + "get_market_movers": "yfinance", + "get_market_indices": "finnhub", + "get_sector_performance": "finnhub", + "get_industry_performance": "yfinance", + "get_topic_news": "finnhub", + "get_earnings_calendar": "finnhub", + "get_economic_calendar": "finnhub", + # Finviz smart money + "get_insider_buying_stocks": "finviz", + "get_unusual_volume_stocks": "finviz", + "get_breakout_accumulation_stocks": "finviz", + # Portfolio (local) + "get_enriched_holdings": "local", + "compute_portfolio_risk_metrics": "local", + "load_portfolio_risk_metrics": "local", + "load_portfolio_decision": "local", +} + class LangGraphEngine: """Orchestrates LangGraph pipeline executions and streams events.""" @@ -74,6 +118,32 @@ class LangGraphEngine: self._node_prompts: Dict[str, Dict[str, str]] = {} # Track the human-readable identifier (ticker / "MARKET" / portfolio_id) per run self._run_identifiers: Dict[str, str] = {} + # Track RunLogger instances per run for JSONL persistence + self._run_loggers: Dict[str, RunLogger] = {} + + # ------------------------------------------------------------------ + # Run logger lifecycle + # ------------------------------------------------------------------ + + def _start_run_logger(self, run_id: str) -> RunLogger: + """Create and register a ``RunLogger`` for the given run.""" + rl = RunLogger() + self._run_loggers[run_id] = rl + set_run_logger(rl) + return rl + + def _finish_run_logger(self, run_id: str, log_dir: Path) -> None: + """Persist the run log to *log_dir*/run_log.jsonl and clean up.""" + rl = self._run_loggers.pop(run_id, None) + if rl is None: + return + try: + log_dir.mkdir(parents=True, exist_ok=True) + rl.write_log(log_dir / "run_log.jsonl") + except Exception: + logger.exception("Failed to write run log for run=%s", run_id) + finally: + set_run_logger(None) # ------------------------------------------------------------------ # Run helpers @@ -85,9 +155,14 @@ class LangGraphEngine: """Run the 3-phase macro scanner and stream events.""" date = params.get("date", time.strftime("%Y-%m-%d")) + # Generate a short run_id for report namespacing + short_rid = generate_run_id() + store = create_report_store(run_id=short_rid) + + rl = self._start_run_logger(run_id) scanner = ScannerGraph(config=self.config) - logger.info("Starting SCAN run=%s date=%s", run_id, date) + logger.info("Starting SCAN run=%s date=%s rid=%s", run_id, date, short_rid) yield self._system_log(f"Starting macro scan for {date}") initial_state = { @@ -105,7 +180,9 @@ class LangGraphEngine: self._run_identifiers[run_id] = "MARKET" final_state: Dict[str, Any] = {} - async for event in scanner.graph.astream_events(initial_state, version="v2"): + async for event in scanner.graph.astream_events( + initial_state, version="v2", config={"callbacks": [rl.callback]} + ): # Capture the complete final state from the root graph's terminal event. # LangGraph v2 emits one root-level on_chain_end (parent_ids=[], no # langgraph_node in metadata) whose data.output is the full accumulated state. @@ -133,11 +210,11 @@ class LangGraphEngine: except Exception as exc: logger.warning("SCAN fallback ainvoke failed run=%s: %s", run_id, exc) - # Save scan reports to disk + # Save scan reports if final_state: - yield self._system_log("Saving scan reports to disk…") + yield self._system_log("Saving scan reports…") try: - save_dir = get_market_dir(date) + save_dir = get_market_dir(date, run_id=short_rid) save_dir.mkdir(parents=True, exist_ok=True) for key in ( @@ -151,12 +228,12 @@ class LangGraphEngine: if content: (save_dir / f"{key}.md").write_text(content) - # Parse and save macro_scan_summary.json via ReportStore for downstream use + # Parse and save macro_scan_summary.json via store for downstream use summary_text = final_state.get("macro_scan_summary", "") if summary_text: try: summary_data = extract_json(summary_text) - ReportStore().save_scan(date, summary_data) + store.save_scan(date, summary_data) except (ValueError, KeyError, TypeError): logger.warning( "macro_scan_summary for date=%s is not valid JSON " @@ -186,6 +263,7 @@ class LangGraphEngine: yield self._system_log(f"Warning: could not save scan reports: {exc}") logger.info("Completed SCAN run=%s", run_id) + self._finish_run_logger(run_id, get_market_dir(date, run_id=short_rid)) async def run_pipeline( self, run_id: str, params: Dict[str, Any] @@ -195,8 +273,14 @@ class LangGraphEngine: date = params.get("date", time.strftime("%Y-%m-%d")) analysts = params.get("analysts", ["market", "news", "fundamentals"]) + # Generate a short run_id for report namespacing + short_rid = generate_run_id() + store = create_report_store(run_id=short_rid) + + rl = self._start_run_logger(run_id) + logger.info( - "Starting PIPELINE run=%s ticker=%s date=%s", run_id, ticker, date + "Starting PIPELINE run=%s ticker=%s date=%s rid=%s", run_id, ticker, date, short_rid ) yield self._system_log(f"Starting analysis pipeline for {ticker} on {date}") @@ -215,7 +299,10 @@ class LangGraphEngine: async for event in graph_wrapper.graph.astream_events( initial_state, version="v2", - config={"recursion_limit": graph_wrapper.propagator.max_recur_limit}, + config={ + "recursion_limit": graph_wrapper.propagator.max_recur_limit, + "callbacks": [rl.callback], + }, ): # Capture the complete final state from the root graph's terminal event. if self._is_root_chain_end(event): @@ -246,19 +333,19 @@ class LangGraphEngine: except Exception as exc: logger.warning("PIPELINE fallback ainvoke failed run=%s: %s", run_id, exc) - # Save pipeline reports to disk + # Save pipeline reports if final_state: yield self._system_log(f"Saving analysis report for {ticker}…") try: - save_dir = get_ticker_dir(date, ticker) + save_dir = get_ticker_dir(date, ticker, run_id=short_rid) save_dir.mkdir(parents=True, exist_ok=True) # Sanitize final_state to remove non-JSON-serializable objects # (e.g. LangChain HumanMessage, AIMessage objects in "messages") serializable_state = self._sanitize_for_json(final_state) - # Save JSON via ReportStore (complete_report.json) - ReportStore().save_analysis(date, ticker, serializable_state) + # Save JSON via store (complete_report.json) + store.save_analysis(date, ticker, serializable_state) # Write human-readable complete_report.md self._write_complete_report_md(final_state, ticker, save_dir) @@ -279,6 +366,7 @@ class LangGraphEngine: yield self._system_log(f"Warning: could not save analysis report for {ticker}: {exc}") logger.info("Completed PIPELINE run=%s", run_id) + self._finish_run_logger(run_id, get_ticker_dir(date, ticker, run_id=short_rid)) async def run_portfolio( self, run_id: str, params: Dict[str, Any] @@ -287,9 +375,17 @@ class LangGraphEngine: date = params.get("date", time.strftime("%Y-%m-%d")) portfolio_id = params.get("portfolio_id", "main_portfolio") + # Generate a short run_id for report namespacing + short_rid = generate_run_id() + store = create_report_store(run_id=short_rid) + # A reader store with no run_id resolves to the latest run for loading + reader_store = create_report_store() + + rl = self._start_run_logger(run_id) + logger.info( - "Starting PORTFOLIO run=%s portfolio=%s date=%s", - run_id, portfolio_id, date, + "Starting PORTFOLIO run=%s portfolio=%s date=%s rid=%s", + run_id, portfolio_id, date, short_rid, ) yield self._system_log( f"Starting portfolio manager for {portfolio_id} on {date}" @@ -297,19 +393,33 @@ class LangGraphEngine: portfolio_graph = PortfolioGraph(config=self.config) - # Load scan summary and per-ticker analyses from the daily report folder - store = ReportStore() - scan_summary = store.load_scan(date) or {} + # Load scan summary and per-ticker analyses from the latest report + scan_summary = reader_store.load_scan(date) or {} ticker_analyses: Dict[str, Any] = {} - from tradingagents.report_paths import get_daily_dir + # Search both run-scoped and legacy flat layouts for ticker directories daily_dir = get_daily_dir(date) + search_dirs: list[Path] = [] + runs_dir = daily_dir / "runs" + if runs_dir.exists(): + for run_dir in runs_dir.iterdir(): + if run_dir.is_dir(): + search_dirs.append(run_dir) if daily_dir.exists(): - for ticker_dir in daily_dir.iterdir(): - if ticker_dir.is_dir() and ticker_dir.name not in ("market", "portfolio"): - analysis = store.load_analysis(date, ticker_dir.name) + search_dirs.append(daily_dir) + + seen_tickers: set[str] = set() + for base in search_dirs: + for ticker_dir in base.iterdir(): + if ( + ticker_dir.is_dir() + and ticker_dir.name not in ("market", "portfolio", "runs") + and ticker_dir.name.upper() not in seen_tickers + ): + analysis = reader_store.load_analysis(date, ticker_dir.name) if analysis: - ticker_analyses[ticker_dir.name] = analysis + ticker_analyses[ticker_dir.name.upper()] = analysis + seen_tickers.add(ticker_dir.name.upper()) if scan_summary: yield self._system_log(f"Loaded macro scan summary for {date}") @@ -362,7 +472,7 @@ class LangGraphEngine: final_state: Dict[str, Any] = {} async for event in portfolio_graph.graph.astream_events( - initial_state, version="v2" + initial_state, version="v2", config={"callbacks": [rl.callback]} ): if self._is_root_chain_end(event): output = (event.get("data") or {}).get("output") @@ -390,12 +500,16 @@ class LangGraphEngine: # Save portfolio reports (Holding Reviews, Risk Metrics, PM Decision, Execution Result) if final_state: try: - # 1. Holding Reviews — save the raw string via ReportStore + # 1. Holding Reviews — save the raw string via store holding_reviews_str = final_state.get("holding_reviews") if holding_reviews_str: try: reviews = json.loads(holding_reviews_str) if isinstance(holding_reviews_str, str) else holding_reviews_str - store.save_holding_reviews(date, portfolio_id, reviews) + if isinstance(reviews, dict): + for ticker, review_data in reviews.items(): + store.save_holding_review(date, ticker, review_data) + else: + logger.warning("Unexpected holding_reviews format run=%s: %s", run_id, type(reviews)) except Exception as exc: logger.warning("Failed to save holding_reviews run=%s: %s", run_id, exc) @@ -432,6 +546,7 @@ class LangGraphEngine: yield self._system_log(f"Warning: could not save portfolio reports: {exc}") logger.info("Completed PORTFOLIO run=%s", run_id) + self._finish_run_logger(run_id, get_daily_dir(date, run_id=short_rid) / "portfolio") async def run_trade_execution( self, run_id: str, date: str, portfolio_id: str, decision: dict, prices: dict, @@ -454,7 +569,7 @@ class LangGraphEngine: logger.warning("TRADE_EXECUTION run=%s: no prices available — execution may produce incomplete results", run_id) yield self._system_log(f"Warning: no prices found for {portfolio_id} on {date} — trade execution may be incomplete.") - _store = store or ReportStore() + _store = store or create_report_store() try: repo = PortfolioRepository() @@ -480,12 +595,18 @@ class LangGraphEngine: date = params.get("date", time.strftime("%Y-%m-%d")) force = params.get("force", False) + # Use a reader store (no run_id) for skip-if-exists checks. + # Each sub-phase (run_scan, run_pipeline, run_portfolio) creates + # its own writer store with a fresh run_id internally. + store = create_report_store() + + self._start_run_logger(run_id) # auto-run's own logger; sub-phases create their own + logger.info("Starting AUTO run=%s date=%s force=%s", run_id, date, force) yield self._system_log(f"Starting full auto workflow for {date} (force={force})") # Phase 1: Market scan yield self._system_log("Phase 1/3: Running market scan…") - store = ReportStore() if not force and store.load_scan(date): yield self._system_log(f"Phase 1: Macro scan for {date} already exists, skipping.") else: @@ -621,6 +742,7 @@ class LangGraphEngine: yield evt logger.info("Completed AUTO run=%s", run_id) + self._finish_run_logger(run_id, get_daily_dir(date)) # ------------------------------------------------------------------ # Report helpers @@ -973,7 +1095,9 @@ class LangGraphEngine: full_input = str(inp)[:_MAX_FULL_LEN] tool_input = self._truncate(str(inp)) - logger.info("Tool start tool=%s node=%s run=%s", name, node_name, run_id) + service = _TOOL_SERVICE_MAP.get(name, "") + + logger.info("Tool start tool=%s service=%s node=%s run=%s", name, service, node_name, run_id) return { "id": event.get("run_id", f"tool_{time.time_ns()}").strip(), @@ -985,6 +1109,8 @@ class LangGraphEngine: "message": f"▶ Tool: {name}" + (f" | {tool_input}" if tool_input else ""), "prompt": full_input, + "service": service, + "status": "running", "metrics": {}, } except Exception: @@ -996,13 +1122,37 @@ class LangGraphEngine: try: full_output = "" tool_output = "" + is_error = False + error_message = "" + graceful = False out = (event.get("data") or {}).get("output") if out is not None: raw = self._extract_content(out) full_output = raw[:_MAX_FULL_LEN] tool_output = self._truncate(raw) + # Detect errors in tool output + if raw.startswith("Error") or raw.startswith("Error calling "): + is_error = True + error_message = raw[:500] + # Detect graceful degradation (vendor fallback / empty-but-ok) + raw_lower = raw.lower() + if any(kw in raw_lower for kw in _GRACEFUL_SKIP_KEYWORDS): + graceful = True + # Some LangGraph versions pass errors through the event status + evt_status = (event.get("data") or {}).get("status") + if evt_status == "error": + is_error = True + if not error_message: + error_message = tool_output or "Unknown tool error" - logger.info("Tool end tool=%s node=%s run=%s", name, node_name, run_id) + service = _TOOL_SERVICE_MAP.get(name, "") + status = "error" if is_error else ("graceful_skip" if graceful else "success") + icon = "✗" if is_error else ("⚠" if graceful else "✓") + + logger.info( + "Tool end tool=%s status=%s node=%s run=%s", + name, status, node_name, run_id, + ) return { "id": f"{event.get('run_id', 'tool_end')}_{time.time_ns()}", @@ -1011,9 +1161,12 @@ class LangGraphEngine: "type": "tool_result", "agent": node_name.upper(), "identifier": identifier, - "message": f"✓ Tool result: {name}" + "message": f"{icon} Tool result: {name}" + (f" | {tool_output}" if tool_output else ""), "response": full_output, + "service": service, + "status": status, + "error": error_message if is_error else None, "metrics": {}, } except Exception: diff --git a/agent_os/frontend/src/Dashboard.tsx b/agent_os/frontend/src/Dashboard.tsx index 408fe38b..1c4d54c1 100644 --- a/agent_os/frontend/src/Dashboard.tsx +++ b/agent_os/frontend/src/Dashboard.tsx @@ -77,7 +77,11 @@ const REQUIRED_PARAMS: Record = { }; /** Return the colour token for a given event type. */ -const eventColor = (type: AgentEvent['type']): string => { +const eventColor = (type: AgentEvent['type'], status?: AgentEvent['status']): string => { + // Error events always show in red + if (status === 'error') return 'red.400'; + // Graceful skips show in orange/yellow + if (status === 'graceful_skip') return 'orange.300'; switch (type) { case 'tool': return 'purple.400'; case 'tool_result': return 'purple.300'; @@ -88,7 +92,9 @@ const eventColor = (type: AgentEvent['type']): string => { }; /** Return a short label badge for the event type. */ -const eventLabel = (type: AgentEvent['type']): string => { +const eventLabel = (type: AgentEvent['type'], status?: AgentEvent['status']): string => { + if (status === 'error') return '❌'; + if (status === 'graceful_skip') return '⚠️'; switch (type) { case 'thought': return '💭'; case 'tool': return '🔧'; @@ -101,10 +107,20 @@ const eventLabel = (type: AgentEvent['type']): string => { /** Short summary for terminal — no inline prompts, just agent + type. */ const eventSummary = (evt: AgentEvent): string => { + const svc = evt.service ? ` [${evt.service}]` : ''; switch (evt.type) { case 'thought': return `Thinking… (${evt.metrics?.model || 'LLM'})`; - case 'tool': return evt.message.startsWith('✓') ? 'Tool result received' : `Tool call: ${evt.message.replace(/^▶ Tool: /, '').split(' | ')[0]}`; - case 'tool_result': return `Tool done: ${evt.message.replace(/^✓ Tool result: /, '').split(' | ')[0]}`; + case 'tool': { + if (evt.message.startsWith('✓')) return 'Tool result received'; + const toolName = evt.message.replace(/^▶ Tool: /, '').split(' | ')[0]; + return `Tool call: ${toolName}${svc}`; + } + case 'tool_result': { + const resultToolName = evt.message.replace(/^[✓✗⚠] Tool result: /, '').split(' | ')[0]; + if (evt.status === 'error') return `Tool error: ${resultToolName}${svc}`; + if (evt.status === 'graceful_skip') return `Tool skipped: ${resultToolName}${svc}`; + return `Tool done: ${resultToolName}${svc}`; + } case 'result': return 'Completed'; case 'log': return evt.message; default: return evt.type; @@ -115,6 +131,12 @@ const eventSummary = (evt: AgentEvent): string => { const EventDetailModal: React.FC<{ event: AgentEvent | null; isOpen: boolean; onClose: () => void }> = ({ event, isOpen, onClose }) => { if (!event) return null; + const headerBadgeColor = event.status === 'error' ? 'red' + : event.status === 'graceful_skip' ? 'orange' + : event.type === 'result' ? 'green' + : event.type === 'tool' || event.type === 'tool_result' ? 'purple' + : 'cyan'; + return ( @@ -122,10 +144,13 @@ const EventDetailModal: React.FC<{ event: AgentEvent | null; isOpen: boolean; on - + {event.type.toUpperCase()} {event.agent} + {event.status === 'error' && ERROR} + {event.status === 'graceful_skip' && GRACEFUL SKIP} + {event.service && {event.service}} {event.timestamp} @@ -134,6 +159,7 @@ const EventDetailModal: React.FC<{ event: AgentEvent | null; isOpen: boolean; on {event.prompt && Prompt / Request} {(event.response || (event.type === 'result' && event.message)) && Response} + {event.error && Error} Summary {event.metrics && Metrics} @@ -149,13 +175,22 @@ const EventDetailModal: React.FC<{ event: AgentEvent | null; isOpen: boolean; on )} {(event.response || (event.type === 'result' && event.message)) && ( - - + + {event.response || event.message} )} + {event.error && ( + + + + {event.error} + + + + )} @@ -169,6 +204,9 @@ const EventDetailModal: React.FC<{ event: AgentEvent | null; isOpen: boolean; on {event.metrics.model && event.metrics.model !== 'unknown' && ( Model:{event.metrics.model} )} + {event.service && ( + Service:{event.service} + )} {event.metrics.tokens_in != null && event.metrics.tokens_in > 0 && ( Tokens In:{event.metrics.tokens_in} )} @@ -193,11 +231,20 @@ const EventDetailModal: React.FC<{ event: AgentEvent | null; isOpen: boolean; on }; // ─── Detail card for a single event in the drawer ───────────────────── -const EventDetail: React.FC<{ event: AgentEvent; onOpenModal?: (evt: AgentEvent) => void }> = ({ event, onOpenModal }) => ( +const EventDetail: React.FC<{ event: AgentEvent; onOpenModal?: (evt: AgentEvent) => void }> = ({ event, onOpenModal }) => { + const badgeColor = event.status === 'error' ? 'red' + : event.status === 'graceful_skip' ? 'orange' + : event.type === 'result' ? 'green' + : event.type === 'tool' || event.type === 'tool_result' ? 'purple' + : 'cyan'; + + return ( - {event.type.toUpperCase()} + {event.type.toUpperCase()} {event.agent} + {event.status === 'error' && ERROR} + {event.status === 'graceful_skip' && GRACEFUL SKIP} {event.timestamp} {onOpenModal && (