TradingAgents/tradingagents/dataflows/stockstats_utils.py

103 lines
3.4 KiB
Python

import time
import logging
import pandas as pd
import yfinance as yf
from yfinance.exceptions import YFRateLimitError
from stockstats import wrap
from typing import Annotated
import os
from .config import get_config
logger = logging.getLogger(__name__)
def yf_retry(func, max_retries=3, base_delay=2.0):
"""Execute a yfinance call with exponential backoff on rate limits.
yfinance raises YFRateLimitError on HTTP 429 responses but does not
retry them internally. This wrapper adds retry logic specifically
for rate limits. Other exceptions propagate immediately.
"""
for attempt in range(max_retries + 1):
try:
return func()
except YFRateLimitError:
if attempt < max_retries:
delay = base_delay * (2 ** attempt)
logger.warning(f"Yahoo Finance rate limited, retrying in {delay:.0f}s (attempt {attempt + 1}/{max_retries})")
time.sleep(delay)
else:
raise
def _clean_dataframe(data: pd.DataFrame) -> pd.DataFrame:
"""Normalize a stock DataFrame for stockstats: parse dates, drop invalid rows, fill price gaps."""
data["Date"] = pd.to_datetime(data["Date"], errors="coerce")
data = data.dropna(subset=["Date"])
price_cols = [c for c in ["Open", "High", "Low", "Close", "Volume"] if c in data.columns]
data[price_cols] = data[price_cols].apply(pd.to_numeric, errors="coerce")
data = data.dropna(subset=["Close"])
data[price_cols] = data[price_cols].ffill().bfill()
return data
class StockstatsUtils:
@staticmethod
def get_stock_stats(
symbol: Annotated[str, "ticker symbol for the company"],
indicator: Annotated[
str, "quantitative indicators based off of the stock data for the company"
],
curr_date: Annotated[
str, "curr date for retrieving stock price data, YYYY-mm-dd"
],
):
config = get_config()
today_date = pd.Timestamp.today()
curr_date_dt = pd.to_datetime(curr_date)
end_date = today_date
start_date = today_date - pd.DateOffset(years=15)
start_date_str = start_date.strftime("%Y-%m-%d")
end_date_str = end_date.strftime("%Y-%m-%d")
# Ensure cache directory exists
os.makedirs(config["data_cache_dir"], exist_ok=True)
data_file = os.path.join(
config["data_cache_dir"],
f"{symbol}-YFin-data-{start_date_str}-{end_date_str}.csv",
)
if os.path.exists(data_file):
data = pd.read_csv(data_file, on_bad_lines="skip")
else:
data = yf_retry(lambda: yf.download(
symbol,
start=start_date_str,
end=end_date_str,
multi_level_index=False,
progress=False,
auto_adjust=True,
))
data = data.reset_index()
data.to_csv(data_file, index=False)
data = _clean_dataframe(data)
df = wrap(data)
df["Date"] = df["Date"].dt.strftime("%Y-%m-%d")
curr_date_str = curr_date_dt.strftime("%Y-%m-%d")
df[indicator] # trigger stockstats to calculate the indicator
matching_rows = df[df["Date"].str.startswith(curr_date_str)]
if not matching_rows.empty:
indicator_value = matching_rows[indicator].values[0]
return indicator_value
else:
return "N/A: Not a trading day (weekend or holiday)"