"""Minervini Trend Template scanner — Stage 2 uptrend identification. Identifies stocks in a confirmed Stage 2 uptrend using Mark Minervini's 6-condition trend template, then ranks survivors by an IBD-style Relative Strength (RS) Rating computed within the scanned universe. All computation is pure OHLCV math — zero per-ticker API calls during scan. """ from typing import Any, Dict, List, Optional, Tuple import pandas as pd from tradingagents.dataflows.discovery.scanner_registry import SCANNER_REGISTRY, BaseScanner from tradingagents.dataflows.discovery.utils import Priority from tradingagents.utils.logger import get_logger logger = get_logger(__name__) DEFAULT_TICKER_FILE = "data/tickers.txt" def _load_tickers_from_file(path: str) -> List[str]: """Load ticker symbols from a text file.""" try: with open(path) as f: tickers = [ line.strip().upper() for line in f if line.strip() and not line.strip().startswith("#") ] if tickers: logger.info(f"Minervini scanner: loaded {len(tickers)} tickers from {path}") return tickers except FileNotFoundError: logger.warning(f"Ticker file not found: {path}") except Exception as e: logger.warning(f"Failed to load ticker file {path}: {e}") return [] class MinerviniScanner(BaseScanner): """Scan for stocks in a confirmed Minervini Stage 2 uptrend. Applies Mark Minervini's 6-condition trend template to identify stocks with healthy price structure (above rising SMAs, well off lows, near highs), then ranks by an IBD-style RS Rating computed within the scanned universe. Data requirement: ~200+ trading days of OHLCV (uses 1y lookback by default). Cost: single batch yfinance download, zero per-ticker API calls. """ name = "minervini" pipeline = "momentum" strategy = "minervini" def __init__(self, config: Dict[str, Any]): super().__init__(config) self.ticker_file = self.scanner_config.get( "ticker_file", config.get("tickers_file", DEFAULT_TICKER_FILE), ) self.min_rs_rating = self.scanner_config.get("min_rs_rating", 70) self.lookback_period = self.scanner_config.get("lookback_period", "1y") self.sma_200_slope_days = self.scanner_config.get("sma_200_slope_days", 20) self.min_pct_off_low = self.scanner_config.get("min_pct_off_low", 30) self.max_pct_from_high = self.scanner_config.get("max_pct_from_high", 25) self.max_tickers = self.scanner_config.get("max_tickers", 50) def scan(self, state: Dict[str, Any]) -> List[Dict[str, Any]]: if not self.is_enabled(): return [] logger.info("📊 Scanning for Minervini Stage 2 uptrends...") tickers = _load_tickers_from_file(self.ticker_file) if not tickers: logger.warning("No tickers loaded for Minervini scan") return [] if self.max_tickers and len(tickers) > self.max_tickers: logger.info(f"Limiting Minervini scan to {self.max_tickers}/{len(tickers)} tickers") tickers = tickers[: self.max_tickers] # Batch download OHLCV — 1y needed for SMA200 import yfinance as yf try: logger.info(f"Batch-downloading {len(tickers)} tickers ({self.lookback_period})...") raw = yf.download( tickers, period=self.lookback_period, interval="1d", auto_adjust=True, progress=False, ) except Exception as e: logger.error(f"Batch download failed: {e}") return [] if raw is None or raw.empty: logger.warning("Minervini scanner: batch download returned empty data") return [] # Compute 12-month returns for RS Rating (need all tickers' data) universe_returns: Dict[str, float] = {} passing_tickers: List[Tuple[str, Dict[str, Any]]] = [] for ticker in tickers: result = self._check_minervini(ticker, raw) if result is not None: ticker_df, metrics = result # Compute 12-month cumulative return for RS rating ret = self._compute_return(ticker_df) if ret is not None: universe_returns[ticker] = ret passing_tickers.append((ticker, metrics)) # Also compute returns for tickers that DIDN'T pass (for RS percentile ranking) for ticker in tickers: if ticker not in universe_returns: try: if isinstance(raw.columns, pd.MultiIndex): if ticker not in raw.columns.get_level_values(1): continue df = raw.xs(ticker, axis=1, level=1).dropna() else: df = raw.dropna() ret = self._compute_return(df) if ret is not None: universe_returns[ticker] = ret except Exception: continue # Compute RS ratings as percentile ranks within the universe if universe_returns: all_returns = list(universe_returns.values()) all_returns_sorted = sorted(all_returns) n = len(all_returns_sorted) def percentile_rank(val: float) -> float: pos = sum(1 for r in all_returns_sorted if r <= val) return round((pos / n) * 100, 1) rs_ratings = {t: percentile_rank(r) for t, r in universe_returns.items()} else: rs_ratings = {} # Build final candidates: pass RS filter, sort, limit candidates = [] for ticker, metrics in passing_tickers: rs_rating = rs_ratings.get(ticker, 0) if rs_rating < self.min_rs_rating: continue pct_off_low = metrics["pct_off_low"] pct_from_high = metrics["pct_from_high"] # Priority based on RS Rating if rs_rating >= 90: priority = Priority.CRITICAL.value elif rs_rating >= 80: priority = Priority.HIGH.value else: priority = Priority.MEDIUM.value context = ( f"Minervini Stage 2: P>SMA50>SMA150>SMA200, " f"+{pct_off_low:.0f}% off 52w low, " f"within {pct_from_high:.0f}% of 52w high, " f"RS Rating {rs_rating:.0f}/100" ) candidates.append( { "ticker": ticker, "source": self.name, "context": context, "priority": priority, "strategy": self.strategy, "rs_rating": rs_rating, "pct_off_low": round(pct_off_low, 1), "pct_from_high": round(pct_from_high, 1), "sma_50": round(metrics["sma_50"], 2), "sma_150": round(metrics["sma_150"], 2), "sma_200": round(metrics["sma_200"], 2), } ) # Sort by RS Rating descending, then limit candidates.sort(key=lambda c: c.get("rs_rating", 0), reverse=True) candidates = candidates[: self.limit] logger.info( f"Minervini scanner: {len(candidates)} Stage 2 candidates " f"(RS >= {self.min_rs_rating}) from {len(tickers)} tickers" ) return candidates def _check_minervini( self, ticker: str, raw: pd.DataFrame ) -> Optional[Tuple[pd.DataFrame, Dict[str, Any]]]: """Apply the 6-condition Minervini trend template to one ticker. Returns (df, metrics) if all conditions pass, None otherwise. """ try: # Extract single-ticker slice if isinstance(raw.columns, pd.MultiIndex): if ticker not in raw.columns.get_level_values(1): return None df = raw.xs(ticker, axis=1, level=1).dropna() else: df = raw.dropna() # Need at least 200 rows for SMA200 if len(df) < 200: return None close = df["Close"] sma_50 = float(close.rolling(50).mean().iloc[-1]) sma_150 = float(close.rolling(150).mean().iloc[-1]) sma_200 = float(close.rolling(200).mean().iloc[-1]) sma_200_prev = float(close.rolling(200).mean().iloc[-self.sma_200_slope_days - 1]) price = float(close.iloc[-1]) low_52w = float(close.iloc[-252:].min()) if len(close) >= 252 else float(close.min()) high_52w = float(close.iloc[-252:].max()) if len(close) >= 252 else float(close.max()) if low_52w <= 0 or sma_50 <= 0 or sma_150 <= 0 or sma_200 <= 0: return None pct_off_low = ((price - low_52w) / low_52w) * 100 pct_from_high = ((high_52w - price) / high_52w) * 100 # Minervini's 6 conditions (all must pass) conditions = [ price > sma_150 > sma_200, # 1. Price > SMA150 > SMA200 sma_150 > sma_200, # 2. SMA150 above SMA200 sma_200 > sma_200_prev, # 3. SMA200 slope is rising price > sma_50, # 4. Price above SMA50 pct_off_low >= self.min_pct_off_low, # 5. At least 30% off 52w low pct_from_high <= self.max_pct_from_high, # 6. Within 25% of 52w high ] if not all(conditions): return None return df, { "sma_50": sma_50, "sma_150": sma_150, "sma_200": sma_200, "pct_off_low": pct_off_low, "pct_from_high": pct_from_high, } except Exception as e: logger.debug(f"Minervini check failed for {ticker}: {e}") return None def _compute_return(self, df: pd.DataFrame) -> Optional[float]: """Compute IBD-style 12-month return with recent-quarter double-weighting. Formula: (full_year_return * 2 + last_quarter_return) / 3 This weights recent momentum more heavily, matching IBD's RS methodology. """ try: close = df["Close"] if "Close" in df.columns else df.iloc[:, 0] close = close.dropna() if len(close) < 2: return None latest = float(close.iloc[-1]) year_ago = float(close.iloc[0]) quarter_ago = float(close.iloc[max(0, len(close) - 63)]) if year_ago <= 0 or quarter_ago <= 0: return None full_year_ret = (latest - year_ago) / year_ago quarter_ret = (latest - quarter_ago) / quarter_ago # IBD weighting: recent quarter counts double return (full_year_ret * 2 + quarter_ret) / 3 except Exception: return None SCANNER_REGISTRY.register(MinerviniScanner)