diff --git a/tradingagents/dataflows/interface.py b/tradingagents/dataflows/interface.py index 4cd5ddef..ca9759a5 100644 --- a/tradingagents/dataflows/interface.py +++ b/tradingagents/dataflows/interface.py @@ -1,4 +1,5 @@ from typing import Annotated +import time # Import from vendor-specific modules from .local import get_YFin_data, get_finnhub_news, get_finnhub_company_insider_sentiment, get_finnhub_company_insider_transactions, get_simfin_balance_sheet, get_simfin_cashflow, get_simfin_income_statements, get_reddit_global_news, get_reddit_company_news @@ -16,6 +17,7 @@ from .alpha_vantage import ( get_news as get_alpha_vantage_news ) from .alpha_vantage_common import AlphaVantageRateLimitError +from openai import APIConnectionError, APITimeoutError, RateLimitError # Configuration and routing logic from .config import get_config @@ -194,25 +196,58 @@ def route_to_vendor(method: str, *args, **kwargs): else: vendor_methods = [(vendor_impl, vendor)] - # Run methods for this vendor + # Run methods for this vendor with retry logic vendor_results = [] for impl_func, vendor_name in vendor_methods: - try: - print(f"DEBUG: Calling {impl_func.__name__} from vendor '{vendor_name}'...") - result = impl_func(*args, **kwargs) - vendor_results.append(result) - print(f"SUCCESS: {impl_func.__name__} from vendor '{vendor_name}' completed successfully") - - except AlphaVantageRateLimitError as e: - if vendor == "alpha_vantage": - print(f"RATE_LIMIT: Alpha Vantage rate limit exceeded, falling back to next available vendor") + max_retries = 3 + base_delay = 1.0 + last_error = None + + for retry_attempt in range(max_retries): + try: + if retry_attempt > 0: + print(f"RETRY: Attempt {retry_attempt + 1}/{max_retries} " + f"for {impl_func.__name__}") + else: + print(f"DEBUG: Calling {impl_func.__name__} " + f"from vendor '{vendor_name}'...") + + result = impl_func(*args, **kwargs) + vendor_results.append(result) + print(f"SUCCESS: {impl_func.__name__} from vendor " + f"'{vendor_name}' completed successfully") + last_error = None + break # Success, exit retry loop + + except AlphaVantageRateLimitError as e: + print(f"RATE_LIMIT: Alpha Vantage rate limit exceeded") print(f"DEBUG: Rate limit details: {e}") - # Continue to next vendor for fallback - continue - except Exception as e: - # Log error but continue with other implementations - print(f"FAILED: {impl_func.__name__} from vendor '{vendor_name}' failed: {e}") - continue + last_error = e + break # Don't retry rate limits, move to next vendor + + except (ConnectionError, TimeoutError, OSError, + APIConnectionError, APITimeoutError, RateLimitError) as e: + # Transient errors - retry with backoff + last_error = e + if retry_attempt < max_retries - 1: + delay = base_delay * (2 ** retry_attempt) + print(f"TRANSIENT_ERROR: {type(e).__name__} - {e}") + print(f"RETRY: Waiting {delay}s before retry...") + time.sleep(delay) + else: + print(f"FAILED: {impl_func.__name__} from vendor " + f"'{vendor_name}' failed after {max_retries} " + f"attempts: {e}") + + except Exception as e: + # Non-transient errors - don't retry + last_error = e + print(f"FAILED: {impl_func.__name__} from vendor " + f"'{vendor_name}' failed: {type(e).__name__}: {e}") + break + + if last_error is not None: + continue # Move to next implementation # Add this vendor's results if vendor_results: