Merge pull request #1 from deepweather/commodities-and-crypto

Commodities and crypto
This commit is contained in:
Marvin Gabler 2025-10-21 15:12:54 +01:00 committed by GitHub
commit b80eb0b905
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
31 changed files with 2294 additions and 785 deletions

216
CONFIG_GUIDE.md Normal file
View File

@ -0,0 +1,216 @@
# Configuration Guide
TradingAgents supports configuration through multiple methods with the following priority:
**Priority (highest to lowest):**
1. Environment variables
2. `config.ini` file
3. Built-in defaults
## Quick Start
### Option 1: Using config.ini (Recommended)
1. Copy the example configuration:
```bash
cp config.example.ini config.ini
```
2. Edit `config.ini` with your preferences:
```ini
[llm]
provider = openai
deep_think_llm = o1-mini
quick_think_llm = gpt-4o-mini
[analysis]
research_depth = 1
default_analysts = market,news,social
[data]
global_news_limit = 15
```
3. Run the CLI - it will use your defaults and skip prompting for configured values:
```bash
uv run python -m cli.main
```
### Option 2: Using Environment Variables
Set environment variables in your shell or `.env` file:
```bash
export LLM_PROVIDER=openai
export DEEP_THINK_LLM=o1-mini
export QUICK_THINK_LLM=gpt-4o-mini
export RESEARCH_DEPTH=1
export GLOBAL_NEWS_LIMIT=20
```
## Configuration Options
### LLM Settings
| Config Key | Env Variable | Description | Default |
|------------|--------------|-------------|---------|
| `provider` | `LLM_PROVIDER` | LLM provider (openai, anthropic, google, ollama) | `openai` |
| `backend_url` | `LLM_BACKEND_URL` | API endpoint URL | `https://api.openai.com/v1` |
| `deep_think_llm` | `DEEP_THINK_LLM` | Model for complex analysis (gpt-5, o1, o3, etc.) | `o1-mini` |
| `quick_think_llm` | `QUICK_THINK_LLM` | Model for simple tasks (gpt-5, gpt-4o-mini, etc.) | `gpt-4o-mini` |
### Analysis Settings
| Config Key | Env Variable | Description | Default |
|------------|--------------|-------------|---------|
| `research_depth` | `RESEARCH_DEPTH` | Depth level (1=shallow, 2=medium, 3=deep) | `1` |
| `default_analysts` | `DEFAULT_ANALYSTS` | Comma-separated list of analysts | `market,news,social` |
### Data Settings
| Config Key | Env Variable | Description | Default |
|------------|--------------|-------------|---------|
| `global_news_limit` | `GLOBAL_NEWS_LIMIT` | Number of global news articles to fetch | `15` |
| `commodity_news_limit` | `COMMODITY_NEWS_LIMIT` | Number of commodity news articles | `50` |
### Data Vendors
Configure which data sources to use:
| Config Key | Env Variable | Description | Options |
|------------|--------------|-------------|---------|
| `stock` | `DATA_VENDOR_STOCK` | Stock price data | yfinance, alpha_vantage |
| `indicators` | `DATA_VENDOR_INDICATORS` | Technical indicators | yfinance, alpha_vantage |
| `fundamentals` | `DATA_VENDOR_FUNDAMENTALS` | Fundamental data | alpha_vantage, openai |
| `news` | `DATA_VENDOR_NEWS` | News data | alpha_vantage, openai, google |
| `commodity` | `DATA_VENDOR_COMMODITY` | Commodity data | alpha_vantage |
## Examples
### Example 1: Minimal config.ini for quick daily use
```ini
[llm]
provider = openai
deep_think_llm = o1-mini
quick_think_llm = gpt-4o-mini
[analysis]
research_depth = 1
default_analysts = market,news
```
**Result:** When you run the CLI, you'll **only be prompted for**:
- ✅ Ticker symbol
- ✅ Analysis date
**Auto-configured (no prompts)**:
- ✅ LLM provider (OpenAI)
- ✅ Models (o1-mini & gpt-4o-mini)
- ✅ Research depth (Shallow)
- ✅ Analysts (Market & News)
Everything else uses your configured defaults!
### Example 2: Using GPT-5
```ini
[llm]
provider = openai
deep_think_llm = gpt-5
quick_think_llm = gpt-5
[analysis]
research_depth = 2
default_analysts = market,news,social
```
### Example 3: Power user with custom news limits
```ini
[llm]
provider = anthropic
deep_think_llm = claude-3-5-sonnet-20241022
quick_think_llm = claude-3-5-haiku-20241022
[analysis]
research_depth = 3
default_analysts = market,news,social,fundamentals
[data]
global_news_limit = 30
commodity_news_limit = 100
[vendors]
news = openai
stock = alpha_vantage
```
### Example 4: Using environment variables
```bash
# In your ~/.zshrc or ~/.bashrc
export LLM_PROVIDER=openai
export DEEP_THINK_LLM=o1-mini
export QUICK_THINK_LLM=gpt-4o-mini
export GLOBAL_NEWS_LIMIT=20
```
## Configuration Priority Example
If you have:
- `config.ini` with `global_news_limit = 15`
- Environment variable `GLOBAL_NEWS_LIMIT=30`
The system will use **30** (environment variable wins).
## How It Works
When you run the CLI with a `config.ini` file, you'll see output like this:
```
Step 1: Ticker Symbol
Enter the ticker symbol to analyze
> AAPL
→ Detected asset class: Equity
Step 2: Analysis date
Enter the analysis date (YYYY-MM-DD)
> 2025-01-15
→ Using configured analysts: market, news, social
→ Using configured research depth: Shallow
→ Using configured LLM provider: OPENAI (https://api.openai.com/v1)
→ Using configured models:
Quick thinking: gpt-4o-mini
Deep thinking: o1-mini
```
The `→` lines show values being used from your config, skipping the prompts!
## Tips
1. **Start with config.ini** - Easier to manage than environment variables
2. **Use env vars for secrets** - Keep API keys in environment variables, not config files
3. **Version control** - Add `config.ini` to `.gitignore`, commit `config.example.ini`
4. **Test changes** - Run CLI after changing config to verify your settings work
5. **Partial config is OK** - You can configure only some values; the rest will be prompted
## Troubleshooting
**CLI still prompts for everything:**
- Check that `config.ini` exists in the TradingAgents directory
- Verify the file has the correct structure (see `config.example.ini`)
- Check for typos in section names and keys
**Wrong model being used:**
- Check priority: env vars override config.ini
- Verify model names match your provider's API exactly
- Check logs for which config values are being loaded
**News limit not working:**
- Limits are read from config on each run
- Clear Python cache: `find . -type d -name "__pycache__" -exec rm -rf {} +`
- Check the tool logs to see what limit is actually being used

64
cli/asset_detection.py Normal file
View File

@ -0,0 +1,64 @@
"""Asset class detection utilities for the CLI."""
# Known commodities from Alpha Vantage
KNOWN_COMMODITIES = {
"WTI",
"BRENT",
"NATURAL_GAS",
"COPPER",
"ALUMINUM",
"WHEAT",
"CORN",
"SUGAR",
"COTTON",
"COFFEE",
}
# Known cryptocurrencies supported by Alpha Vantage
KNOWN_CRYPTOS = {
"BTC",
"ETH",
"BNB",
"XRP",
"ADA",
"SOL",
"DOGE",
"DOT",
"MATIC",
"AVAX",
"LINK",
"UNI",
"ATOM",
"LTC",
"BCH",
"XLM",
"ALGO",
"VET",
"ICP",
"FIL",
}
def detect_asset_class(symbol: str) -> str:
"""
Automatically detect if a symbol is a commodity, crypto, or equity.
Args:
symbol: The ticker symbol (e.g., "BRENT", "BTC", "AAPL")
Returns:
"commodity", "crypto", or "equity" based on the symbol
"""
symbol_upper = symbol.upper()
if symbol_upper in KNOWN_COMMODITIES:
return "commodity"
elif symbol_upper in KNOWN_CRYPTOS:
return "crypto"
else:
return "equity"
def get_asset_class_display_name(asset_class: str) -> str:
"""Get a human-friendly display name for the asset class."""
return asset_class.capitalize()

28
cli/helper_functions.py Normal file
View File

@ -0,0 +1,28 @@
"""Helper functions for the TradingAgents CLI."""
def update_research_team_status(message_buffer, status):
"""Update status for all research team members and trader."""
research_team = ["Bull Researcher", "Bear Researcher", "Research Manager", "Trader"]
for agent in research_team:
message_buffer.update_agent_status(agent, status)
def extract_content_string(content):
"""Extract string content from various message formats."""
if isinstance(content, str):
return content
elif isinstance(content, list):
# Handle Anthropic's list format
text_parts = []
for item in content:
if isinstance(item, dict):
if item.get('type') == 'text':
text_parts.append(item.get('text', ''))
elif item.get('type') == 'tool_use':
text_parts.append(f"[Tool: {item.get('name', 'unknown')}]")
else:
text_parts.append(str(item))
return ' '.join(text_parts)
else:
return str(content)

View File

@ -28,6 +28,11 @@ from tradingagents.graph.trading_graph import TradingAgentsGraph
from tradingagents.default_config import DEFAULT_CONFIG
from cli.models import AnalystType
from cli.utils import *
from cli.message_buffer import MessageBuffer
from cli.ui_display import create_layout, update_display
from cli.report_display import display_complete_report
from cli.helper_functions import update_research_team_status, extract_content_string
from cli.asset_detection import detect_asset_class, get_asset_class_display_name
console = Console()
@ -38,365 +43,16 @@ app = typer.Typer(
)
# Create a deque to store recent messages with a maximum length
class MessageBuffer:
def __init__(self, max_length=100):
self.messages = deque(maxlen=max_length)
self.tool_calls = deque(maxlen=max_length)
self.current_report = None
self.final_report = None # Store the complete final report
self.agent_status = {
# Analyst Team
"Market Analyst": "pending",
"Social Analyst": "pending",
"News Analyst": "pending",
"Fundamentals Analyst": "pending",
# Research Team
"Bull Researcher": "pending",
"Bear Researcher": "pending",
"Research Manager": "pending",
# Trading Team
"Trader": "pending",
# Risk Management Team
"Risky Analyst": "pending",
"Neutral Analyst": "pending",
"Safe Analyst": "pending",
# Portfolio Management Team
"Portfolio Manager": "pending",
}
self.current_agent = None
self.report_sections = {
"market_report": None,
"sentiment_report": None,
"news_report": None,
"fundamentals_report": None,
"investment_plan": None,
"trader_investment_plan": None,
"final_trade_decision": None,
}
def add_message(self, message_type, content):
timestamp = datetime.datetime.now().strftime("%H:%M:%S")
self.messages.append((timestamp, message_type, content))
def add_tool_call(self, tool_name, args):
timestamp = datetime.datetime.now().strftime("%H:%M:%S")
self.tool_calls.append((timestamp, tool_name, args))
def update_agent_status(self, agent, status):
if agent in self.agent_status:
self.agent_status[agent] = status
self.current_agent = agent
def update_report_section(self, section_name, content):
if section_name in self.report_sections:
self.report_sections[section_name] = content
self._update_current_report()
def _update_current_report(self):
# For the panel display, only show the most recently updated section
latest_section = None
latest_content = None
# Find the most recently updated section
for section, content in self.report_sections.items():
if content is not None:
latest_section = section
latest_content = content
if latest_section and latest_content:
# Format the current section for display
section_titles = {
"market_report": "Market Analysis",
"sentiment_report": "Social Sentiment",
"news_report": "News Analysis",
"fundamentals_report": "Fundamentals Analysis",
"investment_plan": "Research Team Decision",
"trader_investment_plan": "Trading Team Plan",
"final_trade_decision": "Portfolio Management Decision",
}
self.current_report = (
f"### {section_titles[latest_section]}\n{latest_content}"
)
# Update the final complete report
self._update_final_report()
def _update_final_report(self):
report_parts = []
# Analyst Team Reports
if any(
self.report_sections[section]
for section in [
"market_report",
"sentiment_report",
"news_report",
"fundamentals_report",
]
):
report_parts.append("## Analyst Team Reports")
if self.report_sections["market_report"]:
report_parts.append(
f"### Market Analysis\n{self.report_sections['market_report']}"
)
if self.report_sections["sentiment_report"]:
report_parts.append(
f"### Social Sentiment\n{self.report_sections['sentiment_report']}"
)
if self.report_sections["news_report"]:
report_parts.append(
f"### News Analysis\n{self.report_sections['news_report']}"
)
if self.report_sections["fundamentals_report"]:
report_parts.append(
f"### Fundamentals Analysis\n{self.report_sections['fundamentals_report']}"
)
# Research Team Reports
if self.report_sections["investment_plan"]:
report_parts.append("## Research Team Decision")
report_parts.append(f"{self.report_sections['investment_plan']}")
# Trading Team Reports
if self.report_sections["trader_investment_plan"]:
report_parts.append("## Trading Team Plan")
report_parts.append(f"{self.report_sections['trader_investment_plan']}")
# Portfolio Management Decision
if self.report_sections["final_trade_decision"]:
report_parts.append("## Portfolio Management Decision")
report_parts.append(f"{self.report_sections['final_trade_decision']}")
self.final_report = "\n\n".join(report_parts) if report_parts else None
# Create a global message buffer instance
message_buffer = MessageBuffer()
def create_layout():
layout = Layout()
layout.split_column(
Layout(name="header", size=3),
Layout(name="main"),
Layout(name="footer", size=3),
)
layout["main"].split_column(
Layout(name="upper", ratio=3), Layout(name="analysis", ratio=5)
)
layout["upper"].split_row(
Layout(name="progress", ratio=2), Layout(name="messages", ratio=3)
)
return layout
def update_display(layout, spinner_text=None):
# Header with welcome message
layout["header"].update(
Panel(
"[bold green]Welcome to TradingAgents CLI[/bold green]\n"
"[dim]© [Tauric Research](https://github.com/TauricResearch)[/dim]",
title="Welcome to TradingAgents",
border_style="green",
padding=(1, 2),
expand=True,
)
)
# Progress panel showing agent status
progress_table = Table(
show_header=True,
header_style="bold magenta",
show_footer=False,
box=box.SIMPLE_HEAD, # Use simple header with horizontal lines
title=None, # Remove the redundant Progress title
padding=(0, 2), # Add horizontal padding
expand=True, # Make table expand to fill available space
)
progress_table.add_column("Team", style="cyan", justify="center", width=20)
progress_table.add_column("Agent", style="green", justify="center", width=20)
progress_table.add_column("Status", style="yellow", justify="center", width=20)
# Group agents by team
teams = {
"Analyst Team": [
"Market Analyst",
"Social Analyst",
"News Analyst",
"Fundamentals Analyst",
],
"Research Team": ["Bull Researcher", "Bear Researcher", "Research Manager"],
"Trading Team": ["Trader"],
"Risk Management": ["Risky Analyst", "Neutral Analyst", "Safe Analyst"],
"Portfolio Management": ["Portfolio Manager"],
}
for team, agents in teams.items():
# Add first agent with team name
first_agent = agents[0]
status = message_buffer.agent_status[first_agent]
if status == "in_progress":
spinner = Spinner(
"dots", text="[blue]in_progress[/blue]", style="bold cyan"
)
status_cell = spinner
else:
status_color = {
"pending": "yellow",
"completed": "green",
"error": "red",
}.get(status, "white")
status_cell = f"[{status_color}]{status}[/{status_color}]"
progress_table.add_row(team, first_agent, status_cell)
# Add remaining agents in team
for agent in agents[1:]:
status = message_buffer.agent_status[agent]
if status == "in_progress":
spinner = Spinner(
"dots", text="[blue]in_progress[/blue]", style="bold cyan"
)
status_cell = spinner
else:
status_color = {
"pending": "yellow",
"completed": "green",
"error": "red",
}.get(status, "white")
status_cell = f"[{status_color}]{status}[/{status_color}]"
progress_table.add_row("", agent, status_cell)
# Add horizontal line after each team
progress_table.add_row("" * 20, "" * 20, "" * 20, style="dim")
layout["progress"].update(
Panel(progress_table, title="Progress", border_style="cyan", padding=(1, 2))
)
# Messages panel showing recent messages and tool calls
messages_table = Table(
show_header=True,
header_style="bold magenta",
show_footer=False,
expand=True, # Make table expand to fill available space
box=box.MINIMAL, # Use minimal box style for a lighter look
show_lines=True, # Keep horizontal lines
padding=(0, 1), # Add some padding between columns
)
messages_table.add_column("Time", style="cyan", width=8, justify="center")
messages_table.add_column("Type", style="green", width=10, justify="center")
messages_table.add_column(
"Content", style="white", no_wrap=False, ratio=1
) # Make content column expand
# Combine tool calls and messages
all_messages = []
# Add tool calls
for timestamp, tool_name, args in message_buffer.tool_calls:
# Truncate tool call args if too long
if isinstance(args, str) and len(args) > 100:
args = args[:97] + "..."
all_messages.append((timestamp, "Tool", f"{tool_name}: {args}"))
# Add regular messages
for timestamp, msg_type, content in message_buffer.messages:
# Convert content to string if it's not already
content_str = content
if isinstance(content, list):
# Handle list of content blocks (Anthropic format)
text_parts = []
for item in content:
if isinstance(item, dict):
if item.get('type') == 'text':
text_parts.append(item.get('text', ''))
elif item.get('type') == 'tool_use':
text_parts.append(f"[Tool: {item.get('name', 'unknown')}]")
else:
text_parts.append(str(item))
content_str = ' '.join(text_parts)
elif not isinstance(content_str, str):
content_str = str(content)
# Truncate message content if too long
if len(content_str) > 200:
content_str = content_str[:197] + "..."
all_messages.append((timestamp, msg_type, content_str))
# Sort by timestamp
all_messages.sort(key=lambda x: x[0])
# Calculate how many messages we can show based on available space
# Start with a reasonable number and adjust based on content length
max_messages = 12 # Increased from 8 to better fill the space
# Get the last N messages that will fit in the panel
recent_messages = all_messages[-max_messages:]
# Add messages to table
for timestamp, msg_type, content in recent_messages:
# Format content with word wrapping
wrapped_content = Text(content, overflow="fold")
messages_table.add_row(timestamp, msg_type, wrapped_content)
if spinner_text:
messages_table.add_row("", "Spinner", spinner_text)
# Add a footer to indicate if messages were truncated
if len(all_messages) > max_messages:
messages_table.footer = (
f"[dim]Showing last {max_messages} of {len(all_messages)} messages[/dim]"
)
layout["messages"].update(
Panel(
messages_table,
title="Messages & Tools",
border_style="blue",
padding=(1, 2),
)
)
# Analysis panel showing current report
if message_buffer.current_report:
layout["analysis"].update(
Panel(
Markdown(message_buffer.current_report),
title="Current Report",
border_style="green",
padding=(1, 2),
)
)
else:
layout["analysis"].update(
Panel(
"[italic]Waiting for analysis report...[/italic]",
title="Current Report",
border_style="green",
padding=(1, 2),
)
)
# Footer with statistics
tool_calls_count = len(message_buffer.tool_calls)
llm_calls_count = sum(
1 for _, msg_type, _ in message_buffer.messages if msg_type == "Reasoning"
)
reports_count = sum(
1 for content in message_buffer.report_sections.values() if content is not None
)
stats_table = Table(show_header=False, box=None, padding=(0, 2), expand=True)
stats_table.add_column("Stats", justify="center")
stats_table.add_row(
f"Tool Calls: {tool_calls_count} | LLM Calls: {llm_calls_count} | Generated Reports: {reports_count}"
)
layout["footer"].update(Panel(stats_table, border_style="grey50"))
def get_user_selections():
"""Get all user selections before starting the analysis display."""
# Load config to check for pre-configured values
from tradingagents.default_config import DEFAULT_CONFIG
# Display ASCII art welcome message
with open("./cli/static/welcome.txt", "r") as f:
welcome_ascii = f.read()
@ -436,6 +92,12 @@ def get_user_selections():
)
)
selected_ticker = get_ticker()
# Auto-detect asset class from ticker
asset_class = detect_asset_class(selected_ticker)
console.print(
f"[dim]→ Detected asset class: [bold]{get_asset_class_display_name(asset_class)}[/bold][/dim]\n"
)
# Step 2: Analysis date
default_date = datetime.datetime.now().strftime("%Y-%m-%d")
@ -448,45 +110,85 @@ def get_user_selections():
)
analysis_date = get_analysis_date()
# Step 3: Select analysts
console.print(
create_question_box(
"Step 3: Analysts Team", "Select your LLM analyst agents for the analysis"
# Step 3: Select analysts (use config default if available)
if "default_analysts" in DEFAULT_CONFIG and DEFAULT_CONFIG["default_analysts"]:
# Convert analyst names to AnalystType
analyst_map = {
"market": AnalystType.MARKET,
"news": AnalystType.NEWS,
"social": AnalystType.SOCIAL,
"fundamentals": AnalystType.FUNDAMENTALS,
}
selected_analysts = [analyst_map[a] for a in DEFAULT_CONFIG["default_analysts"] if a in analyst_map]
# Filter out fundamentals for commodities
if asset_class == "commodity":
selected_analysts = [a for a in selected_analysts if a != AnalystType.FUNDAMENTALS]
console.print(
f"[dim]→ Using configured analysts: [bold]{', '.join(a.value for a in selected_analysts)}[/bold][/dim]\n"
)
else:
console.print(
create_question_box(
"Step 3: Analysts Team", "Select your LLM analyst agents for the analysis"
)
)
selected_analysts = select_analysts(asset_class)
console.print(
f"[green]Selected analysts:[/green] {', '.join(analyst.value for analyst in selected_analysts)}"
)
)
selected_analysts = select_analysts()
console.print(
f"[green]Selected analysts:[/green] {', '.join(analyst.value for analyst in selected_analysts)}"
)
# Step 4: Research depth
console.print(
create_question_box(
"Step 4: Research Depth", "Select your research depth level"
# Step 4: Research depth (use config default if available)
if "max_debate_rounds" in DEFAULT_CONFIG:
selected_research_depth = DEFAULT_CONFIG["max_debate_rounds"]
depth_labels = {1: "Shallow", 2: "Medium", 3: "Deep"}
console.print(
f"[dim]→ Using configured research depth: [bold]{depth_labels.get(selected_research_depth, selected_research_depth)}[/bold][/dim]\n"
)
)
selected_research_depth = select_research_depth()
else:
console.print(
create_question_box(
"Step 4: Research Depth", "Select your research depth level"
)
)
selected_research_depth = select_research_depth()
# Step 5: OpenAI backend
console.print(
create_question_box(
"Step 5: OpenAI backend", "Select which service to talk to"
# Step 5: LLM backend (use config default if available)
if "llm_provider" in DEFAULT_CONFIG and "backend_url" in DEFAULT_CONFIG:
selected_llm_provider = DEFAULT_CONFIG["llm_provider"]
backend_url = DEFAULT_CONFIG["backend_url"]
console.print(
f"[dim]→ Using configured LLM provider: [bold]{selected_llm_provider.upper()}[/bold] ({backend_url})[/dim]\n"
)
)
selected_llm_provider, backend_url = select_llm_provider()
else:
console.print(
create_question_box(
"Step 5: LLM Backend", "Select which service to talk to"
)
)
selected_llm_provider, backend_url = select_llm_provider()
# Step 6: Thinking agents
console.print(
create_question_box(
"Step 6: Thinking Agents", "Select your thinking agents for analysis"
# Step 6: Thinking agents (use config defaults if available)
if "quick_think_llm" in DEFAULT_CONFIG and "deep_think_llm" in DEFAULT_CONFIG:
selected_shallow_thinker = DEFAULT_CONFIG["quick_think_llm"]
selected_deep_thinker = DEFAULT_CONFIG["deep_think_llm"]
console.print(
f"[dim]→ Using configured models:[/dim]\n"
f"[dim] Quick thinking: [bold]{selected_shallow_thinker}[/bold][/dim]\n"
f"[dim] Deep thinking: [bold]{selected_deep_thinker}[/bold][/dim]\n"
)
)
selected_shallow_thinker = select_shallow_thinking_agent(selected_llm_provider)
selected_deep_thinker = select_deep_thinking_agent(selected_llm_provider)
else:
console.print(
create_question_box(
"Step 6: Thinking Agents", "Select your thinking agents for analysis"
)
)
selected_shallow_thinker = select_shallow_thinking_agent(selected_llm_provider)
selected_deep_thinker = select_deep_thinking_agent(selected_llm_provider)
return {
"ticker": selected_ticker,
"analysis_date": analysis_date,
"asset_class": asset_class,
"analysts": selected_analysts,
"research_depth": selected_research_depth,
"llm_provider": selected_llm_provider.lower(),
@ -520,220 +222,6 @@ def get_analysis_date():
)
def display_complete_report(final_state):
"""Display the complete analysis report with team-based panels."""
console.print("\n[bold green]Complete Analysis Report[/bold green]\n")
# I. Analyst Team Reports
analyst_reports = []
# Market Analyst Report
if final_state.get("market_report"):
analyst_reports.append(
Panel(
Markdown(final_state["market_report"]),
title="Market Analyst",
border_style="blue",
padding=(1, 2),
)
)
# Social Analyst Report
if final_state.get("sentiment_report"):
analyst_reports.append(
Panel(
Markdown(final_state["sentiment_report"]),
title="Social Analyst",
border_style="blue",
padding=(1, 2),
)
)
# News Analyst Report
if final_state.get("news_report"):
analyst_reports.append(
Panel(
Markdown(final_state["news_report"]),
title="News Analyst",
border_style="blue",
padding=(1, 2),
)
)
# Fundamentals Analyst Report
if final_state.get("fundamentals_report"):
analyst_reports.append(
Panel(
Markdown(final_state["fundamentals_report"]),
title="Fundamentals Analyst",
border_style="blue",
padding=(1, 2),
)
)
if analyst_reports:
console.print(
Panel(
Columns(analyst_reports, equal=True, expand=True),
title="I. Analyst Team Reports",
border_style="cyan",
padding=(1, 2),
)
)
# II. Research Team Reports
if final_state.get("investment_debate_state"):
research_reports = []
debate_state = final_state["investment_debate_state"]
# Bull Researcher Analysis
if debate_state.get("bull_history"):
research_reports.append(
Panel(
Markdown(debate_state["bull_history"]),
title="Bull Researcher",
border_style="blue",
padding=(1, 2),
)
)
# Bear Researcher Analysis
if debate_state.get("bear_history"):
research_reports.append(
Panel(
Markdown(debate_state["bear_history"]),
title="Bear Researcher",
border_style="blue",
padding=(1, 2),
)
)
# Research Manager Decision
if debate_state.get("judge_decision"):
research_reports.append(
Panel(
Markdown(debate_state["judge_decision"]),
title="Research Manager",
border_style="blue",
padding=(1, 2),
)
)
if research_reports:
console.print(
Panel(
Columns(research_reports, equal=True, expand=True),
title="II. Research Team Decision",
border_style="magenta",
padding=(1, 2),
)
)
# III. Trading Team Reports
if final_state.get("trader_investment_plan"):
console.print(
Panel(
Panel(
Markdown(final_state["trader_investment_plan"]),
title="Trader",
border_style="blue",
padding=(1, 2),
),
title="III. Trading Team Plan",
border_style="yellow",
padding=(1, 2),
)
)
# IV. Risk Management Team Reports
if final_state.get("risk_debate_state"):
risk_reports = []
risk_state = final_state["risk_debate_state"]
# Aggressive (Risky) Analyst Analysis
if risk_state.get("risky_history"):
risk_reports.append(
Panel(
Markdown(risk_state["risky_history"]),
title="Aggressive Analyst",
border_style="blue",
padding=(1, 2),
)
)
# Conservative (Safe) Analyst Analysis
if risk_state.get("safe_history"):
risk_reports.append(
Panel(
Markdown(risk_state["safe_history"]),
title="Conservative Analyst",
border_style="blue",
padding=(1, 2),
)
)
# Neutral Analyst Analysis
if risk_state.get("neutral_history"):
risk_reports.append(
Panel(
Markdown(risk_state["neutral_history"]),
title="Neutral Analyst",
border_style="blue",
padding=(1, 2),
)
)
if risk_reports:
console.print(
Panel(
Columns(risk_reports, equal=True, expand=True),
title="IV. Risk Management Team Decision",
border_style="red",
padding=(1, 2),
)
)
# V. Portfolio Manager Decision
if risk_state.get("judge_decision"):
console.print(
Panel(
Panel(
Markdown(risk_state["judge_decision"]),
title="Portfolio Manager",
border_style="blue",
padding=(1, 2),
),
title="V. Portfolio Manager Decision",
border_style="green",
padding=(1, 2),
)
)
def update_research_team_status(status):
"""Update status for all research team members and trader."""
research_team = ["Bull Researcher", "Bear Researcher", "Research Manager", "Trader"]
for agent in research_team:
message_buffer.update_agent_status(agent, status)
def extract_content_string(content):
"""Extract string content from various message formats."""
if isinstance(content, str):
return content
elif isinstance(content, list):
# Handle Anthropic's list format
text_parts = []
for item in content:
if isinstance(item, dict):
if item.get('type') == 'text':
text_parts.append(item.get('text', ''))
elif item.get('type') == 'tool_use':
text_parts.append(f"[Tool: {item.get('name', 'unknown')}]")
else:
text_parts.append(str(item))
return ' '.join(text_parts)
else:
return str(content)
def run_analysis():
# First get all user selections
@ -747,6 +235,7 @@ def run_analysis():
config["deep_think_llm"] = selections["deep_thinker"]
config["backend_url"] = selections["backend_url"]
config["llm_provider"] = selections["llm_provider"].lower()
config["asset_class"] = selections["asset_class"]
# Initialize the graph
graph = TradingAgentsGraph(
@ -805,7 +294,7 @@ def run_analysis():
with Live(layout, refresh_per_second=4) as live:
# Initial display
update_display(layout)
update_display(layout, message_buffer)
# Add initial messages
message_buffer.add_message("System", f"Selected ticker: {selections['ticker']}")
@ -816,7 +305,7 @@ def run_analysis():
"System",
f"Selected analysts: {', '.join(analyst.value for analyst in selections['analysts'])}",
)
update_display(layout)
update_display(layout, message_buffer)
# Reset agent statuses
for agent in message_buffer.agent_status:
@ -831,18 +320,20 @@ def run_analysis():
# Update agent status to in_progress for the first analyst
first_analyst = f"{selections['analysts'][0].value.capitalize()} Analyst"
message_buffer.update_agent_status(first_analyst, "in_progress")
update_display(layout)
update_display(layout, message_buffer)
# Create spinner text
spinner_text = (
f"Analyzing {selections['ticker']} on {selections['analysis_date']}..."
)
update_display(layout, spinner_text)
update_display(layout, message_buffer, spinner_text)
# Initialize state and get graph args
init_agent_state = graph.propagator.create_initial_state(
selections["ticker"], selections["analysis_date"]
)
# CRITICAL: Add asset_class to state so market analyst can branch correctly
init_agent_state["asset_class"] = selections["asset_class"]
args = graph.propagator.get_graph_args()
# Stream the analysis
@ -875,49 +366,24 @@ def run_analysis():
message_buffer.add_tool_call(tool_call.name, tool_call.args)
# Update reports and agent status based on chunk content
# Analyst Team Reports
if "market_report" in chunk and chunk["market_report"]:
message_buffer.update_report_section(
"market_report", chunk["market_report"]
)
message_buffer.update_agent_status("Market Analyst", "completed")
# Set next analyst to in_progress
if "social" in selections["analysts"]:
message_buffer.update_agent_status(
"Social Analyst", "in_progress"
)
if "sentiment_report" in chunk and chunk["sentiment_report"]:
message_buffer.update_report_section(
"sentiment_report", chunk["sentiment_report"]
)
message_buffer.update_agent_status("Social Analyst", "completed")
# Set next analyst to in_progress
if "news" in selections["analysts"]:
message_buffer.update_agent_status(
"News Analyst", "in_progress"
)
if "news_report" in chunk and chunk["news_report"]:
message_buffer.update_report_section(
"news_report", chunk["news_report"]
)
message_buffer.update_agent_status("News Analyst", "completed")
# Set next analyst to in_progress
if "fundamentals" in selections["analysts"]:
message_buffer.update_agent_status(
"Fundamentals Analyst", "in_progress"
)
if "fundamentals_report" in chunk and chunk["fundamentals_report"]:
message_buffer.update_report_section(
"fundamentals_report", chunk["fundamentals_report"]
)
message_buffer.update_agent_status(
"Fundamentals Analyst", "completed"
)
# Set all research team members to in_progress
update_research_team_status("in_progress")
# Analyst Team Reports - use a mapping to reduce repetition
analyst_mappings = [
("market_report", "Market Analyst", "social", "Social Analyst"),
("sentiment_report", "Social Analyst", "news", "News Analyst"),
("news_report", "News Analyst", "fundamentals", "Fundamentals Analyst"),
("fundamentals_report", "Fundamentals Analyst", None, None),
]
for report_key, analyst_name, next_type, next_analyst in analyst_mappings:
if report_key in chunk and chunk[report_key]:
message_buffer.update_report_section(report_key, chunk[report_key])
message_buffer.update_agent_status(analyst_name, "completed")
if report_key == "fundamentals_report":
# Special case: set all research team to in_progress
update_research_team_status(message_buffer, "in_progress")
elif next_type and next_type in [a.value for a in selections["analysts"]]:
message_buffer.update_agent_status(next_analyst, "in_progress")
# Research Team - Handle Investment Debate State
if (
@ -929,7 +395,7 @@ def run_analysis():
# Update Bull Researcher status and report
if "bull_history" in debate_state and debate_state["bull_history"]:
# Keep all research team members in progress
update_research_team_status("in_progress")
update_research_team_status(message_buffer, "in_progress")
# Extract latest bull response
bull_responses = debate_state["bull_history"].split("\n")
latest_bull = bull_responses[-1] if bull_responses else ""
@ -944,7 +410,7 @@ def run_analysis():
# Update Bear Researcher status and report
if "bear_history" in debate_state and debate_state["bear_history"]:
# Keep all research team members in progress
update_research_team_status("in_progress")
update_research_team_status(message_buffer, "in_progress")
# Extract latest bear response
bear_responses = debate_state["bear_history"].split("\n")
latest_bear = bear_responses[-1] if bear_responses else ""
@ -962,7 +428,7 @@ def run_analysis():
and debate_state["judge_decision"]
):
# Keep all research team members in progress until final decision
update_research_team_status("in_progress")
update_research_team_status(message_buffer, "in_progress")
message_buffer.add_message(
"Reasoning",
f"Research Manager: {debate_state['judge_decision']}",
@ -973,7 +439,7 @@ def run_analysis():
f"{message_buffer.report_sections['investment_plan']}\n\n### Research Manager Decision\n{debate_state['judge_decision']}",
)
# Mark all research team members as completed
update_research_team_status("completed")
update_research_team_status(message_buffer, "completed")
# Set first risk analyst to in_progress
message_buffer.update_agent_status(
"Risky Analyst", "in_progress"
@ -993,60 +459,25 @@ def run_analysis():
# Risk Management Team - Handle Risk Debate State
if "risk_debate_state" in chunk and chunk["risk_debate_state"]:
risk_state = chunk["risk_debate_state"]
# Update Risky Analyst status and report
if (
"current_risky_response" in risk_state
and risk_state["current_risky_response"]
):
message_buffer.update_agent_status(
"Risky Analyst", "in_progress"
)
message_buffer.add_message(
"Reasoning",
f"Risky Analyst: {risk_state['current_risky_response']}",
)
# Update risk report with risky analyst's latest analysis only
message_buffer.update_report_section(
"final_trade_decision",
f"### Risky Analyst Analysis\n{risk_state['current_risky_response']}",
)
# Update Safe Analyst status and report
if (
"current_safe_response" in risk_state
and risk_state["current_safe_response"]
):
message_buffer.update_agent_status(
"Safe Analyst", "in_progress"
)
message_buffer.add_message(
"Reasoning",
f"Safe Analyst: {risk_state['current_safe_response']}",
)
# Update risk report with safe analyst's latest analysis only
message_buffer.update_report_section(
"final_trade_decision",
f"### Safe Analyst Analysis\n{risk_state['current_safe_response']}",
)
# Update Neutral Analyst status and report
if (
"current_neutral_response" in risk_state
and risk_state["current_neutral_response"]
):
message_buffer.update_agent_status(
"Neutral Analyst", "in_progress"
)
message_buffer.add_message(
"Reasoning",
f"Neutral Analyst: {risk_state['current_neutral_response']}",
)
# Update risk report with neutral analyst's latest analysis only
message_buffer.update_report_section(
"final_trade_decision",
f"### Neutral Analyst Analysis\n{risk_state['current_neutral_response']}",
)
# Handle all risk analysts with a mapping
risk_analysts = [
("current_risky_response", "Risky Analyst"),
("current_safe_response", "Safe Analyst"),
("current_neutral_response", "Neutral Analyst"),
]
for response_key, analyst_name in risk_analysts:
if response_key in risk_state and risk_state[response_key]:
message_buffer.update_agent_status(analyst_name, "in_progress")
message_buffer.add_message(
"Reasoning",
f"{analyst_name}: {risk_state[response_key]}",
)
message_buffer.update_report_section(
"final_trade_decision",
f"### {analyst_name} Analysis\n{risk_state[response_key]}",
)
# Update Portfolio Manager status and final decision
if "judge_decision" in risk_state and risk_state["judge_decision"]:
@ -1073,7 +504,7 @@ def run_analysis():
)
# Update the display
update_display(layout)
update_display(layout, message_buffer)
trace.append(chunk)
@ -1097,7 +528,7 @@ def run_analysis():
# Display the complete final report
display_complete_report(final_state)
update_display(layout)
update_display(layout, message_buffer)
@app.command()

128
cli/message_buffer.py Normal file
View File

@ -0,0 +1,128 @@
"""Message buffer for tracking agent messages and reports in the CLI."""
from collections import deque
import datetime
class MessageBuffer:
"""Stores and manages messages, tool calls, and reports for the trading agents UI."""
def __init__(self, max_length=100):
self.messages = deque(maxlen=max_length)
self.tool_calls = deque(maxlen=max_length)
self.current_report = None
self.final_report = None # Store the complete final report
# Initialize all agents as pending
all_agents = [
"Market Analyst", "Social Analyst", "News Analyst", "Fundamentals Analyst",
"Bull Researcher", "Bear Researcher", "Research Manager",
"Trader",
"Risky Analyst", "Neutral Analyst", "Safe Analyst",
"Portfolio Manager"
]
self.agent_status = {agent: "pending" for agent in all_agents}
self.current_agent = None
self.report_sections = {
"market_report": None,
"sentiment_report": None,
"news_report": None,
"fundamentals_report": None,
"investment_plan": None,
"trader_investment_plan": None,
"final_trade_decision": None,
}
def add_message(self, message_type, content):
timestamp = datetime.datetime.now().strftime("%H:%M:%S")
self.messages.append((timestamp, message_type, content))
def add_tool_call(self, tool_name, args):
timestamp = datetime.datetime.now().strftime("%H:%M:%S")
self.tool_calls.append((timestamp, tool_name, args))
def update_agent_status(self, agent, status):
if agent in self.agent_status:
self.agent_status[agent] = status
self.current_agent = agent
def update_report_section(self, section_name, content):
if section_name in self.report_sections:
self.report_sections[section_name] = content
self._update_current_report()
def _update_current_report(self):
# For the panel display, only show the most recently updated section
latest_section = None
latest_content = None
# Find the most recently updated section
for section, content in self.report_sections.items():
if content is not None:
latest_section = section
latest_content = content
if latest_section and latest_content:
# Format the current section for display
section_titles = {
"market_report": "Market Analysis",
"sentiment_report": "Social Sentiment",
"news_report": "News Analysis",
"fundamentals_report": "Fundamentals Analysis",
"investment_plan": "Research Team Decision",
"trader_investment_plan": "Trading Team Plan",
"final_trade_decision": "Portfolio Management Decision",
}
self.current_report = (
f"### {section_titles[latest_section]}\n{latest_content}"
)
# Update the final complete report
self._update_final_report()
def _update_final_report(self):
report_parts = []
# Analyst Team Reports
if any(
self.report_sections[section]
for section in [
"market_report",
"sentiment_report",
"news_report",
"fundamentals_report",
]
):
report_parts.append("## Analyst Team Reports")
if self.report_sections["market_report"]:
report_parts.append(
f"### Market Analysis\n{self.report_sections['market_report']}"
)
if self.report_sections["sentiment_report"]:
report_parts.append(
f"### Social Sentiment\n{self.report_sections['sentiment_report']}"
)
if self.report_sections["news_report"]:
report_parts.append(
f"### News Analysis\n{self.report_sections['news_report']}"
)
if self.report_sections["fundamentals_report"]:
report_parts.append(
f"### Fundamentals Analysis\n{self.report_sections['fundamentals_report']}"
)
# Research Team Reports
if self.report_sections["investment_plan"]:
report_parts.append("## Research Team Decision")
report_parts.append(f"{self.report_sections['investment_plan']}")
# Trading Team Reports
if self.report_sections["trader_investment_plan"]:
report_parts.append("## Trading Team Plan")
report_parts.append(f"{self.report_sections['trader_investment_plan']}")
# Portfolio Management Decision
if self.report_sections["final_trade_decision"]:
report_parts.append("## Portfolio Management Decision")
report_parts.append(f"{self.report_sections['final_trade_decision']}")
self.final_report = "\n\n".join(report_parts) if report_parts else None

159
cli/report_display.py Normal file
View File

@ -0,0 +1,159 @@
"""Report display functions for the TradingAgents CLI."""
from rich.console import Console
from rich.panel import Panel
from rich.markdown import Markdown
from rich.columns import Columns
console = Console()
def display_complete_report(final_state):
"""Display the complete analysis report with team-based panels."""
console.print("\n[bold green]Complete Analysis Report[/bold green]\n")
# I. Analyst Team Reports
analyst_reports = []
# Map report keys to analyst names
analyst_report_map = [
("market_report", "Market Analyst"),
("sentiment_report", "Social Analyst"),
("news_report", "News Analyst"),
("fundamentals_report", "Fundamentals Analyst"),
]
for report_key, analyst_name in analyst_report_map:
if final_state.get(report_key):
analyst_reports.append(
Panel(
Markdown(final_state[report_key]),
title=analyst_name,
border_style="blue",
padding=(1, 2),
)
)
if analyst_reports:
console.print(
Panel(
Columns(analyst_reports, equal=True, expand=True),
title="I. Analyst Team Reports",
border_style="cyan",
padding=(1, 2),
)
)
# II. Research Team Reports
if final_state.get("investment_debate_state"):
research_reports = []
debate_state = final_state["investment_debate_state"]
# Bull Researcher Analysis
if debate_state.get("bull_history"):
research_reports.append(
Panel(
Markdown(debate_state["bull_history"]),
title="Bull Researcher",
border_style="blue",
padding=(1, 2),
)
)
# Bear Researcher Analysis
if debate_state.get("bear_history"):
research_reports.append(
Panel(
Markdown(debate_state["bear_history"]),
title="Bear Researcher",
border_style="blue",
padding=(1, 2),
)
)
# Research Manager Decision
if debate_state.get("judge_decision"):
research_reports.append(
Panel(
Markdown(debate_state["judge_decision"]),
title="Research Manager",
border_style="blue",
padding=(1, 2),
)
)
if research_reports:
console.print(
Panel(
Columns(research_reports, equal=True, expand=True),
title="II. Research Team Decision",
border_style="magenta",
padding=(1, 2),
)
)
# III. Trading Team Reports
if final_state.get("trader_investment_plan"):
console.print(
Panel(
Panel(
Markdown(final_state["trader_investment_plan"]),
title="Trader",
border_style="blue",
padding=(1, 2),
),
title="III. Trading Team Plan",
border_style="yellow",
padding=(1, 2),
)
)
# IV. Risk Management Team Reports
if final_state.get("risk_debate_state"):
risk_reports = []
risk_state = final_state["risk_debate_state"]
# Map risk history keys to analyst names
risk_analyst_map = [
("risky_history", "Aggressive Analyst"),
("safe_history", "Conservative Analyst"),
("neutral_history", "Neutral Analyst"),
]
for history_key, analyst_name in risk_analyst_map:
if risk_state.get(history_key):
risk_reports.append(
Panel(
Markdown(risk_state[history_key]),
title=analyst_name,
border_style="blue",
padding=(1, 2),
)
)
if risk_reports:
console.print(
Panel(
Columns(risk_reports, equal=True, expand=True),
title="IV. Risk Management Team Decision",
border_style="red",
padding=(1, 2),
)
)
# V. Portfolio Manager Decision
if risk_state.get("judge_decision"):
console.print(
Panel(
Panel(
Markdown(risk_state["judge_decision"]),
title="Portfolio Manager",
border_style="blue",
padding=(1, 2),
),
title="V. Portfolio Manager Decision",
border_style="green",
padding=(1, 2),
)
)

232
cli/ui_display.py Normal file
View File

@ -0,0 +1,232 @@
"""UI display functions for the TradingAgents CLI using Rich library."""
from rich.panel import Panel
from rich.spinner import Spinner
from rich.layout import Layout
from rich.text import Text
from rich.table import Table
from rich import box
def create_layout():
"""Create the main layout structure for the CLI display."""
layout = Layout()
layout.split_column(
Layout(name="header", size=3),
Layout(name="main"),
Layout(name="footer", size=3),
)
layout["main"].split_column(
Layout(name="upper", ratio=3), Layout(name="analysis", ratio=5)
)
layout["upper"].split_row(
Layout(name="progress", ratio=2), Layout(name="messages", ratio=3)
)
return layout
def update_display(layout, message_buffer, spinner_text=None):
"""Update all panels in the display layout with current data."""
# Header with welcome message
layout["header"].update(
Panel(
"[bold green]Welcome to TradingAgents CLI[/bold green]\n"
"[dim]© [Tauric Research](https://github.com/TauricResearch)[/dim]",
title="Welcome to TradingAgents",
border_style="green",
padding=(1, 2),
expand=True,
)
)
# Progress panel showing agent status
progress_table = Table(
show_header=True,
header_style="bold magenta",
show_footer=False,
box=box.SIMPLE_HEAD, # Use simple header with horizontal lines
title=None, # Remove the redundant Progress title
padding=(0, 2), # Add horizontal padding
expand=True, # Make table expand to fill available space
)
progress_table.add_column("Team", style="cyan", justify="center", width=20)
progress_table.add_column("Agent", style="green", justify="center", width=20)
progress_table.add_column("Status", style="yellow", justify="center", width=20)
# Group agents by team
teams = {
"Analyst Team": [
"Market Analyst",
"Social Analyst",
"News Analyst",
"Fundamentals Analyst",
],
"Research Team": ["Bull Researcher", "Bear Researcher", "Research Manager"],
"Trading Team": ["Trader"],
"Risk Management": ["Risky Analyst", "Neutral Analyst", "Safe Analyst"],
"Portfolio Management": ["Portfolio Manager"],
}
for team, agents in teams.items():
# Add first agent with team name
first_agent = agents[0]
status = message_buffer.agent_status[first_agent]
if status == "in_progress":
spinner = Spinner(
"dots", text="[blue]in_progress[/blue]", style="bold cyan"
)
status_cell = spinner
else:
status_color = {
"pending": "yellow",
"completed": "green",
"error": "red",
}.get(status, "white")
status_cell = f"[{status_color}]{status}[/{status_color}]"
progress_table.add_row(team, first_agent, status_cell)
# Add remaining agents in team
for agent in agents[1:]:
status = message_buffer.agent_status[agent]
if status == "in_progress":
spinner = Spinner(
"dots", text="[blue]in_progress[/blue]", style="bold cyan"
)
status_cell = spinner
else:
status_color = {
"pending": "yellow",
"completed": "green",
"error": "red",
}.get(status, "white")
status_cell = f"[{status_color}]{status}[/{status_color}]"
progress_table.add_row("", agent, status_cell)
# Add horizontal line after each team
progress_table.add_row("" * 20, "" * 20, "" * 20, style="dim")
layout["progress"].update(
Panel(progress_table, title="Progress", border_style="cyan", padding=(1, 2))
)
# Messages panel showing recent messages and tool calls
messages_table = Table(
show_header=True,
header_style="bold magenta",
show_footer=False,
expand=True, # Make table expand to fill available space
box=box.MINIMAL, # Use minimal box style for a lighter look
show_lines=True, # Keep horizontal lines
padding=(0, 1), # Add some padding between columns
)
messages_table.add_column("Time", style="cyan", width=8, justify="center")
messages_table.add_column("Type", style="green", width=10, justify="center")
messages_table.add_column(
"Content", style="white", no_wrap=False, ratio=1
) # Make content column expand
# Combine tool calls and messages
all_messages = []
# Add tool calls
for timestamp, tool_name, args in message_buffer.tool_calls:
# Truncate tool call args if too long
if isinstance(args, str) and len(args) > 100:
args = args[:97] + "..."
all_messages.append((timestamp, "Tool", f"{tool_name}: {args}"))
# Add regular messages
for timestamp, msg_type, content in message_buffer.messages:
# Convert content to string if it's not already
content_str = content
if isinstance(content, list):
# Handle list of content blocks (Anthropic format)
text_parts = []
for item in content:
if isinstance(item, dict):
if item.get('type') == 'text':
text_parts.append(item.get('text', ''))
elif item.get('type') == 'tool_use':
text_parts.append(f"[Tool: {item.get('name', 'unknown')}]")
else:
text_parts.append(str(item))
content_str = ' '.join(text_parts)
elif not isinstance(content_str, str):
content_str = str(content)
# Truncate message content if too long
if len(content_str) > 200:
content_str = content_str[:197] + "..."
all_messages.append((timestamp, msg_type, content_str))
# Sort by timestamp
all_messages.sort(key=lambda x: x[0])
# Calculate how many messages we can show based on available space
# Start with a reasonable number and adjust based on content length
max_messages = 12 # Increased from 8 to better fill the space
# Get the last N messages that will fit in the panel
recent_messages = all_messages[-max_messages:]
# Add messages to table
for timestamp, msg_type, content in recent_messages:
# Format content with word wrapping
wrapped_content = Text(content, overflow="fold")
messages_table.add_row(timestamp, msg_type, wrapped_content)
if spinner_text:
messages_table.add_row("", "Spinner", spinner_text)
# Add a footer to indicate if messages were truncated
if len(all_messages) > max_messages:
messages_table.footer = (
f"[dim]Showing last {max_messages} of {len(all_messages)} messages[/dim]"
)
layout["messages"].update(
Panel(
messages_table,
title="Messages & Tools",
border_style="blue",
padding=(1, 2),
)
)
# Analysis panel showing current report
if message_buffer.current_report:
from rich.markdown import Markdown
layout["analysis"].update(
Panel(
Markdown(message_buffer.current_report),
title="Current Report",
border_style="green",
padding=(1, 2),
)
)
else:
layout["analysis"].update(
Panel(
"[italic]Waiting for analysis report...[/italic]",
title="Current Report",
border_style="green",
padding=(1, 2),
)
)
# Footer with statistics
tool_calls_count = len(message_buffer.tool_calls)
llm_calls_count = sum(
1 for _, msg_type, _ in message_buffer.messages if msg_type == "Reasoning"
)
reports_count = sum(
1 for content in message_buffer.report_sections.values() if content is not None
)
stats_table = Table(show_header=False, box=None, padding=(0, 2), expand=True)
stats_table.add_column("Stats", justify="center")
stats_table.add_row(
f"Tool Calls: {tool_calls_count} | LLM Calls: {llm_calls_count} | Generated Reports: {reports_count}"
)
layout["footer"].update(Panel(stats_table, border_style="grey50"))

View File

@ -64,12 +64,19 @@ def get_analysis_date() -> str:
return date.strip()
def select_analysts() -> List[AnalystType]:
"""Select analysts using an interactive checkbox."""
def select_analysts(asset_class: str | None = None) -> List[AnalystType]:
"""Select analysts using an interactive checkbox.
If asset_class is 'commodity' or 'crypto', hide Fundamentals Analyst.
"""
order = ANALYST_ORDER
if asset_class and asset_class.lower() in ["commodity", "crypto"]:
order = [(d, v) for (d, v) in ANALYST_ORDER if v != AnalystType.FUNDAMENTALS]
choices = questionary.checkbox(
"Select Your [Analysts Team]:",
choices=[
questionary.Choice(display, value=value) for display, value in ANALYST_ORDER
questionary.Choice(display, value=value) for display, value in order
],
instruction="\n- Press Space to select/unselect analysts\n- Press 'a' to select/unselect all\n- Press Enter when done",
validate=lambda x: len(x) > 0 or "You must select at least one analyst.",
@ -132,6 +139,7 @@ def select_shallow_thinking_agent(provider) -> str:
("GPT-4.1-nano - Ultra-lightweight model for basic operations", "gpt-4.1-nano"),
("GPT-4.1-mini - Compact model with good performance", "gpt-4.1-mini"),
("GPT-4o - Standard model with solid capabilities", "gpt-4o"),
("GPT-5 - Next generation model with enhanced capabilities", "gpt-5"),
],
"anthropic": [
("Claude Haiku 3.5 - Fast inference and standard capabilities", "claude-3-5-haiku-latest"),
@ -189,6 +197,7 @@ def select_deep_thinking_agent(provider) -> str:
("GPT-4.1-nano - Ultra-lightweight model for basic operations", "gpt-4.1-nano"),
("GPT-4.1-mini - Compact model with good performance", "gpt-4.1-mini"),
("GPT-4o - Standard model with solid capabilities", "gpt-4o"),
("GPT-5 - Next generation model with enhanced capabilities", "gpt-5"),
("o4-mini - Specialized reasoning model (compact)", "o4-mini"),
("o3-mini - Advanced reasoning model (lightweight)", "o3-mini"),
("o3 - Full advanced reasoning model", "o3"),

38
config.example.ini Normal file
View File

@ -0,0 +1,38 @@
# TradingAgents Configuration
# Copy this file to config.ini and customize your settings
[llm]
# Provider: openai, anthropic, google, openrouter, ollama
provider = openai
backend_url = https://api.openai.com/v1
# Deep thinking model (for complex analysis, debates, final decisions)
deep_think_llm = o1-mini
# Quick thinking model (for simple tasks, data retrieval)
quick_think_llm = gpt-4o-mini
[analysis]
# Research Depth: 1 (shallow), 2 (medium), 3 (deep)
research_depth = 1
# Default analysts to use (comma-separated): market,news,social,fundamentals
default_analysts = market,news,social
[data]
# Global news limit (number of articles to fetch)
global_news_limit = 15
# Commodity news limit
commodity_news_limit = 50
[vendors]
# Data vendor preferences
stock = yfinance
indicators = yfinance
fundamentals = alpha_vantage
news = alpha_vantage
commodity = alpha_vantage
[storage]
# Directory for analysis results
results_dir = ./results
# Data cache directory
data_cache_dir = ./tradingagents/dataflows/data_cache

25
config.ini Normal file
View File

@ -0,0 +1,25 @@
[llm]
provider = openai
backend_url = https://api.openai.com/v1
deep_think_llm = o4-mini
quick_think_llm = gpt-4o
[analysis]
research_depth = 1
#default_analysts = market,news,social
[data]
global_news_limit = 15
commodity_news_limit = 50
[vendors]
stock = yfinance
indicators = yfinance
fundamentals = alpha_vantage
news = alpha_vantage
commodity = alpha_vantage
crypto = alpha_vantage
[storage]
results_dir = ./results

View File

@ -1,7 +1,10 @@
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
import time
import json
from tradingagents.agents.utils.agent_utils import get_stock_data, get_indicators
from tradingagents.agents.utils.agent_utils import (
get_market_data,
get_indicators,
)
from tradingagents.dataflows.config import get_config
@ -11,11 +14,14 @@ def create_market_analyst(llm):
current_date = state["trade_date"]
ticker = state["company_of_interest"]
company_name = state["company_of_interest"]
asset_class = state.get("asset_class", "equity")
tools = [
get_stock_data,
get_indicators,
]
# Use unified tools for all asset classes
if asset_class == "equity":
tools = [get_market_data, get_indicators]
else:
# Commodities and crypto just use market data (no indicators yet)
tools = [get_market_data]
system_message = (
"""You are a trading assistant tasked with analyzing financial markets. Your role is to select the **most relevant indicators** for a given market condition or trading strategy from the following list. The goal is to choose up to **8 indicators** that provide complementary insights without redundancy. Categories and each category's indicators are:
@ -40,12 +46,19 @@ Volatility Indicators:
- atr: ATR: Averages true range to measure volatility. Usage: Set stop-loss levels and adjust position sizes based on current market volatility. Tips: It's a reactive measure, so use it as part of a broader risk management strategy.
Volume-Based Indicators:
- vwma: VWMA: A moving average weighted by volume. Usage: Confirm trends by integrating price action with volume data. Tips: Watch for skewed results from volume spikes; use in combination with other volume analyses.
- Select indicators that provide diverse and complementary information. Avoid redundancy (e.g., do not select both rsi and stochrsi). Also briefly explain why they are suitable for the given market context. When you tool call, please use the exact name of the indicators provided above as they are defined parameters, otherwise your call will fail. Please make sure to call get_stock_data first to retrieve the CSV that is needed to generate indicators. Then use get_indicators with the specific indicator names. Write a very detailed and nuanced report of the trends you observe. Do not simply state the trends are mixed, provide detailed and finegrained analysis and insights that may help traders make decisions."""
- vwma: VWMA: A moving average weighted by volume. Usage: Confirm trends by integrating price action with volume data. Tips: Watch for skewed results from volume spikes; use in combination with other volume analyses.
- Select indicators that provide diverse and complementary information. Avoid redundancy (e.g., do not select both rsi and stochrsi). Also briefly explain why they are suitable for the given market context. When you tool call, please use the exact name of the indicators provided above as they are defined parameters, otherwise your call will fail. Write a very detailed and nuanced report of the trends you observe. Do not simply state the trends are mixed, provide detailed and finegrained analysis and insights that may help traders make decisions."""
+ """ Make sure to append a Markdown table at the end of the report to organize key points in the report, organized and easy to read."""
)
if asset_class == "equity":
system_message += f""" IMPORTANT: When calling get_market_data, always pass asset_class="{asset_class}". First call get_market_data with asset_class="{asset_class}" to retrieve the CSV data, then use get_indicators with the specific indicator names."""
elif asset_class == "commodity":
system_message += f""" IMPORTANT: When calling get_market_data, always pass asset_class="{asset_class}". Call get_market_data with asset_class="{asset_class}" to retrieve the commodity price series (value column). You may analyze trends directly on the series."""
else: # crypto
system_message += f""" IMPORTANT: When calling get_market_data, always pass asset_class="{asset_class}". Call get_market_data with asset_class="{asset_class}" to retrieve OHLCV data. You may analyze price trends and patterns directly."""
prompt = ChatPromptTemplate.from_messages(
[
(

View File

@ -1,7 +1,10 @@
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
import time
import json
from tradingagents.agents.utils.agent_utils import get_news, get_global_news
from tradingagents.agents.utils.agent_utils import (
get_asset_news,
get_global_news_unified as get_global_news,
)
from tradingagents.dataflows.config import get_config
@ -9,16 +12,44 @@ def create_news_analyst(llm):
def news_analyst_node(state):
current_date = state["trade_date"]
ticker = state["company_of_interest"]
asset_class = state.get("asset_class", "equity")
tools = [
get_news,
get_global_news,
]
# Use unified tools for all asset classes
tools = [get_asset_news, get_global_news]
system_message = (
"You are a news researcher tasked with analyzing recent news and trends over the past week. Please write a comprehensive report of the current state of the world that is relevant for trading and macroeconomics. Use the available tools: get_news(query, start_date, end_date) for company-specific or targeted news searches, and get_global_news(curr_date, look_back_days, limit) for broader macroeconomic news. Do not simply state the trends are mixed, provide detailed and finegrained analysis and insights that may help traders make decisions."
+ """ Make sure to append a Markdown table at the end of the report to organize key points in the report, organized and easy to read."""
)
# Asset-specific messaging
if asset_class == "commodity":
system_message = (
f"You are a news researcher tasked with analyzing recent news and trends for the commodity {ticker}. "
"Please write a comprehensive report of relevant news over the past week that impacts this commodity's price. "
f"Use the available tools: get_asset_news(symbol, start_date, end_date, asset_class=\"{asset_class}\") for commodity-specific news, "
"and get_global_news(curr_date, look_back_days) for broader macroeconomic context (do NOT specify limit). "
f"IMPORTANT: Always pass asset_class=\"{asset_class}\" when calling get_asset_news. If get_asset_news returns limited results, make sure to use get_global_news to provide additional market context. "
"Focus on supply/demand factors, geopolitical events, weather impacts (for agriculture), and macroeconomic trends. "
"Do not simply state the trends are mixed, provide detailed and fine-grained analysis."
+ """ Make sure to append a Markdown table at the end of the report to organize key points."""
)
elif asset_class == "crypto":
system_message = (
f"You are a news researcher tasked with analyzing recent news and trends for the cryptocurrency {ticker}. "
"Please write a comprehensive report of relevant news over the past week that impacts this cryptocurrency's price. "
f"Use the available tools: get_asset_news(symbol, start_date, end_date, asset_class=\"{asset_class}\") for crypto-specific blockchain news, "
"and get_global_news(curr_date, look_back_days) for broader macroeconomic context (do NOT specify limit). "
f"IMPORTANT: Always pass asset_class=\"{asset_class}\" when calling get_asset_news. If get_asset_news returns limited results, make sure to use get_global_news to provide additional market context. "
"Focus on regulatory developments, adoption trends, technological updates, market sentiment, and macroeconomic factors affecting crypto. "
"Do not simply state the trends are mixed, provide detailed and fine-grained analysis."
+ """ Make sure to append a Markdown table at the end of the report to organize key points."""
)
else: # equity
system_message = (
"You are a news researcher tasked with analyzing recent news and trends over the past week. "
"Please write a comprehensive report of the current state of the world that is relevant for trading and macroeconomics. "
f"Use the available tools: get_asset_news(symbol, start_date, end_date, asset_class=\"{asset_class}\") for company-specific or targeted news searches, "
"and get_global_news(curr_date, look_back_days) for broader macroeconomic news (omit limit parameter). "
f"IMPORTANT: Always pass asset_class=\"{asset_class}\" when calling get_asset_news. "
"Do not simply state the trends are mixed, provide detailed and fine-grained analysis and insights that may help traders make decisions."
+ """ Make sure to append a Markdown table at the end of the report to organize key points in the report, organized and easy to read."""
)
prompt = ChatPromptTemplate.from_messages(
[

View File

@ -1,7 +1,10 @@
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
import time
import json
from tradingagents.agents.utils.agent_utils import get_news
from tradingagents.agents.utils.agent_utils import (
get_asset_news,
get_global_news_unified as get_global_news,
)
from tradingagents.dataflows.config import get_config
@ -9,16 +12,42 @@ def create_social_media_analyst(llm):
def social_media_analyst_node(state):
current_date = state["trade_date"]
ticker = state["company_of_interest"]
company_name = state["company_of_interest"]
asset_class = state.get("asset_class", "equity")
tools = [
get_news,
]
# Use unified tools for all asset classes
tools = [get_asset_news, get_global_news]
system_message = (
"You are a social media and company specific news researcher/analyst tasked with analyzing social media posts, recent company news, and public sentiment for a specific company over the past week. You will be given a company's name your objective is to write a comprehensive long report detailing your analysis, insights, and implications for traders and investors on this company's current state after looking at social media and what people are saying about that company, analyzing sentiment data of what people feel each day about the company, and looking at recent company news. Use the get_news(query, start_date, end_date) tool to search for company-specific news and social media discussions. Try to look at all sources possible from social media to sentiment to news. Do not simply state the trends are mixed, provide detailed and finegrained analysis and insights that may help traders make decisions."
+ """ Make sure to append a Markdown table at the end of the report to organize key points in the report, organized and easy to read.""",
)
# Asset-specific messaging
if asset_class == "commodity":
system_message = (
f"You are a social media and news researcher/analyst tasked with analyzing recent discussions and sentiment for the commodity {ticker}. "
"Your objective is to write a comprehensive report detailing market sentiment, trader discussions, and public perception over the past week. "
f"Use get_asset_news(symbol, start_date, end_date, asset_class=\"{asset_class}\") to search for commodity-related news and discussions. "
f"IMPORTANT: Always pass asset_class=\"{asset_class}\" when calling get_asset_news. If get_asset_news returns limited results, supplement with get_global_news(curr_date, look_back_days) for broader market context (do NOT specify limit). "
"Focus on trader sentiment, supply/demand expectations, geopolitical concerns, and market psychology. "
"Do not simply state the trends are mixed, provide detailed and fine-grained analysis."
+ """ Make sure to append a Markdown table at the end of the report to organize key points."""
)
elif asset_class == "crypto":
system_message = (
f"You are a social media and news researcher/analyst tasked with analyzing recent discussions and sentiment for the cryptocurrency {ticker}. "
"Your objective is to write a comprehensive report detailing market sentiment, community discussions, and public perception over the past week. "
f"Use get_asset_news(symbol, start_date, end_date, asset_class=\"{asset_class}\") to search for crypto-related news and discussions. "
f"IMPORTANT: Always pass asset_class=\"{asset_class}\" when calling get_asset_news. If get_asset_news returns limited results, supplement with get_global_news(curr_date, look_back_days) for broader market context (do NOT specify limit). "
"Focus on community sentiment, adoption trends, regulatory concerns, developer activity, whale movements, and market psychology. "
"Do not simply state the trends are mixed, provide detailed and fine-grained analysis."
+ """ Make sure to append a Markdown table at the end of the report to organize key points."""
)
else: # equity
system_message = (
"You are a social media and company specific news researcher/analyst tasked with analyzing social media posts, recent company news, and public sentiment for a specific company over the past week. "
"Your objective is to write a comprehensive long report detailing your analysis, insights, and implications for traders and investors on this company's current state after looking at social media and what people are saying about that company, "
"analyzing sentiment data of what people feel each day about the company, and looking at recent company news. "
f"Use the get_asset_news(symbol, start_date, end_date, asset_class=\"{asset_class}\") tool to search for company-specific news and social media discussions. "
f"IMPORTANT: Always pass asset_class=\"{asset_class}\" when calling get_asset_news. If needed, use get_global_news(curr_date, look_back_days) for broader market context (omit limit). "
"Try to look at all sources possible from social media to sentiment to news. Do not simply state the trends are mixed, provide detailed and fine-grained analysis and insights that may help traders make decisions."
+ """ Make sure to append a Markdown table at the end of the report to organize key points in the report, organized and easy to read."""
)
prompt = ChatPromptTemplate.from_messages(
[

View File

@ -0,0 +1,17 @@
"""Configuration for trading agents."""
from .analyst_config import AnalystConfig, get_analyst_config
from .prompt_builder import (
build_market_analyst_prompt,
build_news_analyst_prompt,
build_social_media_analyst_prompt,
)
__all__ = [
"AnalystConfig",
"get_analyst_config",
"build_market_analyst_prompt",
"build_news_analyst_prompt",
"build_social_media_analyst_prompt",
]

View File

@ -0,0 +1,113 @@
"""Centralized configuration for analyst tools and prompts based on asset class."""
from typing import List, Dict, Any
from langchain_core.tools import BaseTool
class AnalystConfig:
"""Configuration for analysts based on asset class."""
def __init__(self, asset_class: str = "equity"):
self.asset_class = asset_class.lower()
def get_tools_for_analyst(self, analyst_type: str) -> List[BaseTool]:
"""Get the appropriate tools for a given analyst based on asset class.
Args:
analyst_type: One of 'market', 'news', 'social', 'fundamentals'
Returns:
List of tools for that analyst
"""
# Import here to avoid circular dependencies
from tradingagents.agents.utils.agent_utils import (
get_stock_data,
get_indicators,
get_news,
get_commodity_news,
get_global_news,
get_insider_sentiment,
get_insider_transactions,
get_fundamentals,
get_balance_sheet,
get_cashflow,
get_income_statement,
)
from tradingagents.agents.utils.commodity_data_tools import get_commodity_data
tools_map = {
"equity": {
"market": [get_stock_data, get_indicators],
"news": [get_news, get_global_news, get_insider_sentiment, get_insider_transactions],
"social": [get_news, get_global_news],
"fundamentals": [get_fundamentals, get_balance_sheet, get_cashflow, get_income_statement],
},
"commodity": {
"market": [get_commodity_data],
"news": [get_commodity_news, get_global_news],
"social": [get_commodity_news, get_global_news],
"fundamentals": [], # Not applicable for commodities
}
}
return tools_map.get(self.asset_class, tools_map["equity"]).get(analyst_type, [])
def get_prompt_config(self, analyst_type: str) -> Dict[str, str]:
"""Get prompt configuration for a given analyst based on asset class.
Returns a dict with prompt templates and asset-specific terminology.
"""
if self.asset_class == "commodity":
return {
"asset_term": "commodity",
"asset_name_var": "ticker", # Still use ticker variable name for compatibility
"market": {
"focus": "supply/demand factors, geopolitical events, weather impacts (for agriculture), and macroeconomic trends",
"data_tool": "get_commodity_data",
"instructions": "call get_commodity_data to retrieve commodity price data",
},
"news": {
"focus": "supply/demand factors, geopolitical events, weather impacts (for agriculture), and macroeconomic trends",
"primary_tool": "get_commodity_news(commodity, start_date, end_date)",
"primary_note": "searches by topic like 'energy' for oil, 'economy_macro' for agriculture",
"fallback_note": "If get_commodity_news returns limited results, make sure to use get_global_news to provide additional market context.",
},
"social": {
"focus": "trader sentiment, supply/demand expectations, geopolitical concerns, and market psychology",
"primary_tool": "get_commodity_news(commodity, start_date, end_date)",
"primary_note": "searches by topic like 'energy' for oil",
"fallback_note": "If get_commodity_news returns limited results, supplement with get_global_news(curr_date, look_back_days, limit) for broader market context.",
}
}
else: # equity
return {
"asset_term": "company",
"asset_name_var": "ticker",
"market": {
"focus": "price trends, volume, volatility, and technical indicators",
"data_tool": "get_stock_data and get_indicators",
"instructions": "call get_stock_data first to retrieve historical price data, then get_indicators for technical analysis",
},
"news": {
"focus": "company-specific events, earnings, product launches, and market sentiment",
"primary_tool": "get_news(ticker, start_date, end_date)",
"primary_note": "for company-specific or targeted news searches",
"fallback_note": "Use get_global_news(curr_date, look_back_days, limit) for broader macroeconomic context.",
},
"social": {
"focus": "social media discussions, public sentiment, and community perception",
"primary_tool": "get_news(ticker, start_date, end_date)",
"primary_note": "to search for company-specific news and social media discussions",
"fallback_note": "If needed, use get_global_news(curr_date, look_back_days, limit) for broader market context.",
}
}
# Singleton instance can be created per graph
_config_instance = None
def get_analyst_config(asset_class: str = "equity") -> AnalystConfig:
"""Get or create analyst configuration for the given asset class."""
return AnalystConfig(asset_class)

View File

@ -0,0 +1,80 @@
"""Build analyst prompts dynamically based on asset class configuration."""
def build_market_analyst_prompt(asset_class: str, ticker: str) -> str:
"""Build system message for market analyst based on asset class."""
from .analyst_config import get_analyst_config
config = get_analyst_config(asset_class)
prompt_cfg = config.get_prompt_config("market")
return (
f"You are a market analyst specializing in {prompt_cfg['asset_term']} analysis. "
f"Your task is to analyze {ticker} and provide comprehensive technical analysis. "
f"Focus on {prompt_cfg['focus']}. "
f"\n\nIMPORTANT: First, {prompt_cfg['instructions']}. "
"After retrieving the data, provide detailed analysis with specific numbers, dates, and trends. "
"Do not simply state the trends are mixed, provide detailed and fine-grained analysis and insights that may help traders make decisions."
" Make sure to append a Markdown table at the end of the report to organize key points in the report, organized and easy to read."
)
def build_news_analyst_prompt(asset_class: str, ticker: str) -> str:
"""Build system message for news analyst based on asset class."""
from .analyst_config import get_analyst_config
config = get_analyst_config(asset_class)
prompt_cfg = config.get_prompt_config("news")
asset_term = prompt_cfg["asset_term"]
if asset_class.lower() == "commodity":
return (
f"You are a news researcher tasked with analyzing recent news and trends for the commodity {ticker}. "
"Please write a comprehensive report of relevant news over the past week that impacts this commodity's price. "
f"Use the available tools: {prompt_cfg['primary_tool']} for commodity-specific news ({prompt_cfg['primary_note']}), "
f"and get_global_news(curr_date, look_back_days) for broader macroeconomic context (do NOT specify limit - it uses optimal configured value). "
f"IMPORTANT: {prompt_cfg['fallback_note']} "
f"Focus on {prompt_cfg['focus']}. "
"Do not simply state the trends are mixed, provide detailed and fine-grained analysis."
" Make sure to append a Markdown table at the end of the report to organize key points."
)
else:
return (
"You are a news researcher tasked with analyzing recent news and trends over the past week. "
"Please write a comprehensive report of the current state of the world that is relevant for trading and macroeconomics. "
f"Use the available tools: {prompt_cfg['primary_tool']} {prompt_cfg['primary_note']}, "
f"and get_global_news(curr_date, look_back_days) for broader macroeconomic news (omit limit parameter to use configured optimal value). "
"Do not simply state the trends are mixed, provide detailed and fine-grained analysis and insights that may help traders make decisions."
" Make sure to append a Markdown table at the end of the report to organize key points in the report, organized and easy to read."
)
def build_social_media_analyst_prompt(asset_class: str, ticker: str) -> str:
"""Build system message for social media analyst based on asset class."""
from .analyst_config import get_analyst_config
config = get_analyst_config(asset_class)
prompt_cfg = config.get_prompt_config("social")
asset_term = prompt_cfg["asset_term"]
if asset_class.lower() == "commodity":
return (
f"You are a social media and news researcher/analyst tasked with analyzing recent discussions and sentiment for the commodity {ticker}. "
"Your objective is to write a comprehensive report detailing market sentiment, trader discussions, and public perception over the past week. "
f"Use {prompt_cfg['primary_tool']} to search for commodity-related news and discussions ({prompt_cfg['primary_note']}). "
f"IMPORTANT: {prompt_cfg['fallback_note']} When using get_global_news, do NOT specify limit parameter - use configured optimal value. "
f"Focus on {prompt_cfg['focus']}. "
"Do not simply state the trends are mixed, provide detailed and fine-grained analysis."
" Make sure to append a Markdown table at the end of the report to organize key points."
)
else:
return (
f"You are a social media and {asset_term} specific news researcher/analyst tasked with analyzing social media posts, recent {asset_term} news, and public sentiment for a specific {asset_term} over the past week. "
f"Your objective is to write a comprehensive long report detailing your analysis, insights, and implications for traders and investors on this {asset_term}'s current state after looking at social media and what people are saying about that {asset_term}, "
f"analyzing sentiment data of what people feel each day about the {asset_term}, and looking at recent {asset_term} news. "
f"Use the {prompt_cfg['primary_tool']} tool {prompt_cfg['primary_note']}. "
f"{prompt_cfg['fallback_note']} When using get_global_news, omit the limit parameter to use the configured optimal value. "
"Try to look at all sources possible from social media to sentiment to news. Do not simply state the trends are mixed, provide detailed and fine-grained analysis and insights that may help traders make decisions."
" Make sure to append a Markdown table at the end of the report to organize key points in the report, organized and easy to read."
)

View File

@ -50,6 +50,7 @@ class RiskDebateState(TypedDict):
class AgentState(MessagesState):
company_of_interest: Annotated[str, "Company that we are interested in trading"]
trade_date: Annotated[str, "What date we are trading at"]
asset_class: Annotated[str, "Asset class: equity, commodity, or crypto"]
sender: Annotated[str, "Agent that sent this message"]

View File

@ -15,10 +15,21 @@ from tradingagents.agents.utils.fundamental_data_tools import (
)
from tradingagents.agents.utils.news_data_tools import (
get_news,
get_commodity_news,
get_crypto_news,
get_insider_sentiment,
get_insider_transactions,
get_global_news
)
from tradingagents.agents.utils.crypto_data_tools import (
get_crypto_data
)
from tradingagents.agents.utils.unified_market_tools import (
get_market_data,
get_asset_news,
get_indicators,
get_global_news as get_global_news_unified,
)
def create_msg_delete():
def delete_messages(state):

View File

@ -0,0 +1,19 @@
from langchain_core.tools import tool
from typing import Annotated
from tradingagents.dataflows.interface import route_to_vendor
@tool
def get_commodity_data(
commodity: Annotated[str, "name like WTI, BRENT, NATURAL_GAS, COPPER"],
start_date: Annotated[str, "YYYY-mm-dd"],
end_date: Annotated[str, "YYYY-mm-dd"],
interval: Annotated[str, "daily|weekly|monthly"] = "monthly",
) -> str:
"""
Retrieve commodity price data for a given commodity symbol.
Uses the configured commodity_data vendor.
"""
return route_to_vendor("get_commodity_data", commodity, start_date, end_date, interval)

View File

@ -0,0 +1,34 @@
"""Cryptocurrency data tools for LangChain agents."""
from langchain_core.tools import tool
from typing import Annotated
from tradingagents.dataflows.interface import route_to_vendor
@tool
def get_crypto_data(
symbol: Annotated[str, "Crypto symbol like BTC, ETH, SOL, BNB"],
start_date: Annotated[str, "Start date in YYYY-mm-dd format"],
end_date: Annotated[str, "End date in YYYY-mm-dd format"],
market: Annotated[str, "Market currency like USD, EUR"] = "USD",
) -> str:
"""
Retrieve cryptocurrency OHLCV (Open, High, Low, Close, Volume) price data.
This tool fetches historical cryptocurrency price data for analysis.
Supports major cryptocurrencies like Bitcoin (BTC), Ethereum (ETH), Solana (SOL), etc.
Args:
symbol: Cryptocurrency symbol (e.g., "BTC", "ETH", "SOL")
start_date: Start date in YYYY-mm-dd format
end_date: End date in YYYY-mm-dd format
market: Market currency (default "USD", can also be "EUR", "GBP")
Returns:
CSV string with columns: time,open,high,low,close,volume
Example:
get_crypto_data("BTC", "2025-01-01", "2025-01-31", "USD")
"""
return route_to_vendor("get_crypto_data", symbol, start_date, end_date, market)

View File

@ -24,20 +24,67 @@ def get_news(
def get_global_news(
curr_date: Annotated[str, "Current date in yyyy-mm-dd format"],
look_back_days: Annotated[int, "Number of days to look back"] = 7,
limit: Annotated[int, "Maximum number of articles to return"] = 5,
limit: Annotated[int, "Number of articles (leave unset to use configured default, typically 15-20)"] = None,
) -> str:
"""
Retrieve global news data.
Uses the configured news_data vendor.
DO NOT specify limit parameter - it will use the system-configured optimal value.
Only override if you have a specific reason to fetch more/fewer articles.
Args:
curr_date (str): Current date in yyyy-mm-dd format
look_back_days (int): Number of days to look back (default 7)
limit (int): Maximum number of articles to return (default 5)
limit (int): OPTIONAL - Number of articles (omit to use configured default)
Returns:
str: A formatted string containing global news data
"""
# Use config default if not specified
if limit is None:
from tradingagents.dataflows.config import get_config
limit = get_config().get("global_news_limit", 15)
return route_to_vendor("get_global_news", curr_date, look_back_days, limit)
@tool
def get_commodity_news(
commodity: Annotated[str, "Commodity symbol like BRENT, WTI, COPPER"],
start_date: Annotated[str, "Start date in yyyy-mm-dd format"],
end_date: Annotated[str, "End date in yyyy-mm-dd format"],
) -> str:
"""
Retrieve news data for a commodity (oil, metals, agriculture).
Uses topic-based search since commodities don't have stock tickers.
Searches news by relevant topics (energy, economy, etc.) and filters for the commodity.
Args:
commodity (str): Commodity symbol (e.g., "BRENT", "WTI", "COPPER")
start_date (str): Start date in yyyy-mm-dd format
end_date (str): End date in yyyy-mm-dd format
Returns:
str: A formatted string containing commodity-related news data
"""
return route_to_vendor("get_commodity_news", commodity, start_date, end_date)
@tool
def get_crypto_news(
crypto: Annotated[str, "Crypto symbol like BTC, ETH, SOL"],
start_date: Annotated[str, "Start date in yyyy-mm-dd format"],
end_date: Annotated[str, "End date in yyyy-mm-dd format"],
) -> str:
"""
Retrieve news data for a cryptocurrency (Bitcoin, Ethereum, etc.).
Uses topic-based search since crypto doesn't have traditional stock tickers.
Searches news by blockchain/technology topics and filters for the specific cryptocurrency.
Args:
crypto (str): Cryptocurrency symbol (e.g., "BTC", "ETH", "SOL")
start_date (str): Start date in yyyy-mm-dd format
end_date (str): End date in yyyy-mm-dd format
Returns:
str: A formatted string containing crypto-related news data
"""
return route_to_vendor("get_crypto_news", crypto, start_date, end_date)
@tool
def get_insider_sentiment(
ticker: Annotated[str, "ticker symbol for the company"],

View File

@ -0,0 +1,132 @@
"""
Unified tools for market data and news that work across all asset classes.
These tools automatically route to the correct vendor implementation based on asset class.
"""
from langchain_core.tools import tool
from typing import Annotated, Optional
from tradingagents.dataflows.interface import route_to_vendor
@tool
def get_market_data(
symbol: Annotated[str, "Asset symbol (AAPL for stocks, BRENT for commodities, BTC for crypto)"],
start_date: Annotated[str, "Start date in YYYY-mm-dd format"],
end_date: Annotated[str, "End date in YYYY-mm-dd format"],
asset_class: Annotated[str, "Asset class: equity, commodity, or crypto"] = "equity",
interval: Annotated[str, "Data interval: daily, weekly, or monthly (for commodities)"] = "daily",
market: Annotated[str, "Market currency for crypto (USD, EUR, etc.)"] = "USD",
) -> str:
"""
Retrieve price data for any asset: stocks, commodities, or cryptocurrencies.
Automatically routes to the appropriate data source based on asset class.
For stocks: Returns OHLCV data (interval is ignored for stocks)
For commodities: Returns price data from Alpha Vantage commodity API
For crypto: Returns OHLCV data with specified market currency
Args:
symbol: Ticker or symbol (e.g., "AAPL", "BRENT", "BTC")
start_date: Start date in YYYY-mm-dd format
end_date: End date in YYYY-mm-dd format
asset_class: The type of asset (equity, commodity, crypto)
interval: Data interval (daily, weekly, monthly) - only used for commodities
market: Market currency for crypto pairs (default: USD) - only used for crypto
Returns:
CSV-formatted price data with appropriate columns for the asset type
"""
if asset_class == "crypto":
return route_to_vendor("get_crypto_data", symbol, start_date, end_date, market)
elif asset_class == "commodity":
return route_to_vendor("get_commodity_data", symbol, start_date, end_date, interval)
else: # equity
# Stock data functions only take 3 params (no interval)
return route_to_vendor("get_stock_data", symbol, start_date, end_date)
@tool
def get_indicators(
symbol: Annotated[str, "Stock ticker symbol"],
start_date: Annotated[str, "Start date in YYYY-mm-dd format"],
end_date: Annotated[str, "End date in YYYY-mm-dd format"],
indicators: Annotated[str, "Comma-separated list of indicators (sma_20, rsi, macd, etc.)"],
) -> str:
"""
Calculate technical indicators for stock data.
Note: Currently only supported for equities.
Supported indicators: sma_X, ema_X, rsi, macd, boll (Bollinger Bands), adx, cci, stoch
Args:
symbol: Stock ticker symbol
start_date: Start date in YYYY-mm-dd format
end_date: End date in YYYY-mm-dd format
indicators: Comma-separated list of indicator names
Returns:
CSV-formatted data with requested technical indicators
"""
# Indicators are equity-specific for now
return route_to_vendor("get_indicators", symbol, start_date, end_date, indicators)
@tool
def get_asset_news(
symbol: Annotated[str, "Asset symbol (AAPL for stocks, BRENT for commodities, BTC for crypto)"],
start_date: Annotated[str, "Start date in YYYY-mm-dd format"],
end_date: Annotated[str, "End date in YYYY-mm-dd format"],
asset_class: Annotated[str, "Asset class: equity, commodity, or crypto"] = "equity",
) -> str:
"""
Retrieve news and sentiment data for any asset type.
Automatically routes to the appropriate news source based on asset class.
For stocks: Returns ticker-specific news from financial outlets
For commodities: Returns topic-based news (energy, metals, agriculture)
For crypto: Returns blockchain/technology news filtered for the specific cryptocurrency
Args:
symbol: Ticker or symbol
start_date: Start date in YYYY-mm-dd format
end_date: End date in YYYY-mm-dd format
asset_class: The type of asset (equity, commodity, crypto)
Returns:
JSON-formatted news articles with sentiment scores and metadata
"""
if asset_class == "crypto":
return route_to_vendor("get_crypto_news", symbol, start_date, end_date)
elif asset_class == "commodity":
return route_to_vendor("get_commodity_news", symbol, start_date, end_date)
else: # equity
return route_to_vendor("get_news", symbol, start_date, end_date)
@tool
def get_global_news(
curr_date: Annotated[str, "Current date in YYYY-mm-dd format"],
look_back_days: Annotated[int, "Number of days to look back"] = 7,
limit: Annotated[Optional[int], "Maximum number of articles (omit to use configured default)"] = None,
) -> str:
"""
Retrieve global macroeconomic and financial market news.
IMPORTANT: Do NOT specify the 'limit' parameter - omit it to use the configured default.
The system is configured with an optimal limit based on data source performance.
Args:
curr_date: Current date in YYYY-mm-dd format
look_back_days: Number of days to look back (default: 7)
limit: Leave this parameter unset to use the configured default
Returns:
JSON-formatted global news articles about macro trends, markets, and economy
"""
from tradingagents.dataflows.config import get_config
if limit is None:
limit = get_config().get("global_news_limit", 15)
return route_to_vendor("get_global_news", curr_date, look_back_days, limit)

View File

@ -2,4 +2,7 @@
from .alpha_vantage_stock import get_stock
from .alpha_vantage_indicator import get_indicator
from .alpha_vantage_fundamentals import get_fundamentals, get_balance_sheet, get_cashflow, get_income_statement
from .alpha_vantage_news import get_news, get_insider_transactions
from .alpha_vantage_news import get_news, get_insider_transactions, get_commodity_news, get_crypto_news
from .alpha_vantage_global_news import get_global_news_alpha_vantage
from .alpha_vantage_commodity import get_commodity
from .alpha_vantage_crypto import get_crypto

View File

@ -0,0 +1,88 @@
from .alpha_vantage_common import _make_api_request
from datetime import datetime, timedelta
# Map human-friendly names to Alpha Vantage commodity functions
FUNCTIONS = {
"WTI": "WTI",
"BRENT": "BRENT",
"NATURAL_GAS": "NATURAL_GAS",
"COPPER": "COPPER",
"ALUMINUM": "ALUMINUM",
"WHEAT": "WHEAT",
"CORN": "CORN",
"SUGAR": "SUGAR",
"COTTON": "COTTON",
"COFFEE": "COFFEE",
}
def get_commodity(
commodity: str,
start_date: str,
end_date: str,
interval: str = "monthly",
) -> str:
"""
Fetch commodity price series from Alpha Vantage and return as CSV with columns time,value.
Args:
commodity: e.g. WTI, BRENT, NATURAL_GAS, COPPER
start_date: YYYY-mm-dd
end_date: YYYY-mm-dd
interval: daily|weekly|monthly (depends on AV endpoint support)
Returns:
CSV string with headers time,value
"""
func = FUNCTIONS.get(commodity.upper())
if not func:
raise ValueError(f"Unsupported commodity: {commodity}")
params = {
"interval": interval,
"datatype": "json",
}
raw = _make_api_request(func, params)
# Convert AV JSON payload to simple CSV
import json
import io
try:
payload = json.loads(raw)
series = payload.get("data") or []
s_dt = datetime.strptime(start_date, "%Y-%m-%d")
e_dt = datetime.strptime(end_date, "%Y-%m-%d")
# If user passed a very narrow window (e.g., single day) on a monthly/weekly series,
# widen to a reasonable historical window to ensure data presence.
if interval == "monthly" and (e_dt - s_dt).days < 28:
s_dt = e_dt - timedelta(days=365)
elif interval == "weekly" and (e_dt - s_dt).days < 7:
s_dt = e_dt - timedelta(days=180)
rows = []
for item in series:
# items are like {"date": "2025-05-01", "value": "xxxx"}
d = datetime.strptime(item["date"], "%Y-%m-%d")
if s_dt <= d <= e_dt:
rows.append((item["date"], item.get("value")))
out = io.StringIO()
out.write("time,value\n")
for d, v in sorted(rows):
out.write(f"{d},{v}\n")
csv_text = out.getvalue()
# If still empty after widening, return header + latest few rows without filtering
if csv_text.strip() == "time,value":
out = io.StringIO()
out.write("time,value\n")
for item in series[:24]: # last ~2 years monthly
out.write(f"{item['date']},{item.get('value')}\n")
return out.getvalue()
return csv_text
except Exception:
# Fallback: return raw payload for debugging
return raw

View File

@ -0,0 +1,215 @@
"""Alpha Vantage cryptocurrency data fetcher."""
from .alpha_vantage_common import _make_api_request
from datetime import datetime, timedelta
import json
import io
# Map crypto symbols to full names for better context
CRYPTO_NAMES = {
"BTC": "Bitcoin",
"ETH": "Ethereum",
"BNB": "Binance Coin",
"XRP": "Ripple",
"ADA": "Cardano",
"SOL": "Solana",
"DOGE": "Dogecoin",
"DOT": "Polkadot",
"MATIC": "Polygon",
"AVAX": "Avalanche",
"LINK": "Chainlink",
"UNI": "Uniswap",
"ATOM": "Cosmos",
"LTC": "Litecoin",
"BCH": "Bitcoin Cash",
"XLM": "Stellar",
"ALGO": "Algorand",
"VET": "VeChain",
"ICP": "Internet Computer",
"FIL": "Filecoin",
}
def get_crypto(
symbol: str,
start_date: str,
end_date: str,
market: str = "USD",
) -> str:
"""
Fetch cryptocurrency OHLCV data from Alpha Vantage.
Uses the DIGITAL_CURRENCY_DAILY function to get daily crypto prices.
Args:
symbol: Cryptocurrency symbol (e.g., "BTC", "ETH", "SOL")
start_date: Start date in YYYY-mm-dd format
end_date: End date in YYYY-mm-dd format
market: Market currency (default "USD", can also be "EUR", "GBP", etc.)
Returns:
CSV string with columns: time,open,high,low,close,volume
"""
symbol_upper = symbol.upper()
market_upper = market.upper()
# Alpha Vantage uses DIGITAL_CURRENCY_DAILY
params = {
"symbol": symbol_upper,
"market": market_upper,
}
raw = _make_api_request("DIGITAL_CURRENCY_DAILY", params)
# Parse the JSON response and convert to CSV
try:
payload = json.loads(raw)
# Alpha Vantage returns data in "Time Series (Digital Currency Daily)" key
time_series_key = "Time Series (Digital Currency Daily)"
if time_series_key not in payload:
# If key not found, return raw for debugging
return raw
time_series = payload[time_series_key]
# Parse date range
s_dt = datetime.strptime(start_date, "%Y-%m-%d")
e_dt = datetime.strptime(end_date, "%Y-%m-%d")
# If very narrow window, widen to get more context
if (e_dt - s_dt).days < 7:
s_dt = e_dt - timedelta(days=90) # 90 days for crypto (more volatile)
# DEBUG: Check what keys are available in the first data point
if time_series:
first_date = next(iter(time_series))
first_data = time_series[first_date]
print(f"DEBUG: Available keys in crypto data: {list(first_data.keys())}")
# Build rows within date range
rows = []
for date_str, values in time_series.items():
try:
d = datetime.strptime(date_str, "%Y-%m-%d")
if s_dt <= d <= e_dt:
# Alpha Vantage crypto keys - try multiple formats
# Format 1: "1a. open (USD)" - standard format
# Format 2: "1b. open (USD)" - alternate
# Format 3: "1. open" - fallback
def find_value(patterns):
"""Try multiple key patterns and return first match."""
for pattern in patterns:
if pattern in values:
return values[pattern]
return ""
open_price = find_value([
f"1a. open ({market_upper})",
f"1b. open ({market_upper})",
"1a. open",
"1. open"
])
high_price = find_value([
f"2a. high ({market_upper})",
f"2b. high ({market_upper})",
"2a. high",
"2. high"
])
low_price = find_value([
f"3a. low ({market_upper})",
f"3b. low ({market_upper})",
"3a. low",
"3. low"
])
close_price = find_value([
f"4a. close ({market_upper})",
f"4b. close ({market_upper})",
"4a. close",
"4. close"
])
volume = values.get("5. volume", "")
rows.append((
date_str,
open_price,
high_price,
low_price,
close_price,
volume
))
except ValueError:
# Skip invalid dates
continue
# Sort by date (oldest first)
rows.sort(key=lambda x: x[0])
# Convert to CSV
out = io.StringIO()
out.write("time,open,high,low,close,volume\n")
for row in rows:
out.write(f"{row[0]},{row[1]},{row[2]},{row[3]},{row[4]},{row[5]}\n")
csv_text = out.getvalue()
# If still empty after widening, return recent data without filtering
if csv_text.strip() == "time,open,high,low,close,volume":
out = io.StringIO()
out.write("time,open,high,low,close,volume\n")
# Get last 90 days
sorted_dates = sorted(time_series.keys(), reverse=True)[:90]
def find_value(vals, patterns):
"""Try multiple key patterns and return first match."""
for pattern in patterns:
if pattern in vals:
return vals[pattern]
return ""
for date_str in reversed(sorted_dates):
values = time_series[date_str]
open_price = find_value(values, [
f"1a. open ({market_upper})",
f"1b. open ({market_upper})",
"1a. open",
"1. open"
])
high_price = find_value(values, [
f"2a. high ({market_upper})",
f"2b. high ({market_upper})",
"2a. high",
"2. high"
])
low_price = find_value(values, [
f"3a. low ({market_upper})",
f"3b. low ({market_upper})",
"3a. low",
"3. low"
])
close_price = find_value(values, [
f"4a. close ({market_upper})",
f"4b. close ({market_upper})",
"4a. close",
"4. close"
])
out.write(
f"{date_str},"
f"{open_price},"
f"{high_price},"
f"{low_price},"
f"{close_price},"
f"{values.get('5. volume', '')}\n"
)
return out.getvalue()
return csv_text
except (json.JSONDecodeError, KeyError, Exception) as e:
# Fallback: return raw payload for debugging
return f"Error parsing crypto data: {str(e)}\n\nRaw response:\n{raw}"

View File

@ -0,0 +1,39 @@
"""Global news fetcher using Alpha Vantage NEWS_SENTIMENT with topics."""
from .alpha_vantage_common import _make_api_request, format_datetime_for_api
from datetime import datetime, timedelta
def get_global_news_alpha_vantage(curr_date: str, look_back_days: int = 7, limit: int = 15) -> str:
"""
Fetch global macroeconomic news using Alpha Vantage NEWS_SENTIMENT.
Uses broad topics to get general market and economic news.
Args:
curr_date: Current date in YYYY-mm-dd format
look_back_days: Number of days to look back (default 7)
limit: Number of articles to return (default 15)
Returns:
JSON string with news articles
"""
# Calculate start date
curr_dt = datetime.strptime(curr_date, "%Y-%m-%d")
start_dt = curr_dt - timedelta(days=look_back_days)
start_date = start_dt.strftime("%Y-%m-%d")
# Use broad topics for global news
# Combine multiple topics: economy_macro, finance, earnings, ipo, mergers_and_acquisitions
topics = "economy_macro,finance,earnings"
params = {
"topics": topics,
"time_from": format_datetime_for_api(start_date),
"time_to": format_datetime_for_api(curr_date),
"sort": "LATEST",
"limit": str(limit),
}
return _make_api_request("NEWS_SENTIMENT", params)

View File

@ -1,5 +1,88 @@
from .alpha_vantage_common import _make_api_request, format_datetime_for_api
# Map commodity symbols to Alpha Vantage NEWS_SENTIMENT topics
COMMODITY_TOPIC_MAP = {
# Energy commodities
"WTI": "energy",
"BRENT": "energy",
"NATURAL_GAS": "energy",
# Metals
"COPPER": "technology", # Copper is heavily used in tech/manufacturing
"ALUMINUM": "technology",
# Agriculture
"WHEAT": "economy_macro", # Agriculture affects macro economy
"CORN": "economy_macro",
"SUGAR": "economy_macro",
"COTTON": "economy_macro",
"COFFEE": "economy_macro",
}
# Map commodities to search keywords for better context
COMMODITY_KEYWORDS = {
"WTI": "WTI crude oil",
"BRENT": "Brent crude oil",
"NATURAL_GAS": "natural gas",
"COPPER": "copper commodity",
"ALUMINUM": "aluminum commodity",
"WHEAT": "wheat commodity",
"CORN": "corn commodity",
"SUGAR": "sugar commodity",
"COTTON": "cotton commodity",
"COFFEE": "coffee commodity",
}
# Map crypto symbols to Alpha Vantage NEWS_SENTIMENT topics
CRYPTO_TOPIC_MAP = {
# Major cryptocurrencies - use blockchain/technology topics
"BTC": "blockchain",
"ETH": "blockchain",
"BNB": "blockchain",
"XRP": "blockchain",
"ADA": "blockchain",
"SOL": "blockchain",
"DOGE": "blockchain",
"DOT": "blockchain",
"MATIC": "blockchain",
"AVAX": "blockchain",
"LINK": "blockchain",
"UNI": "blockchain",
"ATOM": "blockchain",
"LTC": "blockchain",
"BCH": "blockchain",
"XLM": "blockchain",
"ALGO": "blockchain",
"VET": "blockchain",
"ICP": "blockchain",
"FIL": "blockchain",
}
# Map crypto symbols to search keywords for better news filtering
CRYPTO_KEYWORDS = {
"BTC": "Bitcoin BTC cryptocurrency",
"ETH": "Ethereum ETH cryptocurrency",
"BNB": "Binance Coin BNB",
"XRP": "Ripple XRP",
"ADA": "Cardano ADA",
"SOL": "Solana SOL",
"DOGE": "Dogecoin DOGE",
"DOT": "Polkadot DOT",
"MATIC": "Polygon MATIC",
"AVAX": "Avalanche AVAX",
"LINK": "Chainlink LINK",
"UNI": "Uniswap UNI",
"ATOM": "Cosmos ATOM",
"LTC": "Litecoin LTC",
"BCH": "Bitcoin Cash BCH",
"XLM": "Stellar XLM",
"ALGO": "Algorand ALGO",
"VET": "VeChain VET",
"ICP": "Internet Computer ICP",
"FIL": "Filecoin FIL",
}
def get_news(ticker, start_date, end_date) -> dict[str, str] | str:
"""Returns live and historical market news & sentiment data from premier news outlets worldwide.
@ -24,6 +107,108 @@ def get_news(ticker, start_date, end_date) -> dict[str, str] | str:
return _make_api_request("NEWS_SENTIMENT", params)
def get_commodity_news(commodity: str, start_date: str, end_date: str) -> dict[str, str] | str:
"""Returns news data for commodities using topic-based search.
Since Alpha Vantage NEWS_SENTIMENT doesn't support commodity symbols directly,
this function uses the 'topics' parameter to find relevant news.
Args:
commodity: Commodity symbol (e.g., "BRENT", "WTI", "COPPER")
start_date: Start date for news search.
end_date: End date for news search.
Returns:
Dictionary containing news sentiment data or JSON string.
"""
commodity_upper = commodity.upper()
topic = COMMODITY_TOPIC_MAP.get(commodity_upper, "economy_macro")
keyword = COMMODITY_KEYWORDS.get(commodity_upper, commodity)
# Use topics parameter instead of tickers for commodities
from .config import get_config
limit = get_config().get("commodity_news_limit", 50)
params = {
"topics": topic,
"time_from": format_datetime_for_api(start_date),
"time_to": format_datetime_for_api(end_date),
"sort": "LATEST",
"limit": str(limit), # Get more results to filter for commodity-specific news
}
result = _make_api_request("NEWS_SENTIMENT", params)
# Add metadata to help the LLM understand this is commodity-filtered news
import json
try:
data = json.loads(result) if isinstance(result, str) else result
if isinstance(data, dict) and "feed" in data:
# Add a note about the commodity and topic used
data["_commodity_context"] = {
"commodity": commodity,
"topic": topic,
"keyword_filter": keyword,
"note": f"News filtered by topic '{topic}'. Look for articles mentioning '{keyword}' for most relevant results."
}
return json.dumps(data)
except (json.JSONDecodeError, TypeError):
pass
return result
def get_crypto_news(crypto: str, start_date: str, end_date: str) -> dict[str, str] | str:
"""Returns news data for cryptocurrencies using Alpha Vantage ticker format.
Alpha Vantage supports crypto-specific tickers in the format CRYPTO:BTC.
Args:
crypto: Cryptocurrency symbol (e.g., "BTC", "ETH", "SOL")
start_date: Start date for news search.
end_date: End date for news search.
Returns:
Dictionary containing news sentiment data or JSON string.
"""
crypto_upper = crypto.upper()
# Use CRYPTO:XXX ticker format
ticker = f"CRYPTO:{crypto_upper}"
from .config import get_config
limit = get_config().get("commodity_news_limit", 50)
params = {
"tickers": ticker,
"time_from": format_datetime_for_api(start_date),
"time_to": format_datetime_for_api(end_date),
"sort": "LATEST",
"limit": str(limit),
}
result = _make_api_request("NEWS_SENTIMENT", params)
# Add metadata to help the LLM understand this is crypto-filtered news
import json
try:
data = json.loads(result) if isinstance(result, str) else result
if isinstance(data, dict) and "feed" in data:
keyword = CRYPTO_KEYWORDS.get(crypto_upper, f"{crypto} cryptocurrency")
data["_crypto_context"] = {
"cryptocurrency": crypto,
"ticker_used": ticker,
"keyword_filter": keyword,
"note": f"News filtered by ticker '{ticker}'. Articles tagged with {ticker}."
}
return json.dumps(data)
except (json.JSONDecodeError, TypeError):
pass
return result
def get_insider_transactions(symbol: str) -> dict[str, str] | str:
"""Returns latest and historical insider transactions by key stakeholders.

View File

@ -13,7 +13,12 @@ from .alpha_vantage import (
get_cashflow as get_alpha_vantage_cashflow,
get_income_statement as get_alpha_vantage_income_statement,
get_insider_transactions as get_alpha_vantage_insider_transactions,
get_news as get_alpha_vantage_news
get_news as get_alpha_vantage_news,
get_commodity_news as get_alpha_vantage_commodity_news,
get_crypto_news as get_alpha_vantage_crypto_news,
get_global_news_alpha_vantage,
get_commodity as get_alpha_vantage_commodity,
get_crypto as get_alpha_vantage_crypto
)
from .alpha_vantage_common import AlphaVantageRateLimitError
@ -28,6 +33,18 @@ TOOLS_CATEGORIES = {
"get_stock_data"
]
},
"commodity_data": {
"description": "Commodity price data",
"tools": [
"get_commodity_data"
]
},
"crypto_data": {
"description": "Cryptocurrency price data",
"tools": [
"get_crypto_data"
]
},
"technical_indicators": {
"description": "Technical analysis indicators",
"tools": [
@ -47,6 +64,8 @@ TOOLS_CATEGORIES = {
"description": "News (public/insiders, original/processed)",
"tools": [
"get_news",
"get_commodity_news",
"get_crypto_news",
"get_global_news",
"get_insider_sentiment",
"get_insider_transactions",
@ -69,6 +88,14 @@ VENDOR_METHODS = {
"yfinance": get_YFin_data_online,
"local": get_YFin_data,
},
# commodity_data
"get_commodity_data": {
"alpha_vantage": get_alpha_vantage_commodity,
},
# crypto_data
"get_crypto_data": {
"alpha_vantage": get_alpha_vantage_crypto,
},
# technical_indicators
"get_indicators": {
"alpha_vantage": get_alpha_vantage_indicator,
@ -102,7 +129,16 @@ VENDOR_METHODS = {
"google": get_google_news,
"local": [get_finnhub_news, get_reddit_company_news, get_google_news],
},
"get_commodity_news": {
"alpha_vantage": get_alpha_vantage_commodity_news,
"openai": get_stock_news_openai, # Fallback to OpenAI web search
},
"get_crypto_news": {
"alpha_vantage": get_alpha_vantage_crypto_news,
"openai": get_stock_news_openai, # Fallback to OpenAI web search
},
"get_global_news": {
"alpha_vantage": get_global_news_alpha_vantage,
"openai": get_global_news_openai,
"local": get_reddit_global_news
},

View File

@ -1,8 +1,83 @@
import os
import configparser
from pathlib import Path
DEFAULT_CONFIG = {
def _load_user_config():
"""Load configuration from config.ini if it exists."""
config_path = Path(__file__).parent.parent / "config.ini"
if not config_path.exists():
return {}
parser = configparser.ConfigParser()
parser.read(config_path)
user_config = {}
# LLM settings
if parser.has_section("llm"):
user_config["llm_provider"] = parser.get("llm", "provider", fallback=None)
user_config["backend_url"] = parser.get("llm", "backend_url", fallback=None)
user_config["deep_think_llm"] = parser.get("llm", "deep_think_llm", fallback=None)
user_config["quick_think_llm"] = parser.get("llm", "quick_think_llm", fallback=None)
# Analysis settings
if parser.has_section("analysis"):
depth = parser.get("analysis", "research_depth", fallback=None)
if depth:
user_config["max_debate_rounds"] = int(depth)
user_config["max_risk_discuss_rounds"] = int(depth)
analysts = parser.get("analysis", "default_analysts", fallback=None)
if analysts:
user_config["default_analysts"] = [a.strip() for a in analysts.split(",")]
# Data settings
if parser.has_section("data"):
limit = parser.get("data", "global_news_limit", fallback=None)
if limit:
user_config["global_news_limit"] = int(limit)
commodity_limit = parser.get("data", "commodity_news_limit", fallback=None)
if commodity_limit:
user_config["commodity_news_limit"] = int(commodity_limit)
# Vendor settings
if parser.has_section("vendors"):
vendors = {}
if parser.has_option("vendors", "stock"):
vendors["core_stock_apis"] = parser.get("vendors", "stock")
if parser.has_option("vendors", "indicators"):
vendors["technical_indicators"] = parser.get("vendors", "indicators")
if parser.has_option("vendors", "fundamentals"):
vendors["fundamental_data"] = parser.get("vendors", "fundamentals")
if parser.has_option("vendors", "news"):
vendors["news_data"] = parser.get("vendors", "news")
if parser.has_option("vendors", "commodity"):
vendors["commodity_data"] = parser.get("vendors", "commodity")
if parser.has_option("vendors", "crypto"):
vendors["crypto_data"] = parser.get("vendors", "crypto")
if vendors:
user_config["data_vendors"] = vendors
# Storage settings
if parser.has_section("storage"):
results = parser.get("storage", "results_dir", fallback=None)
if results:
user_config["results_dir"] = results
# Remove None values
return {k: v for k, v in user_config.items() if v is not None}
# Load user config from config.ini
_user_config = _load_user_config()
# Base defaults
_base_config = {
"project_dir": os.path.abspath(os.path.join(os.path.dirname(__file__), ".")),
"results_dir": os.getenv("TRADINGAGENTS_RESULTS_DIR", "./results"),
"results_dir": "./results",
"data_dir": "/Users/yluo/Documents/Code/ScAI/FR1-data",
"data_cache_dir": os.path.join(
os.path.abspath(os.path.join(os.path.dirname(__file__), ".")),
@ -10,24 +85,52 @@ DEFAULT_CONFIG = {
),
# LLM settings
"llm_provider": "openai",
"deep_think_llm": "o4-mini",
"deep_think_llm": "o1-mini",
"quick_think_llm": "gpt-4o-mini",
"backend_url": "https://api.openai.com/v1",
# Asset class (equity | commodity)
"asset_class": "equity",
# Debate and discussion settings
"max_debate_rounds": 1,
"max_risk_discuss_rounds": 1,
"max_recur_limit": 100,
# News limits
"global_news_limit": 15,
"commodity_news_limit": 50,
# Data vendor configuration
# Category-level configuration (default for all tools in category)
"data_vendors": {
"core_stock_apis": "yfinance", # Options: yfinance, alpha_vantage, local
"technical_indicators": "yfinance", # Options: yfinance, alpha_vantage, local
"fundamental_data": "alpha_vantage", # Options: openai, alpha_vantage, local
"news_data": "alpha_vantage", # Options: openai, alpha_vantage, google, local
"core_stock_apis": "yfinance",
"technical_indicators": "yfinance",
"fundamental_data": "alpha_vantage",
"news_data": "alpha_vantage",
"commodity_data": "alpha_vantage",
"crypto_data": "alpha_vantage",
},
# Tool-level configuration (takes precedence over category-level)
"tool_vendors": {
# Example: "get_stock_data": "alpha_vantage", # Override category default
# Example: "get_news": "openai", # Override category default
},
"tool_vendors": {},
}
# Merge with environment variables
_env_overrides = {
"llm_provider": os.getenv("LLM_PROVIDER"),
"deep_think_llm": os.getenv("DEEP_THINK_LLM"),
"quick_think_llm": os.getenv("QUICK_THINK_LLM"),
"backend_url": os.getenv("LLM_BACKEND_URL"),
"results_dir": os.getenv("TRADINGAGENTS_RESULTS_DIR"),
"global_news_limit": os.getenv("GLOBAL_NEWS_LIMIT"),
"commodity_news_limit": os.getenv("COMMODITY_NEWS_LIMIT"),
}
_env_overrides = {k: v for k, v in _env_overrides.items() if v is not None}
# Convert string numbers to int
if "global_news_limit" in _env_overrides:
_env_overrides["global_news_limit"] = int(_env_overrides["global_news_limit"])
if "commodity_news_limit" in _env_overrides:
_env_overrides["commodity_news_limit"] = int(_env_overrides["commodity_news_limit"])
# Priority: env vars > config.ini > base defaults
DEFAULT_CONFIG = {**_base_config, **_user_config, **_env_overrides}
# Update data_vendors if user config has partial vendors
if "data_vendors" in _user_config:
DEFAULT_CONFIG["data_vendors"] = {**_base_config["data_vendors"], **_user_config["data_vendors"]}

View File

@ -22,18 +22,18 @@ from tradingagents.agents.utils.agent_states import (
)
from tradingagents.dataflows.config import set_config
# Import the new abstract tool methods from agent_utils
# Import unified tools from agent_utils
from tradingagents.agents.utils.agent_utils import (
get_stock_data,
get_market_data,
get_indicators,
get_asset_news,
get_global_news_unified as get_global_news,
get_fundamentals,
get_balance_sheet,
get_cashflow,
get_income_statement,
get_news,
get_insider_sentiment,
get_insider_transactions,
get_global_news
)
from .conditional_logic import ConditionalLogic
@ -121,40 +121,21 @@ class TradingAgentsGraph:
self.graph = self.graph_setup.setup_graph(selected_analysts)
def _create_tool_nodes(self) -> Dict[str, ToolNode]:
"""Create tool nodes for different data sources using abstract methods."""
"""Create tool nodes using unified tools that work across all asset classes."""
# Unified tools automatically route based on asset_class context
# No more conditional logic needed here!
return {
"market": ToolNode(
[
# Core stock data tools
get_stock_data,
# Technical indicators
get_indicators,
]
),
"social": ToolNode(
[
# News tools for social media analysis
get_news,
]
),
"news": ToolNode(
[
# News and insider information
get_news,
get_global_news,
get_insider_sentiment,
get_insider_transactions,
]
),
"fundamentals": ToolNode(
[
# Fundamental analysis tools
get_fundamentals,
get_balance_sheet,
get_cashflow,
get_income_statement,
]
),
"market": ToolNode([get_market_data, get_indicators]),
"social": ToolNode([get_asset_news, get_global_news]),
"news": ToolNode([get_asset_news, get_global_news]),
"fundamentals": ToolNode([
get_fundamentals,
get_balance_sheet,
get_cashflow,
get_income_statement,
get_insider_sentiment,
get_insider_transactions,
]),
}
def propagate(self, company_name, trade_date):
@ -166,6 +147,8 @@ class TradingAgentsGraph:
init_agent_state = self.propagator.create_initial_state(
company_name, trade_date
)
# Pass asset class into state for downstream branching
init_agent_state["asset_class"] = self.config.get("asset_class", "equity")
args = self.propagator.get_graph_args()
if self.debug: