diff --git a/tradingagents/graph/parallel_analysts.py b/tradingagents/graph/parallel_analysts.py index 246058f4..b9e33635 100644 --- a/tradingagents/graph/parallel_analysts.py +++ b/tradingagents/graph/parallel_analysts.py @@ -7,8 +7,13 @@ Provides parallel wrappers for: """ import asyncio +import logging +from concurrent.futures import ThreadPoolExecutor + from langchain_core.messages import HumanMessage, RemoveMessage +logger = logging.getLogger(__name__) + def create_parallel_analyst_node(analyst_fns, tool_nodes, selected_analysts): """Create a single LangGraph node that runs all analysts in parallel. @@ -87,10 +92,22 @@ def create_parallel_research_node(bull_fn, bear_fn): """ async def parallel_research_node(state): - bull_result, bear_result = await asyncio.gather( - asyncio.to_thread(bull_fn, state), - asyncio.to_thread(bear_fn, state), - ) + # Snapshot into plain dicts — LangGraph state proxies serialize + # concurrent dict access, which would force sequential execution. + state_snap = { + "investment_debate_state": dict(state.get("investment_debate_state", {})), + "market_report": state.get("market_report", ""), + "sentiment_report": state.get("sentiment_report", ""), + "news_report": state.get("news_report", ""), + "fundamentals_report": state.get("fundamentals_report", ""), + } + + loop = asyncio.get_running_loop() + with ThreadPoolExecutor(max_workers=2) as pool: + bull_result, bear_result = await asyncio.gather( + loop.run_in_executor(pool, bull_fn, state_snap), + loop.run_in_executor(pool, bear_fn, state_snap), + ) bull_debate = bull_result["investment_debate_state"] bear_debate = bear_result["investment_debate_state"] @@ -119,11 +136,24 @@ def create_parallel_risk_node(aggressive_fn, conservative_fn, neutral_fn): """ async def parallel_risk_node(state): - agg_result, con_result, neu_result = await asyncio.gather( - asyncio.to_thread(aggressive_fn, state), - asyncio.to_thread(conservative_fn, state), - asyncio.to_thread(neutral_fn, state), - ) + # Snapshot into plain dicts — LangGraph state proxies serialize + # concurrent dict access, which would force sequential execution. + state_snap = { + "risk_debate_state": dict(state.get("risk_debate_state", {})), + "market_report": state.get("market_report", ""), + "sentiment_report": state.get("sentiment_report", ""), + "news_report": state.get("news_report", ""), + "fundamentals_report": state.get("fundamentals_report", ""), + "trader_investment_plan": state.get("trader_investment_plan", ""), + } + + loop = asyncio.get_running_loop() + with ThreadPoolExecutor(max_workers=3) as pool: + agg_result, con_result, neu_result = await asyncio.gather( + loop.run_in_executor(pool, aggressive_fn, state_snap), + loop.run_in_executor(pool, conservative_fn, state_snap), + loop.run_in_executor(pool, neutral_fn, state_snap), + ) agg_debate = agg_result["risk_debate_state"] con_debate = con_result["risk_debate_state"]