409 lines
10 KiB
Markdown
409 lines
10 KiB
Markdown
# Data Flow Architecture
|
|
|
|
This document describes how data flows through the TradingAgents system, from external data sources to final trading decisions.
|
|
|
|
## Overview
|
|
|
|
TradingAgents implements a flexible data abstraction layer that allows seamless switching between data vendors without changing agent code.
|
|
|
|
## Data Flow Diagram
|
|
|
|
```
|
|
External Sources Abstraction Layer Agents Decision
|
|
───────────────── ────────────────── ──────── ─────────
|
|
|
|
yfinance ─┐
|
|
Alpha Vantage ─┼→ Interface Layer → Analysts → Researchers → Trader
|
|
Google News ─┤ (config-driven) ↓ ↓ ↓
|
|
Local Cache ─┘ Reports Debates Decision
|
|
↓ ↓ ↓
|
|
Vector Memory Synthesis Risk Check
|
|
```
|
|
|
|
## Data Vendors
|
|
|
|
### Core Data Vendors
|
|
|
|
TradingAgents supports multiple data vendors, configurable per data category:
|
|
|
|
#### yfinance
|
|
- **Purpose**: Stock prices, technical indicators
|
|
- **Pros**: Free, reliable, comprehensive market data
|
|
- **Cons**: Limited fundamental data
|
|
- **Rate Limits**: None (public data)
|
|
- **Location**: `tradingagents/dataflows/yfinance.py`
|
|
|
|
#### Alpha Vantage
|
|
- **Purpose**: Fundamental data, news, company financials
|
|
- **Pros**: Rich fundamental data, partnership with TradingAgents for enhanced limits
|
|
- **Cons**: Requires API key
|
|
- **Rate Limits**: 60 requests/minute for TradingAgents users (normally 25/day free tier)
|
|
- **Location**: `tradingagents/dataflows/alpha_vantage.py`
|
|
|
|
#### Google News
|
|
- **Purpose**: News articles and headlines
|
|
- **Pros**: Real-time news, comprehensive coverage
|
|
- **Cons**: Requires API key for full access
|
|
- **Location**: `tradingagents/dataflows/google.py`
|
|
|
|
#### Local Cache
|
|
- **Purpose**: Offline backtesting, development
|
|
- **Pros**: Fast, no API limits, reproducible
|
|
- **Cons**: Data must be pre-downloaded
|
|
- **Location**: `tradingagents/dataflows/local.py`
|
|
|
|
### Data Categories
|
|
|
|
Data vendor configuration is organized by category:
|
|
|
|
```python
|
|
config["data_vendors"] = {
|
|
"core_stock_apis": "yfinance", # Price data, quotes
|
|
"technical_indicators": "yfinance", # MACD, RSI, etc.
|
|
"fundamental_data": "alpha_vantage", # Financials, ratios
|
|
"news_data": "alpha_vantage", # News and events
|
|
}
|
|
```
|
|
|
|
## Interface Layer
|
|
|
|
### Unified Interface
|
|
|
|
All agents access data through a unified interface:
|
|
|
|
```python
|
|
from tradingagents.agents.utils.agent_utils import (
|
|
get_stock_data,
|
|
get_indicators,
|
|
get_fundamentals,
|
|
get_news
|
|
)
|
|
```
|
|
|
|
### Interface Routing
|
|
|
|
The interface layer routes requests to the configured vendor:
|
|
|
|
**Configuration:**
|
|
```python
|
|
from tradingagents.dataflows.config import set_config
|
|
|
|
config = {
|
|
"data_vendors": {
|
|
"core_stock_apis": "yfinance"
|
|
}
|
|
}
|
|
set_config(config)
|
|
```
|
|
|
|
**Usage:**
|
|
```python
|
|
# Automatically routes to yfinance based on config
|
|
data = get_stock_data("NVDA", "2024-01-01", "2024-12-31")
|
|
```
|
|
|
|
**Implementation:**
|
|
```python
|
|
def get_stock_data(ticker: str, start_date: str, end_date: str):
|
|
vendor = get_vendor_for_category("core_stock_apis")
|
|
|
|
if vendor == "yfinance":
|
|
return yfinance_get_stock_data(ticker, start_date, end_date)
|
|
elif vendor == "alpha_vantage":
|
|
return alphavantage_get_stock_data(ticker, start_date, end_date)
|
|
elif vendor == "local":
|
|
return local_get_stock_data(ticker, start_date, end_date)
|
|
```
|
|
|
|
Location: `tradingagents/dataflows/interface.py`
|
|
|
|
## Data Types
|
|
|
|
### Price Data
|
|
|
|
Historical stock prices (OHLCV):
|
|
|
|
```python
|
|
{
|
|
"dates": ["2024-01-01", "2024-01-02", ...],
|
|
"open": [150.0, 151.2, ...],
|
|
"high": [152.5, 153.0, ...],
|
|
"low": [149.8, 150.5, ...],
|
|
"close": [151.0, 152.0, ...],
|
|
"volume": [1000000, 1200000, ...]
|
|
}
|
|
```
|
|
|
|
### Technical Indicators
|
|
|
|
Calculated technical analysis metrics:
|
|
|
|
```python
|
|
{
|
|
"MACD": {
|
|
"macd": [...],
|
|
"signal": [...],
|
|
"histogram": [...]
|
|
},
|
|
"RSI": {
|
|
"rsi": [...]
|
|
},
|
|
"BollingerBands": {
|
|
"upper": [...],
|
|
"middle": [...],
|
|
"lower": [...]
|
|
}
|
|
}
|
|
```
|
|
|
|
### Fundamental Data
|
|
|
|
Company financial metrics:
|
|
|
|
```python
|
|
{
|
|
"MarketCapitalization": 2800000000000,
|
|
"PERatio": 35.2,
|
|
"PEGRatio": 1.8,
|
|
"BookValue": 25.50,
|
|
"DividendYield": 0.005,
|
|
"ProfitMargin": 0.25,
|
|
"OperatingMarginTTM": 0.30,
|
|
"ReturnOnAssetsTTM": 0.22,
|
|
"ReturnOnEquityTTM": 0.45
|
|
}
|
|
```
|
|
|
|
### News Data
|
|
|
|
News articles and headlines:
|
|
|
|
```python
|
|
{
|
|
"articles": [
|
|
{
|
|
"title": "Company Announces Record Earnings",
|
|
"source": "Reuters",
|
|
"published_at": "2024-01-15T10:30:00Z",
|
|
"sentiment": 0.8, # -1 to 1
|
|
"summary": "..."
|
|
},
|
|
...
|
|
]
|
|
}
|
|
```
|
|
|
|
## Data Caching
|
|
|
|
### Cache Strategy
|
|
|
|
TradingAgents implements multi-level caching:
|
|
|
|
1. **Memory Cache**: In-process cache for repeated requests within a session
|
|
2. **Disk Cache**: Persistent cache for expensive API calls
|
|
3. **Vector Store**: Semantic cache for analysis results
|
|
|
|
### Cache Configuration
|
|
|
|
```python
|
|
config["data_cache_dir"] = "./dataflows/data_cache"
|
|
```
|
|
|
|
### Cache Keys
|
|
|
|
Cache keys are generated from request parameters:
|
|
|
|
```python
|
|
cache_key = f"{vendor}_{function}_{ticker}_{start_date}_{end_date}"
|
|
```
|
|
|
|
### Cache Invalidation
|
|
|
|
Caches expire based on data freshness requirements:
|
|
|
|
- **Price Data**: 1 hour (intraday), 1 day (historical)
|
|
- **Fundamental Data**: 1 day
|
|
- **News Data**: 1 hour
|
|
- **Technical Indicators**: Based on underlying price data
|
|
|
|
Location: `tradingagents/dataflows/cache.py`
|
|
|
|
## Data Validation
|
|
|
|
### Input Validation
|
|
|
|
All data inputs are validated before processing:
|
|
|
|
```python
|
|
def validate_ticker(ticker: str) -> bool:
|
|
"""Validate ticker symbol format."""
|
|
return bool(re.match(r'^[A-Z]{1,5}$', ticker))
|
|
|
|
def validate_date(date_str: str) -> bool:
|
|
"""Validate date format (YYYY-MM-DD)."""
|
|
try:
|
|
datetime.strptime(date_str, '%Y-%m-%d')
|
|
return True
|
|
except ValueError:
|
|
return False
|
|
```
|
|
|
|
### Output Validation
|
|
|
|
Data vendor responses are validated for completeness:
|
|
|
|
```python
|
|
def validate_stock_data(data: dict) -> bool:
|
|
"""Ensure stock data has required fields."""
|
|
required = ["dates", "open", "high", "low", "close", "volume"]
|
|
return all(field in data for field in required)
|
|
```
|
|
|
|
## Error Handling
|
|
|
|
### Vendor Fallback
|
|
|
|
If a vendor fails, the system can fall back to alternatives:
|
|
|
|
```python
|
|
def get_stock_data_with_fallback(ticker, start_date, end_date):
|
|
vendors = ["yfinance", "alpha_vantage", "local"]
|
|
|
|
for vendor in vendors:
|
|
try:
|
|
return get_stock_data(ticker, start_date, end_date, vendor=vendor)
|
|
except VendorError:
|
|
continue
|
|
|
|
raise DataUnavailableError(f"No vendor could provide data for {ticker}")
|
|
```
|
|
|
|
### Rate Limit Handling
|
|
|
|
Automatic retry with exponential backoff for rate limits:
|
|
|
|
```python
|
|
def handle_rate_limit(func, max_retries=3):
|
|
for attempt in range(max_retries):
|
|
try:
|
|
return func()
|
|
except RateLimitError as e:
|
|
wait_time = e.retry_after or (2 ** attempt)
|
|
time.sleep(wait_time)
|
|
|
|
raise RateLimitExceeded("Max retries exceeded")
|
|
```
|
|
|
|
## Data Flow Examples
|
|
|
|
### Market Analyst Workflow
|
|
|
|
```python
|
|
# 1. Market Analyst requests technical data
|
|
data = get_stock_data("NVDA", "2024-01-01", "2024-12-31")
|
|
indicators = get_indicators("NVDA", ["MACD", "RSI", "BollingerBands"])
|
|
|
|
# 2. Interface routes to configured vendor (yfinance)
|
|
# 3. Data is fetched, cached, and validated
|
|
# 4. Analyst processes data and generates report
|
|
report = analyze_technical_signals(data, indicators)
|
|
|
|
# 5. Report is stored in agent state
|
|
state.analyst_reports["market"] = report
|
|
```
|
|
|
|
### Fundamentals Analyst Workflow
|
|
|
|
```python
|
|
# 1. Fundamentals Analyst requests financial data
|
|
fundamentals = get_fundamentals("NVDA")
|
|
balance_sheet = get_balance_sheet("NVDA")
|
|
income_statement = get_income_statement("NVDA")
|
|
|
|
# 2. Interface routes to configured vendor (alpha_vantage)
|
|
# 3. Data is fetched from Alpha Vantage API
|
|
# 4. Analyst evaluates financial health
|
|
report = analyze_financial_health(fundamentals, balance_sheet, income_statement)
|
|
|
|
# 5. Report is stored in agent state
|
|
state.analyst_reports["fundamentals"] = report
|
|
```
|
|
|
|
### News Analyst Workflow
|
|
|
|
```python
|
|
# 1. News Analyst requests news data
|
|
company_news = get_news("NVDA", "2024-01-15")
|
|
global_news = get_global_news("2024-01-15")
|
|
|
|
# 2. Interface routes to configured vendor (alpha_vantage or google)
|
|
# 3. News articles are fetched and sentiment scored
|
|
# 4. Analyst identifies market-moving events
|
|
report = analyze_news_impact(company_news, global_news)
|
|
|
|
# 5. Report is stored in agent state
|
|
state.analyst_reports["news"] = report
|
|
```
|
|
|
|
## Performance Optimization
|
|
|
|
### Batch Requests
|
|
|
|
Request multiple data points in a single API call:
|
|
|
|
```python
|
|
# Bad: Multiple API calls
|
|
data1 = get_stock_data("NVDA", "2024-01-01", "2024-01-02")
|
|
data2 = get_stock_data("NVDA", "2024-01-03", "2024-01-04")
|
|
|
|
# Good: Single API call
|
|
data = get_stock_data("NVDA", "2024-01-01", "2024-01-04")
|
|
```
|
|
|
|
### Parallel Requests
|
|
|
|
Fetch data for multiple tickers in parallel:
|
|
|
|
```python
|
|
import asyncio
|
|
|
|
async def fetch_multiple_tickers(tickers):
|
|
tasks = [get_stock_data_async(ticker, start, end) for ticker in tickers]
|
|
return await asyncio.gather(*tasks)
|
|
```
|
|
|
|
### Data Preprocessing
|
|
|
|
Preprocess data once and cache results:
|
|
|
|
```python
|
|
def get_preprocessed_indicators(ticker, start_date, end_date):
|
|
cache_key = f"preprocessed_{ticker}_{start_date}_{end_date}"
|
|
|
|
if cached := get_from_cache(cache_key):
|
|
return cached
|
|
|
|
data = get_stock_data(ticker, start_date, end_date)
|
|
indicators = calculate_all_indicators(data)
|
|
|
|
save_to_cache(cache_key, indicators)
|
|
return indicators
|
|
```
|
|
|
|
## Best Practices
|
|
|
|
1. **Use Configuration**: Always configure vendors through config, not hardcoded
|
|
2. **Handle Errors Gracefully**: Implement fallbacks and retries
|
|
3. **Cache Aggressively**: Cache expensive API calls with appropriate TTL
|
|
4. **Validate Data**: Check data completeness before using
|
|
5. **Monitor Usage**: Track API quotas and rate limits
|
|
6. **Batch When Possible**: Minimize API calls through batching
|
|
7. **Use Async for Parallelism**: Fetch multiple resources concurrently
|
|
|
|
## References
|
|
|
|
- [Multi-Agent System](multi-agent-system.md)
|
|
- [Data Flows API](../api/dataflows.md)
|
|
- [Configuration Guide](../guides/configuration.md)
|
|
- [Adding Data Vendor Guide](../guides/adding-data-vendor.md)
|