feat: switch stock data to Alpaca Markets API with yfinance fallback

Alpaca provides 10k calls/min free with 7yr history via IEX feed.
Hybrid approach: Alpaca for price bars, snapshots, sector ETF perf,
and moving averages; yfinance for fundamentals (PE, margins, 13F).

- Add alpaca_data.py: bars, snapshots, MAs, sector ETF perf, news
- Update get_macro_indicators: sector ETF performance via Alpaca
- Update get_sector_rotation: compute relative strength vs SPY
- Update entry timing node: Alpaca MAs from actual bar data
- Add alpaca-py to requirements.txt

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
dtarkent2-sys 2026-03-09 22:23:58 +00:00
parent 5e8c81e738
commit 84f7894768
4 changed files with 504 additions and 29 deletions

View File

@ -9,6 +9,7 @@ langgraph
rank-bm25 rank-bm25
setuptools setuptools
requests requests
alpaca-py
tqdm tqdm
pytz pytz
fastapi fastapi

View File

@ -261,21 +261,39 @@ def create_entry_timing_node(llm):
def node(state: Dict[str, Any]) -> Dict[str, Any]: def node(state: Dict[str, Any]) -> Dict[str, Any]:
ticker = state["ticker"] ticker = state["ticker"]
price = ma50 = ma200 = hi52 = lo52 = range_pct = None
# Try Alpaca first (computed from actual bar data — more reliable than yfinance info)
try: try:
t = yf.Ticker(ticker.upper()) from tradingagents.dataflows.alpaca_data import alpaca_available, get_moving_averages
info = t.info or {} if alpaca_available():
except Exception: ma_data = get_moving_averages(ticker)
info = {} if ma_data:
price = ma_data.get("current_price")
ma50 = ma_data.get("fifty_day_avg")
ma200 = ma_data.get("two_hundred_day_avg")
hi52 = ma_data.get("fifty_two_week_high")
lo52 = ma_data.get("fifty_two_week_low")
range_pct = ma_data.get("vs_52w_range_pct")
except Exception as e:
logger.debug("Alpaca MAs failed for %s: %s", ticker, e)
price = _safe(info, "currentPrice") or _safe(info, "regularMarketPrice") # Fallback: yfinance info
ma50 = _safe(info, "fiftyDayAverage") if price is None:
ma200 = _safe(info, "twoHundredDayAverage") try:
hi52 = _safe(info, "fiftyTwoWeekHigh") t = yf.Ticker(ticker.upper())
lo52 = _safe(info, "fiftyTwoWeekLow") info = t.info or {}
except Exception:
info = {}
range_pct = None price = _safe(info, "currentPrice") or _safe(info, "regularMarketPrice")
if hi52 and lo52 and price and (hi52 - lo52) > 0: ma50 = _safe(info, "fiftyDayAverage")
range_pct = round(((price - lo52) / (hi52 - lo52)) * 100, 1) ma200 = _safe(info, "twoHundredDayAverage")
hi52 = _safe(info, "fiftyTwoWeekHigh")
lo52 = _safe(info, "fiftyTwoWeekLow")
if hi52 and lo52 and price and (hi52 - lo52) > 0:
range_pct = round(((price - lo52) / (hi52 - lo52)) * 100, 1)
ma_rel = "unknown" ma_rel = "unknown"
if ma50 and ma200: if ma50 and ma200:

View File

@ -0,0 +1,321 @@
"""Alpaca Market Data API client for the equity ranking engine.
Provides price bars, snapshots, and news. Fundamentals still come from yfinance.
Free tier: 10,000 requests/min, up to 7 years of historical data.
"""
from __future__ import annotations
import json
import logging
import os
from datetime import datetime, timedelta
from typing import Any, Dict, List, Optional
import pandas as pd
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Client setup (lazy init)
# ---------------------------------------------------------------------------
_stock_client = None
_news_client = None
def _get_stock_client():
global _stock_client
if _stock_client is None:
from alpaca.data.historical import StockHistoricalDataClient
key = os.environ.get("ALPACA_API_KEY", "")
secret = os.environ.get("ALPACA_API_SECRET", "")
if not key or not secret:
raise RuntimeError(
"ALPACA_API_KEY and ALPACA_API_SECRET must be set"
)
_stock_client = StockHistoricalDataClient(key, secret)
return _stock_client
def _get_news_client():
global _news_client
if _news_client is None:
from alpaca.data.historical.news import NewsClient
key = os.environ.get("ALPACA_API_KEY", "")
secret = os.environ.get("ALPACA_API_SECRET", "")
_news_client = NewsClient(key, secret)
return _news_client
def alpaca_available() -> bool:
"""Check if Alpaca credentials are configured."""
return bool(
os.environ.get("ALPACA_API_KEY")
and os.environ.get("ALPACA_API_SECRET")
)
# ---------------------------------------------------------------------------
# Price / Bar data
# ---------------------------------------------------------------------------
def get_bars(
symbol: str,
start_date: str,
end_date: str,
timeframe: str = "1Day",
) -> pd.DataFrame:
"""Fetch historical bars from Alpaca.
Args:
symbol: Ticker symbol (e.g., "AAPL")
start_date: Start date in YYYY-MM-DD format
end_date: End date in YYYY-MM-DD format
timeframe: "1Min", "5Min", "15Min", "1Hour", "1Day", "1Week", "1Month"
Returns:
DataFrame with OHLCV columns.
"""
from alpaca.data.requests import StockBarsRequest
from alpaca.data.timeframe import TimeFrame, TimeFrameUnit
tf_map = {
"1Min": TimeFrame(1, TimeFrameUnit.Minute),
"5Min": TimeFrame(5, TimeFrameUnit.Minute),
"15Min": TimeFrame(15, TimeFrameUnit.Minute),
"1Hour": TimeFrame(1, TimeFrameUnit.Hour),
"1Day": TimeFrame(1, TimeFrameUnit.Day),
"1Week": TimeFrame(1, TimeFrameUnit.Week),
"1Month": TimeFrame(1, TimeFrameUnit.Month),
}
tf = tf_map.get(timeframe, TimeFrame(1, TimeFrameUnit.Day))
client = _get_stock_client()
request = StockBarsRequest(
symbol_or_symbols=symbol.upper(),
timeframe=tf,
start=datetime.strptime(start_date, "%Y-%m-%d"),
end=datetime.strptime(end_date, "%Y-%m-%d"),
feed="iex",
)
bars = client.get_stock_bars(request)
df = bars.df
if isinstance(df.index, pd.MultiIndex):
df = df.droplevel("symbol")
return df
def get_bars_csv(symbol: str, start_date: str, end_date: str) -> str:
"""Fetch historical bars and return as CSV string (drop-in for get_YFin_data_online)."""
try:
df = get_bars(symbol, start_date, end_date)
if df.empty:
return f"No data found for '{symbol}' between {start_date} and {end_date}"
# Rename columns to match yfinance output format
df = df.rename(columns={
"open": "Open", "high": "High", "low": "Low",
"close": "Close", "volume": "Volume",
"trade_count": "Trade Count", "vwap": "VWAP",
})
for col in ["Open", "High", "Low", "Close"]:
if col in df.columns:
df[col] = df[col].round(2)
if df.index.tz is not None:
df.index = df.index.tz_localize(None)
csv = df.to_csv()
header = (
f"# Stock data for {symbol.upper()} from {start_date} to {end_date}\n"
f"# Source: Alpaca Markets (IEX feed)\n"
f"# Total records: {len(df)}\n\n"
)
return header + csv
except Exception as e:
logger.warning("Alpaca bars failed for %s: %s", symbol, e)
return f"Error fetching Alpaca data for {symbol}: {e}"
# ---------------------------------------------------------------------------
# Snapshots (latest quote/trade)
# ---------------------------------------------------------------------------
def get_snapshot(symbol: str) -> Dict[str, Any]:
"""Get the latest snapshot (quote + trade + bar) for a symbol."""
from alpaca.data.requests import StockSnapshotRequest
client = _get_stock_client()
request = StockSnapshotRequest(symbol_or_symbols=symbol.upper(), feed="iex")
snapshots = client.get_stock_snapshot(request)
snap = snapshots.get(symbol.upper())
if not snap:
return {}
result = {
"ticker": symbol.upper(),
"latest_trade_price": snap.latest_trade.price if snap.latest_trade else None,
"latest_trade_size": snap.latest_trade.size if snap.latest_trade else None,
"latest_trade_time": str(snap.latest_trade.timestamp) if snap.latest_trade else None,
}
if snap.latest_quote:
result["bid"] = snap.latest_quote.bid_price
result["ask"] = snap.latest_quote.ask_price
result["bid_size"] = snap.latest_quote.bid_size
result["ask_size"] = snap.latest_quote.ask_size
if snap.daily_bar:
result["daily_open"] = snap.daily_bar.open
result["daily_high"] = snap.daily_bar.high
result["daily_low"] = snap.daily_bar.low
result["daily_close"] = snap.daily_bar.close
result["daily_volume"] = snap.daily_bar.volume
result["daily_vwap"] = snap.daily_bar.vwap
if snap.previous_daily_bar:
result["prev_close"] = snap.previous_daily_bar.close
result["prev_volume"] = snap.previous_daily_bar.volume
return result
def get_multi_snapshots(symbols: List[str]) -> Dict[str, Dict[str, Any]]:
"""Get snapshots for multiple symbols at once."""
from alpaca.data.requests import StockSnapshotRequest
client = _get_stock_client()
request = StockSnapshotRequest(
symbol_or_symbols=[s.upper() for s in symbols],
feed="iex",
)
snapshots = client.get_stock_snapshot(request)
result = {}
for sym, snap in snapshots.items():
entry = {"ticker": sym}
if snap.latest_trade:
entry["price"] = snap.latest_trade.price
if snap.daily_bar:
entry["daily_open"] = snap.daily_bar.open
entry["daily_high"] = snap.daily_bar.high
entry["daily_low"] = snap.daily_bar.low
entry["daily_close"] = snap.daily_bar.close
entry["daily_volume"] = snap.daily_bar.volume
entry["daily_vwap"] = snap.daily_bar.vwap
if snap.previous_daily_bar:
entry["prev_close"] = snap.previous_daily_bar.close
result[sym] = entry
return result
# ---------------------------------------------------------------------------
# Computed indicators from bars
# ---------------------------------------------------------------------------
def get_moving_averages(symbol: str) -> Dict[str, Any]:
"""Compute 50-day and 200-day moving averages from Alpaca bars."""
end = datetime.now()
start = end - timedelta(days=300) # ~200 trading days + buffer
try:
df = get_bars(symbol, start.strftime("%Y-%m-%d"), end.strftime("%Y-%m-%d"))
if df.empty or len(df) < 50:
return {}
close = df["close"] if "close" in df.columns else df["Close"]
result = {
"current_price": float(close.iloc[-1]),
"fifty_day_avg": float(close.tail(50).mean()),
}
if len(close) >= 200:
result["two_hundred_day_avg"] = float(close.tail(200).mean())
# 52-week high/low (approx 252 trading days)
year_data = close.tail(252) if len(close) >= 252 else close
result["fifty_two_week_high"] = float(year_data.max())
result["fifty_two_week_low"] = float(year_data.min())
hi = result["fifty_two_week_high"]
lo = result["fifty_two_week_low"]
price = result["current_price"]
if (hi - lo) > 0:
result["vs_52w_range_pct"] = round((price - lo) / (hi - lo) * 100, 1)
return result
except Exception as e:
logger.warning("Alpaca moving averages failed for %s: %s", symbol, e)
return {}
def get_sector_etf_performance(etf_symbols: List[str]) -> Dict[str, Dict[str, float]]:
"""Compute 1M and 3M returns for a list of sector ETFs."""
end = datetime.now()
start_3m = end - timedelta(days=100)
result = {}
for sym in etf_symbols:
try:
df = get_bars(sym, start_3m.strftime("%Y-%m-%d"), end.strftime("%Y-%m-%d"))
if df.empty or len(df) < 5:
continue
close = df["close"] if "close" in df.columns else df["Close"]
current = float(close.iloc[-1])
ret_1m = None
if len(close) >= 22:
price_1m = float(close.iloc[-22])
ret_1m = round((current - price_1m) / price_1m * 100, 2)
ret_3m = None
if len(close) >= 63:
price_3m = float(close.iloc[-63])
ret_3m = round((current - price_3m) / price_3m * 100, 2)
result[sym] = {
"return_1m": ret_1m,
"return_3m": ret_3m,
"price": current,
}
except Exception as e:
logger.warning("Alpaca ETF perf failed for %s: %s", sym, e)
return result
# ---------------------------------------------------------------------------
# News
# ---------------------------------------------------------------------------
def get_news(
symbols: Optional[List[str]] = None,
limit: int = 10,
start_date: Optional[str] = None,
) -> List[Dict[str, Any]]:
"""Fetch news articles from Alpaca News API."""
try:
from alpaca.data.requests import NewsRequest
client = _get_news_client()
kwargs: Dict[str, Any] = {"limit": limit}
if symbols:
kwargs["symbols"] = [s.upper() for s in symbols]
if start_date:
kwargs["start"] = datetime.strptime(start_date, "%Y-%m-%d")
request = NewsRequest(**kwargs)
news = client.get_news(request)
return [
{
"title": n.headline,
"summary": n.summary or "",
"url": n.url,
"source": n.source,
"created_at": str(n.created_at),
"symbols": n.symbols or [],
}
for n in news.news
]
except Exception as e:
logger.warning("Alpaca news failed: %s", e)
return []

View File

@ -1,45 +1,48 @@
from typing import Annotated from typing import Annotated
from datetime import datetime from datetime import datetime
from dateutil.relativedelta import relativedelta from dateutil.relativedelta import relativedelta
import logging
import yfinance as yf import yfinance as yf
import os import os
from .stockstats_utils import StockstatsUtils from .stockstats_utils import StockstatsUtils
_logger = logging.getLogger(__name__)
def get_YFin_data_online( def get_YFin_data_online(
symbol: Annotated[str, "ticker symbol of the company"], symbol: Annotated[str, "ticker symbol of the company"],
start_date: Annotated[str, "Start date in yyyy-mm-dd format"], start_date: Annotated[str, "Start date in yyyy-mm-dd format"],
end_date: Annotated[str, "End date in yyyy-mm-dd format"], end_date: Annotated[str, "End date in yyyy-mm-dd format"],
): ):
datetime.strptime(start_date, "%Y-%m-%d") datetime.strptime(start_date, "%Y-%m-%d")
datetime.strptime(end_date, "%Y-%m-%d") datetime.strptime(end_date, "%Y-%m-%d")
# Create ticker object # Try Alpaca first (10k calls/min, 7yr history)
ticker = yf.Ticker(symbol.upper()) try:
from .alpaca_data import alpaca_available, get_bars_csv
if alpaca_available():
result = get_bars_csv(symbol, start_date, end_date)
if not result.startswith("Error"):
return result
_logger.info("Alpaca bars failed, falling back to yfinance for %s", symbol)
except Exception as e:
_logger.debug("Alpaca unavailable: %s", e)
# Fetch historical data for the specified date range # Fallback: yfinance
ticker = yf.Ticker(symbol.upper())
data = ticker.history(start=start_date, end=end_date) data = ticker.history(start=start_date, end=end_date)
# Check if data is empty
if data.empty: if data.empty:
return ( return f"No data found for symbol '{symbol}' between {start_date} and {end_date}"
f"No data found for symbol '{symbol}' between {start_date} and {end_date}"
)
# Remove timezone info from index for cleaner output
if data.index.tz is not None: if data.index.tz is not None:
data.index = data.index.tz_localize(None) data.index = data.index.tz_localize(None)
# Round numerical values to 2 decimal places for cleaner display
numeric_columns = ["Open", "High", "Low", "Close", "Adj Close"] numeric_columns = ["Open", "High", "Low", "Close", "Adj Close"]
for col in numeric_columns: for col in numeric_columns:
if col in data.columns: if col in data.columns:
data[col] = data[col].round(2) data[col] = data[col].round(2)
# Convert DataFrame to CSV string
csv_string = data.to_csv() csv_string = data.to_csv()
# Add header information
header = f"# Stock data for {symbol.upper()} from {start_date} to {end_date}\n" header = f"# Stock data for {symbol.upper()} from {start_date} to {end_date}\n"
header += f"# Total records: {len(data)}\n" header += f"# Total records: {len(data)}\n"
header += f"# Data retrieved on: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n" header += f"# Data retrieved on: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n"
@ -526,9 +529,39 @@ def get_company_profile(ticker, curr_date=None):
return _json.dumps({"error": str(e), "ticker": ticker}) return _json.dumps({"error": str(e), "ticker": ticker})
_SECTOR_ETF_MAP = {
"Technology": "XLK",
"Financial Services": "XLF",
"Financials": "XLF",
"Energy": "XLE",
"Healthcare": "XLV",
"Health Care": "XLV",
"Industrials": "XLI",
"Consumer Cyclical": "XLY",
"Consumer Discretionary": "XLY",
"Consumer Defensive": "XLP",
"Consumer Staples": "XLP",
"Utilities": "XLU",
"Real Estate": "XLRE",
"Basic Materials": "XLB",
"Materials": "XLB",
"Communication Services": "XLC",
}
_SECTOR_ETFS = {
"SPY": "S&P 500",
"XLK": "Technology", "XLF": "Financials", "XLE": "Energy",
"XLV": "Health Care", "XLI": "Industrials", "XLY": "Consumer Discretionary",
"XLP": "Consumer Staples", "XLU": "Utilities", "XLRE": "Real Estate",
"XLB": "Materials", "XLC": "Communication Services",
}
def get_macro_indicators(curr_date=None): def get_macro_indicators(curr_date=None):
"""Get macro indicators via yfinance (plain function for interface routing).""" """Get macro indicators. VIX/TNX from yfinance (indices), sector ETFs from Alpaca."""
results = {} results = {}
# VIX and TNX are indices — yfinance only (Alpaca doesn't serve index tickers)
try: try:
vix = yf.Ticker("^VIX") vix = yf.Ticker("^VIX")
vd = vix.history(period="5d") vd = vix.history(period="5d")
@ -543,16 +576,118 @@ def get_macro_indicators(curr_date=None):
results["ten_year_yield"] = round(td["Close"].iloc[-1], 3) results["ten_year_yield"] = round(td["Close"].iloc[-1], 3)
except Exception: except Exception:
pass pass
# Sector ETF performance — Alpaca first (10k calls/min), yfinance fallback
try:
from .alpaca_data import alpaca_available, get_sector_etf_performance
if alpaca_available():
perf = get_sector_etf_performance(list(_SECTOR_ETFS.keys()))
if perf:
sector_performance = {}
for sym, data in perf.items():
sector_performance[sym] = {
"name": _SECTOR_ETFS.get(sym, sym),
"return_1m": data.get("return_1m"),
"return_3m": data.get("return_3m"),
"price": data.get("price"),
}
results["sector_performance"] = sector_performance
except Exception as e:
_logger.debug("Alpaca sector ETFs failed: %s", e)
# Fallback: yfinance for sector ETFs
if "sector_performance" not in results:
sector_performance = {}
for sym, name in _SECTOR_ETFS.items():
try:
t = yf.Ticker(sym)
hist = t.history(period="3mo")
if hist.empty or len(hist) < 5:
continue
close = hist["Close"]
current = float(close.iloc[-1])
ret_1m = round((current - float(close.iloc[-22])) / float(close.iloc[-22]) * 100, 2) if len(close) >= 22 else None
ret_3m = round((current - float(close.iloc[-63])) / float(close.iloc[-63]) * 100, 2) if len(close) >= 63 else None
sector_performance[sym] = {"name": name, "return_1m": ret_1m, "return_3m": ret_3m, "price": current}
except Exception:
pass
if sector_performance:
results["sector_performance"] = sector_performance
return _json.dumps(results, default=str) return _json.dumps(results, default=str)
def get_sector_rotation(ticker, curr_date=None): def get_sector_rotation(ticker, curr_date=None):
"""Get sector rotation data via yfinance (plain function for interface routing).""" """Get sector rotation data with relative performance vs SPY."""
try: try:
t = yf.Ticker(ticker.upper()) t = yf.Ticker(ticker.upper())
info = t.info info = t.info or {}
sector = _safe_get_yf(info, "sector", "Unknown") sector = _safe_get_yf(info, "sector", "Unknown")
return _json.dumps({"ticker": ticker.upper(), "sector": sector}, default=str) sector_etf = _SECTOR_ETF_MAP.get(sector)
result = {"ticker": ticker.upper(), "sector": sector, "sector_etf": sector_etf}
if not sector_etf:
return _json.dumps(result, default=str)
# Get sector ETF + SPY performance for relative strength
etfs_to_fetch = [sector_etf, "SPY"]
perf = {}
try:
from .alpaca_data import alpaca_available, get_sector_etf_performance
if alpaca_available():
perf = get_sector_etf_performance(etfs_to_fetch)
except Exception:
pass
# Fallback: yfinance
if not perf:
for sym in etfs_to_fetch:
try:
hist = yf.Ticker(sym).history(period="3mo")
if hist.empty or len(hist) < 5:
continue
close = hist["Close"]
current = float(close.iloc[-1])
ret_1m = round((current - float(close.iloc[-22])) / float(close.iloc[-22]) * 100, 2) if len(close) >= 22 else None
ret_3m = round((current - float(close.iloc[-63])) / float(close.iloc[-63]) * 100, 2) if len(close) >= 63 else None
perf[sym] = {"return_1m": ret_1m, "return_3m": ret_3m, "price": current}
except Exception:
pass
# Compute relative strength vs SPY
spy_data = perf.get("SPY", {})
etf_data = perf.get(sector_etf, {})
spy_1m = spy_data.get("return_1m")
spy_3m = spy_data.get("return_3m")
etf_1m = etf_data.get("return_1m")
etf_3m = etf_data.get("return_3m")
if etf_1m is not None and spy_1m is not None:
result["stock_sector_vs_spy_1m"] = round(etf_1m - spy_1m, 2)
if etf_3m is not None and spy_3m is not None:
result["stock_sector_vs_spy_3m"] = round(etf_3m - spy_3m, 2)
# Rank sector among all sector ETFs (from macro_indicators cache or fresh)
try:
macro_raw = get_macro_indicators()
macro = _json.loads(macro_raw) if isinstance(macro_raw, str) else macro_raw
sector_perf = macro.get("sector_performance", {})
# Rank by 1M return (exclude SPY from ranking)
ranked = sorted(
[(s, d.get("return_1m", -999)) for s, d in sector_perf.items() if s != "SPY"],
key=lambda x: x[1], reverse=True,
)
for i, (sym, _) in enumerate(ranked, 1):
if sym == sector_etf:
result["stock_sector_rank"] = i
result["total_sectors"] = len(ranked)
break
except Exception:
pass
return _json.dumps(result, default=str)
except Exception as e: except Exception as e:
return _json.dumps({"error": str(e)}) return _json.dumps({"error": str(e)})