From 7ff05328a8a80ff75c9af05e43c47ae5c62f7ae4 Mon Sep 17 00:00:00 2001 From: dtarkent2-sys Date: Fri, 20 Feb 2026 15:18:43 +0000 Subject: [PATCH] Fix parallel research/risk: snapshot state to avoid proxy serialization MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit LangGraph state proxies serialize concurrent dict access, forcing threads to run sequentially. Fix by snapshotting needed fields into plain dicts before dispatching to ThreadPoolExecutor — same pattern used by the working parallel analysts node. Co-Authored-By: Claude Opus 4.6 --- tradingagents/graph/parallel_analysts.py | 48 +++++++++++++++++++----- 1 file changed, 39 insertions(+), 9 deletions(-) diff --git a/tradingagents/graph/parallel_analysts.py b/tradingagents/graph/parallel_analysts.py index 246058f4..b9e33635 100644 --- a/tradingagents/graph/parallel_analysts.py +++ b/tradingagents/graph/parallel_analysts.py @@ -7,8 +7,13 @@ Provides parallel wrappers for: """ import asyncio +import logging +from concurrent.futures import ThreadPoolExecutor + from langchain_core.messages import HumanMessage, RemoveMessage +logger = logging.getLogger(__name__) + def create_parallel_analyst_node(analyst_fns, tool_nodes, selected_analysts): """Create a single LangGraph node that runs all analysts in parallel. @@ -87,10 +92,22 @@ def create_parallel_research_node(bull_fn, bear_fn): """ async def parallel_research_node(state): - bull_result, bear_result = await asyncio.gather( - asyncio.to_thread(bull_fn, state), - asyncio.to_thread(bear_fn, state), - ) + # Snapshot into plain dicts — LangGraph state proxies serialize + # concurrent dict access, which would force sequential execution. + state_snap = { + "investment_debate_state": dict(state.get("investment_debate_state", {})), + "market_report": state.get("market_report", ""), + "sentiment_report": state.get("sentiment_report", ""), + "news_report": state.get("news_report", ""), + "fundamentals_report": state.get("fundamentals_report", ""), + } + + loop = asyncio.get_running_loop() + with ThreadPoolExecutor(max_workers=2) as pool: + bull_result, bear_result = await asyncio.gather( + loop.run_in_executor(pool, bull_fn, state_snap), + loop.run_in_executor(pool, bear_fn, state_snap), + ) bull_debate = bull_result["investment_debate_state"] bear_debate = bear_result["investment_debate_state"] @@ -119,11 +136,24 @@ def create_parallel_risk_node(aggressive_fn, conservative_fn, neutral_fn): """ async def parallel_risk_node(state): - agg_result, con_result, neu_result = await asyncio.gather( - asyncio.to_thread(aggressive_fn, state), - asyncio.to_thread(conservative_fn, state), - asyncio.to_thread(neutral_fn, state), - ) + # Snapshot into plain dicts — LangGraph state proxies serialize + # concurrent dict access, which would force sequential execution. + state_snap = { + "risk_debate_state": dict(state.get("risk_debate_state", {})), + "market_report": state.get("market_report", ""), + "sentiment_report": state.get("sentiment_report", ""), + "news_report": state.get("news_report", ""), + "fundamentals_report": state.get("fundamentals_report", ""), + "trader_investment_plan": state.get("trader_investment_plan", ""), + } + + loop = asyncio.get_running_loop() + with ThreadPoolExecutor(max_workers=3) as pool: + agg_result, con_result, neu_result = await asyncio.gather( + loop.run_in_executor(pool, aggressive_fn, state_snap), + loop.run_in_executor(pool, conservative_fn, state_snap), + loop.run_in_executor(pool, neutral_fn, state_snap), + ) agg_debate = agg_result["risk_debate_state"] con_debate = con_result["risk_debate_state"]