TradingAgents/tradingagents/dataflows/interface.py

179 lines
5.9 KiB
Python

from typing import Annotated
# Import from vendor-specific modules
from .local import get_YFin_data, get_finnhub_news, get_finnhub_company_insider_sentiment, get_finnhub_company_insider_transactions, get_simfin_balance_sheet, get_simfin_cashflow, get_simfin_income_statements, get_reddit_global_news, get_reddit_company_news
from .yahoo_finance import get_YFin_data_online, get_stock_stats_indicators_window
from .google import get_google_news
from .openai import get_stock_news_openai, get_global_news_openai, get_fundamentals_openai
from .alpha_vantage import (
get_stock as get_alpha_vantage_stock,
get_indicator as get_alpha_vantage_indicator,
get_fundamentals as get_alpha_vantage_fundamentals,
get_balance_sheet as get_alpha_vantage_balance_sheet,
get_cashflow as get_alpha_vantage_cashflow,
get_income_statement as get_alpha_vantage_income_statement,
get_insider_transactions as get_alpha_vantage_insider_transactions,
get_news as get_alpha_vantage_news
)
# Configuration and routing logic
from .config import get_config
# Tools organized by category
TOOLS_CATEGORIES = {
"core_stock_apis": {
"description": "OHLCV stock price data",
"tools": [
"get_stock_data"
]
},
"technical_indicators": {
"description": "Technical analysis indicators",
"tools": [
"get_indicators"
]
},
"fundamental_data": {
"description": "Company fundamentals",
"tools": [
"get_fundamentals",
"get_balance_sheet",
"get_cashflow",
"get_income_statement"
]
},
"news_data": {
"description": "News (public/insiders, original/processed)",
"tools": [
"get_news",
"get_global_news",
"get_insider_sentiment",
"get_insider_transactions",
]
}
}
VENDOR_LIST = [
"local",
"yahoo_finance",
"openai",
"google"
]
# Mapping of methods to their vendor-specific implementations
VENDOR_METHODS = {
# core_stock_apis
"get_stock_data": {
"alpha_vantage": get_alpha_vantage_stock,
"yahoo_finance": get_YFin_data_online,
"local": get_YFin_data,
},
# technical_indicators
"get_indicators": {
"alpha_vantage": get_alpha_vantage_indicator,
"yahoo_finance": get_stock_stats_indicators_window,
"local": get_stock_stats_indicators_window
},
# fundamental_data
"get_fundamentals": {
"alpha_vantage": get_alpha_vantage_fundamentals,
"openai": get_fundamentals_openai,
},
"get_balance_sheet": {
"alpha_vantage": get_alpha_vantage_balance_sheet,
"local": get_simfin_balance_sheet,
},
"get_cashflow": {
"alpha_vantage": get_alpha_vantage_cashflow,
"local": get_simfin_cashflow,
},
"get_income_statement": {
"alpha_vantage": get_alpha_vantage_income_statement,
"local": get_simfin_income_statements,
},
# news_data
"get_news": {
"alpha_vantage": get_alpha_vantage_news,
"openai": get_stock_news_openai,
"google": get_google_news,
"local": [get_finnhub_news, get_reddit_company_news, get_google_news],
},
"get_global_news": {
"openai": get_global_news_openai,
"local": get_reddit_global_news
},
"get_insider_sentiment": {
"local": get_finnhub_company_insider_sentiment
},
"get_insider_transactions": {
"alpha_vantage": get_alpha_vantage_insider_transactions,
"local": get_finnhub_company_insider_transactions,
},
}
def get_category_for_method(method: str) -> str:
"""Get the category that contains the specified method."""
for category, info in TOOLS_CATEGORIES.items():
if method in info["tools"]:
return category
raise ValueError(f"Method '{method}' not found in any category")
def get_vendor(category: str, method: str = None) -> str:
"""Get the configured vendor for a data category or specific tool method.
Tool-level configuration takes precedence over category-level.
"""
config = get_config()
# Check tool-level configuration first (if method provided)
if method:
tool_vendors = config.get("tool_vendors", {})
if method in tool_vendors:
return tool_vendors[method]
# Fall back to category-level configuration
return config.get("data_vendors", {}).get(category, "default")
def route_to_vender(method: str, *args, **kwargs):
"""Route method calls to appropriate vendor implementation."""
category = get_category_for_method(method)
vendor_config = get_vendor(category, method)
# Handle comma-separated vendors
vendors = [v.strip() for v in vendor_config.split(',')]
if method not in VENDOR_METHODS:
raise ValueError(f"Method '{method}' not supported")
# Collect all methods to run
methods_to_run = []
for vendor in vendors:
if vendor not in VENDOR_METHODS[method]:
print(f"Info: Vendor '{vendor}' not supported for method '{method}', ignoring")
continue
vendor_impl = VENDOR_METHODS[method][vendor]
# Handle list of methods for a vendor
if isinstance(vendor_impl, list):
methods_to_run.extend(vendor_impl)
else:
# Single method implementation
methods_to_run.append(vendor_impl)
# Run all methods and collect results
results = []
for impl_func in methods_to_run:
try:
result = impl_func(*args, **kwargs)
results.append(result)
except Exception as e:
# Log error but continue with other implementations
print(f"Warning: {impl_func.__name__} failed: {e}")
# Return single result if only one, otherwise concatenate as string
if len(results) == 1:
return results[0]
else:
# Convert all results to strings and concatenate
return '\n'.join(str(result) for result in results)