feat(filter): wire OHLCV cache into filter stage, replace yfinance call sites

This commit is contained in:
Youssef Aitousarrah 2026-04-15 12:50:52 -07:00
parent 7d48ad67bc
commit c3ccfd5500
1 changed files with 22 additions and 30 deletions

View File

@ -5,7 +5,6 @@ from typing import Any, Callable, Dict, List
import pandas as pd import pandas as pd
from tradingagents.dataflows.data_cache.ohlcv_cache import download_ohlcv_cached
from tradingagents.dataflows.discovery.candidate import Candidate from tradingagents.dataflows.discovery.candidate import Candidate
from tradingagents.dataflows.discovery.discovery_config import DiscoveryConfig from tradingagents.dataflows.discovery.discovery_config import DiscoveryConfig
from tradingagents.dataflows.discovery.utils import ( from tradingagents.dataflows.discovery.utils import (
@ -172,7 +171,13 @@ class CandidateFilter:
volume_by_ticker = self._fetch_batch_volume(state, candidates) volume_by_ticker = self._fetch_batch_volume(state, candidates)
news_by_ticker = self._fetch_batch_news(start_date, end_date, candidates) news_by_ticker = self._fetch_batch_news(start_date, end_date, candidates)
price_by_ticker = self._fetch_batch_prices(candidates)
# Load OHLCV cache for candidate tickers — replaces per-ticker yfinance calls
cache_dir = self.config.get("discovery", {}).get("ohlcv_cache_dir", "data/ohlcv_cache")
candidate_tickers = [c["ticker"].upper() for c in candidates if c.get("ticker")]
logger.info(f"Loading OHLCV cache for {len(candidate_tickers)} candidate tickers...")
ohlcv_data = download_ohlcv_cached(candidate_tickers, period="1y", cache_dir=cache_dir)
logger.info(f"OHLCV cache loaded for {len(ohlcv_data)}/{len(candidate_tickers)} tickers")
( (
filtered_candidates, filtered_candidates,
@ -184,7 +189,7 @@ class CandidateFilter:
candidates=candidates, candidates=candidates,
volume_by_ticker=volume_by_ticker, volume_by_ticker=volume_by_ticker,
news_by_ticker=news_by_ticker, news_by_ticker=news_by_ticker,
price_by_ticker=price_by_ticker, ohlcv_data=ohlcv_data,
end_date=end_date, end_date=end_date,
) )
@ -472,7 +477,7 @@ class CandidateFilter:
candidates: List[Dict[str, Any]], candidates: List[Dict[str, Any]],
volume_by_ticker: Dict[str, Any], volume_by_ticker: Dict[str, Any],
news_by_ticker: Dict[str, Any], news_by_ticker: Dict[str, Any],
price_by_ticker: Dict[str, float], ohlcv_data: Dict[str, Any],
end_date: str, end_date: str,
): ):
filtered_candidates = [] filtered_candidates = []
@ -498,14 +503,10 @@ class CandidateFilter:
try: try:
# Same-day mover filter (check intraday movement first) # Same-day mover filter (check intraday movement first)
if self.filter_same_day_movers: if self.filter_same_day_movers:
from tradingagents.dataflows.y_finance import check_intraday_movement intraday_check = self._intraday_from_cache(
ticker, ohlcv_data, self.intraday_movement_threshold
try: )
intraday_check = check_intraday_movement( if intraday_check is not None:
ticker=ticker, movement_threshold=self.intraday_movement_threshold
)
# Skip if already moved significantly today
if intraday_check.get("already_moved"): if intraday_check.get("already_moved"):
filtered_reasons["intraday_moved"] += 1 filtered_reasons["intraday_moved"] += 1
intraday_pct = intraday_check.get("intraday_change_pct", 0) intraday_pct = intraday_check.get("intraday_change_pct", 0)
@ -513,24 +514,17 @@ class CandidateFilter:
f"Filtered {ticker}: Already moved {intraday_pct:+.1f}% today (stale)" f"Filtered {ticker}: Already moved {intraday_pct:+.1f}% today (stale)"
) )
continue continue
# Add intraday data to candidate metadata for ranking
cand["intraday_change_pct"] = intraday_check.get("intraday_change_pct", 0) cand["intraday_change_pct"] = intraday_check.get("intraday_change_pct", 0)
except Exception as e:
# Don't filter out if check fails, just log
logger.warning(f"Could not check intraday movement for {ticker}: {e}")
# Recent multi-day mover filter (avoid stocks that already ran) # Recent multi-day mover filter (avoid stocks that already ran)
if self.filter_recent_movers: if self.filter_recent_movers:
from tradingagents.dataflows.y_finance import check_if_price_reacted reaction = self._recent_move_from_cache(
ticker,
try: ohlcv_data,
reaction = check_if_price_reacted( self.recent_movement_lookback_days,
ticker=ticker, self.recent_movement_threshold,
lookback_days=self.recent_movement_lookback_days, )
reaction_threshold=self.recent_movement_threshold, if reaction is not None:
)
cand["recent_change_pct"] = reaction.get("price_change_pct") cand["recent_change_pct"] = reaction.get("price_change_pct")
cand["recent_move_status"] = reaction.get("status") cand["recent_move_status"] = reaction.get("status")
@ -551,8 +545,6 @@ class CandidateFilter:
f"{existing_context} | ⚠️ Recent move: {change_pct:+.1f}% " f"{existing_context} | ⚠️ Recent move: {change_pct:+.1f}% "
f"over {self.recent_movement_lookback_days}d" f"over {self.recent_movement_lookback_days}d"
) )
except Exception as e:
logger.warning(f"Could not check recent movement for {ticker}: {e}")
# Liquidity filter based on average volume # Liquidity filter based on average volume
if self.min_average_volume: if self.min_average_volume:
@ -580,8 +572,8 @@ class CandidateFilter:
try: try:
from tradingagents.dataflows.y_finance import get_fundamentals, get_stock_price from tradingagents.dataflows.y_finance import get_fundamentals, get_stock_price
# Get current price — prefer batch result, fall back to per-ticker # Get current price — prefer OHLCV cache, fall back to per-ticker yfinance
current_price = price_by_ticker.get(ticker.upper()) current_price = self._price_from_cache(ticker, ohlcv_data)
if current_price is None: if current_price is None:
current_price = get_stock_price(ticker) current_price = get_stock_price(ticker)
cand["current_price"] = current_price cand["current_price"] = current_price