refactor cli

This commit is contained in:
Marvin Gabler 2025-10-21 17:55:32 +02:00
parent b80eb0b905
commit 167e53a86e
8 changed files with 525 additions and 388 deletions

78
cli/file_handlers.py Normal file
View File

@ -0,0 +1,78 @@
"""File handling and logging decorators for the TradingAgents CLI."""
from functools import wraps
from pathlib import Path
def save_message_decorator(message_buffer, log_file):
"""Decorator to save messages to log file after they're added to buffer."""
original_add_message = message_buffer.add_message
@wraps(original_add_message)
def wrapper(*args, **kwargs):
original_add_message(*args, **kwargs)
timestamp, message_type, content = message_buffer.messages[-1]
content = content.replace("\n", " ") # Replace newlines with spaces
with open(log_file, "a") as f:
f.write(f"{timestamp} [{message_type}] {content}\n")
return wrapper
def save_tool_call_decorator(message_buffer, log_file):
"""Decorator to save tool calls to log file after they're added to buffer."""
original_add_tool_call = message_buffer.add_tool_call
@wraps(original_add_tool_call)
def wrapper(*args, **kwargs):
original_add_tool_call(*args, **kwargs)
timestamp, tool_name, args = message_buffer.tool_calls[-1]
args_str = ", ".join(f"{k}={v}" for k, v in args.items())
with open(log_file, "a") as f:
f.write(f"{timestamp} [Tool Call] {tool_name}({args_str})\n")
return wrapper
def save_report_section_decorator(message_buffer, report_dir):
"""Decorator to save report sections to markdown files after they're updated."""
original_update_report_section = message_buffer.update_report_section
@wraps(original_update_report_section)
def wrapper(section_name, content):
original_update_report_section(section_name, content)
if section_name in message_buffer.report_sections and message_buffer.report_sections[section_name] is not None:
content = message_buffer.report_sections[section_name]
if content:
file_name = f"{section_name}.md"
with open(report_dir / file_name, "w") as f:
f.write(content)
return wrapper
def setup_file_handlers(message_buffer, results_dir: Path):
"""
Setup file handlers for logging messages, tool calls, and reports.
Args:
message_buffer: The MessageBuffer instance to decorate
results_dir: Directory where results should be saved
Returns:
tuple: (log_file, report_dir) paths for reference
"""
# Create directories
results_dir.mkdir(parents=True, exist_ok=True)
report_dir = results_dir / "reports"
report_dir.mkdir(parents=True, exist_ok=True)
log_file = results_dir / "message_tool.log"
log_file.touch(exist_ok=True)
# Apply decorators
message_buffer.add_message = save_message_decorator(message_buffer, log_file)
message_buffer.add_tool_call = save_tool_call_decorator(message_buffer, log_file)
message_buffer.update_report_section = save_report_section_decorator(message_buffer, report_dir)
return log_file, report_dir

View File

@ -1,5 +1,19 @@
"""Helper functions for the TradingAgents CLI."""
"""Helper utilities and models for the TradingAgents CLI."""
from enum import Enum
# ===== Models =====
class AnalystType(str, Enum):
"""Enumeration of available analyst types."""
MARKET = "market"
SOCIAL = "social"
NEWS = "news"
FUNDAMENTALS = "fundamentals"
# ===== Helper Functions =====
def update_research_team_status(message_buffer, status):
"""Update status for all research team members and trader."""
@ -26,3 +40,4 @@ def extract_content_string(content):
return ' '.join(text_parts)
else:
return str(content)

77
cli/llm_config.py Normal file
View File

@ -0,0 +1,77 @@
"""LLM provider and model configuration data for the TradingAgents CLI."""
# LLM provider base URLs
LLM_PROVIDERS = [
("OpenAI", "https://api.openai.com/v1"),
("Anthropic", "https://api.anthropic.com/"),
("Google", "https://generativelanguage.googleapis.com/v1"),
("Openrouter", "https://openrouter.ai/api/v1"),
("Ollama", "http://localhost:11434/v1"),
]
# Quick-thinking (shallow) model options per provider
SHALLOW_AGENT_OPTIONS = {
"openai": [
("GPT-4o-mini - Fast and efficient for quick tasks", "gpt-4o-mini"),
("GPT-4.1-nano - Ultra-lightweight model for basic operations", "gpt-4.1-nano"),
("GPT-4.1-mini - Compact model with good performance", "gpt-4.1-mini"),
("GPT-4o - Standard model with solid capabilities", "gpt-4o"),
("GPT-5 - Next generation model with enhanced capabilities", "gpt-5"),
],
"anthropic": [
("Claude Haiku 3.5 - Fast inference and standard capabilities", "claude-3-5-haiku-latest"),
("Claude Sonnet 3.5 - Highly capable standard model", "claude-3-5-sonnet-latest"),
("Claude Sonnet 3.7 - Exceptional hybrid reasoning and agentic capabilities", "claude-3-7-sonnet-latest"),
("Claude Sonnet 4 - High performance and excellent reasoning", "claude-sonnet-4-0"),
],
"google": [
("Gemini 2.0 Flash-Lite - Cost efficiency and low latency", "gemini-2.0-flash-lite"),
("Gemini 2.0 Flash - Next generation features, speed, and thinking", "gemini-2.0-flash"),
("Gemini 2.5 Flash - Adaptive thinking, cost efficiency", "gemini-2.5-flash-preview-05-20"),
],
"openrouter": [
("Meta: Llama 4 Scout", "meta-llama/llama-4-scout:free"),
("Meta: Llama 3.3 8B Instruct - A lightweight and ultra-fast variant of Llama 3.3 70B", "meta-llama/llama-3.3-8b-instruct:free"),
("google/gemini-2.0-flash-exp:free - Gemini Flash 2.0 offers a significantly faster time to first token", "google/gemini-2.0-flash-exp:free"),
],
"ollama": [
("llama3.1 local", "llama3.1"),
("llama3.2 local", "llama3.2"),
]
}
# Deep-thinking model options per provider
DEEP_AGENT_OPTIONS = {
"openai": [
("GPT-4.1-nano - Ultra-lightweight model for basic operations", "gpt-4.1-nano"),
("GPT-4.1-mini - Compact model with good performance", "gpt-4.1-mini"),
("GPT-4o - Standard model with solid capabilities", "gpt-4o"),
("GPT-5 - Next generation model with enhanced capabilities", "gpt-5"),
("o4-mini - Specialized reasoning model (compact)", "o4-mini"),
("o3-mini - Advanced reasoning model (lightweight)", "o3-mini"),
("o3 - Full advanced reasoning model", "o3"),
("o1 - Premier reasoning and problem-solving model", "o1"),
],
"anthropic": [
("Claude Haiku 3.5 - Fast inference and standard capabilities", "claude-3-5-haiku-latest"),
("Claude Sonnet 3.5 - Highly capable standard model", "claude-3-5-sonnet-latest"),
("Claude Sonnet 3.7 - Exceptional hybrid reasoning and agentic capabilities", "claude-3-7-sonnet-latest"),
("Claude Sonnet 4 - High performance and excellent reasoning", "claude-sonnet-4-0"),
("Claude Opus 4 - Most powerful Anthropic model", " claude-opus-4-0"),
],
"google": [
("Gemini 2.0 Flash-Lite - Cost efficiency and low latency", "gemini-2.0-flash-lite"),
("Gemini 2.0 Flash - Next generation features, speed, and thinking", "gemini-2.0-flash"),
("Gemini 2.5 Flash - Adaptive thinking, cost efficiency", "gemini-2.5-flash-preview-05-20"),
("Gemini 2.5 Pro", "gemini-2.5-pro-preview-06-05"),
],
"openrouter": [
("DeepSeek V3 - a 685B-parameter, mixture-of-experts model", "deepseek/deepseek-chat-v3-0324:free"),
("Deepseek - latest iteration of the flagship chat model family from the DeepSeek team.", "deepseek/deepseek-chat-v3-0324:free"),
],
"ollama": [
("llama3.1 local", "llama3.1"),
("qwen3", "qwen3"),
]
}

View File

@ -2,7 +2,6 @@ from typing import Optional
import datetime
import typer
from pathlib import Path
from functools import wraps
from rich.console import Console
from dotenv import load_dotenv
@ -26,13 +25,14 @@ from rich.rule import Rule
from tradingagents.graph.trading_graph import TradingAgentsGraph
from tradingagents.default_config import DEFAULT_CONFIG
from cli.models import AnalystType
from cli.utils import *
from cli.helpers import AnalystType, update_research_team_status, extract_content_string
from cli.prompts import *
from cli.message_buffer import MessageBuffer
from cli.ui_display import create_layout, update_display
from cli.report_display import display_complete_report
from cli.helper_functions import update_research_team_status, extract_content_string
from cli.asset_detection import detect_asset_class, get_asset_class_display_name
from cli.file_handlers import setup_file_handlers
from cli.stream_processor import process_chunk
console = Console()
@ -223,11 +223,8 @@ def get_analysis_date():
def run_analysis():
# First get all user selections
selections = get_user_selections()
# Create config with selected research depth
def setup_config(selections):
"""Create and configure the analysis config from user selections."""
config = DEFAULT_CONFIG.copy()
config["max_debate_rounds"] = selections["research_depth"]
config["max_risk_discuss_rounds"] = selections["research_depth"]
@ -236,299 +233,125 @@ def run_analysis():
config["backend_url"] = selections["backend_url"]
config["llm_provider"] = selections["llm_provider"].lower()
config["asset_class"] = selections["asset_class"]
return config
def setup_analysis(selections, config):
"""Initialize the trading graph and file handlers."""
# Initialize the graph
graph = TradingAgentsGraph(
[analyst.value for analyst in selections["analysts"]], config=config, debug=True
)
# Create result directory
results_dir = Path(config["results_dir"]) / selections["ticker"] / selections["analysis_date"]
results_dir.mkdir(parents=True, exist_ok=True)
report_dir = results_dir / "reports"
report_dir.mkdir(parents=True, exist_ok=True)
log_file = results_dir / "message_tool.log"
log_file.touch(exist_ok=True)
def save_message_decorator(obj, func_name):
func = getattr(obj, func_name)
@wraps(func)
def wrapper(*args, **kwargs):
func(*args, **kwargs)
timestamp, message_type, content = obj.messages[-1]
content = content.replace("\n", " ") # Replace newlines with spaces
with open(log_file, "a") as f:
f.write(f"{timestamp} [{message_type}] {content}\n")
return wrapper
def save_tool_call_decorator(obj, func_name):
func = getattr(obj, func_name)
@wraps(func)
def wrapper(*args, **kwargs):
func(*args, **kwargs)
timestamp, tool_name, args = obj.tool_calls[-1]
args_str = ", ".join(f"{k}={v}" for k, v in args.items())
with open(log_file, "a") as f:
f.write(f"{timestamp} [Tool Call] {tool_name}({args_str})\n")
return wrapper
# Create result directory and setup file handlers
results_dir = Path(config["results_dir"]) / selections["ticker"] / selections["analysis_date"]
log_file, report_dir = setup_file_handlers(message_buffer, results_dir)
return graph, log_file, report_dir
def save_report_section_decorator(obj, func_name):
func = getattr(obj, func_name)
@wraps(func)
def wrapper(section_name, content):
func(section_name, content)
if section_name in obj.report_sections and obj.report_sections[section_name] is not None:
content = obj.report_sections[section_name]
if content:
file_name = f"{section_name}.md"
with open(report_dir / file_name, "w") as f:
f.write(content)
return wrapper
message_buffer.add_message = save_message_decorator(message_buffer, "add_message")
message_buffer.add_tool_call = save_tool_call_decorator(message_buffer, "add_tool_call")
message_buffer.update_report_section = save_report_section_decorator(message_buffer, "update_report_section")
def initialize_display(layout, selections):
"""Initialize the display with startup messages and agent statuses."""
# Initial display
update_display(layout, message_buffer)
# Add initial messages
message_buffer.add_message("System", f"Selected ticker: {selections['ticker']}")
message_buffer.add_message("System", f"Analysis date: {selections['analysis_date']}")
message_buffer.add_message(
"System",
f"Selected analysts: {', '.join(analyst.value for analyst in selections['analysts'])}",
)
update_display(layout, message_buffer)
# Reset agent statuses
for agent in message_buffer.agent_status:
message_buffer.update_agent_status(agent, "pending")
# Reset report sections
for section in message_buffer.report_sections:
message_buffer.report_sections[section] = None
message_buffer.current_report = None
message_buffer.final_report = None
# Update agent status to in_progress for the first analyst
first_analyst = f"{selections['analysts'][0].value.capitalize()} Analyst"
message_buffer.update_agent_status(first_analyst, "in_progress")
update_display(layout, message_buffer)
# Create spinner text
spinner_text = f"Analyzing {selections['ticker']} on {selections['analysis_date']}..."
update_display(layout, message_buffer, spinner_text)
# Now start the display layout
def run_stream_analysis(graph, selections, layout):
"""Stream the analysis and process chunks in real-time."""
# Initialize state and get graph args
init_agent_state = graph.propagator.create_initial_state(
selections["ticker"], selections["analysis_date"]
)
# CRITICAL: Add asset_class to state so market analyst can branch correctly
init_agent_state["asset_class"] = selections["asset_class"]
args = graph.propagator.get_graph_args()
# Stream the analysis
trace = []
for chunk in graph.graph.stream(init_agent_state, **args):
# Process the chunk and update message buffer
if process_chunk(chunk, message_buffer, selections["analysts"]):
# Update the display after successful chunk processing
update_display(layout, message_buffer)
trace.append(chunk)
return trace
def finalize_analysis(trace, graph, selections, layout):
"""Process final results and display the complete report."""
# Get final state and decision
final_state = trace[-1]
decision = graph.process_signal(final_state["final_trade_decision"])
# Update all agent statuses to completed
for agent in message_buffer.agent_status:
message_buffer.update_agent_status(agent, "completed")
message_buffer.add_message(
"Analysis", f"Completed analysis for {selections['analysis_date']}"
)
# Update final report sections
for section in message_buffer.report_sections.keys():
if section in final_state:
message_buffer.update_report_section(section, final_state[section])
# Display the complete final report
display_complete_report(final_state)
update_display(layout, message_buffer)
def run_analysis():
"""Main analysis orchestrator - coordinates the entire trading analysis workflow."""
# Get user selections
selections = get_user_selections()
# Setup configuration and initialize components
config = setup_config(selections)
graph, log_file, report_dir = setup_analysis(selections, config)
# Create display layout and run analysis
layout = create_layout()
with Live(layout, refresh_per_second=4) as live:
# Initial display
update_display(layout, message_buffer)
# Add initial messages
message_buffer.add_message("System", f"Selected ticker: {selections['ticker']}")
message_buffer.add_message(
"System", f"Analysis date: {selections['analysis_date']}"
)
message_buffer.add_message(
"System",
f"Selected analysts: {', '.join(analyst.value for analyst in selections['analysts'])}",
)
update_display(layout, message_buffer)
# Reset agent statuses
for agent in message_buffer.agent_status:
message_buffer.update_agent_status(agent, "pending")
# Reset report sections
for section in message_buffer.report_sections:
message_buffer.report_sections[section] = None
message_buffer.current_report = None
message_buffer.final_report = None
# Update agent status to in_progress for the first analyst
first_analyst = f"{selections['analysts'][0].value.capitalize()} Analyst"
message_buffer.update_agent_status(first_analyst, "in_progress")
update_display(layout, message_buffer)
# Create spinner text
spinner_text = (
f"Analyzing {selections['ticker']} on {selections['analysis_date']}..."
)
update_display(layout, message_buffer, spinner_text)
# Initialize state and get graph args
init_agent_state = graph.propagator.create_initial_state(
selections["ticker"], selections["analysis_date"]
)
# CRITICAL: Add asset_class to state so market analyst can branch correctly
init_agent_state["asset_class"] = selections["asset_class"]
args = graph.propagator.get_graph_args()
# Stream the analysis
trace = []
for chunk in graph.graph.stream(init_agent_state, **args):
if len(chunk["messages"]) > 0:
# Get the last message from the chunk
last_message = chunk["messages"][-1]
# Extract message content and type
if hasattr(last_message, "content"):
content = extract_content_string(last_message.content) # Use the helper function
msg_type = "Reasoning"
else:
content = str(last_message)
msg_type = "System"
# Add message to buffer
message_buffer.add_message(msg_type, content)
# If it's a tool call, add it to tool calls
if hasattr(last_message, "tool_calls"):
for tool_call in last_message.tool_calls:
# Handle both dictionary and object tool calls
if isinstance(tool_call, dict):
message_buffer.add_tool_call(
tool_call["name"], tool_call["args"]
)
else:
message_buffer.add_tool_call(tool_call.name, tool_call.args)
# Update reports and agent status based on chunk content
# Analyst Team Reports - use a mapping to reduce repetition
analyst_mappings = [
("market_report", "Market Analyst", "social", "Social Analyst"),
("sentiment_report", "Social Analyst", "news", "News Analyst"),
("news_report", "News Analyst", "fundamentals", "Fundamentals Analyst"),
("fundamentals_report", "Fundamentals Analyst", None, None),
]
for report_key, analyst_name, next_type, next_analyst in analyst_mappings:
if report_key in chunk and chunk[report_key]:
message_buffer.update_report_section(report_key, chunk[report_key])
message_buffer.update_agent_status(analyst_name, "completed")
if report_key == "fundamentals_report":
# Special case: set all research team to in_progress
update_research_team_status(message_buffer, "in_progress")
elif next_type and next_type in [a.value for a in selections["analysts"]]:
message_buffer.update_agent_status(next_analyst, "in_progress")
# Research Team - Handle Investment Debate State
if (
"investment_debate_state" in chunk
and chunk["investment_debate_state"]
):
debate_state = chunk["investment_debate_state"]
# Update Bull Researcher status and report
if "bull_history" in debate_state and debate_state["bull_history"]:
# Keep all research team members in progress
update_research_team_status(message_buffer, "in_progress")
# Extract latest bull response
bull_responses = debate_state["bull_history"].split("\n")
latest_bull = bull_responses[-1] if bull_responses else ""
if latest_bull:
message_buffer.add_message("Reasoning", latest_bull)
# Update research report with bull's latest analysis
message_buffer.update_report_section(
"investment_plan",
f"### Bull Researcher Analysis\n{latest_bull}",
)
# Update Bear Researcher status and report
if "bear_history" in debate_state and debate_state["bear_history"]:
# Keep all research team members in progress
update_research_team_status(message_buffer, "in_progress")
# Extract latest bear response
bear_responses = debate_state["bear_history"].split("\n")
latest_bear = bear_responses[-1] if bear_responses else ""
if latest_bear:
message_buffer.add_message("Reasoning", latest_bear)
# Update research report with bear's latest analysis
message_buffer.update_report_section(
"investment_plan",
f"{message_buffer.report_sections['investment_plan']}\n\n### Bear Researcher Analysis\n{latest_bear}",
)
# Update Research Manager status and final decision
if (
"judge_decision" in debate_state
and debate_state["judge_decision"]
):
# Keep all research team members in progress until final decision
update_research_team_status(message_buffer, "in_progress")
message_buffer.add_message(
"Reasoning",
f"Research Manager: {debate_state['judge_decision']}",
)
# Update research report with final decision
message_buffer.update_report_section(
"investment_plan",
f"{message_buffer.report_sections['investment_plan']}\n\n### Research Manager Decision\n{debate_state['judge_decision']}",
)
# Mark all research team members as completed
update_research_team_status(message_buffer, "completed")
# Set first risk analyst to in_progress
message_buffer.update_agent_status(
"Risky Analyst", "in_progress"
)
# Trading Team
if (
"trader_investment_plan" in chunk
and chunk["trader_investment_plan"]
):
message_buffer.update_report_section(
"trader_investment_plan", chunk["trader_investment_plan"]
)
# Set first risk analyst to in_progress
message_buffer.update_agent_status("Risky Analyst", "in_progress")
# Risk Management Team - Handle Risk Debate State
if "risk_debate_state" in chunk and chunk["risk_debate_state"]:
risk_state = chunk["risk_debate_state"]
# Handle all risk analysts with a mapping
risk_analysts = [
("current_risky_response", "Risky Analyst"),
("current_safe_response", "Safe Analyst"),
("current_neutral_response", "Neutral Analyst"),
]
for response_key, analyst_name in risk_analysts:
if response_key in risk_state and risk_state[response_key]:
message_buffer.update_agent_status(analyst_name, "in_progress")
message_buffer.add_message(
"Reasoning",
f"{analyst_name}: {risk_state[response_key]}",
)
message_buffer.update_report_section(
"final_trade_decision",
f"### {analyst_name} Analysis\n{risk_state[response_key]}",
)
# Update Portfolio Manager status and final decision
if "judge_decision" in risk_state and risk_state["judge_decision"]:
message_buffer.update_agent_status(
"Portfolio Manager", "in_progress"
)
message_buffer.add_message(
"Reasoning",
f"Portfolio Manager: {risk_state['judge_decision']}",
)
# Update risk report with final decision only
message_buffer.update_report_section(
"final_trade_decision",
f"### Portfolio Manager Decision\n{risk_state['judge_decision']}",
)
# Mark risk analysts as completed
message_buffer.update_agent_status("Risky Analyst", "completed")
message_buffer.update_agent_status("Safe Analyst", "completed")
message_buffer.update_agent_status(
"Neutral Analyst", "completed"
)
message_buffer.update_agent_status(
"Portfolio Manager", "completed"
)
# Update the display
update_display(layout, message_buffer)
trace.append(chunk)
# Get final state and decision
final_state = trace[-1]
decision = graph.process_signal(final_state["final_trade_decision"])
# Update all agent statuses to completed
for agent in message_buffer.agent_status:
message_buffer.update_agent_status(agent, "completed")
message_buffer.add_message(
"Analysis", f"Completed analysis for {selections['analysis_date']}"
)
# Update final report sections
for section in message_buffer.report_sections.keys():
if section in final_state:
message_buffer.update_report_section(section, final_state[section])
# Display the complete final report
display_complete_report(final_state)
update_display(layout, message_buffer)
# Initialize display with startup messages
initialize_display(layout, selections)
# Run the streaming analysis
trace = run_stream_analysis(graph, selections, layout)
# Process and display final results
finalize_analysis(trace, graph, selections, layout)
@app.command()

View File

@ -1,10 +0,0 @@
from enum import Enum
from typing import List, Optional, Dict
from pydantic import BaseModel
class AnalystType(str, Enum):
MARKET = "market"
SOCIAL = "social"
NEWS = "news"
FUNDAMENTALS = "fundamentals"

View File

@ -1,7 +1,10 @@
"""Interactive user prompts for the TradingAgents CLI."""
import questionary
from typing import List, Optional, Tuple, Dict
from cli.models import AnalystType
from cli.helpers import AnalystType
from cli.llm_config import SHALLOW_AGENT_OPTIONS, DEEP_AGENT_OPTIONS, LLM_PROVIDERS
ANALYST_ORDER = [
("Market Analyst", AnalystType.MARKET),
@ -131,38 +134,6 @@ def select_research_depth() -> int:
def select_shallow_thinking_agent(provider) -> str:
"""Select shallow thinking llm engine using an interactive selection."""
# Define shallow thinking llm engine options with their corresponding model names
SHALLOW_AGENT_OPTIONS = {
"openai": [
("GPT-4o-mini - Fast and efficient for quick tasks", "gpt-4o-mini"),
("GPT-4.1-nano - Ultra-lightweight model for basic operations", "gpt-4.1-nano"),
("GPT-4.1-mini - Compact model with good performance", "gpt-4.1-mini"),
("GPT-4o - Standard model with solid capabilities", "gpt-4o"),
("GPT-5 - Next generation model with enhanced capabilities", "gpt-5"),
],
"anthropic": [
("Claude Haiku 3.5 - Fast inference and standard capabilities", "claude-3-5-haiku-latest"),
("Claude Sonnet 3.5 - Highly capable standard model", "claude-3-5-sonnet-latest"),
("Claude Sonnet 3.7 - Exceptional hybrid reasoning and agentic capabilities", "claude-3-7-sonnet-latest"),
("Claude Sonnet 4 - High performance and excellent reasoning", "claude-sonnet-4-0"),
],
"google": [
("Gemini 2.0 Flash-Lite - Cost efficiency and low latency", "gemini-2.0-flash-lite"),
("Gemini 2.0 Flash - Next generation features, speed, and thinking", "gemini-2.0-flash"),
("Gemini 2.5 Flash - Adaptive thinking, cost efficiency", "gemini-2.5-flash-preview-05-20"),
],
"openrouter": [
("Meta: Llama 4 Scout", "meta-llama/llama-4-scout:free"),
("Meta: Llama 3.3 8B Instruct - A lightweight and ultra-fast variant of Llama 3.3 70B", "meta-llama/llama-3.3-8b-instruct:free"),
("google/gemini-2.0-flash-exp:free - Gemini Flash 2.0 offers a significantly faster time to first token", "google/gemini-2.0-flash-exp:free"),
],
"ollama": [
("llama3.1 local", "llama3.1"),
("llama3.2 local", "llama3.2"),
]
}
choice = questionary.select(
"Select Your [Quick-Thinking LLM Engine]:",
choices=[
@ -190,42 +161,6 @@ def select_shallow_thinking_agent(provider) -> str:
def select_deep_thinking_agent(provider) -> str:
"""Select deep thinking llm engine using an interactive selection."""
# Define deep thinking llm engine options with their corresponding model names
DEEP_AGENT_OPTIONS = {
"openai": [
("GPT-4.1-nano - Ultra-lightweight model for basic operations", "gpt-4.1-nano"),
("GPT-4.1-mini - Compact model with good performance", "gpt-4.1-mini"),
("GPT-4o - Standard model with solid capabilities", "gpt-4o"),
("GPT-5 - Next generation model with enhanced capabilities", "gpt-5"),
("o4-mini - Specialized reasoning model (compact)", "o4-mini"),
("o3-mini - Advanced reasoning model (lightweight)", "o3-mini"),
("o3 - Full advanced reasoning model", "o3"),
("o1 - Premier reasoning and problem-solving model", "o1"),
],
"anthropic": [
("Claude Haiku 3.5 - Fast inference and standard capabilities", "claude-3-5-haiku-latest"),
("Claude Sonnet 3.5 - Highly capable standard model", "claude-3-5-sonnet-latest"),
("Claude Sonnet 3.7 - Exceptional hybrid reasoning and agentic capabilities", "claude-3-7-sonnet-latest"),
("Claude Sonnet 4 - High performance and excellent reasoning", "claude-sonnet-4-0"),
("Claude Opus 4 - Most powerful Anthropic model", " claude-opus-4-0"),
],
"google": [
("Gemini 2.0 Flash-Lite - Cost efficiency and low latency", "gemini-2.0-flash-lite"),
("Gemini 2.0 Flash - Next generation features, speed, and thinking", "gemini-2.0-flash"),
("Gemini 2.5 Flash - Adaptive thinking, cost efficiency", "gemini-2.5-flash-preview-05-20"),
("Gemini 2.5 Pro", "gemini-2.5-pro-preview-06-05"),
],
"openrouter": [
("DeepSeek V3 - a 685B-parameter, mixture-of-experts model", "deepseek/deepseek-chat-v3-0324:free"),
("Deepseek - latest iteration of the flagship chat model family from the DeepSeek team.", "deepseek/deepseek-chat-v3-0324:free"),
],
"ollama": [
("llama3.1 local", "llama3.1"),
("qwen3", "qwen3"),
]
}
choice = questionary.select(
"Select Your [Deep-Thinking LLM Engine]:",
choices=[
@ -250,20 +185,11 @@ def select_deep_thinking_agent(provider) -> str:
def select_llm_provider() -> tuple[str, str]:
"""Select the OpenAI api url using interactive selection."""
# Define OpenAI api options with their corresponding endpoints
BASE_URLS = [
("OpenAI", "https://api.openai.com/v1"),
("Anthropic", "https://api.anthropic.com/"),
("Google", "https://generativelanguage.googleapis.com/v1"),
("Openrouter", "https://openrouter.ai/api/v1"),
("Ollama", "http://localhost:11434/v1"),
]
choice = questionary.select(
"Select your LLM Provider:",
choices=[
questionary.Choice(display, value=(display, value))
for display, value in BASE_URLS
for display, value in LLM_PROVIDERS
],
instruction="\n- Use arrow keys to navigate\n- Press Enter to select",
style=questionary.Style(
@ -283,3 +209,4 @@ def select_llm_provider() -> tuple[str, str]:
print(f"You selected: {display_name}\tURL: {url}")
return display_name, url

View File

@ -1,7 +1,5 @@
______ ___ ___ __
/_ __/________ _____/ (_)___ ____ _/ | ____ ____ ____ / /______
/ / / ___/ __ `/ __ / / __ \/ __ `/ /| |/ __ `/ _ \/ __ \/ __/ ___/
/ / / / / /_/ / /_/ / / / / / /_/ / ___ / /_/ / __/ / / / /_(__ )
/_/ /_/ \__,_/\__,_/_/_/ /_/\__, /_/ |_\__, /\___/_/ /_/\__/____/
/____/ /____/
██ ██ ████████ █████ ██████ ███████ ██
██ ██ ██ ██ ██ ██ ██ ██ ██
██ ██ ██ ███████ ██ ██ █████ ██
██ ██ ██ ██ ██ ██ ██ ██ ██
███████ ██ ██ ██ ██ ██████ ███████ ███████

229
cli/stream_processor.py Normal file
View File

@ -0,0 +1,229 @@
"""Stream processing logic for handling agent analysis chunks in the TradingAgents CLI."""
from cli.helpers import extract_content_string, update_research_team_status
def process_message_chunk(chunk, message_buffer):
"""
Process message content and tool calls from a chunk.
Args:
chunk: The chunk dictionary from the graph stream
message_buffer: The MessageBuffer instance to update
"""
if len(chunk["messages"]) == 0:
return
# Get the last message from the chunk
last_message = chunk["messages"][-1]
# Extract message content and type
if hasattr(last_message, "content"):
content = extract_content_string(last_message.content)
msg_type = "Reasoning"
else:
content = str(last_message)
msg_type = "System"
# Add message to buffer
message_buffer.add_message(msg_type, content)
# If it's a tool call, add it to tool calls
if hasattr(last_message, "tool_calls"):
for tool_call in last_message.tool_calls:
# Handle both dictionary and object tool calls
if isinstance(tool_call, dict):
message_buffer.add_tool_call(
tool_call["name"], tool_call["args"]
)
else:
message_buffer.add_tool_call(tool_call.name, tool_call.args)
def process_analyst_reports(chunk, message_buffer, selected_analysts):
"""
Process analyst team reports from chunk.
Args:
chunk: The chunk dictionary from the graph stream
message_buffer: The MessageBuffer instance to update
selected_analysts: List of selected analyst types
"""
# Mapping of report keys to analyst info
analyst_mappings = [
("market_report", "Market Analyst", "social", "Social Analyst"),
("sentiment_report", "Social Analyst", "news", "News Analyst"),
("news_report", "News Analyst", "fundamentals", "Fundamentals Analyst"),
("fundamentals_report", "Fundamentals Analyst", None, None),
]
for report_key, analyst_name, next_type, next_analyst in analyst_mappings:
if report_key in chunk and chunk[report_key]:
message_buffer.update_report_section(report_key, chunk[report_key])
message_buffer.update_agent_status(analyst_name, "completed")
if report_key == "fundamentals_report":
# Special case: set all research team to in_progress
update_research_team_status(message_buffer, "in_progress")
elif next_type and next_type in [a.value for a in selected_analysts]:
message_buffer.update_agent_status(next_analyst, "in_progress")
def process_research_debate(chunk, message_buffer):
"""
Process research team investment debate state from chunk.
Args:
chunk: The chunk dictionary from the graph stream
message_buffer: The MessageBuffer instance to update
"""
if "investment_debate_state" not in chunk or not chunk["investment_debate_state"]:
return
debate_state = chunk["investment_debate_state"]
# Update Bull Researcher status and report
if "bull_history" in debate_state and debate_state["bull_history"]:
# Keep all research team members in progress
update_research_team_status(message_buffer, "in_progress")
# Extract latest bull response
bull_responses = debate_state["bull_history"].split("\n")
latest_bull = bull_responses[-1] if bull_responses else ""
if latest_bull:
message_buffer.add_message("Reasoning", latest_bull)
# Update research report with bull's latest analysis
message_buffer.update_report_section(
"investment_plan",
f"### Bull Researcher Analysis\n{latest_bull}",
)
# Update Bear Researcher status and report
if "bear_history" in debate_state and debate_state["bear_history"]:
# Keep all research team members in progress
update_research_team_status(message_buffer, "in_progress")
# Extract latest bear response
bear_responses = debate_state["bear_history"].split("\n")
latest_bear = bear_responses[-1] if bear_responses else ""
if latest_bear:
message_buffer.add_message("Reasoning", latest_bear)
# Update research report with bear's latest analysis
message_buffer.update_report_section(
"investment_plan",
f"{message_buffer.report_sections['investment_plan']}\n\n### Bear Researcher Analysis\n{latest_bear}",
)
# Update Research Manager status and final decision
if "judge_decision" in debate_state and debate_state["judge_decision"]:
# Keep all research team members in progress until final decision
update_research_team_status(message_buffer, "in_progress")
message_buffer.add_message(
"Reasoning",
f"Research Manager: {debate_state['judge_decision']}",
)
# Update research report with final decision
message_buffer.update_report_section(
"investment_plan",
f"{message_buffer.report_sections['investment_plan']}\n\n### Research Manager Decision\n{debate_state['judge_decision']}",
)
# Mark all research team members as completed
update_research_team_status(message_buffer, "completed")
# Set first risk analyst to in_progress
message_buffer.update_agent_status("Risky Analyst", "in_progress")
def process_trader_report(chunk, message_buffer):
"""
Process trader investment plan from chunk.
Args:
chunk: The chunk dictionary from the graph stream
message_buffer: The MessageBuffer instance to update
"""
if "trader_investment_plan" in chunk and chunk["trader_investment_plan"]:
message_buffer.update_report_section(
"trader_investment_plan", chunk["trader_investment_plan"]
)
# Set first risk analyst to in_progress
message_buffer.update_agent_status("Risky Analyst", "in_progress")
def process_risk_debate(chunk, message_buffer):
"""
Process risk management team debate state from chunk.
Args:
chunk: The chunk dictionary from the graph stream
message_buffer: The MessageBuffer instance to update
"""
if "risk_debate_state" not in chunk or not chunk["risk_debate_state"]:
return
risk_state = chunk["risk_debate_state"]
# Handle all risk analysts with a mapping
risk_analysts = [
("current_risky_response", "Risky Analyst"),
("current_safe_response", "Safe Analyst"),
("current_neutral_response", "Neutral Analyst"),
]
for response_key, analyst_name in risk_analysts:
if response_key in risk_state and risk_state[response_key]:
message_buffer.update_agent_status(analyst_name, "in_progress")
message_buffer.add_message(
"Reasoning",
f"{analyst_name}: {risk_state[response_key]}",
)
message_buffer.update_report_section(
"final_trade_decision",
f"### {analyst_name} Analysis\n{risk_state[response_key]}",
)
# Update Portfolio Manager status and final decision
if "judge_decision" in risk_state and risk_state["judge_decision"]:
message_buffer.update_agent_status(
"Portfolio Manager", "in_progress"
)
message_buffer.add_message(
"Reasoning",
f"Portfolio Manager: {risk_state['judge_decision']}",
)
# Update risk report with final decision only
message_buffer.update_report_section(
"final_trade_decision",
f"### Portfolio Manager Decision\n{risk_state['judge_decision']}",
)
# Mark risk analysts as completed
message_buffer.update_agent_status("Risky Analyst", "completed")
message_buffer.update_agent_status("Safe Analyst", "completed")
message_buffer.update_agent_status("Neutral Analyst", "completed")
message_buffer.update_agent_status("Portfolio Manager", "completed")
def process_chunk(chunk, message_buffer, selected_analysts):
"""
Process a single chunk from the graph stream, updating the message buffer
with messages, reports, and agent statuses.
Args:
chunk: The chunk dictionary from the graph stream
message_buffer: The MessageBuffer instance to update
selected_analysts: List of selected analyst types
Returns:
bool: True if chunk was processed, False if chunk had no messages
"""
if len(chunk["messages"]) == 0:
return False
# Process messages and tool calls
process_message_chunk(chunk, message_buffer)
# Process different report types
process_analyst_reports(chunk, message_buffer, selected_analysts)
process_research_debate(chunk, message_buffer)
process_trader_report(chunk, message_buffer)
process_risk_debate(chunk, message_buffer)
return True