added fallbacks for tools

This commit is contained in:
Edward Sun 2025-10-03 22:40:09 -07:00
parent d23fb539e9
commit c07dcf026b
3 changed files with 218 additions and 20 deletions

View File

@ -1,6 +1,7 @@
import os
import requests
import pandas as pd
import json
from datetime import datetime
from io import StringIO
@ -34,9 +35,15 @@ def format_datetime_for_api(date_input) -> str:
else:
raise ValueError(f"Date must be string or datetime object, got {type(date_input)}")
class AlphaVantageRateLimitError(Exception):
"""Exception raised when Alpha Vantage API rate limit is exceeded."""
pass
def _make_api_request(function_name: str, params: dict) -> dict | str:
"""Helper function to make API requests and handle responses.
Raises:
AlphaVantageRateLimitError: When API rate limit is exceeded
"""
# Create a copy of params to avoid modifying the original
api_params = params.copy()
@ -61,6 +68,18 @@ def _make_api_request(function_name: str, params: dict) -> dict | str:
response_text = response.text
# Check if response is JSON (error responses are typically JSON)
try:
response_json = json.loads(response_text)
# Check for rate limit error
if "Information" in response_json:
info_message = response_json["Information"]
if "rate limit" in info_message.lower() or "api key" in info_message.lower():
raise AlphaVantageRateLimitError(f"Alpha Vantage rate limit exceeded: {info_message}")
except json.JSONDecodeError:
# Response is not JSON (likely CSV data), which is normal
pass
return response_text

View File

@ -2,7 +2,7 @@ from typing import Annotated
# Import from vendor-specific modules
from .local import get_YFin_data, get_finnhub_news, get_finnhub_company_insider_sentiment, get_finnhub_company_insider_transactions, get_simfin_balance_sheet, get_simfin_cashflow, get_simfin_income_statements, get_reddit_global_news, get_reddit_company_news
from .y_finance import get_YFin_data_online, get_stock_stats_indicators_window
from .y_finance import get_YFin_data_online, get_stock_stats_indicators_window, get_balance_sheet as get_yfinance_balance_sheet, get_cashflow as get_yfinance_cashflow, get_income_statement as get_yfinance_income_statement, get_insider_transactions as get_yfinance_insider_transactions
from .google import get_google_news
from .openai import get_stock_news_openai, get_global_news_openai, get_fundamentals_openai
from .alpha_vantage import (
@ -15,6 +15,7 @@ from .alpha_vantage import (
get_insider_transactions as get_alpha_vantage_insider_transactions,
get_news as get_alpha_vantage_news
)
from .alpha_vantage_common import AlphaVantageRateLimitError
# Configuration and routing logic
from .config import get_config
@ -81,14 +82,17 @@ VENDOR_METHODS = {
},
"get_balance_sheet": {
"alpha_vantage": get_alpha_vantage_balance_sheet,
"yfinance": get_yfinance_balance_sheet,
"local": get_simfin_balance_sheet,
},
"get_cashflow": {
"alpha_vantage": get_alpha_vantage_cashflow,
"yfinance": get_yfinance_cashflow,
"local": get_simfin_cashflow,
},
"get_income_statement": {
"alpha_vantage": get_alpha_vantage_income_statement,
"yfinance": get_yfinance_income_statement,
"local": get_simfin_income_statements,
},
# news_data
@ -107,6 +111,7 @@ VENDOR_METHODS = {
},
"get_insider_transactions": {
"alpha_vantage": get_alpha_vantage_insider_transactions,
"yfinance": get_yfinance_insider_transactions,
"local": get_finnhub_company_insider_transactions,
},
}
@ -134,42 +139,102 @@ def get_vendor(category: str, method: str = None) -> str:
return config.get("data_vendors", {}).get(category, "default")
def route_to_vendor(method: str, *args, **kwargs):
"""Route method calls to appropriate vendor implementation."""
"""Route method calls to appropriate vendor implementation with fallback support."""
category = get_category_for_method(method)
vendor_config = get_vendor(category, method)
# Handle comma-separated vendors
vendors = [v.strip() for v in vendor_config.split(',')]
primary_vendors = [v.strip() for v in vendor_config.split(',')]
if method not in VENDOR_METHODS:
raise ValueError(f"Method '{method}' not supported")
# Collect all methods to run
methods_to_run = []
# Get all available vendors for this method for fallback
all_available_vendors = list(VENDOR_METHODS[method].keys())
for vendor in vendors:
# Create fallback vendor list: primary vendors first, then remaining vendors as fallbacks
fallback_vendors = primary_vendors.copy()
for vendor in all_available_vendors:
if vendor not in fallback_vendors:
fallback_vendors.append(vendor)
# Debug: Print fallback ordering
primary_str = "".join(primary_vendors)
fallback_str = "".join(fallback_vendors)
print(f"DEBUG: {method} - Primary: [{primary_str}] | Full fallback order: [{fallback_str}]")
# Track results and execution state
results = []
vendor_attempt_count = 0
any_primary_vendor_attempted = False
successful_vendor = None
for vendor in fallback_vendors:
if vendor not in VENDOR_METHODS[method]:
print(f"Info: Vendor '{vendor}' not supported for method '{method}', ignoring")
if vendor in primary_vendors:
print(f"INFO: Vendor '{vendor}' not supported for method '{method}', falling back to next vendor")
continue
vendor_impl = VENDOR_METHODS[method][vendor]
is_primary_vendor = vendor in primary_vendors
vendor_attempt_count += 1
# Track if we attempted any primary vendor
if is_primary_vendor:
any_primary_vendor_attempted = True
# Debug: Print current attempt
vendor_type = "PRIMARY" if is_primary_vendor else "FALLBACK"
print(f"DEBUG: Attempting {vendor_type} vendor '{vendor}' for {method} (attempt #{vendor_attempt_count})")
# Handle list of methods for a vendor
if isinstance(vendor_impl, list):
methods_to_run.extend(vendor_impl)
vendor_methods = [(impl, vendor) for impl in vendor_impl]
print(f"DEBUG: Vendor '{vendor}' has multiple implementations: {len(vendor_methods)} functions")
else:
# Single method implementation
methods_to_run.append(vendor_impl)
vendor_methods = [(vendor_impl, vendor)]
# Run all methods and collect results
results = []
for impl_func in methods_to_run:
# Run methods for this vendor
vendor_results = []
for impl_func, vendor_name in vendor_methods:
try:
print(f"DEBUG: Calling {impl_func.__name__} from vendor '{vendor_name}'...")
result = impl_func(*args, **kwargs)
results.append(result)
vendor_results.append(result)
print(f"SUCCESS: {impl_func.__name__} from vendor '{vendor_name}' completed successfully")
except AlphaVantageRateLimitError as e:
if vendor == "alpha_vantage":
print(f"RATE_LIMIT: Alpha Vantage rate limit exceeded, falling back to next available vendor")
print(f"DEBUG: Rate limit details: {e}")
# Continue to next vendor for fallback
continue
except Exception as e:
# Log error but continue with other implementations
print(f"Warning: {impl_func.__name__} failed: {e}")
print(f"FAILED: {impl_func.__name__} from vendor '{vendor_name}' failed: {e}")
continue
# Add this vendor's results
if vendor_results:
results.extend(vendor_results)
successful_vendor = vendor
result_summary = f"Got {len(vendor_results)} result(s)"
print(f"SUCCESS: Vendor '{vendor}' succeeded - {result_summary}")
# Stopping logic: Stop after first successful vendor for single-vendor configs
# Multiple vendor configs (comma-separated) may want to collect from multiple sources
if len(primary_vendors) == 1:
print(f"DEBUG: Stopping after successful vendor '{vendor}' (single-vendor config)")
break
else:
print(f"FAILED: Vendor '{vendor}' produced no results")
# Final result summary
if not results:
print(f"FAILURE: All {vendor_attempt_count} vendor attempts failed for method '{method}'")
raise RuntimeError(f"All vendor implementations failed for method '{method}'")
else:
print(f"FINAL: Method '{method}' completed with {len(results)} result(s) from {vendor_attempt_count} vendor attempt(s)")
# Return single result if only one, otherwise concatenate as string
if len(results) == 1:

View File

@ -182,3 +182,117 @@ def get_stockstats_indicator(
return ""
return str(indicator_value)
def get_balance_sheet(
ticker: Annotated[str, "ticker symbol of the company"],
freq: Annotated[str, "frequency of data: 'annual' or 'quarterly'"] = "quarterly",
curr_date: Annotated[str, "current date (not used for yfinance)"] = None
):
"""Get balance sheet data from yfinance."""
try:
ticker_obj = yf.Ticker(ticker.upper())
if freq.lower() == "quarterly":
data = ticker_obj.quarterly_balance_sheet
else:
data = ticker_obj.balance_sheet
if data.empty:
return f"No balance sheet data found for symbol '{ticker}'"
# Convert to CSV string for consistency with other functions
csv_string = data.to_csv()
# Add header information
header = f"# Balance Sheet data for {ticker.upper()} ({freq})\n"
header += f"# Data retrieved on: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n"
return header + csv_string
except Exception as e:
return f"Error retrieving balance sheet for {ticker}: {str(e)}"
def get_cashflow(
ticker: Annotated[str, "ticker symbol of the company"],
freq: Annotated[str, "frequency of data: 'annual' or 'quarterly'"] = "quarterly",
curr_date: Annotated[str, "current date (not used for yfinance)"] = None
):
"""Get cash flow data from yfinance."""
try:
ticker_obj = yf.Ticker(ticker.upper())
if freq.lower() == "quarterly":
data = ticker_obj.quarterly_cashflow
else:
data = ticker_obj.cashflow
if data.empty:
return f"No cash flow data found for symbol '{ticker}'"
# Convert to CSV string for consistency with other functions
csv_string = data.to_csv()
# Add header information
header = f"# Cash Flow data for {ticker.upper()} ({freq})\n"
header += f"# Data retrieved on: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n"
return header + csv_string
except Exception as e:
return f"Error retrieving cash flow for {ticker}: {str(e)}"
def get_income_statement(
ticker: Annotated[str, "ticker symbol of the company"],
freq: Annotated[str, "frequency of data: 'annual' or 'quarterly'"] = "quarterly",
curr_date: Annotated[str, "current date (not used for yfinance)"] = None
):
"""Get income statement data from yfinance."""
try:
ticker_obj = yf.Ticker(ticker.upper())
if freq.lower() == "quarterly":
data = ticker_obj.quarterly_income_stmt
else:
data = ticker_obj.income_stmt
if data.empty:
return f"No income statement data found for symbol '{ticker}'"
# Convert to CSV string for consistency with other functions
csv_string = data.to_csv()
# Add header information
header = f"# Income Statement data for {ticker.upper()} ({freq})\n"
header += f"# Data retrieved on: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n"
return header + csv_string
except Exception as e:
return f"Error retrieving income statement for {ticker}: {str(e)}"
def get_insider_transactions(
ticker: Annotated[str, "ticker symbol of the company"]
):
"""Get insider transactions data from yfinance."""
try:
ticker_obj = yf.Ticker(ticker.upper())
data = ticker_obj.insider_transactions
if data is None or data.empty:
return f"No insider transactions data found for symbol '{ticker}'"
# Convert to CSV string for consistency with other functions
csv_string = data.to_csv()
# Add header information
header = f"# Insider Transactions data for {ticker.upper()}\n"
header += f"# Data retrieved on: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n"
return header + csv_string
except Exception as e:
return f"Error retrieving insider transactions for {ticker}: {str(e)}"