fix: analyst status tracking and message deduplication

- Add update_analyst_statuses() for unified status logic (pending/in_progress/completed)
- Normalize analyst selection to predefined ANALYST_ORDER for consistent execution
- Add message deduplication to prevent duplicates from stream_mode=values
- Restructure streaming loop so state handlers run on every chunk
This commit is contained in:
Yijia Xiao 2026-02-03 19:39:25 +00:00
parent 54cdb146d0
commit 93b87d5119
No known key found for this signature in database
1 changed files with 184 additions and 234 deletions

View File

@ -79,6 +79,7 @@ class MessageBuffer:
self.current_agent = None self.current_agent = None
self.report_sections = {} self.report_sections = {}
self.selected_analysts = [] self.selected_analysts = []
self._last_message_id = None
def init_for_analysis(self, selected_analysts): def init_for_analysis(self, selected_analysts):
"""Initialize agent status and report sections based on selected analysts. """Initialize agent status and report sections based on selected analysts.
@ -113,6 +114,7 @@ class MessageBuffer:
self.current_agent = None self.current_agent = None
self.messages.clear() self.messages.clear()
self.tool_calls.clear() self.tool_calls.clear()
self._last_message_id = None
def get_completed_reports_count(self): def get_completed_reports_count(self):
"""Count reports that are finalized (their finalizing agent is completed). """Count reports that are finalized (their finalizing agent is completed).
@ -361,60 +363,31 @@ def update_display(layout, spinner_text=None, stats_handler=None, start_time=Non
# Add tool calls # Add tool calls
for timestamp, tool_name, args in message_buffer.tool_calls: for timestamp, tool_name, args in message_buffer.tool_calls:
# Truncate tool call args if too long formatted_args = format_tool_args(args)
if isinstance(args, str) and len(args) > 100: all_messages.append((timestamp, "Tool", f"{tool_name}: {formatted_args}"))
args = args[:97] + "..."
all_messages.append((timestamp, "Tool", f"{tool_name}: {args}"))
# Add regular messages # Add regular messages
for timestamp, msg_type, content in message_buffer.messages: for timestamp, msg_type, content in message_buffer.messages:
# Convert content to string if it's not already content_str = str(content) if content else ""
content_str = content
if isinstance(content, list):
# Handle list of content blocks (Anthropic format)
text_parts = []
for item in content:
if isinstance(item, dict):
if item.get('type') == 'text':
text_parts.append(item.get('text', ''))
elif item.get('type') == 'tool_use':
text_parts.append(f"[Tool: {item.get('name', 'unknown')}]")
else:
text_parts.append(str(item))
content_str = ' '.join(text_parts)
elif not isinstance(content_str, str):
content_str = str(content)
# Truncate message content if too long
if len(content_str) > 200: if len(content_str) > 200:
content_str = content_str[:197] + "..." content_str = content_str[:197] + "..."
all_messages.append((timestamp, msg_type, content_str)) all_messages.append((timestamp, msg_type, content_str))
# Sort by timestamp # Sort by timestamp descending (newest first)
all_messages.sort(key=lambda x: x[0]) all_messages.sort(key=lambda x: x[0], reverse=True)
# Calculate how many messages we can show based on available space # Calculate how many messages we can show based on available space
# Start with a reasonable number and adjust based on content length max_messages = 12
max_messages = 12 # Increased from 8 to better fill the space
# Get the last N messages that will fit in the panel # Get the first N messages (newest ones)
recent_messages = all_messages[-max_messages:] recent_messages = all_messages[:max_messages]
# Add messages to table # Add messages to table (already in newest-first order)
for timestamp, msg_type, content in recent_messages: for timestamp, msg_type, content in recent_messages:
# Format content with word wrapping # Format content with word wrapping
wrapped_content = Text(content, overflow="fold") wrapped_content = Text(content, overflow="fold")
messages_table.add_row(timestamp, msg_type, wrapped_content) messages_table.add_row(timestamp, msg_type, wrapped_content)
if spinner_text:
messages_table.add_row("", "Spinner", spinner_text)
# Add a footer to indicate if messages were truncated
if len(all_messages) > max_messages:
messages_table.footer = (
f"[dim]Showing last {max_messages} of {len(all_messages)} messages[/dim]"
)
layout["messages"].update( layout["messages"].update(
Panel( Panel(
messages_table, messages_table,
@ -831,11 +804,62 @@ def display_complete_report(final_state):
def update_research_team_status(status): def update_research_team_status(status):
"""Update status for all research team members and trader.""" """Update status for research team members (not Trader)."""
research_team = ["Bull Researcher", "Bear Researcher", "Research Manager", "Trader"] research_team = ["Bull Researcher", "Bear Researcher", "Research Manager"]
for agent in research_team: for agent in research_team:
message_buffer.update_agent_status(agent, status) message_buffer.update_agent_status(agent, status)
# Ordered list of analysts for status transitions
ANALYST_ORDER = ["market", "social", "news", "fundamentals"]
ANALYST_AGENT_NAMES = {
"market": "Market Analyst",
"social": "Social Analyst",
"news": "News Analyst",
"fundamentals": "Fundamentals Analyst",
}
ANALYST_REPORT_MAP = {
"market": "market_report",
"social": "sentiment_report",
"news": "news_report",
"fundamentals": "fundamentals_report",
}
def update_analyst_statuses(message_buffer, chunk):
"""Update all analyst statuses based on current report state.
Logic:
- Analysts with reports = completed
- First analyst without report = in_progress
- Remaining analysts without reports = pending
- When all analysts done, set Bull Researcher to in_progress
"""
selected = message_buffer.selected_analysts
found_active = False
for analyst_key in ANALYST_ORDER:
if analyst_key not in selected:
continue
agent_name = ANALYST_AGENT_NAMES[analyst_key]
report_key = ANALYST_REPORT_MAP[analyst_key]
has_report = bool(chunk.get(report_key))
if has_report:
message_buffer.update_agent_status(agent_name, "completed")
message_buffer.update_report_section(report_key, chunk[report_key])
elif not found_active:
message_buffer.update_agent_status(agent_name, "in_progress")
found_active = True
else:
message_buffer.update_agent_status(agent_name, "pending")
# When all analysts complete, transition research team to in_progress
if not found_active and selected:
if message_buffer.agent_status.get("Bull Researcher") == "pending":
message_buffer.update_agent_status("Bull Researcher", "in_progress")
def extract_content_string(content): def extract_content_string(content):
"""Extract string content from various message formats. """Extract string content from various message formats.
Returns None if no meaningful text content is found. Returns None if no meaningful text content is found.
@ -877,6 +901,40 @@ def extract_content_string(content):
return str(content).strip() if not is_empty(content) else None return str(content).strip() if not is_empty(content) else None
def classify_message_type(message) -> tuple[str, str | None]:
"""Classify LangChain message into display type and extract content.
Returns:
(type, content) - type is one of: User, Agent, Data, Control
- content is extracted string or None
"""
from langchain_core.messages import AIMessage, HumanMessage, ToolMessage
content = extract_content_string(getattr(message, 'content', None))
if isinstance(message, HumanMessage):
if content and content.strip() == "Continue":
return ("Control", content)
return ("User", content)
if isinstance(message, ToolMessage):
return ("Data", content)
if isinstance(message, AIMessage):
return ("Agent", content)
# Fallback for unknown types
return ("System", content)
def format_tool_args(args, max_length=80) -> str:
"""Format tool arguments for terminal display."""
result = str(args)
if len(result) > max_length:
return result[:max_length - 3] + "..."
return result
def run_analysis(): def run_analysis():
# First get all user selections # First get all user selections
selections = get_user_selections() selections = get_user_selections()
@ -896,16 +954,19 @@ def run_analysis():
# Create stats callback handler for tracking LLM/tool calls # Create stats callback handler for tracking LLM/tool calls
stats_handler = StatsCallbackHandler() stats_handler = StatsCallbackHandler()
# Normalize analyst selection to predefined order (selection is a 'set', order is fixed)
selected_set = {analyst.value for analyst in selections["analysts"]}
selected_analyst_keys = [a for a in ANALYST_ORDER if a in selected_set]
# Initialize the graph with callbacks bound to LLMs # Initialize the graph with callbacks bound to LLMs
graph = TradingAgentsGraph( graph = TradingAgentsGraph(
[analyst.value for analyst in selections["analysts"]], selected_analyst_keys,
config=config, config=config,
debug=True, debug=True,
callbacks=[stats_handler], callbacks=[stats_handler],
) )
# Initialize message buffer with selected analysts # Initialize message buffer with selected analysts
selected_analyst_keys = [analyst.value for analyst in selections["analysts"]]
message_buffer.init_for_analysis(selected_analyst_keys) message_buffer.init_for_analysis(selected_analyst_keys)
# Track start time for elapsed display # Track start time for elapsed display
@ -998,216 +1059,105 @@ def run_analysis():
# Stream the analysis # Stream the analysis
trace = [] trace = []
for chunk in graph.graph.stream(init_agent_state, **args): for chunk in graph.graph.stream(init_agent_state, **args):
# Process messages if present (skip duplicates via message ID)
if len(chunk["messages"]) > 0: if len(chunk["messages"]) > 0:
# Get the last message from the chunk
last_message = chunk["messages"][-1] last_message = chunk["messages"][-1]
msg_id = getattr(last_message, "id", None)
# Extract message content and type if msg_id != message_buffer._last_message_id:
content = None message_buffer._last_message_id = msg_id
msg_type = "Reasoning"
if hasattr(last_message, "content"): # Add message to buffer
content = extract_content_string(last_message.content) msg_type, content = classify_message_type(last_message)
elif last_message is not None: if content and content.strip():
raw = str(last_message).strip() message_buffer.add_message(msg_type, content)
if raw and raw != '{}':
content = raw
msg_type = "System"
# Only add message to buffer if there's actual content # Handle tool calls
if content: if hasattr(last_message, "tool_calls") and last_message.tool_calls:
message_buffer.add_message(msg_type, content) for tool_call in last_message.tool_calls:
if isinstance(tool_call, dict):
message_buffer.add_tool_call(
tool_call["name"], tool_call["args"]
)
else:
message_buffer.add_tool_call(tool_call.name, tool_call.args)
# Handle tool calls separately # Update analyst statuses based on report state (runs on every chunk)
if hasattr(last_message, "tool_calls") and last_message.tool_calls: update_analyst_statuses(message_buffer, chunk)
for tool_call in last_message.tool_calls:
# Handle both dictionary and object tool calls
if isinstance(tool_call, dict):
message_buffer.add_tool_call(
tool_call["name"], tool_call["args"]
)
else:
message_buffer.add_tool_call(tool_call.name, tool_call.args)
# Update reports and agent status based on chunk content # Research Team - Handle Investment Debate State
# Analyst Team Reports if chunk.get("investment_debate_state"):
if "market_report" in chunk and chunk["market_report"]: debate_state = chunk["investment_debate_state"]
message_buffer.update_report_section( bull_hist = debate_state.get("bull_history", "").strip()
"market_report", chunk["market_report"] bear_hist = debate_state.get("bear_history", "").strip()
) judge = debate_state.get("judge_decision", "").strip()
message_buffer.update_agent_status("Market Analyst", "completed")
# Set next analyst to in_progress
if "social" in selections["analysts"]:
message_buffer.update_agent_status(
"Social Analyst", "in_progress"
)
if "sentiment_report" in chunk and chunk["sentiment_report"]: # Only update status when there's actual content
message_buffer.update_report_section( if bull_hist or bear_hist:
"sentiment_report", chunk["sentiment_report"]
)
message_buffer.update_agent_status("Social Analyst", "completed")
# Set next analyst to in_progress
if "news" in selections["analysts"]:
message_buffer.update_agent_status(
"News Analyst", "in_progress"
)
if "news_report" in chunk and chunk["news_report"]:
message_buffer.update_report_section(
"news_report", chunk["news_report"]
)
message_buffer.update_agent_status("News Analyst", "completed")
# Set next analyst to in_progress
if "fundamentals" in selections["analysts"]:
message_buffer.update_agent_status(
"Fundamentals Analyst", "in_progress"
)
if "fundamentals_report" in chunk and chunk["fundamentals_report"]:
message_buffer.update_report_section(
"fundamentals_report", chunk["fundamentals_report"]
)
message_buffer.update_agent_status(
"Fundamentals Analyst", "completed"
)
# Set all research team members to in_progress
update_research_team_status("in_progress") update_research_team_status("in_progress")
if bull_hist:
# Research Team - Handle Investment Debate State
if (
"investment_debate_state" in chunk
and chunk["investment_debate_state"]
):
debate_state = chunk["investment_debate_state"]
# Update Bull Researcher status and report
if "bull_history" in debate_state and debate_state["bull_history"]:
update_research_team_status("in_progress")
message_buffer.update_report_section(
"investment_plan",
f"### Bull Researcher Analysis\n{debate_state['bull_history']}",
)
# Update Bear Researcher status and report
if "bear_history" in debate_state and debate_state["bear_history"]:
update_research_team_status("in_progress")
message_buffer.update_report_section(
"investment_plan",
f"### Bear Researcher Analysis\n{debate_state['bear_history']}",
)
# Update Research Manager status and final decision
if (
"judge_decision" in debate_state
and debate_state["judge_decision"]
):
update_research_team_status("in_progress")
message_buffer.update_report_section(
"investment_plan",
f"### Research Manager Decision\n{debate_state['judge_decision']}",
)
update_research_team_status("completed")
# Set first risk analyst to in_progress
message_buffer.update_agent_status(
"Aggressive Analyst", "in_progress"
)
# Trading Team
if (
"trader_investment_plan" in chunk
and chunk["trader_investment_plan"]
):
message_buffer.update_report_section( message_buffer.update_report_section(
"trader_investment_plan", chunk["trader_investment_plan"] "investment_plan", f"### Bull Researcher Analysis\n{bull_hist}"
) )
# Set first risk analyst to in_progress if bear_hist:
message_buffer.update_report_section(
"investment_plan", f"### Bear Researcher Analysis\n{bear_hist}"
)
if judge:
message_buffer.update_report_section(
"investment_plan", f"### Research Manager Decision\n{judge}"
)
update_research_team_status("completed")
message_buffer.update_agent_status("Trader", "in_progress")
# Trading Team
if chunk.get("trader_investment_plan"):
message_buffer.update_report_section(
"trader_investment_plan", chunk["trader_investment_plan"]
)
if message_buffer.agent_status.get("Trader") != "completed":
message_buffer.update_agent_status("Trader", "completed")
message_buffer.update_agent_status("Aggressive Analyst", "in_progress") message_buffer.update_agent_status("Aggressive Analyst", "in_progress")
# Risk Management Team - Handle Risk Debate State # Risk Management Team - Handle Risk Debate State
if "risk_debate_state" in chunk and chunk["risk_debate_state"]: if chunk.get("risk_debate_state"):
risk_state = chunk["risk_debate_state"] risk_state = chunk["risk_debate_state"]
agg_hist = risk_state.get("aggressive_history", "").strip()
con_hist = risk_state.get("conservative_history", "").strip()
neu_hist = risk_state.get("neutral_history", "").strip()
judge = risk_state.get("judge_decision", "").strip()
# Update Aggressive Analyst status and report if agg_hist:
if ( if message_buffer.agent_status.get("Aggressive Analyst") != "completed":
"current_aggressive_response" in risk_state message_buffer.update_agent_status("Aggressive Analyst", "in_progress")
and risk_state["current_aggressive_response"] message_buffer.update_report_section(
): "final_trade_decision", f"### Aggressive Analyst Analysis\n{agg_hist}"
message_buffer.update_agent_status( )
"Aggressive Analyst", "in_progress" if con_hist:
) if message_buffer.agent_status.get("Conservative Analyst") != "completed":
message_buffer.add_message( message_buffer.update_agent_status("Conservative Analyst", "in_progress")
"Reasoning", message_buffer.update_report_section(
f"Aggressive Analyst: {risk_state['current_aggressive_response']}", "final_trade_decision", f"### Conservative Analyst Analysis\n{con_hist}"
) )
# Update risk report with aggressive analyst's latest analysis only if neu_hist:
if message_buffer.agent_status.get("Neutral Analyst") != "completed":
message_buffer.update_agent_status("Neutral Analyst", "in_progress")
message_buffer.update_report_section(
"final_trade_decision", f"### Neutral Analyst Analysis\n{neu_hist}"
)
if judge:
if message_buffer.agent_status.get("Portfolio Manager") != "completed":
message_buffer.update_agent_status("Portfolio Manager", "in_progress")
message_buffer.update_report_section( message_buffer.update_report_section(
"final_trade_decision", "final_trade_decision", f"### Portfolio Manager Decision\n{judge}"
f"### Aggressive Analyst Analysis\n{risk_state['current_aggressive_response']}",
) )
# Update Conservative Analyst status and report
if (
"current_conservative_response" in risk_state
and risk_state["current_conservative_response"]
):
message_buffer.update_agent_status(
"Conservative Analyst", "in_progress"
)
message_buffer.add_message(
"Reasoning",
f"Conservative Analyst: {risk_state['current_conservative_response']}",
)
# Update risk report with conservative analyst's latest analysis only
message_buffer.update_report_section(
"final_trade_decision",
f"### Conservative Analyst Analysis\n{risk_state['current_conservative_response']}",
)
# Update Neutral Analyst status and report
if (
"current_neutral_response" in risk_state
and risk_state["current_neutral_response"]
):
message_buffer.update_agent_status(
"Neutral Analyst", "in_progress"
)
message_buffer.add_message(
"Reasoning",
f"Neutral Analyst: {risk_state['current_neutral_response']}",
)
# Update risk report with neutral analyst's latest analysis only
message_buffer.update_report_section(
"final_trade_decision",
f"### Neutral Analyst Analysis\n{risk_state['current_neutral_response']}",
)
# Update Portfolio Manager status and final decision
if "judge_decision" in risk_state and risk_state["judge_decision"]:
message_buffer.update_agent_status(
"Portfolio Manager", "in_progress"
)
message_buffer.add_message(
"Reasoning",
f"Portfolio Manager: {risk_state['judge_decision']}",
)
# Update risk report with final decision only
message_buffer.update_report_section(
"final_trade_decision",
f"### Portfolio Manager Decision\n{risk_state['judge_decision']}",
)
# Mark risk analysts as completed
message_buffer.update_agent_status("Aggressive Analyst", "completed") message_buffer.update_agent_status("Aggressive Analyst", "completed")
message_buffer.update_agent_status("Conservative Analyst", "completed") message_buffer.update_agent_status("Conservative Analyst", "completed")
message_buffer.update_agent_status( message_buffer.update_agent_status("Neutral Analyst", "completed")
"Neutral Analyst", "completed" message_buffer.update_agent_status("Portfolio Manager", "completed")
)
message_buffer.update_agent_status(
"Portfolio Manager", "completed"
)
# Update the display # Update the display
update_display(layout, stats_handler=stats_handler, start_time=start_time) update_display(layout, stats_handler=stats_handler, start_time=start_time)
trace.append(chunk) trace.append(chunk)
@ -1220,7 +1170,7 @@ def run_analysis():
message_buffer.update_agent_status(agent, "completed") message_buffer.update_agent_status(agent, "completed")
message_buffer.add_message( message_buffer.add_message(
"Analysis", f"Completed analysis for {selections['analysis_date']}" "System", f"Completed analysis for {selections['analysis_date']}"
) )
# Update final report sections # Update final report sections