From d7e1b93509af515ea680a9922e3fb23682108f38 Mon Sep 17 00:00:00 2001 From: Youssef Aitousarrah Date: Wed, 15 Apr 2026 12:45:04 -0700 Subject: [PATCH] feat(filter): add OHLCV cache helper methods for price/intraday/recent-move Co-Authored-By: Claude Sonnet 4.6 --- tradingagents/dataflows/discovery/filter.py | 62 +++++++++++++++++++++ 1 file changed, 62 insertions(+) diff --git a/tradingagents/dataflows/discovery/filter.py b/tradingagents/dataflows/discovery/filter.py index 9444f32f..e2c82e45 100644 --- a/tradingagents/dataflows/discovery/filter.py +++ b/tradingagents/dataflows/discovery/filter.py @@ -5,6 +5,7 @@ from typing import Any, Callable, Dict, List import pandas as pd +from tradingagents.dataflows.data_cache.ohlcv_cache import download_ohlcv_cached from tradingagents.dataflows.discovery.candidate import Candidate from tradingagents.dataflows.discovery.discovery_config import DiscoveryConfig from tradingagents.dataflows.discovery.utils import ( @@ -298,6 +299,67 @@ class CandidateFilter: f"Priority breakdown: {critical_priority} critical, {high_priority} high, {medium_priority} medium, {low_priority} low" ) + def _price_from_cache( + self, ticker: str, ohlcv_data: Dict[str, Any] + ) -> Any: + """Return last closing price from OHLCV cache, or None if ticker missing.""" + df = ohlcv_data.get(ticker.upper()) + if df is None or df.empty or "Close" not in df.columns: + return None + close = df["Close"].dropna() + if close.empty: + return None + return float(close.iloc[-1]) + + def _intraday_from_cache( + self, ticker: str, ohlcv_data: Dict[str, Any], threshold: float + ) -> Any: + """Compute day-over-day % change from last two daily closes. + + Returns dict with 'already_moved' (bool) and 'intraday_change_pct' (float), + or None if ticker missing from cache or insufficient data. + """ + df = ohlcv_data.get(ticker.upper()) + if df is None or df.empty or "Close" not in df.columns: + return None + close = df["Close"].dropna() + if len(close) < 2: + return None + prev_close = float(close.iloc[-2]) + last_close = float(close.iloc[-1]) + if prev_close <= 0: + return None + pct = (last_close - prev_close) / prev_close * 100 + return { + "already_moved": pct > threshold, + "intraday_change_pct": round(pct, 2), + } + + def _recent_move_from_cache( + self, ticker: str, ohlcv_data: Dict[str, Any], lookback_days: int, threshold: float + ) -> Any: + """Compute % change over last N daily closes. + + Returns dict with 'status' ('leading'|'lagging') and 'price_change_pct' (float), + or None if ticker missing from cache or insufficient data. + """ + df = ohlcv_data.get(ticker.upper()) + if df is None or df.empty or "Close" not in df.columns: + return None + close = df["Close"].dropna() + if len(close) < lookback_days + 1: + return None + price_start = float(close.iloc[-(lookback_days + 1)]) + price_end = float(close.iloc[-1]) + if price_start <= 0: + return None + pct = (price_end - price_start) / price_start * 100 + reacted = abs(pct) >= threshold + return { + "status": "lagging" if reacted else "leading", + "price_change_pct": round(pct, 2), + } + def _fetch_batch_volume( self, state: Dict[str, Any], candidates: List[Dict[str, Any]] ) -> Dict[str, Any]: