From c0d13b9207797f74afeb9451984e8f43bf4f820e Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Mon, 23 Mar 2026 19:57:35 +0000 Subject: [PATCH] Fix langgraph engine: fallback ainvoke, portfolio state shape, JSON serialization Co-authored-by: aguzererler <6199053+aguzererler@users.noreply.github.com> Agent-Logs-Url: https://github.com/aguzererler/TradingAgents/sessions/43d04354-154a-442c-8bfc-ede05860e7f9 --- agent_os/backend/services/langgraph_engine.py | 120 +++++++++++++++++- 1 file changed, 117 insertions(+), 3 deletions(-) diff --git a/agent_os/backend/services/langgraph_engine.py b/agent_os/backend/services/langgraph_engine.py index 6e318b4f..5cb498ce 100644 --- a/agent_os/backend/services/langgraph_engine.py +++ b/agent_os/backend/services/langgraph_engine.py @@ -1,5 +1,6 @@ import asyncio import datetime as _dt +import json import logging import time from pathlib import Path @@ -77,6 +78,18 @@ class LangGraphEngine: self._node_start_times.pop(run_id, None) self._node_prompts.pop(run_id, None) + # Fallback: if the root on_chain_end event was never captured (can happen + # with deeply nested sub-graphs), re-invoke to get the complete final state. + if not final_state: + logger.warning( + "SCAN run=%s: root on_chain_end not captured — falling back to ainvoke", + run_id, + ) + try: + final_state = await scanner.graph.ainvoke(initial_state) + except Exception as exc: + logger.warning("SCAN fallback ainvoke failed run=%s: %s", run_id, exc) + # Save scan reports to disk if final_state: yield self._system_log("Saving scan reports to disk…") @@ -172,6 +185,22 @@ class LangGraphEngine: self._node_start_times.pop(run_id, None) self._node_prompts.pop(run_id, None) + # Fallback: if the root on_chain_end event was never captured (can happen + # with deeply nested sub-graphs), re-invoke to get the complete final state. + if not final_state: + logger.warning( + "PIPELINE run=%s ticker=%s: root on_chain_end not captured — " + "falling back to ainvoke", + run_id, ticker, + ) + try: + final_state = await graph_wrapper.graph.ainvoke( + initial_state, + config={"recursion_limit": graph_wrapper.propagator.max_recur_limit}, + ) + except Exception as exc: + logger.warning("PIPELINE fallback ainvoke failed run=%s: %s", run_id, exc) + # Save pipeline reports to disk if final_state: yield self._system_log(f"Saving analysis report for {ticker}…") @@ -179,8 +208,12 @@ class LangGraphEngine: save_dir = get_ticker_dir(date, ticker) save_dir.mkdir(parents=True, exist_ok=True) + # Sanitize final_state to remove non-JSON-serializable objects + # (e.g. LangChain HumanMessage, AIMessage objects in "messages") + serializable_state = self._sanitize_for_json(final_state) + # Save JSON via ReportStore (complete_report.json) - ReportStore().save_analysis(date, ticker, final_state) + ReportStore().save_analysis(date, ticker, serializable_state) # Write human-readable complete_report.md self._write_complete_report_md(final_state, ticker, save_dir) @@ -242,25 +275,80 @@ class LangGraphEngine: else: yield self._system_log("No per-ticker analyses found for this date") + # Merge ticker_analyses into scan_summary so portfolio graph nodes can access + # per-ticker analysis data (PortfolioManagerState has no ticker_analyses field). + if ticker_analyses: + scan_summary["ticker_analyses"] = ticker_analyses + + # Fetch prices from scan_summary if available, else default to empty dict + prices = scan_summary.get("prices") or {} + initial_state = { "portfolio_id": portfolio_id, - "scan_date": date, + "analysis_date": date, # PortfolioManagerState uses analysis_date + "prices": prices, "scan_summary": scan_summary, - "ticker_analyses": ticker_analyses, "messages": [], + "portfolio_data": "", + "risk_metrics": "", + "holding_reviews": "", + "prioritized_candidates": "", + "pm_decision": "", + "execution_result": "", + "sender": "", } self._node_start_times[run_id] = {} + final_state: Dict[str, Any] = {} async for event in portfolio_graph.graph.astream_events( initial_state, version="v2" ): + if self._is_root_chain_end(event): + output = (event.get("data") or {}).get("output") + if isinstance(output, dict): + final_state = output mapped = self._map_langgraph_event(run_id, event) if mapped: yield mapped self._node_start_times.pop(run_id, None) self._node_prompts.pop(run_id, None) + + # Fallback: if the root on_chain_end event was never captured, re-invoke. + if not final_state: + logger.warning( + "PORTFOLIO run=%s: root on_chain_end not captured — falling back to ainvoke", + run_id, + ) + try: + final_state = await portfolio_graph.graph.ainvoke(initial_state) + except Exception as exc: + logger.warning("PORTFOLIO fallback ainvoke failed run=%s: %s", run_id, exc) + + # Save PM decision report + if final_state: + try: + pm_decision_str = final_state.get("pm_decision", "") + if pm_decision_str: + try: + pm_decision_dict = ( + json.loads(pm_decision_str) + if isinstance(pm_decision_str, str) + else pm_decision_str + ) + except (json.JSONDecodeError, TypeError): + pm_decision_dict = {"raw": pm_decision_str} + ReportStore().save_pm_decision(date, portfolio_id, pm_decision_dict) + yield self._system_log( + f"Portfolio reports saved for {portfolio_id} on {date}" + ) + except Exception as exc: + logger.exception("Failed to save portfolio reports run=%s", run_id) + yield self._system_log( + f"Warning: could not save portfolio reports: {exc}" + ) + logger.info("Completed PORTFOLIO run=%s", run_id) async def run_auto( @@ -310,6 +398,32 @@ class LangGraphEngine: # Report helpers # ------------------------------------------------------------------ + @staticmethod + def _sanitize_for_json(obj: Any) -> Any: + """Recursively convert non-JSON-serializable objects to plain types. + + LangGraph final states may contain LangChain message objects + (HumanMessage, AIMessage, etc.) in the ``messages`` field, as well as + other non-serializable objects from third-party libraries. All such + objects are converted to strings as a last resort so ``json.dumps`` + never raises ``TypeError``. + """ + if isinstance(obj, dict): + return {k: LangGraphEngine._sanitize_for_json(v) for k, v in obj.items()} + if isinstance(obj, (list, tuple)): + return [LangGraphEngine._sanitize_for_json(v) for v in obj] + # LangChain message objects: convert to a safe dict representation + if hasattr(obj, "content") and hasattr(obj, "type"): + return { + "type": str(getattr(obj, "type", "unknown")), + "content": str(getattr(obj, "content", "")), + } + # Native JSON-serializable scalar types — return as-is + if isinstance(obj, (str, int, float, bool, type(None))): + return obj + # Anything else (custom objects, datetimes, etc.) — stringify + return str(obj) + @staticmethod def _write_complete_report_md( final_state: Dict[str, Any], ticker: str, save_dir: Path