Fix parallel research/risk: snapshot state to avoid proxy serialization
LangGraph state proxies serialize concurrent dict access, forcing threads to run sequentially. Fix by snapshotting needed fields into plain dicts before dispatching to ThreadPoolExecutor — same pattern used by the working parallel analysts node. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
parent
3cd0c19b35
commit
7ff05328a8
|
|
@ -7,8 +7,13 @@ Provides parallel wrappers for:
|
||||||
"""
|
"""
|
||||||
|
|
||||||
import asyncio
|
import asyncio
|
||||||
|
import logging
|
||||||
|
from concurrent.futures import ThreadPoolExecutor
|
||||||
|
|
||||||
from langchain_core.messages import HumanMessage, RemoveMessage
|
from langchain_core.messages import HumanMessage, RemoveMessage
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
def create_parallel_analyst_node(analyst_fns, tool_nodes, selected_analysts):
|
def create_parallel_analyst_node(analyst_fns, tool_nodes, selected_analysts):
|
||||||
"""Create a single LangGraph node that runs all analysts in parallel.
|
"""Create a single LangGraph node that runs all analysts in parallel.
|
||||||
|
|
@ -87,10 +92,22 @@ def create_parallel_research_node(bull_fn, bear_fn):
|
||||||
"""
|
"""
|
||||||
|
|
||||||
async def parallel_research_node(state):
|
async def parallel_research_node(state):
|
||||||
bull_result, bear_result = await asyncio.gather(
|
# Snapshot into plain dicts — LangGraph state proxies serialize
|
||||||
asyncio.to_thread(bull_fn, state),
|
# concurrent dict access, which would force sequential execution.
|
||||||
asyncio.to_thread(bear_fn, state),
|
state_snap = {
|
||||||
)
|
"investment_debate_state": dict(state.get("investment_debate_state", {})),
|
||||||
|
"market_report": state.get("market_report", ""),
|
||||||
|
"sentiment_report": state.get("sentiment_report", ""),
|
||||||
|
"news_report": state.get("news_report", ""),
|
||||||
|
"fundamentals_report": state.get("fundamentals_report", ""),
|
||||||
|
}
|
||||||
|
|
||||||
|
loop = asyncio.get_running_loop()
|
||||||
|
with ThreadPoolExecutor(max_workers=2) as pool:
|
||||||
|
bull_result, bear_result = await asyncio.gather(
|
||||||
|
loop.run_in_executor(pool, bull_fn, state_snap),
|
||||||
|
loop.run_in_executor(pool, bear_fn, state_snap),
|
||||||
|
)
|
||||||
|
|
||||||
bull_debate = bull_result["investment_debate_state"]
|
bull_debate = bull_result["investment_debate_state"]
|
||||||
bear_debate = bear_result["investment_debate_state"]
|
bear_debate = bear_result["investment_debate_state"]
|
||||||
|
|
@ -119,11 +136,24 @@ def create_parallel_risk_node(aggressive_fn, conservative_fn, neutral_fn):
|
||||||
"""
|
"""
|
||||||
|
|
||||||
async def parallel_risk_node(state):
|
async def parallel_risk_node(state):
|
||||||
agg_result, con_result, neu_result = await asyncio.gather(
|
# Snapshot into plain dicts — LangGraph state proxies serialize
|
||||||
asyncio.to_thread(aggressive_fn, state),
|
# concurrent dict access, which would force sequential execution.
|
||||||
asyncio.to_thread(conservative_fn, state),
|
state_snap = {
|
||||||
asyncio.to_thread(neutral_fn, state),
|
"risk_debate_state": dict(state.get("risk_debate_state", {})),
|
||||||
)
|
"market_report": state.get("market_report", ""),
|
||||||
|
"sentiment_report": state.get("sentiment_report", ""),
|
||||||
|
"news_report": state.get("news_report", ""),
|
||||||
|
"fundamentals_report": state.get("fundamentals_report", ""),
|
||||||
|
"trader_investment_plan": state.get("trader_investment_plan", ""),
|
||||||
|
}
|
||||||
|
|
||||||
|
loop = asyncio.get_running_loop()
|
||||||
|
with ThreadPoolExecutor(max_workers=3) as pool:
|
||||||
|
agg_result, con_result, neu_result = await asyncio.gather(
|
||||||
|
loop.run_in_executor(pool, aggressive_fn, state_snap),
|
||||||
|
loop.run_in_executor(pool, conservative_fn, state_snap),
|
||||||
|
loop.run_in_executor(pool, neutral_fn, state_snap),
|
||||||
|
)
|
||||||
|
|
||||||
agg_debate = agg_result["risk_debate_state"]
|
agg_debate = agg_result["risk_debate_state"]
|
||||||
con_debate = con_result["risk_debate_state"]
|
con_debate = con_result["risk_debate_state"]
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue