From 93b87d511947ecba58e455d742a3b62af37509fe Mon Sep 17 00:00:00 2001 From: Yijia Xiao Date: Tue, 3 Feb 2026 19:39:25 +0000 Subject: [PATCH] fix: analyst status tracking and message deduplication - Add update_analyst_statuses() for unified status logic (pending/in_progress/completed) - Normalize analyst selection to predefined ANALYST_ORDER for consistent execution - Add message deduplication to prevent duplicates from stream_mode=values - Restructure streaming loop so state handlers run on every chunk --- cli/main.py | 418 +++++++++++++++++++++++----------------------------- 1 file changed, 184 insertions(+), 234 deletions(-) diff --git a/cli/main.py b/cli/main.py index 614b43f2..fa662564 100644 --- a/cli/main.py +++ b/cli/main.py @@ -79,6 +79,7 @@ class MessageBuffer: self.current_agent = None self.report_sections = {} self.selected_analysts = [] + self._last_message_id = None def init_for_analysis(self, selected_analysts): """Initialize agent status and report sections based on selected analysts. @@ -113,6 +114,7 @@ class MessageBuffer: self.current_agent = None self.messages.clear() self.tool_calls.clear() + self._last_message_id = None def get_completed_reports_count(self): """Count reports that are finalized (their finalizing agent is completed). @@ -361,60 +363,31 @@ def update_display(layout, spinner_text=None, stats_handler=None, start_time=Non # Add tool calls for timestamp, tool_name, args in message_buffer.tool_calls: - # Truncate tool call args if too long - if isinstance(args, str) and len(args) > 100: - args = args[:97] + "..." - all_messages.append((timestamp, "Tool", f"{tool_name}: {args}")) + formatted_args = format_tool_args(args) + all_messages.append((timestamp, "Tool", f"{tool_name}: {formatted_args}")) # Add regular messages for timestamp, msg_type, content in message_buffer.messages: - # Convert content to string if it's not already - content_str = content - if isinstance(content, list): - # Handle list of content blocks (Anthropic format) - text_parts = [] - for item in content: - if isinstance(item, dict): - if item.get('type') == 'text': - text_parts.append(item.get('text', '')) - elif item.get('type') == 'tool_use': - text_parts.append(f"[Tool: {item.get('name', 'unknown')}]") - else: - text_parts.append(str(item)) - content_str = ' '.join(text_parts) - elif not isinstance(content_str, str): - content_str = str(content) - - # Truncate message content if too long + content_str = str(content) if content else "" if len(content_str) > 200: content_str = content_str[:197] + "..." all_messages.append((timestamp, msg_type, content_str)) - # Sort by timestamp - all_messages.sort(key=lambda x: x[0]) + # Sort by timestamp descending (newest first) + all_messages.sort(key=lambda x: x[0], reverse=True) # Calculate how many messages we can show based on available space - # Start with a reasonable number and adjust based on content length - max_messages = 12 # Increased from 8 to better fill the space + max_messages = 12 - # Get the last N messages that will fit in the panel - recent_messages = all_messages[-max_messages:] + # Get the first N messages (newest ones) + recent_messages = all_messages[:max_messages] - # Add messages to table + # Add messages to table (already in newest-first order) for timestamp, msg_type, content in recent_messages: # Format content with word wrapping wrapped_content = Text(content, overflow="fold") messages_table.add_row(timestamp, msg_type, wrapped_content) - if spinner_text: - messages_table.add_row("", "Spinner", spinner_text) - - # Add a footer to indicate if messages were truncated - if len(all_messages) > max_messages: - messages_table.footer = ( - f"[dim]Showing last {max_messages} of {len(all_messages)} messages[/dim]" - ) - layout["messages"].update( Panel( messages_table, @@ -831,11 +804,62 @@ def display_complete_report(final_state): def update_research_team_status(status): - """Update status for all research team members and trader.""" - research_team = ["Bull Researcher", "Bear Researcher", "Research Manager", "Trader"] + """Update status for research team members (not Trader).""" + research_team = ["Bull Researcher", "Bear Researcher", "Research Manager"] for agent in research_team: message_buffer.update_agent_status(agent, status) + +# Ordered list of analysts for status transitions +ANALYST_ORDER = ["market", "social", "news", "fundamentals"] +ANALYST_AGENT_NAMES = { + "market": "Market Analyst", + "social": "Social Analyst", + "news": "News Analyst", + "fundamentals": "Fundamentals Analyst", +} +ANALYST_REPORT_MAP = { + "market": "market_report", + "social": "sentiment_report", + "news": "news_report", + "fundamentals": "fundamentals_report", +} + + +def update_analyst_statuses(message_buffer, chunk): + """Update all analyst statuses based on current report state. + + Logic: + - Analysts with reports = completed + - First analyst without report = in_progress + - Remaining analysts without reports = pending + - When all analysts done, set Bull Researcher to in_progress + """ + selected = message_buffer.selected_analysts + found_active = False + + for analyst_key in ANALYST_ORDER: + if analyst_key not in selected: + continue + + agent_name = ANALYST_AGENT_NAMES[analyst_key] + report_key = ANALYST_REPORT_MAP[analyst_key] + has_report = bool(chunk.get(report_key)) + + if has_report: + message_buffer.update_agent_status(agent_name, "completed") + message_buffer.update_report_section(report_key, chunk[report_key]) + elif not found_active: + message_buffer.update_agent_status(agent_name, "in_progress") + found_active = True + else: + message_buffer.update_agent_status(agent_name, "pending") + + # When all analysts complete, transition research team to in_progress + if not found_active and selected: + if message_buffer.agent_status.get("Bull Researcher") == "pending": + message_buffer.update_agent_status("Bull Researcher", "in_progress") + def extract_content_string(content): """Extract string content from various message formats. Returns None if no meaningful text content is found. @@ -877,6 +901,40 @@ def extract_content_string(content): return str(content).strip() if not is_empty(content) else None + +def classify_message_type(message) -> tuple[str, str | None]: + """Classify LangChain message into display type and extract content. + + Returns: + (type, content) - type is one of: User, Agent, Data, Control + - content is extracted string or None + """ + from langchain_core.messages import AIMessage, HumanMessage, ToolMessage + + content = extract_content_string(getattr(message, 'content', None)) + + if isinstance(message, HumanMessage): + if content and content.strip() == "Continue": + return ("Control", content) + return ("User", content) + + if isinstance(message, ToolMessage): + return ("Data", content) + + if isinstance(message, AIMessage): + return ("Agent", content) + + # Fallback for unknown types + return ("System", content) + + +def format_tool_args(args, max_length=80) -> str: + """Format tool arguments for terminal display.""" + result = str(args) + if len(result) > max_length: + return result[:max_length - 3] + "..." + return result + def run_analysis(): # First get all user selections selections = get_user_selections() @@ -896,16 +954,19 @@ def run_analysis(): # Create stats callback handler for tracking LLM/tool calls stats_handler = StatsCallbackHandler() + # Normalize analyst selection to predefined order (selection is a 'set', order is fixed) + selected_set = {analyst.value for analyst in selections["analysts"]} + selected_analyst_keys = [a for a in ANALYST_ORDER if a in selected_set] + # Initialize the graph with callbacks bound to LLMs graph = TradingAgentsGraph( - [analyst.value for analyst in selections["analysts"]], + selected_analyst_keys, config=config, debug=True, callbacks=[stats_handler], ) # Initialize message buffer with selected analysts - selected_analyst_keys = [analyst.value for analyst in selections["analysts"]] message_buffer.init_for_analysis(selected_analyst_keys) # Track start time for elapsed display @@ -998,216 +1059,105 @@ def run_analysis(): # Stream the analysis trace = [] for chunk in graph.graph.stream(init_agent_state, **args): + # Process messages if present (skip duplicates via message ID) if len(chunk["messages"]) > 0: - # Get the last message from the chunk last_message = chunk["messages"][-1] + msg_id = getattr(last_message, "id", None) - # Extract message content and type - content = None - msg_type = "Reasoning" + if msg_id != message_buffer._last_message_id: + message_buffer._last_message_id = msg_id - if hasattr(last_message, "content"): - content = extract_content_string(last_message.content) - elif last_message is not None: - raw = str(last_message).strip() - if raw and raw != '{}': - content = raw - msg_type = "System" + # Add message to buffer + msg_type, content = classify_message_type(last_message) + if content and content.strip(): + message_buffer.add_message(msg_type, content) - # Only add message to buffer if there's actual content - if content: - message_buffer.add_message(msg_type, content) + # Handle tool calls + if hasattr(last_message, "tool_calls") and last_message.tool_calls: + for tool_call in last_message.tool_calls: + if isinstance(tool_call, dict): + message_buffer.add_tool_call( + tool_call["name"], tool_call["args"] + ) + else: + message_buffer.add_tool_call(tool_call.name, tool_call.args) - # Handle tool calls separately - if hasattr(last_message, "tool_calls") and last_message.tool_calls: - for tool_call in last_message.tool_calls: - # Handle both dictionary and object tool calls - if isinstance(tool_call, dict): - message_buffer.add_tool_call( - tool_call["name"], tool_call["args"] - ) - else: - message_buffer.add_tool_call(tool_call.name, tool_call.args) + # Update analyst statuses based on report state (runs on every chunk) + update_analyst_statuses(message_buffer, chunk) - # Update reports and agent status based on chunk content - # Analyst Team Reports - if "market_report" in chunk and chunk["market_report"]: - message_buffer.update_report_section( - "market_report", chunk["market_report"] - ) - message_buffer.update_agent_status("Market Analyst", "completed") - # Set next analyst to in_progress - if "social" in selections["analysts"]: - message_buffer.update_agent_status( - "Social Analyst", "in_progress" - ) + # Research Team - Handle Investment Debate State + if chunk.get("investment_debate_state"): + debate_state = chunk["investment_debate_state"] + bull_hist = debate_state.get("bull_history", "").strip() + bear_hist = debate_state.get("bear_history", "").strip() + judge = debate_state.get("judge_decision", "").strip() - if "sentiment_report" in chunk and chunk["sentiment_report"]: - message_buffer.update_report_section( - "sentiment_report", chunk["sentiment_report"] - ) - message_buffer.update_agent_status("Social Analyst", "completed") - # Set next analyst to in_progress - if "news" in selections["analysts"]: - message_buffer.update_agent_status( - "News Analyst", "in_progress" - ) - - if "news_report" in chunk and chunk["news_report"]: - message_buffer.update_report_section( - "news_report", chunk["news_report"] - ) - message_buffer.update_agent_status("News Analyst", "completed") - # Set next analyst to in_progress - if "fundamentals" in selections["analysts"]: - message_buffer.update_agent_status( - "Fundamentals Analyst", "in_progress" - ) - - if "fundamentals_report" in chunk and chunk["fundamentals_report"]: - message_buffer.update_report_section( - "fundamentals_report", chunk["fundamentals_report"] - ) - message_buffer.update_agent_status( - "Fundamentals Analyst", "completed" - ) - # Set all research team members to in_progress + # Only update status when there's actual content + if bull_hist or bear_hist: update_research_team_status("in_progress") - - # Research Team - Handle Investment Debate State - if ( - "investment_debate_state" in chunk - and chunk["investment_debate_state"] - ): - debate_state = chunk["investment_debate_state"] - - # Update Bull Researcher status and report - if "bull_history" in debate_state and debate_state["bull_history"]: - update_research_team_status("in_progress") - message_buffer.update_report_section( - "investment_plan", - f"### Bull Researcher Analysis\n{debate_state['bull_history']}", - ) - - # Update Bear Researcher status and report - if "bear_history" in debate_state and debate_state["bear_history"]: - update_research_team_status("in_progress") - message_buffer.update_report_section( - "investment_plan", - f"### Bear Researcher Analysis\n{debate_state['bear_history']}", - ) - - # Update Research Manager status and final decision - if ( - "judge_decision" in debate_state - and debate_state["judge_decision"] - ): - update_research_team_status("in_progress") - message_buffer.update_report_section( - "investment_plan", - f"### Research Manager Decision\n{debate_state['judge_decision']}", - ) - update_research_team_status("completed") - # Set first risk analyst to in_progress - message_buffer.update_agent_status( - "Aggressive Analyst", "in_progress" - ) - - # Trading Team - if ( - "trader_investment_plan" in chunk - and chunk["trader_investment_plan"] - ): + if bull_hist: message_buffer.update_report_section( - "trader_investment_plan", chunk["trader_investment_plan"] + "investment_plan", f"### Bull Researcher Analysis\n{bull_hist}" ) - # Set first risk analyst to in_progress + if bear_hist: + message_buffer.update_report_section( + "investment_plan", f"### Bear Researcher Analysis\n{bear_hist}" + ) + if judge: + message_buffer.update_report_section( + "investment_plan", f"### Research Manager Decision\n{judge}" + ) + update_research_team_status("completed") + message_buffer.update_agent_status("Trader", "in_progress") + + # Trading Team + if chunk.get("trader_investment_plan"): + message_buffer.update_report_section( + "trader_investment_plan", chunk["trader_investment_plan"] + ) + if message_buffer.agent_status.get("Trader") != "completed": + message_buffer.update_agent_status("Trader", "completed") message_buffer.update_agent_status("Aggressive Analyst", "in_progress") - # Risk Management Team - Handle Risk Debate State - if "risk_debate_state" in chunk and chunk["risk_debate_state"]: - risk_state = chunk["risk_debate_state"] + # Risk Management Team - Handle Risk Debate State + if chunk.get("risk_debate_state"): + risk_state = chunk["risk_debate_state"] + agg_hist = risk_state.get("aggressive_history", "").strip() + con_hist = risk_state.get("conservative_history", "").strip() + neu_hist = risk_state.get("neutral_history", "").strip() + judge = risk_state.get("judge_decision", "").strip() - # Update Aggressive Analyst status and report - if ( - "current_aggressive_response" in risk_state - and risk_state["current_aggressive_response"] - ): - message_buffer.update_agent_status( - "Aggressive Analyst", "in_progress" - ) - message_buffer.add_message( - "Reasoning", - f"Aggressive Analyst: {risk_state['current_aggressive_response']}", - ) - # Update risk report with aggressive analyst's latest analysis only + if agg_hist: + if message_buffer.agent_status.get("Aggressive Analyst") != "completed": + message_buffer.update_agent_status("Aggressive Analyst", "in_progress") + message_buffer.update_report_section( + "final_trade_decision", f"### Aggressive Analyst Analysis\n{agg_hist}" + ) + if con_hist: + if message_buffer.agent_status.get("Conservative Analyst") != "completed": + message_buffer.update_agent_status("Conservative Analyst", "in_progress") + message_buffer.update_report_section( + "final_trade_decision", f"### Conservative Analyst Analysis\n{con_hist}" + ) + if neu_hist: + if message_buffer.agent_status.get("Neutral Analyst") != "completed": + message_buffer.update_agent_status("Neutral Analyst", "in_progress") + message_buffer.update_report_section( + "final_trade_decision", f"### Neutral Analyst Analysis\n{neu_hist}" + ) + if judge: + if message_buffer.agent_status.get("Portfolio Manager") != "completed": + message_buffer.update_agent_status("Portfolio Manager", "in_progress") message_buffer.update_report_section( - "final_trade_decision", - f"### Aggressive Analyst Analysis\n{risk_state['current_aggressive_response']}", + "final_trade_decision", f"### Portfolio Manager Decision\n{judge}" ) - - # Update Conservative Analyst status and report - if ( - "current_conservative_response" in risk_state - and risk_state["current_conservative_response"] - ): - message_buffer.update_agent_status( - "Conservative Analyst", "in_progress" - ) - message_buffer.add_message( - "Reasoning", - f"Conservative Analyst: {risk_state['current_conservative_response']}", - ) - # Update risk report with conservative analyst's latest analysis only - message_buffer.update_report_section( - "final_trade_decision", - f"### Conservative Analyst Analysis\n{risk_state['current_conservative_response']}", - ) - - # Update Neutral Analyst status and report - if ( - "current_neutral_response" in risk_state - and risk_state["current_neutral_response"] - ): - message_buffer.update_agent_status( - "Neutral Analyst", "in_progress" - ) - message_buffer.add_message( - "Reasoning", - f"Neutral Analyst: {risk_state['current_neutral_response']}", - ) - # Update risk report with neutral analyst's latest analysis only - message_buffer.update_report_section( - "final_trade_decision", - f"### Neutral Analyst Analysis\n{risk_state['current_neutral_response']}", - ) - - # Update Portfolio Manager status and final decision - if "judge_decision" in risk_state and risk_state["judge_decision"]: - message_buffer.update_agent_status( - "Portfolio Manager", "in_progress" - ) - message_buffer.add_message( - "Reasoning", - f"Portfolio Manager: {risk_state['judge_decision']}", - ) - # Update risk report with final decision only - message_buffer.update_report_section( - "final_trade_decision", - f"### Portfolio Manager Decision\n{risk_state['judge_decision']}", - ) - # Mark risk analysts as completed message_buffer.update_agent_status("Aggressive Analyst", "completed") message_buffer.update_agent_status("Conservative Analyst", "completed") - message_buffer.update_agent_status( - "Neutral Analyst", "completed" - ) - message_buffer.update_agent_status( - "Portfolio Manager", "completed" - ) + message_buffer.update_agent_status("Neutral Analyst", "completed") + message_buffer.update_agent_status("Portfolio Manager", "completed") - # Update the display - update_display(layout, stats_handler=stats_handler, start_time=start_time) + # Update the display + update_display(layout, stats_handler=stats_handler, start_time=start_time) trace.append(chunk) @@ -1220,7 +1170,7 @@ def run_analysis(): message_buffer.update_agent_status(agent, "completed") message_buffer.add_message( - "Analysis", f"Completed analysis for {selections['analysis_date']}" + "System", f"Completed analysis for {selections['analysis_date']}" ) # Update final report sections