Fix langgraph engine: fallback ainvoke, portfolio state shape, JSON serialization
Co-authored-by: aguzererler <6199053+aguzererler@users.noreply.github.com> Agent-Logs-Url: https://github.com/aguzererler/TradingAgents/sessions/43d04354-154a-442c-8bfc-ede05860e7f9
This commit is contained in:
parent
90c49dc1f4
commit
c0d13b9207
|
|
@ -1,5 +1,6 @@
|
||||||
import asyncio
|
import asyncio
|
||||||
import datetime as _dt
|
import datetime as _dt
|
||||||
|
import json
|
||||||
import logging
|
import logging
|
||||||
import time
|
import time
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
|
@ -77,6 +78,18 @@ class LangGraphEngine:
|
||||||
self._node_start_times.pop(run_id, None)
|
self._node_start_times.pop(run_id, None)
|
||||||
self._node_prompts.pop(run_id, None)
|
self._node_prompts.pop(run_id, None)
|
||||||
|
|
||||||
|
# Fallback: if the root on_chain_end event was never captured (can happen
|
||||||
|
# with deeply nested sub-graphs), re-invoke to get the complete final state.
|
||||||
|
if not final_state:
|
||||||
|
logger.warning(
|
||||||
|
"SCAN run=%s: root on_chain_end not captured — falling back to ainvoke",
|
||||||
|
run_id,
|
||||||
|
)
|
||||||
|
try:
|
||||||
|
final_state = await scanner.graph.ainvoke(initial_state)
|
||||||
|
except Exception as exc:
|
||||||
|
logger.warning("SCAN fallback ainvoke failed run=%s: %s", run_id, exc)
|
||||||
|
|
||||||
# Save scan reports to disk
|
# Save scan reports to disk
|
||||||
if final_state:
|
if final_state:
|
||||||
yield self._system_log("Saving scan reports to disk…")
|
yield self._system_log("Saving scan reports to disk…")
|
||||||
|
|
@ -172,6 +185,22 @@ class LangGraphEngine:
|
||||||
self._node_start_times.pop(run_id, None)
|
self._node_start_times.pop(run_id, None)
|
||||||
self._node_prompts.pop(run_id, None)
|
self._node_prompts.pop(run_id, None)
|
||||||
|
|
||||||
|
# Fallback: if the root on_chain_end event was never captured (can happen
|
||||||
|
# with deeply nested sub-graphs), re-invoke to get the complete final state.
|
||||||
|
if not final_state:
|
||||||
|
logger.warning(
|
||||||
|
"PIPELINE run=%s ticker=%s: root on_chain_end not captured — "
|
||||||
|
"falling back to ainvoke",
|
||||||
|
run_id, ticker,
|
||||||
|
)
|
||||||
|
try:
|
||||||
|
final_state = await graph_wrapper.graph.ainvoke(
|
||||||
|
initial_state,
|
||||||
|
config={"recursion_limit": graph_wrapper.propagator.max_recur_limit},
|
||||||
|
)
|
||||||
|
except Exception as exc:
|
||||||
|
logger.warning("PIPELINE fallback ainvoke failed run=%s: %s", run_id, exc)
|
||||||
|
|
||||||
# Save pipeline reports to disk
|
# Save pipeline reports to disk
|
||||||
if final_state:
|
if final_state:
|
||||||
yield self._system_log(f"Saving analysis report for {ticker}…")
|
yield self._system_log(f"Saving analysis report for {ticker}…")
|
||||||
|
|
@ -179,8 +208,12 @@ class LangGraphEngine:
|
||||||
save_dir = get_ticker_dir(date, ticker)
|
save_dir = get_ticker_dir(date, ticker)
|
||||||
save_dir.mkdir(parents=True, exist_ok=True)
|
save_dir.mkdir(parents=True, exist_ok=True)
|
||||||
|
|
||||||
|
# Sanitize final_state to remove non-JSON-serializable objects
|
||||||
|
# (e.g. LangChain HumanMessage, AIMessage objects in "messages")
|
||||||
|
serializable_state = self._sanitize_for_json(final_state)
|
||||||
|
|
||||||
# Save JSON via ReportStore (complete_report.json)
|
# Save JSON via ReportStore (complete_report.json)
|
||||||
ReportStore().save_analysis(date, ticker, final_state)
|
ReportStore().save_analysis(date, ticker, serializable_state)
|
||||||
|
|
||||||
# Write human-readable complete_report.md
|
# Write human-readable complete_report.md
|
||||||
self._write_complete_report_md(final_state, ticker, save_dir)
|
self._write_complete_report_md(final_state, ticker, save_dir)
|
||||||
|
|
@ -242,25 +275,80 @@ class LangGraphEngine:
|
||||||
else:
|
else:
|
||||||
yield self._system_log("No per-ticker analyses found for this date")
|
yield self._system_log("No per-ticker analyses found for this date")
|
||||||
|
|
||||||
|
# Merge ticker_analyses into scan_summary so portfolio graph nodes can access
|
||||||
|
# per-ticker analysis data (PortfolioManagerState has no ticker_analyses field).
|
||||||
|
if ticker_analyses:
|
||||||
|
scan_summary["ticker_analyses"] = ticker_analyses
|
||||||
|
|
||||||
|
# Fetch prices from scan_summary if available, else default to empty dict
|
||||||
|
prices = scan_summary.get("prices") or {}
|
||||||
|
|
||||||
initial_state = {
|
initial_state = {
|
||||||
"portfolio_id": portfolio_id,
|
"portfolio_id": portfolio_id,
|
||||||
"scan_date": date,
|
"analysis_date": date, # PortfolioManagerState uses analysis_date
|
||||||
|
"prices": prices,
|
||||||
"scan_summary": scan_summary,
|
"scan_summary": scan_summary,
|
||||||
"ticker_analyses": ticker_analyses,
|
|
||||||
"messages": [],
|
"messages": [],
|
||||||
|
"portfolio_data": "",
|
||||||
|
"risk_metrics": "",
|
||||||
|
"holding_reviews": "",
|
||||||
|
"prioritized_candidates": "",
|
||||||
|
"pm_decision": "",
|
||||||
|
"execution_result": "",
|
||||||
|
"sender": "",
|
||||||
}
|
}
|
||||||
|
|
||||||
self._node_start_times[run_id] = {}
|
self._node_start_times[run_id] = {}
|
||||||
|
final_state: Dict[str, Any] = {}
|
||||||
|
|
||||||
async for event in portfolio_graph.graph.astream_events(
|
async for event in portfolio_graph.graph.astream_events(
|
||||||
initial_state, version="v2"
|
initial_state, version="v2"
|
||||||
):
|
):
|
||||||
|
if self._is_root_chain_end(event):
|
||||||
|
output = (event.get("data") or {}).get("output")
|
||||||
|
if isinstance(output, dict):
|
||||||
|
final_state = output
|
||||||
mapped = self._map_langgraph_event(run_id, event)
|
mapped = self._map_langgraph_event(run_id, event)
|
||||||
if mapped:
|
if mapped:
|
||||||
yield mapped
|
yield mapped
|
||||||
|
|
||||||
self._node_start_times.pop(run_id, None)
|
self._node_start_times.pop(run_id, None)
|
||||||
self._node_prompts.pop(run_id, None)
|
self._node_prompts.pop(run_id, None)
|
||||||
|
|
||||||
|
# Fallback: if the root on_chain_end event was never captured, re-invoke.
|
||||||
|
if not final_state:
|
||||||
|
logger.warning(
|
||||||
|
"PORTFOLIO run=%s: root on_chain_end not captured — falling back to ainvoke",
|
||||||
|
run_id,
|
||||||
|
)
|
||||||
|
try:
|
||||||
|
final_state = await portfolio_graph.graph.ainvoke(initial_state)
|
||||||
|
except Exception as exc:
|
||||||
|
logger.warning("PORTFOLIO fallback ainvoke failed run=%s: %s", run_id, exc)
|
||||||
|
|
||||||
|
# Save PM decision report
|
||||||
|
if final_state:
|
||||||
|
try:
|
||||||
|
pm_decision_str = final_state.get("pm_decision", "")
|
||||||
|
if pm_decision_str:
|
||||||
|
try:
|
||||||
|
pm_decision_dict = (
|
||||||
|
json.loads(pm_decision_str)
|
||||||
|
if isinstance(pm_decision_str, str)
|
||||||
|
else pm_decision_str
|
||||||
|
)
|
||||||
|
except (json.JSONDecodeError, TypeError):
|
||||||
|
pm_decision_dict = {"raw": pm_decision_str}
|
||||||
|
ReportStore().save_pm_decision(date, portfolio_id, pm_decision_dict)
|
||||||
|
yield self._system_log(
|
||||||
|
f"Portfolio reports saved for {portfolio_id} on {date}"
|
||||||
|
)
|
||||||
|
except Exception as exc:
|
||||||
|
logger.exception("Failed to save portfolio reports run=%s", run_id)
|
||||||
|
yield self._system_log(
|
||||||
|
f"Warning: could not save portfolio reports: {exc}"
|
||||||
|
)
|
||||||
|
|
||||||
logger.info("Completed PORTFOLIO run=%s", run_id)
|
logger.info("Completed PORTFOLIO run=%s", run_id)
|
||||||
|
|
||||||
async def run_auto(
|
async def run_auto(
|
||||||
|
|
@ -310,6 +398,32 @@ class LangGraphEngine:
|
||||||
# Report helpers
|
# Report helpers
|
||||||
# ------------------------------------------------------------------
|
# ------------------------------------------------------------------
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _sanitize_for_json(obj: Any) -> Any:
|
||||||
|
"""Recursively convert non-JSON-serializable objects to plain types.
|
||||||
|
|
||||||
|
LangGraph final states may contain LangChain message objects
|
||||||
|
(HumanMessage, AIMessage, etc.) in the ``messages`` field, as well as
|
||||||
|
other non-serializable objects from third-party libraries. All such
|
||||||
|
objects are converted to strings as a last resort so ``json.dumps``
|
||||||
|
never raises ``TypeError``.
|
||||||
|
"""
|
||||||
|
if isinstance(obj, dict):
|
||||||
|
return {k: LangGraphEngine._sanitize_for_json(v) for k, v in obj.items()}
|
||||||
|
if isinstance(obj, (list, tuple)):
|
||||||
|
return [LangGraphEngine._sanitize_for_json(v) for v in obj]
|
||||||
|
# LangChain message objects: convert to a safe dict representation
|
||||||
|
if hasattr(obj, "content") and hasattr(obj, "type"):
|
||||||
|
return {
|
||||||
|
"type": str(getattr(obj, "type", "unknown")),
|
||||||
|
"content": str(getattr(obj, "content", "")),
|
||||||
|
}
|
||||||
|
# Native JSON-serializable scalar types — return as-is
|
||||||
|
if isinstance(obj, (str, int, float, bool, type(None))):
|
||||||
|
return obj
|
||||||
|
# Anything else (custom objects, datetimes, etc.) — stringify
|
||||||
|
return str(obj)
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def _write_complete_report_md(
|
def _write_complete_report_md(
|
||||||
final_state: Dict[str, Any], ticker: str, save_dir: Path
|
final_state: Dict[str, Any], ticker: str, save_dir: Path
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue