diff --git a/app.py b/app.py index 5188c389..a2f66d40 100644 --- a/app.py +++ b/app.py @@ -239,6 +239,8 @@ async def run_analysis(analysis_id: str, ticker: str, trade_date: str): trader_emitted = True buf.update_agent_status("Trader", "completed") buf.update_agent_status("Aggressive Analyst", "in_progress") + buf.update_agent_status("Conservative Analyst", "in_progress") + buf.update_agent_status("Neutral Analyst", "in_progress") buf.update_report_section("trader_investment_plan", chunk["trader_investment_plan"]) evt = { "type": "trader", diff --git a/cli/main.py b/cli/main.py index fb97d189..68b364dd 100644 --- a/cli/main.py +++ b/cli/main.py @@ -820,6 +820,7 @@ def update_analyst_statuses(message_buffer, chunk): if not found_active and selected: if message_buffer.agent_status.get("Bull Researcher") == "pending": message_buffer.update_agent_status("Bull Researcher", "in_progress") + message_buffer.update_agent_status("Bear Researcher", "in_progress") def extract_content_string(content): """Extract string content from various message formats. diff --git a/tradingagents/graph/parallel_analysts.py b/tradingagents/graph/parallel_analysts.py index 1fddd8a6..246058f4 100644 --- a/tradingagents/graph/parallel_analysts.py +++ b/tradingagents/graph/parallel_analysts.py @@ -1,7 +1,9 @@ -"""Parallel analyst execution for TradingAgents. +"""Parallel execution nodes for TradingAgents. -Runs all analyst agents (Market, Social, News, Fundamentals) concurrently -instead of sequentially, cutting the analyst phase from ~8-9 min to ~2-3 min. +Provides parallel wrappers for: +- Analyst phase (Market, Social, News, Fundamentals) +- Research debate phase (Bull + Bear) +- Risk debate phase (Aggressive + Conservative + Neutral) """ import asyncio @@ -74,3 +76,81 @@ def create_parallel_analyst_node(analyst_fns, tool_nodes, selected_analysts): return merged return parallel_analysts_node + + +def create_parallel_research_node(bull_fn, bear_fn): + """Create a node that runs Bull and Bear researchers in parallel. + + Both agents receive the same state (reports + empty debate state) and + produce independent arguments. Results are merged into a single + investment_debate_state with both histories and count=2. + """ + + async def parallel_research_node(state): + bull_result, bear_result = await asyncio.gather( + asyncio.to_thread(bull_fn, state), + asyncio.to_thread(bear_fn, state), + ) + + bull_debate = bull_result["investment_debate_state"] + bear_debate = bear_result["investment_debate_state"] + + merged_debate = { + "bull_history": bull_debate.get("bull_history", ""), + "bear_history": bear_debate.get("bear_history", ""), + "history": bull_debate.get("bull_history", "") + + "\n" + + bear_debate.get("bear_history", ""), + "current_response": bear_debate.get("current_response", ""), + "judge_decision": "", + "count": 2, + } + return {"investment_debate_state": merged_debate} + + return parallel_research_node + + +def create_parallel_risk_node(aggressive_fn, conservative_fn, neutral_fn): + """Create a node that runs all 3 risk analysts in parallel. + + All agents receive the same state (trader plan + empty risk debate state) + and produce independent arguments. Results are merged into a single + risk_debate_state with all histories and count=3. + """ + + async def parallel_risk_node(state): + agg_result, con_result, neu_result = await asyncio.gather( + asyncio.to_thread(aggressive_fn, state), + asyncio.to_thread(conservative_fn, state), + asyncio.to_thread(neutral_fn, state), + ) + + agg_debate = agg_result["risk_debate_state"] + con_debate = con_result["risk_debate_state"] + neu_debate = neu_result["risk_debate_state"] + + merged_debate = { + "aggressive_history": agg_debate.get("aggressive_history", ""), + "conservative_history": con_debate.get("conservative_history", ""), + "neutral_history": neu_debate.get("neutral_history", ""), + "history": agg_debate.get("aggressive_history", "") + + "\n" + + con_debate.get("conservative_history", "") + + "\n" + + neu_debate.get("neutral_history", ""), + "latest_speaker": "Neutral", + "current_aggressive_response": agg_debate.get( + "current_aggressive_response", "" + ), + "current_conservative_response": con_debate.get( + "current_conservative_response", "" + ), + "current_neutral_response": neu_debate.get( + "current_neutral_response", "" + ), + "judge_decision": "", + "count": 3, + } + return {"risk_debate_state": merged_debate} + + return parallel_risk_node diff --git a/tradingagents/graph/setup.py b/tradingagents/graph/setup.py index 41c69140..08f6815c 100644 --- a/tradingagents/graph/setup.py +++ b/tradingagents/graph/setup.py @@ -9,7 +9,11 @@ from tradingagents.agents import * from tradingagents.agents.utils.agent_states import AgentState from .conditional_logic import ConditionalLogic -from .parallel_analysts import create_parallel_analyst_node +from .parallel_analysts import ( + create_parallel_analyst_node, + create_parallel_research_node, + create_parallel_risk_node, +) class GraphSetup: @@ -126,23 +130,49 @@ class GraphSetup: ) workflow.add_node(f"tools_{analyst_type}", tool_nodes[analyst_type]) - # Add other nodes - workflow.add_node("Bull Researcher", bull_researcher_node) - workflow.add_node("Bear Researcher", bear_researcher_node) - workflow.add_node("Research Manager", research_manager_node) - workflow.add_node("Trader", trader_node) - workflow.add_node("Aggressive Analyst", aggressive_analyst) - workflow.add_node("Neutral Analyst", neutral_analyst) - workflow.add_node("Conservative Analyst", conservative_analyst) - workflow.add_node("Risk Judge", risk_manager_node) - - # Define edges if parallel: - # Parallel: START → Parallel Analysts → Bull Researcher + # --- Parallel mode --- + # Analysts: single parallel node + # Research: Bull+Bear run concurrently in one node + # Risk: Agg+Con+Neu run concurrently in one node + + parallel_research = create_parallel_research_node( + bull_researcher_node, bear_researcher_node + ) + parallel_risk = create_parallel_risk_node( + aggressive_analyst, conservative_analyst, neutral_analyst + ) + + workflow.add_node("Research Manager", research_manager_node) + workflow.add_node("Trader", trader_node) + workflow.add_node("Parallel Research", parallel_research) + workflow.add_node("Parallel Risk", parallel_risk) + workflow.add_node("Risk Judge", risk_manager_node) + + # Parallel Analysts → Parallel Research → Manager → Trader → Parallel Risk → Judge → END workflow.add_edge(START, "Parallel Analysts") - workflow.add_edge("Parallel Analysts", "Bull Researcher") + workflow.add_edge("Parallel Analysts", "Parallel Research") + workflow.add_edge("Parallel Research", "Research Manager") + workflow.add_edge("Research Manager", "Trader") + workflow.add_edge("Trader", "Parallel Risk") + workflow.add_edge("Parallel Risk", "Risk Judge") + workflow.add_edge("Risk Judge", END) + else: - # Sequential: START → Analyst 1 → ... → Analyst N → Bull Researcher + # --- Sequential mode --- + # Individual analyst nodes with tool-calling loops + # Bull/Bear debate with conditional routing + # Agg/Con/Neu risk debate with conditional routing + + workflow.add_node("Bull Researcher", bull_researcher_node) + workflow.add_node("Bear Researcher", bear_researcher_node) + workflow.add_node("Research Manager", research_manager_node) + workflow.add_node("Trader", trader_node) + workflow.add_node("Aggressive Analyst", aggressive_analyst) + workflow.add_node("Neutral Analyst", neutral_analyst) + workflow.add_node("Conservative Analyst", conservative_analyst) + workflow.add_node("Risk Judge", risk_manager_node) + first_analyst = selected_analysts[0] workflow.add_edge(START, f"{first_analyst.capitalize()} Analyst") @@ -164,51 +194,50 @@ class GraphSetup: else: workflow.add_edge(current_clear, "Bull Researcher") - # Add remaining edges (same for both modes) - workflow.add_conditional_edges( - "Bull Researcher", - self.conditional_logic.should_continue_debate, - { - "Bear Researcher": "Bear Researcher", - "Research Manager": "Research Manager", - }, - ) - workflow.add_conditional_edges( - "Bear Researcher", - self.conditional_logic.should_continue_debate, - { - "Bull Researcher": "Bull Researcher", - "Research Manager": "Research Manager", - }, - ) - workflow.add_edge("Research Manager", "Trader") - workflow.add_edge("Trader", "Aggressive Analyst") - workflow.add_conditional_edges( - "Aggressive Analyst", - self.conditional_logic.should_continue_risk_analysis, - { - "Conservative Analyst": "Conservative Analyst", - "Risk Judge": "Risk Judge", - }, - ) - workflow.add_conditional_edges( - "Conservative Analyst", - self.conditional_logic.should_continue_risk_analysis, - { - "Neutral Analyst": "Neutral Analyst", - "Risk Judge": "Risk Judge", - }, - ) - workflow.add_conditional_edges( - "Neutral Analyst", - self.conditional_logic.should_continue_risk_analysis, - { - "Aggressive Analyst": "Aggressive Analyst", - "Risk Judge": "Risk Judge", - }, - ) + workflow.add_conditional_edges( + "Bull Researcher", + self.conditional_logic.should_continue_debate, + { + "Bear Researcher": "Bear Researcher", + "Research Manager": "Research Manager", + }, + ) + workflow.add_conditional_edges( + "Bear Researcher", + self.conditional_logic.should_continue_debate, + { + "Bull Researcher": "Bull Researcher", + "Research Manager": "Research Manager", + }, + ) + workflow.add_edge("Research Manager", "Trader") + workflow.add_edge("Trader", "Aggressive Analyst") + workflow.add_conditional_edges( + "Aggressive Analyst", + self.conditional_logic.should_continue_risk_analysis, + { + "Conservative Analyst": "Conservative Analyst", + "Risk Judge": "Risk Judge", + }, + ) + workflow.add_conditional_edges( + "Conservative Analyst", + self.conditional_logic.should_continue_risk_analysis, + { + "Neutral Analyst": "Neutral Analyst", + "Risk Judge": "Risk Judge", + }, + ) + workflow.add_conditional_edges( + "Neutral Analyst", + self.conditional_logic.should_continue_risk_analysis, + { + "Aggressive Analyst": "Aggressive Analyst", + "Risk Judge": "Risk Judge", + }, + ) - workflow.add_edge("Risk Judge", END) + workflow.add_edge("Risk Judge", END) # Compile and return return workflow.compile()