feat(filter): add OHLCV cache helper methods for price/intraday/recent-move

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Youssef Aitousarrah 2026-04-15 12:45:04 -07:00
parent b03ab60f32
commit d7e1b93509
1 changed files with 62 additions and 0 deletions

View File

@ -5,6 +5,7 @@ from typing import Any, Callable, Dict, List
import pandas as pd
from tradingagents.dataflows.data_cache.ohlcv_cache import download_ohlcv_cached
from tradingagents.dataflows.discovery.candidate import Candidate
from tradingagents.dataflows.discovery.discovery_config import DiscoveryConfig
from tradingagents.dataflows.discovery.utils import (
@ -298,6 +299,67 @@ class CandidateFilter:
f"Priority breakdown: {critical_priority} critical, {high_priority} high, {medium_priority} medium, {low_priority} low"
)
def _price_from_cache(
self, ticker: str, ohlcv_data: Dict[str, Any]
) -> Any:
"""Return last closing price from OHLCV cache, or None if ticker missing."""
df = ohlcv_data.get(ticker.upper())
if df is None or df.empty or "Close" not in df.columns:
return None
close = df["Close"].dropna()
if close.empty:
return None
return float(close.iloc[-1])
def _intraday_from_cache(
self, ticker: str, ohlcv_data: Dict[str, Any], threshold: float
) -> Any:
"""Compute day-over-day % change from last two daily closes.
Returns dict with 'already_moved' (bool) and 'intraday_change_pct' (float),
or None if ticker missing from cache or insufficient data.
"""
df = ohlcv_data.get(ticker.upper())
if df is None or df.empty or "Close" not in df.columns:
return None
close = df["Close"].dropna()
if len(close) < 2:
return None
prev_close = float(close.iloc[-2])
last_close = float(close.iloc[-1])
if prev_close <= 0:
return None
pct = (last_close - prev_close) / prev_close * 100
return {
"already_moved": pct > threshold,
"intraday_change_pct": round(pct, 2),
}
def _recent_move_from_cache(
self, ticker: str, ohlcv_data: Dict[str, Any], lookback_days: int, threshold: float
) -> Any:
"""Compute % change over last N daily closes.
Returns dict with 'status' ('leading'|'lagging') and 'price_change_pct' (float),
or None if ticker missing from cache or insufficient data.
"""
df = ohlcv_data.get(ticker.upper())
if df is None or df.empty or "Close" not in df.columns:
return None
close = df["Close"].dropna()
if len(close) < lookback_days + 1:
return None
price_start = float(close.iloc[-(lookback_days + 1)])
price_end = float(close.iloc[-1])
if price_start <= 0:
return None
pct = (price_end - price_start) / price_start * 100
reacted = abs(pct) >= threshold
return {
"status": "lagging" if reacted else "leading",
"price_change_pct": round(pct, 2),
}
def _fetch_batch_volume(
self, state: Dict[str, Any], candidates: List[Dict[str, Any]]
) -> Dict[str, Any]: