TradingAgents/docs/architecture/data-flow.md

10 KiB

Data Flow Architecture

This document describes how data flows through the TradingAgents system, from external data sources to final trading decisions.

Overview

TradingAgents implements a flexible data abstraction layer that allows seamless switching between data vendors without changing agent code.

Data Flow Diagram

External Sources          Abstraction Layer        Agents              Decision
─────────────────        ──────────────────      ────────            ─────────

yfinance        ─┐
Alpha Vantage   ─┼→  Interface Layer  →  Analysts  →  Researchers  →  Trader
Google News     ─┤    (config-driven)      ↓             ↓              ↓
Local Cache     ─┘                      Reports      Debates       Decision
                                           ↓             ↓              ↓
                                      Vector Memory  Synthesis    Risk Check

Data Vendors

Core Data Vendors

TradingAgents supports multiple data vendors, configurable per data category:

yfinance

  • Purpose: Stock prices, technical indicators
  • Pros: Free, reliable, comprehensive market data
  • Cons: Limited fundamental data
  • Rate Limits: None (public data)
  • Location: tradingagents/dataflows/yfinance.py

Alpha Vantage

  • Purpose: Fundamental data, news, company financials
  • Pros: Rich fundamental data, partnership with TradingAgents for enhanced limits
  • Cons: Requires API key
  • Rate Limits: 60 requests/minute for TradingAgents users (normally 25/day free tier)
  • Location: tradingagents/dataflows/alpha_vantage.py

Google News

  • Purpose: News articles and headlines
  • Pros: Real-time news, comprehensive coverage
  • Cons: Requires API key for full access
  • Location: tradingagents/dataflows/google.py

Local Cache

  • Purpose: Offline backtesting, development
  • Pros: Fast, no API limits, reproducible
  • Cons: Data must be pre-downloaded
  • Location: tradingagents/dataflows/local.py

Data Categories

Data vendor configuration is organized by category:

config["data_vendors"] = {
    "core_stock_apis": "yfinance",       # Price data, quotes
    "technical_indicators": "yfinance",  # MACD, RSI, etc.
    "fundamental_data": "alpha_vantage", # Financials, ratios
    "news_data": "alpha_vantage",        # News and events
}

Interface Layer

Unified Interface

All agents access data through a unified interface:

from tradingagents.agents.utils.agent_utils import (
    get_stock_data,
    get_indicators,
    get_fundamentals,
    get_news
)

Interface Routing

The interface layer routes requests to the configured vendor:

Configuration:

from tradingagents.dataflows.config import set_config

config = {
    "data_vendors": {
        "core_stock_apis": "yfinance"
    }
}
set_config(config)

Usage:

# Automatically routes to yfinance based on config
data = get_stock_data("NVDA", "2024-01-01", "2024-12-31")

Implementation:

def get_stock_data(ticker: str, start_date: str, end_date: str):
    vendor = get_vendor_for_category("core_stock_apis")

    if vendor == "yfinance":
        return yfinance_get_stock_data(ticker, start_date, end_date)
    elif vendor == "alpha_vantage":
        return alphavantage_get_stock_data(ticker, start_date, end_date)
    elif vendor == "local":
        return local_get_stock_data(ticker, start_date, end_date)

Location: tradingagents/dataflows/interface.py

Data Types

Price Data

Historical stock prices (OHLCV):

{
    "dates": ["2024-01-01", "2024-01-02", ...],
    "open": [150.0, 151.2, ...],
    "high": [152.5, 153.0, ...],
    "low": [149.8, 150.5, ...],
    "close": [151.0, 152.0, ...],
    "volume": [1000000, 1200000, ...]
}

Technical Indicators

Calculated technical analysis metrics:

{
    "MACD": {
        "macd": [...],
        "signal": [...],
        "histogram": [...]
    },
    "RSI": {
        "rsi": [...]
    },
    "BollingerBands": {
        "upper": [...],
        "middle": [...],
        "lower": [...]
    }
}

Fundamental Data

Company financial metrics:

{
    "MarketCapitalization": 2800000000000,
    "PERatio": 35.2,
    "PEGRatio": 1.8,
    "BookValue": 25.50,
    "DividendYield": 0.005,
    "ProfitMargin": 0.25,
    "OperatingMarginTTM": 0.30,
    "ReturnOnAssetsTTM": 0.22,
    "ReturnOnEquityTTM": 0.45
}

News Data

News articles and headlines:

{
    "articles": [
        {
            "title": "Company Announces Record Earnings",
            "source": "Reuters",
            "published_at": "2024-01-15T10:30:00Z",
            "sentiment": 0.8,  # -1 to 1
            "summary": "..."
        },
        ...
    ]
}

Data Caching

Cache Strategy

TradingAgents implements multi-level caching:

  1. Memory Cache: In-process cache for repeated requests within a session
  2. Disk Cache: Persistent cache for expensive API calls
  3. Vector Store: Semantic cache for analysis results

Cache Configuration

config["data_cache_dir"] = "./dataflows/data_cache"

Cache Keys

Cache keys are generated from request parameters:

cache_key = f"{vendor}_{function}_{ticker}_{start_date}_{end_date}"

Cache Invalidation

Caches expire based on data freshness requirements:

  • Price Data: 1 hour (intraday), 1 day (historical)
  • Fundamental Data: 1 day
  • News Data: 1 hour
  • Technical Indicators: Based on underlying price data

Location: tradingagents/dataflows/cache.py

Data Validation

Input Validation

All data inputs are validated before processing:

def validate_ticker(ticker: str) -> bool:
    """Validate ticker symbol format."""
    return bool(re.match(r'^[A-Z]{1,5}$', ticker))

def validate_date(date_str: str) -> bool:
    """Validate date format (YYYY-MM-DD)."""
    try:
        datetime.strptime(date_str, '%Y-%m-%d')
        return True
    except ValueError:
        return False

Output Validation

Data vendor responses are validated for completeness:

def validate_stock_data(data: dict) -> bool:
    """Ensure stock data has required fields."""
    required = ["dates", "open", "high", "low", "close", "volume"]
    return all(field in data for field in required)

Error Handling

Vendor Fallback

If a vendor fails, the system can fall back to alternatives:

def get_stock_data_with_fallback(ticker, start_date, end_date):
    vendors = ["yfinance", "alpha_vantage", "local"]

    for vendor in vendors:
        try:
            return get_stock_data(ticker, start_date, end_date, vendor=vendor)
        except VendorError:
            continue

    raise DataUnavailableError(f"No vendor could provide data for {ticker}")

Rate Limit Handling

Automatic retry with exponential backoff for rate limits:

def handle_rate_limit(func, max_retries=3):
    for attempt in range(max_retries):
        try:
            return func()
        except RateLimitError as e:
            wait_time = e.retry_after or (2 ** attempt)
            time.sleep(wait_time)

    raise RateLimitExceeded("Max retries exceeded")

Data Flow Examples

Market Analyst Workflow

# 1. Market Analyst requests technical data
data = get_stock_data("NVDA", "2024-01-01", "2024-12-31")
indicators = get_indicators("NVDA", ["MACD", "RSI", "BollingerBands"])

# 2. Interface routes to configured vendor (yfinance)
# 3. Data is fetched, cached, and validated
# 4. Analyst processes data and generates report
report = analyze_technical_signals(data, indicators)

# 5. Report is stored in agent state
state.analyst_reports["market"] = report

Fundamentals Analyst Workflow

# 1. Fundamentals Analyst requests financial data
fundamentals = get_fundamentals("NVDA")
balance_sheet = get_balance_sheet("NVDA")
income_statement = get_income_statement("NVDA")

# 2. Interface routes to configured vendor (alpha_vantage)
# 3. Data is fetched from Alpha Vantage API
# 4. Analyst evaluates financial health
report = analyze_financial_health(fundamentals, balance_sheet, income_statement)

# 5. Report is stored in agent state
state.analyst_reports["fundamentals"] = report

News Analyst Workflow

# 1. News Analyst requests news data
company_news = get_news("NVDA", "2024-01-15")
global_news = get_global_news("2024-01-15")

# 2. Interface routes to configured vendor (alpha_vantage or google)
# 3. News articles are fetched and sentiment scored
# 4. Analyst identifies market-moving events
report = analyze_news_impact(company_news, global_news)

# 5. Report is stored in agent state
state.analyst_reports["news"] = report

Performance Optimization

Batch Requests

Request multiple data points in a single API call:

# Bad: Multiple API calls
data1 = get_stock_data("NVDA", "2024-01-01", "2024-01-02")
data2 = get_stock_data("NVDA", "2024-01-03", "2024-01-04")

# Good: Single API call
data = get_stock_data("NVDA", "2024-01-01", "2024-01-04")

Parallel Requests

Fetch data for multiple tickers in parallel:

import asyncio

async def fetch_multiple_tickers(tickers):
    tasks = [get_stock_data_async(ticker, start, end) for ticker in tickers]
    return await asyncio.gather(*tasks)

Data Preprocessing

Preprocess data once and cache results:

def get_preprocessed_indicators(ticker, start_date, end_date):
    cache_key = f"preprocessed_{ticker}_{start_date}_{end_date}"

    if cached := get_from_cache(cache_key):
        return cached

    data = get_stock_data(ticker, start_date, end_date)
    indicators = calculate_all_indicators(data)

    save_to_cache(cache_key, indicators)
    return indicators

Best Practices

  1. Use Configuration: Always configure vendors through config, not hardcoded
  2. Handle Errors Gracefully: Implement fallbacks and retries
  3. Cache Aggressively: Cache expensive API calls with appropriate TTL
  4. Validate Data: Check data completeness before using
  5. Monitor Usage: Track API quotas and rate limits
  6. Batch When Possible: Minimize API calls through batching
  7. Use Async for Parallelism: Fetch multiple resources concurrently

References