TradingAgents/docs/agent-development.md

581 lines
17 KiB
Markdown

# Agent Development Guide
This guide covers how to develop, modify, and extend agents in the TradingAgents framework.
## Agent Architecture Overview
The TradingAgents framework uses a multi-agent system where each agent has specific responsibilities in the trading decision workflow.
### Agent Categories
1. **Analyst Team** (`tradingagents/agents/analysts/`): Data analysis and market intelligence
2. **Research Team** (`tradingagents/agents/researchers/`): Investment debate and recommendation synthesis
3. **Trading Team** (`tradingagents/agents/trader/`): Trading decision execution
4. **Risk Management** (`tradingagents/agents/risk_mgmt/`): Risk assessment and portfolio management
### Agent Implementation Pattern
All agents follow a consistent implementation pattern:
```python
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.messages import SystemMessage
from tradingagents.agents.libs.agent_toolkit import AgentToolkit
def create_market_analyst(toolkit: AgentToolkit, config: dict) -> dict:
"""Factory function to create a market analyst agent."""
# Define agent's role and responsibilities
system_prompt = """You are a Market Analyst specializing in technical analysis.
Your responsibilities:
- Analyze price trends and trading patterns
- Calculate and interpret technical indicators
- Identify support and resistance levels
- Assess market momentum and volatility
Use the available tools to gather market data and provide actionable insights."""
# Create prompt template
prompt = ChatPromptTemplate.from_messages([
SystemMessage(content=system_prompt),
("human", "{input}")
])
# Create the agent with tools
agent = create_agent(
llm=get_llm(config.get("quick_think_llm", "gpt-4o-mini")),
tools=[
toolkit.get_market_data,
toolkit.get_ta_report,
],
prompt=prompt
)
return agent
```
## Adding New Agents
### Step 1: Define Agent Role
Create a new file for your agent (e.g., `custom_analyst.py`):
```python
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.messages import SystemMessage
from tradingagents.agents.libs.agent_toolkit import AgentToolkit
from tradingagents.agents.libs.agent_base import create_agent, get_llm
def create_custom_analyst(toolkit: AgentToolkit, config: dict) -> dict:
"""Create a custom analyst with specific expertise."""
system_prompt = """You are a Custom Market Analyst specializing in [specific domain].
Your responsibilities:
- [List specific responsibilities]
- [Define analysis focus]
- [Specify output format]
Always provide:
1. Clear analysis summary
2. Key findings with evidence
3. Confidence level in your assessment
4. Risk factors to consider
"""
prompt = ChatPromptTemplate.from_messages([
SystemMessage(content=system_prompt),
("human", "Analyze {symbol} for date {date}. Context: {context}")
])
# Select appropriate tools for this agent
agent_tools = [
toolkit.get_market_data,
toolkit.get_news,
toolkit.get_socialmedia_stock_info,
# Add other relevant tools
]
agent = create_agent(
llm=get_llm(config.get("deep_think_llm", "o4-mini")),
tools=agent_tools,
prompt=prompt
)
return agent
```
### Step 2: Create Agent Node Function
Create a node function for LangGraph integration:
```python
from langchain_core.messages import HumanMessage
from tradingagents.graph.state_models import AgentState
def custom_analyst_node(state: AgentState, config: dict) -> dict:
"""Node function for custom analyst in the trading workflow."""
# Extract relevant information from state
symbol = state.get("symbol", "")
date = state.get("date", "")
context = state.get("context", {})
# Get the agent instance
toolkit = state.get("toolkit")
agent = create_custom_analyst(toolkit, config)
# Prepare input message
input_msg = f"""Analyze {symbol} for {date}.
Previous analysis context:
{context}
Provide detailed analysis focusing on [specific aspects].
"""
# Execute agent
response = agent.invoke({
"input": input_msg,
"symbol": symbol,
"date": date,
"context": context
})
# Extract analysis from response
analysis = response.get("output", "")
# Update state with results
return {
"custom_analysis": analysis,
"agents_completed": state.get("agents_completed", []) + ["custom_analyst"]
}
```
### Step 3: Integrate into Workflow
Add your agent to the trading graph:
```python
# In tradingagents/graph/trading_graph.py
from tradingagents.agents.analysts.custom_analyst import custom_analyst_node
class TradingAgentsGraph:
def _build_graph(self):
# ... existing code ...
# Add your custom analyst node
graph.add_node("custom_analyst", custom_analyst_node)
# Define connections in the workflow
graph.add_edge("market_analyst", "custom_analyst")
graph.add_edge("custom_analyst", "research_team")
# ... rest of graph construction ...
```
## Extending Existing Agents
### Modifying Agent Prompts
To customize an existing agent's behavior, modify its system prompt:
```python
def create_enhanced_fundamentals_analyst(toolkit: AgentToolkit, config: dict) -> dict:
"""Enhanced version of fundamentals analyst with additional capabilities."""
enhanced_prompt = """You are a Senior Fundamentals Analyst with expertise in financial modeling.
Your enhanced responsibilities:
- Analyze financial statements with deep ratio analysis
- Build DCF models when appropriate
- Compare metrics against industry benchmarks
- Assess management quality and corporate governance
- Evaluate ESG factors and sustainability metrics
Analysis Framework:
1. Financial Health Assessment (40%)
2. Valuation Analysis (30%)
3. Growth Potential (20%)
4. Risk Assessment (10%)
Always provide quantitative metrics with qualitative insights.
"""
# ... rest of implementation
```
### Adding New Tools
Create custom tools for specialized data sources:
```python
from langchain_core.tools import tool
from typing import Annotated
@tool
def get_industry_comparison(
symbol: Annotated[str, "Stock symbol to analyze"],
metrics: Annotated[list[str], "List of metrics to compare"]
) -> str:
"""Compare stock metrics against industry averages."""
# Implementation to fetch industry data
# This could integrate with additional APIs or databases
industry_data = fetch_industry_metrics(symbol, metrics)
stock_data = fetch_stock_metrics(symbol, metrics)
comparison = compare_metrics(stock_data, industry_data)
return format_comparison_report(comparison)
# Add to agent toolkit
def create_enhanced_toolkit(config: dict) -> AgentToolkit:
"""Create toolkit with additional custom tools."""
toolkit = AgentToolkit.build(config)
# Add custom tools
toolkit.add_tool(get_industry_comparison)
return toolkit
```
## Agent Communication Patterns
### Structured Information Passing
Agents communicate through structured state objects:
```python
@dataclass
class AnalysisState:
symbol: str
date: str
market_analysis: str | None = None
fundamental_analysis: str | None = None
sentiment_analysis: str | None = None
risk_assessment: str | None = None
final_recommendation: str | None = None
confidence_score: float | None = None
def analyst_coordination_node(state: AnalysisState, config: dict) -> dict:
"""Coordinate multiple analysts and synthesize results."""
analyses = {
"market": state.market_analysis,
"fundamental": state.fundamental_analysis,
"sentiment": state.sentiment_analysis
}
# Synthesis logic
synthesized_analysis = synthesize_analyses(analyses)
return {
"synthesized_analysis": synthesized_analysis,
"confidence_score": calculate_confidence(analyses)
}
```
### Debate Mechanisms
Implement structured debates between agents:
```python
def investment_debate_node(state: AgentState, config: dict) -> dict:
"""Facilitate debate between bull and bear researchers."""
max_rounds = config.get("max_debate_rounds", 3)
current_round = state.get("debate_round", 0)
if current_round >= max_rounds:
# End debate and synthesize
return finalize_debate(state)
if current_round % 2 == 0:
# Bull researcher's turn
response = bull_researcher.invoke(state)
return {
"bull_arguments": state.get("bull_arguments", []) + [response],
"debate_round": current_round + 1,
"current_speaker": "bear"
}
else:
# Bear researcher's turn
response = bear_researcher.invoke(state)
return {
"bear_arguments": state.get("bear_arguments", []) + [response],
"debate_round": current_round + 1,
"current_speaker": "bull"
}
```
## Agent Testing
### Unit Testing Agents
Create comprehensive tests for agent behavior:
```python
import pytest
from unittest.mock import Mock, patch
from tradingagents.agents.analysts.custom_analyst import create_custom_analyst
def test_custom_analyst_creation():
"""Test agent creation with mock toolkit."""
mock_toolkit = Mock()
config = {"deep_think_llm": "gpt-4o-mini"}
agent = create_custom_analyst(mock_toolkit, config)
assert agent is not None
assert hasattr(agent, 'invoke')
def test_custom_analyst_analysis():
"""Test agent analysis with mock data."""
mock_toolkit = Mock()
# Configure mock responses
mock_toolkit.get_market_data.return_value = "Mock market data"
mock_toolkit.get_news.return_value = "Mock news data"
agent = create_custom_analyst(mock_toolkit, {})
with patch('tradingagents.agents.libs.agent_base.get_llm') as mock_llm:
mock_llm.return_value.invoke.return_value = {"output": "Test analysis"}
result = agent.invoke({
"input": "Analyze AAPL",
"symbol": "AAPL",
"date": "2024-01-15"
})
assert "output" in result
assert result["output"] == "Test analysis"
def test_agent_node_integration():
"""Test agent node function."""
state = {
"symbol": "AAPL",
"date": "2024-01-15",
"toolkit": Mock(),
"agents_completed": []
}
result = custom_analyst_node(state, {})
assert "custom_analysis" in result
assert "custom_analyst" in result["agents_completed"]
```
### Integration Testing
Test agent interactions within the workflow:
```python
def test_agent_workflow_integration():
"""Test agent integration in trading workflow."""
from tradingagents.graph.trading_graph import TradingAgentsGraph
from tradingagents.config import TradingAgentsConfig
config = TradingAgentsConfig(
online_tools=False, # Use cached data for testing
max_debate_rounds=1
)
graph = TradingAgentsGraph(debug=True, config=config)
# Test with known stock and date
result, decision = graph.propagate("AAPL", "2024-01-15")
assert result is not None
assert decision in ["BUY", "SELL", "HOLD"]
assert "custom_analysis" in result # If your agent was integrated
```
## Agent Performance Optimization
### Model Selection Strategy
Choose appropriate models for different agent types:
```python
def get_optimal_model(agent_type: str, config: dict) -> str:
"""Select optimal model based on agent requirements."""
model_mapping = {
# Fast models for data retrieval and formatting
"data_fetchers": config.get("quick_think_llm", "gpt-4o-mini"),
# Powerful models for complex analysis
"analysts": config.get("deep_think_llm", "o4-mini"),
# Balanced models for decision making
"traders": config.get("deep_think_llm", "o4-mini"),
# Conservative models for risk assessment
"risk_managers": config.get("deep_think_llm", "o4-mini")
}
return model_mapping.get(agent_type, config.get("quick_think_llm"))
```
### Caching Agent Responses
Implement caching for expensive agent operations:
```python
from functools import lru_cache
import hashlib
def cache_key(symbol: str, date: str, analysis_type: str) -> str:
"""Generate cache key for agent analysis."""
return hashlib.md5(f"{symbol}_{date}_{analysis_type}".encode()).hexdigest()
@lru_cache(maxsize=100)
def cached_analysis(cache_key: str, symbol: str, date: str) -> str:
"""Cache expensive analysis operations."""
# This would be called by agents that perform expensive operations
pass
def create_cached_agent(base_agent, cache_size: int = 100):
"""Wrap agent with caching functionality."""
@lru_cache(maxsize=cache_size)
def cached_invoke(input_hash: str, **kwargs):
return base_agent.invoke(kwargs)
def invoke(inputs: dict):
# Create hash of inputs for cache key
input_str = str(sorted(inputs.items()))
input_hash = hashlib.md5(input_str.encode()).hexdigest()
return cached_invoke(input_hash, **inputs)
base_agent.invoke = invoke
return base_agent
```
### Parallel Agent Execution
Implement parallel execution for independent agents:
```python
import asyncio
from concurrent.futures import ThreadPoolExecutor
async def parallel_analysis_node(state: AgentState, config: dict) -> dict:
"""Execute multiple analysts in parallel."""
symbol = state.get("symbol")
date = state.get("date")
toolkit = state.get("toolkit")
# Create analysts
market_analyst = create_market_analyst(toolkit, config)
fundamentals_analyst = create_fundamentals_analyst(toolkit, config)
sentiment_analyst = create_sentiment_analyst(toolkit, config)
# Define analysis tasks
async def run_analyst(analyst, input_data):
loop = asyncio.get_event_loop()
with ThreadPoolExecutor() as executor:
return await loop.run_in_executor(executor, analyst.invoke, input_data)
# Execute in parallel
tasks = [
run_analyst(market_analyst, {"symbol": symbol, "date": date}),
run_analyst(fundamentals_analyst, {"symbol": symbol, "date": date}),
run_analyst(sentiment_analyst, {"symbol": symbol, "date": date})
]
results = await asyncio.gather(*tasks)
return {
"market_analysis": results[0]["output"],
"fundamental_analysis": results[1]["output"],
"sentiment_analysis": results[2]["output"]
}
```
## Best Practices
### Agent Design Principles
1. **Single Responsibility**: Each agent should have one clear purpose
2. **Clear Communication**: Use structured inputs/outputs
3. **Error Handling**: Gracefully handle missing data or API failures
4. **Testability**: Design agents to be easily testable with mocks
5. **Performance**: Consider token usage and API costs
### Code Organization
```
tradingagents/agents/
├── analysts/
│ ├── __init__.py
│ ├── market_analyst.py
│ ├── fundamentals_analyst.py
│ ├── custom_analyst.py
│ └── analyst_base.py # Common analyst functionality
├── researchers/
│ ├── __init__.py
│ ├── bull_researcher.py
│ ├── bear_researcher.py
│ └── researcher_base.py
├── libs/
│ ├── agent_toolkit.py # Anti-corruption layer
│ ├── agent_base.py # Common agent utilities
│ └── context_helpers.py # Helper functions
└── tests/
├── test_analysts.py
├── test_researchers.py
└── test_integration.py
```
### Error Handling
Implement robust error handling in agents:
```python
def safe_agent_invoke(agent, inputs: dict, default_response: str = "Analysis unavailable") -> str:
"""Safely invoke agent with error handling."""
try:
result = agent.invoke(inputs)
return result.get("output", default_response)
except Exception as e:
logger.error(f"Agent invocation failed: {e}")
return f"{default_response}. Error: {str(e)}"
def create_resilient_analyst(toolkit: AgentToolkit, config: dict) -> dict:
"""Create analyst with built-in resilience."""
base_agent = create_custom_analyst(toolkit, config)
def resilient_invoke(inputs: dict):
# Add retry logic
max_retries = 3
for attempt in range(max_retries):
try:
return base_agent.invoke(inputs)
except Exception as e:
if attempt == max_retries - 1:
# Final attempt failed
return {
"output": "Analysis failed after multiple attempts",
"error": str(e),
"confidence": 0.0
}
# Wait before retry
time.sleep(2 ** attempt) # Exponential backoff
base_agent.invoke = resilient_invoke
return base_agent
```
This guide provides a comprehensive foundation for developing and extending agents in the TradingAgents framework. Follow these patterns and best practices to create robust, testable, and performant agents.