230 lines
9.0 KiB
Python
230 lines
9.0 KiB
Python
"""Stream processing logic for handling agent analysis chunks in the Litadel CLI."""
|
|
|
|
from cli.helpers import extract_content_string, update_research_team_status
|
|
|
|
|
|
def process_message_chunk(chunk, message_buffer):
|
|
"""
|
|
Process message content and tool calls from a chunk.
|
|
|
|
Args:
|
|
chunk: The chunk dictionary from the graph stream
|
|
message_buffer: The MessageBuffer instance to update
|
|
"""
|
|
if len(chunk["messages"]) == 0:
|
|
return
|
|
|
|
# Get the last message from the chunk
|
|
last_message = chunk["messages"][-1]
|
|
|
|
# Extract message content and type
|
|
if hasattr(last_message, "content"):
|
|
content = extract_content_string(last_message.content)
|
|
msg_type = "Reasoning"
|
|
else:
|
|
content = str(last_message)
|
|
msg_type = "System"
|
|
|
|
# Add message to buffer
|
|
message_buffer.add_message(msg_type, content)
|
|
|
|
# If it's a tool call, add it to tool calls
|
|
if hasattr(last_message, "tool_calls"):
|
|
for tool_call in last_message.tool_calls:
|
|
# Handle both dictionary and object tool calls
|
|
if isinstance(tool_call, dict):
|
|
message_buffer.add_tool_call(
|
|
tool_call["name"], tool_call["args"]
|
|
)
|
|
else:
|
|
message_buffer.add_tool_call(tool_call.name, tool_call.args)
|
|
|
|
|
|
def process_analyst_reports(chunk, message_buffer, selected_analysts):
|
|
"""
|
|
Process analyst team reports from chunk.
|
|
|
|
Args:
|
|
chunk: The chunk dictionary from the graph stream
|
|
message_buffer: The MessageBuffer instance to update
|
|
selected_analysts: List of selected analyst types
|
|
"""
|
|
# Mapping of report keys to analyst info
|
|
analyst_mappings = [
|
|
("market_report", "Market Analyst", "social", "Social Analyst"),
|
|
("sentiment_report", "Social Analyst", "news", "News Analyst"),
|
|
("news_report", "News Analyst", "fundamentals", "Fundamentals Analyst"),
|
|
("fundamentals_report", "Fundamentals Analyst", None, None),
|
|
]
|
|
|
|
for report_key, analyst_name, next_type, next_analyst in analyst_mappings:
|
|
if report_key in chunk and chunk[report_key]:
|
|
message_buffer.update_report_section(report_key, chunk[report_key])
|
|
message_buffer.update_agent_status(analyst_name, "completed")
|
|
|
|
if report_key == "fundamentals_report":
|
|
# Special case: set all research team to in_progress
|
|
update_research_team_status(message_buffer, "in_progress")
|
|
elif next_type and next_type in [a.value for a in selected_analysts]:
|
|
message_buffer.update_agent_status(next_analyst, "in_progress")
|
|
|
|
|
|
def process_research_debate(chunk, message_buffer):
|
|
"""
|
|
Process research team investment debate state from chunk.
|
|
|
|
Args:
|
|
chunk: The chunk dictionary from the graph stream
|
|
message_buffer: The MessageBuffer instance to update
|
|
"""
|
|
if "investment_debate_state" not in chunk or not chunk["investment_debate_state"]:
|
|
return
|
|
|
|
debate_state = chunk["investment_debate_state"]
|
|
|
|
# Update Bull Researcher status and report
|
|
if "bull_history" in debate_state and debate_state["bull_history"]:
|
|
# Keep all research team members in progress
|
|
update_research_team_status(message_buffer, "in_progress")
|
|
# Extract latest bull response
|
|
bull_responses = debate_state["bull_history"].split("\n")
|
|
latest_bull = bull_responses[-1] if bull_responses else ""
|
|
if latest_bull:
|
|
message_buffer.add_message("Reasoning", latest_bull)
|
|
# Update research report with bull's latest analysis
|
|
message_buffer.update_report_section(
|
|
"investment_plan",
|
|
f"### Bull Researcher Analysis\n{latest_bull}",
|
|
)
|
|
|
|
# Update Bear Researcher status and report
|
|
if "bear_history" in debate_state and debate_state["bear_history"]:
|
|
# Keep all research team members in progress
|
|
update_research_team_status(message_buffer, "in_progress")
|
|
# Extract latest bear response
|
|
bear_responses = debate_state["bear_history"].split("\n")
|
|
latest_bear = bear_responses[-1] if bear_responses else ""
|
|
if latest_bear:
|
|
message_buffer.add_message("Reasoning", latest_bear)
|
|
# Update research report with bear's latest analysis
|
|
message_buffer.update_report_section(
|
|
"investment_plan",
|
|
f"{message_buffer.report_sections['investment_plan']}\n\n### Bear Researcher Analysis\n{latest_bear}",
|
|
)
|
|
|
|
# Update Research Manager status and final decision
|
|
if "judge_decision" in debate_state and debate_state["judge_decision"]:
|
|
# Keep all research team members in progress until final decision
|
|
update_research_team_status(message_buffer, "in_progress")
|
|
message_buffer.add_message(
|
|
"Reasoning",
|
|
f"Research Manager: {debate_state['judge_decision']}",
|
|
)
|
|
# Update research report with final decision
|
|
message_buffer.update_report_section(
|
|
"investment_plan",
|
|
f"{message_buffer.report_sections['investment_plan']}\n\n### Research Manager Decision\n{debate_state['judge_decision']}",
|
|
)
|
|
# Mark all research team members as completed
|
|
update_research_team_status(message_buffer, "completed")
|
|
# Set first risk analyst to in_progress
|
|
message_buffer.update_agent_status("Risky Analyst", "in_progress")
|
|
|
|
|
|
def process_trader_report(chunk, message_buffer):
|
|
"""
|
|
Process trader investment plan from chunk.
|
|
|
|
Args:
|
|
chunk: The chunk dictionary from the graph stream
|
|
message_buffer: The MessageBuffer instance to update
|
|
"""
|
|
if "trader_investment_plan" in chunk and chunk["trader_investment_plan"]:
|
|
message_buffer.update_report_section(
|
|
"trader_investment_plan", chunk["trader_investment_plan"]
|
|
)
|
|
# Set first risk analyst to in_progress
|
|
message_buffer.update_agent_status("Risky Analyst", "in_progress")
|
|
|
|
|
|
def process_risk_debate(chunk, message_buffer):
|
|
"""
|
|
Process risk management team debate state from chunk.
|
|
|
|
Args:
|
|
chunk: The chunk dictionary from the graph stream
|
|
message_buffer: The MessageBuffer instance to update
|
|
"""
|
|
if "risk_debate_state" not in chunk or not chunk["risk_debate_state"]:
|
|
return
|
|
|
|
risk_state = chunk["risk_debate_state"]
|
|
|
|
# Handle all risk analysts with a mapping
|
|
risk_analysts = [
|
|
("current_risky_response", "Risky Analyst"),
|
|
("current_safe_response", "Safe Analyst"),
|
|
("current_neutral_response", "Neutral Analyst"),
|
|
]
|
|
|
|
for response_key, analyst_name in risk_analysts:
|
|
if response_key in risk_state and risk_state[response_key]:
|
|
message_buffer.update_agent_status(analyst_name, "in_progress")
|
|
message_buffer.add_message(
|
|
"Reasoning",
|
|
f"{analyst_name}: {risk_state[response_key]}",
|
|
)
|
|
message_buffer.update_report_section(
|
|
"final_trade_decision",
|
|
f"### {analyst_name} Analysis\n{risk_state[response_key]}",
|
|
)
|
|
|
|
# Update Portfolio Manager status and final decision
|
|
if "judge_decision" in risk_state and risk_state["judge_decision"]:
|
|
message_buffer.update_agent_status(
|
|
"Portfolio Manager", "in_progress"
|
|
)
|
|
message_buffer.add_message(
|
|
"Reasoning",
|
|
f"Portfolio Manager: {risk_state['judge_decision']}",
|
|
)
|
|
# Update risk report with final decision only
|
|
message_buffer.update_report_section(
|
|
"final_trade_decision",
|
|
f"### Portfolio Manager Decision\n{risk_state['judge_decision']}",
|
|
)
|
|
# Mark risk analysts as completed
|
|
message_buffer.update_agent_status("Risky Analyst", "completed")
|
|
message_buffer.update_agent_status("Safe Analyst", "completed")
|
|
message_buffer.update_agent_status("Neutral Analyst", "completed")
|
|
message_buffer.update_agent_status("Portfolio Manager", "completed")
|
|
|
|
|
|
def process_chunk(chunk, message_buffer, selected_analysts):
|
|
"""
|
|
Process a single chunk from the graph stream, updating the message buffer
|
|
with messages, reports, and agent statuses.
|
|
|
|
Args:
|
|
chunk: The chunk dictionary from the graph stream
|
|
message_buffer: The MessageBuffer instance to update
|
|
selected_analysts: List of selected analyst types
|
|
|
|
Returns:
|
|
bool: True if chunk was processed, False if chunk had no messages
|
|
"""
|
|
if len(chunk["messages"]) == 0:
|
|
return False
|
|
|
|
# Process messages and tool calls
|
|
process_message_chunk(chunk, message_buffer)
|
|
|
|
# Process different report types
|
|
process_analyst_reports(chunk, message_buffer, selected_analysts)
|
|
process_research_debate(chunk, message_buffer)
|
|
process_trader_report(chunk, message_buffer)
|
|
process_risk_debate(chunk, message_buffer)
|
|
|
|
return True
|
|
|