feat: improve all 9 scanners and add 3 new scanners

Phase 1 - Fix existing scanners:
- Options flow: apply min_premium filter, scan 3 expirations
- Volume accumulation: distinguish accumulation vs distribution
- Reddit DD: use LLM quality score for priority (skip <60)
- Reddit trending: add mention counts, scale priority by volume
- Semantic news: include headlines, add catalyst classification
- Earnings calendar: add pre-earnings accumulation + EPS estimates
- Market movers: add price ($5) and volume (500K) validation
- ML signal: raise min_win_prob from 35% to 50%

Phase 2 - New scanners:
- Analyst upgrades: monitors rating changes via Alpha Vantage
- Technical breakout: volume-confirmed breakouts above 20d high
- Sector rotation: finds laggards in accelerating sectors

All 12 scanners register with valid Strategy enum values.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Youssef Aitousarrah 2026-02-18 15:44:56 -08:00
parent 573b756b4b
commit 1c20dc8c90
15 changed files with 2238 additions and 83 deletions

File diff suppressed because it is too large Load Diff

View File

@ -2,6 +2,7 @@
# Import all scanners to trigger registration
from . import (
analyst_upgrades, # noqa: F401
earnings_calendar, # noqa: F401
insider_buying, # noqa: F401
market_movers, # noqa: F401
@ -9,6 +10,8 @@ from . import (
options_flow, # noqa: F401
reddit_dd, # noqa: F401
reddit_trending, # noqa: F401
sector_rotation, # noqa: F401
semantic_news, # noqa: F401
technical_breakout, # noqa: F401
volume_accumulation, # noqa: F401
)

View File

@ -0,0 +1,99 @@
"""Analyst upgrade and initiation scanner."""
from typing import Any, Dict, List
from tradingagents.dataflows.discovery.scanner_registry import SCANNER_REGISTRY, BaseScanner
from tradingagents.dataflows.discovery.utils import Priority
from tradingagents.utils.logger import get_logger
logger = get_logger(__name__)
class AnalystUpgradeScanner(BaseScanner):
"""Scan for recent analyst upgrades and coverage initiations."""
name = "analyst_upgrades"
pipeline = "edge"
strategy = "analyst_upgrade"
def __init__(self, config: Dict[str, Any]):
super().__init__(config)
self.lookback_days = self.scanner_config.get("lookback_days", 3)
self.max_hours_old = self.scanner_config.get("max_hours_old", 72)
def scan(self, state: Dict[str, Any]) -> List[Dict[str, Any]]:
if not self.is_enabled():
return []
logger.info("📊 Scanning analyst upgrades and initiations...")
try:
from tradingagents.dataflows.alpha_vantage_analysts import (
get_analyst_rating_changes,
)
changes = get_analyst_rating_changes(
lookback_days=self.lookback_days,
change_types=["upgrade", "initiated"],
top_n=self.limit * 2,
return_structured=True,
)
if not changes:
logger.info("No analyst upgrades found")
return []
candidates = []
for change in changes:
ticker = change.get("ticker", "").upper().strip()
if not ticker:
continue
action = change.get("action", "unknown")
hours_old = change.get("hours_old", 999)
headline = change.get("headline", "")
source = change.get("source", "")
if hours_old > self.max_hours_old:
continue
# Priority by freshness and action type
if action == "upgrade" and hours_old <= 24:
priority = Priority.HIGH.value
elif action == "initiated" and hours_old <= 24:
priority = Priority.HIGH.value
elif hours_old <= 48:
priority = Priority.MEDIUM.value
else:
priority = Priority.LOW.value
context = (
f"Analyst {action}: {headline}"
if headline
else f"Analyst {action} ({source})"
)
candidates.append(
{
"ticker": ticker,
"source": self.name,
"context": context,
"priority": priority,
"strategy": self.strategy,
"analyst_action": action,
"hours_old": hours_old,
}
)
if len(candidates) >= self.limit:
break
logger.info(f"Analyst upgrades: {len(candidates)} candidates")
return candidates
except Exception as e:
logger.error(f"Analyst upgrades scan failed: {e}", exc_info=True)
return []
SCANNER_REGISTRY.register(AnalystUpgradeScanner)

View File

@ -16,6 +16,7 @@ class EarningsCalendarScanner(BaseScanner):
name = "earnings_calendar"
pipeline = "events"
strategy = "earnings_play"
def __init__(self, config: Dict[str, Any]):
super().__init__(config)
@ -60,6 +61,12 @@ class EarningsCalendarScanner(BaseScanner):
# Sort by days until earnings (sooner = higher priority)
candidates.sort(key=lambda x: x.get("days_until", 999))
# Enrich top candidates with accumulation signal and EPS estimates
for cand in candidates[:10]:
days_until = cand.get("days_until", 999)
if 2 <= days_until <= 7:
self._enrich_earnings_candidate(cand)
# Apply limit
candidates = candidates[: self.limit]
@ -70,6 +77,37 @@ class EarningsCalendarScanner(BaseScanner):
logger.warning(f"⚠️ Earnings calendar failed: {e}")
return []
def _enrich_earnings_candidate(self, cand: Dict[str, Any]) -> None:
"""Enrich earnings candidate with accumulation signal and estimates (in-place)."""
ticker = cand["ticker"]
# Check pre-earnings volume accumulation
try:
from tradingagents.dataflows.y_finance import get_pre_earnings_accumulation_signal
signal = get_pre_earnings_accumulation_signal(ticker)
if signal and signal.get("signal"):
vol_ratio = signal.get("volume_ratio", 0)
cand["has_accumulation"] = True
cand["accumulation_volume_ratio"] = vol_ratio
cand["context"] += f" | Pre-earnings accumulation: {vol_ratio:.1f}x volume"
cand["priority"] = Priority.CRITICAL.value
except Exception:
pass
# Add earnings estimates
try:
from tradingagents.dataflows.finnhub_api import get_ticker_earnings_estimate
est = get_ticker_earnings_estimate(ticker)
if est and est.get("has_upcoming_earnings"):
eps = est.get("eps_estimate")
if eps is not None:
cand["eps_estimate"] = eps
cand["context"] += f" | EPS est: ${eps:.2f}"
except Exception:
pass
def _parse_structured_earnings(
self, earnings_list: List[Dict], seen_tickers: set
) -> List[Dict[str, Any]]:

View File

@ -66,8 +66,7 @@ class InsiderBuyingScanner(BaseScanner):
# Priority by significance
title_lower = title.lower()
is_c_suite = any(
t in title_lower
for t in ["ceo", "cfo", "coo", "cto", "president", "chairman"]
t in title_lower for t in ["ceo", "cfo", "coo", "cto", "president", "chairman"]
)
is_director = "director" in title_lower

View File

@ -14,9 +14,12 @@ class MarketMoversScanner(BaseScanner):
name = "market_movers"
pipeline = "momentum"
strategy = "momentum"
def __init__(self, config: Dict[str, Any]):
super().__init__(config)
self.min_price = self.scanner_config.get("min_price", 5.0)
self.min_volume = self.scanner_config.get("min_volume", 500_000)
def scan(self, state: Dict[str, Any]) -> List[Dict[str, Any]]:
if not self.is_enabled():
@ -39,36 +42,61 @@ class MarketMoversScanner(BaseScanner):
candidates = []
# Process gainers
for gainer in result.get("gainers", [])[: self.limit // 2]:
for gainer in result.get("gainers", [])[: self.limit]:
ticker = gainer.get("ticker", "").upper()
if not ticker:
continue
if not self._validate_mover(ticker):
continue
change_pct = gainer.get("change_percentage", 0)
price = gainer.get("price", "")
volume = gainer.get("volume", "")
context = f"Top gainer: {change_pct} change"
if price:
context += f" (${price})"
if volume:
context += f" vol: {volume}"
candidates.append(
{
"ticker": ticker,
"source": self.name,
"context": f"Top gainer: {gainer.get('change_percentage', 0)} change",
"context": context,
"priority": Priority.MEDIUM.value,
"strategy": "momentum",
"strategy": self.strategy,
}
)
if len(candidates) >= self.limit // 2:
break
# Process losers (potential reversal plays)
for loser in result.get("losers", [])[: self.limit // 2]:
loser_count = 0
for loser in result.get("losers", [])[: self.limit]:
ticker = loser.get("ticker", "").upper()
if not ticker:
continue
if not self._validate_mover(ticker):
continue
change_pct = loser.get("change_percentage", 0)
candidates.append(
{
"ticker": ticker,
"source": self.name,
"context": f"Top loser: {loser.get('change_percentage', 0)} change (reversal play)",
"context": f"Top loser: {change_pct} change (reversal play)",
"priority": Priority.LOW.value,
"strategy": "oversold_reversal",
"strategy": self.strategy,
}
)
loser_count += 1
if loser_count >= self.limit // 2:
break
logger.info(f"Found {len(candidates)} market movers")
return candidates
@ -77,5 +105,23 @@ class MarketMoversScanner(BaseScanner):
logger.warning(f"⚠️ Market movers failed: {e}")
return []
def _validate_mover(self, ticker: str) -> bool:
"""Quick validation: price and volume check to filter penny/illiquid stocks."""
try:
from tradingagents.dataflows.y_finance import get_stock_price, get_ticker_info
price = get_stock_price(ticker)
if price is not None and price < self.min_price:
return False
info = get_ticker_info(ticker)
avg_vol = info.get("averageVolume", 0) if info else 0
if avg_vol and avg_vol < self.min_volume:
return False
return True
except Exception:
return True # Don't filter on errors
SCANNER_REGISTRY.register(MarketMoversScanner)

View File

@ -53,10 +53,11 @@ class MLSignalScanner(BaseScanner):
name = "ml_signal"
pipeline = "momentum"
strategy = "ml_signal"
def __init__(self, config: Dict[str, Any]):
super().__init__(config)
self.min_win_prob = self.scanner_config.get("min_win_prob", 0.35)
self.min_win_prob = self.scanner_config.get("min_win_prob", 0.50)
self.lookback_period = self.scanner_config.get("lookback_period", "1y")
self.max_workers = self.scanner_config.get("max_workers", 8)
self.fetch_market_cap = self.scanner_config.get("fetch_market_cap", False)
@ -249,9 +250,9 @@ class MLSignalScanner(BaseScanner):
return None
# Determine priority from P(WIN)
if win_prob >= 0.50:
if win_prob >= 0.65:
priority = Priority.CRITICAL.value
elif win_prob >= 0.40:
elif win_prob >= 0.55:
priority = Priority.HIGH.value
else:
priority = Priority.MEDIUM.value
@ -265,7 +266,7 @@ class MLSignalScanner(BaseScanner):
f"({prediction.get('prediction', 'N/A')})"
),
"priority": priority,
"strategy": "ml_signal",
"strategy": self.strategy,
"ml_win_prob": win_prob,
"ml_loss_prob": loss_prob,
"ml_prediction": prediction.get("prediction", "N/A"),

View File

@ -9,6 +9,7 @@ from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import Any, Dict, List, Optional
from tradingagents.dataflows.discovery.scanner_registry import SCANNER_REGISTRY, BaseScanner
from tradingagents.dataflows.discovery.utils import Priority
from tradingagents.dataflows.y_finance import get_option_chain, get_ticker_options
from tradingagents.utils.logger import get_logger
@ -41,6 +42,7 @@ class OptionsFlowScanner(BaseScanner):
name = "options_flow"
pipeline = "edge"
strategy = "options_flow"
def __init__(self, config: Dict[str, Any]):
super().__init__(config)
@ -95,48 +97,116 @@ class OptionsFlowScanner(BaseScanner):
return candidates
def _analyze_ticker_options(self, ticker: str) -> Optional[Dict[str, Any]]:
"""Scan a single ticker for unusual options activity across multiple expirations."""
try:
expirations = get_ticker_options(ticker)
if not expirations:
return None
options = get_option_chain(ticker, expirations[0])
calls = options.calls
puts = options.puts
# Scan up to 3 nearest expirations
max_expirations = min(3, len(expirations))
total_unusual_calls = 0
total_unusual_puts = 0
total_call_vol = 0
total_put_vol = 0
best_expiration = None
best_unusual_count = 0
# Find unusual strikes
unusual_strikes = []
for _, opt in calls.iterrows():
vol = opt.get("volume", 0) or 0
oi = opt.get("openInterest", 0) or 0
if oi > 0 and vol > self.min_volume and (vol / oi) >= self.min_volume_oi_ratio:
unusual_strikes.append(
{"type": "call", "strike": opt["strike"], "volume": vol, "oi": oi}
)
for exp in expirations[:max_expirations]:
try:
options = get_option_chain(ticker, exp)
except Exception:
continue
if not unusual_strikes:
if options is None:
continue
calls_df, puts_df = (None, None)
if isinstance(options, tuple) and len(options) == 2:
calls_df, puts_df = options
elif hasattr(options, "calls") and hasattr(options, "puts"):
calls_df, puts_df = options.calls, options.puts
else:
continue
exp_unusual_calls = 0
exp_unusual_puts = 0
# Analyze calls
if calls_df is not None and not calls_df.empty:
for _, opt in calls_df.iterrows():
vol = opt.get("volume", 0) or 0
oi = opt.get("openInterest", 0) or 0
price = opt.get("lastPrice", 0) or 0
if vol < self.min_volume:
continue
# Premium filter (volume * price * 100 shares per contract)
if (vol * price * 100) < self.min_premium:
continue
if oi > 0 and (vol / oi) >= self.min_volume_oi_ratio:
exp_unusual_calls += 1
total_call_vol += vol
# Analyze puts
if puts_df is not None and not puts_df.empty:
for _, opt in puts_df.iterrows():
vol = opt.get("volume", 0) or 0
oi = opt.get("openInterest", 0) or 0
price = opt.get("lastPrice", 0) or 0
if vol < self.min_volume:
continue
if (vol * price * 100) < self.min_premium:
continue
if oi > 0 and (vol / oi) >= self.min_volume_oi_ratio:
exp_unusual_puts += 1
total_put_vol += vol
total_unusual_calls += exp_unusual_calls
total_unusual_puts += exp_unusual_puts
exp_total = exp_unusual_calls + exp_unusual_puts
if exp_total > best_unusual_count:
best_unusual_count = exp_total
best_expiration = exp
total_unusual = total_unusual_calls + total_unusual_puts
if total_unusual == 0:
return None
# Calculate P/C ratio
total_call_vol = calls["volume"].sum() if not calls.empty else 0
total_put_vol = puts["volume"].sum() if not puts.empty else 0
pc_ratio = total_put_vol / total_call_vol if total_call_vol > 0 else 0
# Calculate put/call ratio
pc_ratio = total_put_vol / total_call_vol if total_call_vol > 0 else 999
sentiment = "bullish" if pc_ratio < 0.7 else "bearish" if pc_ratio > 1.3 else "neutral"
if pc_ratio < 0.7:
sentiment = "bullish"
elif pc_ratio > 1.3:
sentiment = "bearish"
else:
sentiment = "neutral"
priority = Priority.HIGH.value if sentiment == "bullish" else Priority.MEDIUM.value
context = (
f"Unusual options: {total_unusual} strikes across {max_expirations} exp, "
f"P/C={pc_ratio:.2f} ({sentiment}), "
f"{total_unusual_calls} unusual calls / {total_unusual_puts} unusual puts"
)
return {
"ticker": ticker,
"source": self.name,
"context": (
f"Unusual options: {len(unusual_strikes)} strikes, "
f"P/C={pc_ratio:.2f} ({sentiment})"
),
"priority": "high" if sentiment == "bullish" else "medium",
"strategy": "options_flow",
"context": context,
"priority": priority,
"strategy": self.strategy,
"put_call_ratio": round(pc_ratio, 2),
"unusual_calls": total_unusual_calls,
"unusual_puts": total_unusual_puts,
"best_expiration": best_expiration,
}
except Exception:
except Exception as e:
logger.debug(f"Error scanning {ticker}: {e}")
return None

View File

@ -15,6 +15,7 @@ class RedditDDScanner(BaseScanner):
name = "reddit_dd"
pipeline = "social"
strategy = "social_dd"
def __init__(self, config: Dict[str, Any]):
super().__init__(config)
@ -38,25 +39,34 @@ class RedditDDScanner(BaseScanner):
# Handle different result formats
if isinstance(result, list):
# Structured result with DD posts
for post in result[: self.limit]:
for post in result[: self.limit * 2]:
ticker = post.get("ticker", "").upper()
if not ticker:
continue
title = post.get("title", "")
score = post.get("score", 0)
title = post.get("title", "")[:100]
# Use LLM quality score (0-100) for priority, not Reddit upvotes
dd_score = post.get("quality_score", post.get("score", 0))
# Higher score = higher priority
priority = Priority.HIGH.value if score > 1000 else Priority.MEDIUM.value
if dd_score >= 80:
priority = Priority.HIGH.value
elif dd_score >= 60:
priority = Priority.MEDIUM.value
else:
# Skip low-quality posts
continue
context = f"Reddit DD (score: {dd_score}/100): {title}"
candidates.append(
{
"ticker": ticker,
"source": self.name,
"context": f"Reddit DD: {title[:80]}... (score: {score})",
"context": context,
"priority": priority,
"strategy": "undiscovered_dd",
"dd_score": score,
"strategy": self.strategy,
"dd_quality_score": dd_score,
"dd_title": title,
}
)
@ -67,13 +77,24 @@ class RedditDDScanner(BaseScanner):
if not ticker:
continue
title = ticker_data.get("title", "")[:100]
dd_score = ticker_data.get("quality_score", ticker_data.get("score", 0))
if dd_score >= 80:
priority = Priority.HIGH.value
elif dd_score >= 60:
priority = Priority.MEDIUM.value
else:
continue
candidates.append(
{
"ticker": ticker,
"source": self.name,
"context": "Reddit DD post",
"priority": Priority.MEDIUM.value,
"strategy": "undiscovered_dd",
"context": f"Reddit DD (score: {dd_score}/100): {title}" if title else "Reddit DD post",
"priority": priority,
"strategy": self.strategy,
"dd_quality_score": dd_score,
}
)
@ -123,7 +144,7 @@ class RedditDDScanner(BaseScanner):
"source": self.name,
"context": f"Reddit DD: {submission.title[:80]}...",
"priority": Priority.MEDIUM.value,
"strategy": "undiscovered_dd",
"strategy": self.strategy,
}
)
@ -151,7 +172,7 @@ class RedditDDScanner(BaseScanner):
"source": self.name,
"context": "Reddit DD post",
"priority": Priority.MEDIUM.value,
"strategy": "undiscovered_dd",
"strategy": self.strategy,
}
)

View File

@ -14,6 +14,7 @@ class RedditTrendingScanner(BaseScanner):
name = "reddit_trending"
pipeline = "social"
strategy = "social_hype"
def __init__(self, config: Dict[str, Any]):
super().__init__(config)
@ -37,19 +38,48 @@ class RedditTrendingScanner(BaseScanner):
return []
# Extract tickers using common utility
from collections import Counter
from tradingagents.dataflows.discovery.common_utils import extract_tickers_from_text
tickers_found = extract_tickers_from_text(result)
# Count ticker mentions in the raw text for priority scaling
ticker_counts = Counter()
for ticker in tickers_found:
# Count occurrences in the original text
count = result.upper().count(ticker)
ticker_counts[ticker] = max(count, 1)
# Deduplicate while preserving order
seen = set()
unique_tickers = []
for t in tickers_found:
if t not in seen:
seen.add(t)
unique_tickers.append(t)
candidates = []
for ticker in tickers_found[: self.limit]:
for ticker in unique_tickers[: self.limit]:
count = ticker_counts.get(ticker, 1)
if count >= 50:
priority = Priority.HIGH.value
elif count >= 20:
priority = Priority.MEDIUM.value
else:
priority = Priority.LOW.value
context = f"Trending on Reddit: ~{count} mentions"
candidates.append(
{
"ticker": ticker,
"source": self.name,
"context": "Reddit trending discussion",
"priority": Priority.MEDIUM.value,
"strategy": "social_hype",
"context": context,
"priority": priority,
"strategy": self.strategy,
"mention_count": count,
}
)

View File

@ -0,0 +1,209 @@
"""Sector rotation scanner — finds laggards in accelerating sectors."""
from typing import Any, Dict, List, Optional
import pandas as pd
from tradingagents.dataflows.discovery.scanner_registry import SCANNER_REGISTRY, BaseScanner
from tradingagents.dataflows.discovery.utils import Priority
from tradingagents.utils.logger import get_logger
logger = get_logger(__name__)
# SPDR Select Sector ETFs
SECTOR_ETFS = {
"XLK": "Technology",
"XLF": "Financials",
"XLE": "Energy",
"XLV": "Healthcare",
"XLI": "Industrials",
"XLY": "Consumer Discretionary",
"XLP": "Consumer Staples",
"XLU": "Utilities",
"XLB": "Materials",
"XLRE": "Real Estate",
"XLC": "Communication Services",
}
DEFAULT_TICKER_FILE = "data/tickers.txt"
def _load_tickers_from_file(path: str) -> List[str]:
"""Load ticker symbols from a text file."""
try:
with open(path) as f:
return [
line.strip().upper()
for line in f
if line.strip() and not line.strip().startswith("#")
]
except Exception:
return []
class SectorRotationScanner(BaseScanner):
"""Detect sector momentum shifts and find laggards in accelerating sectors."""
name = "sector_rotation"
pipeline = "momentum"
strategy = "sector_rotation"
def __init__(self, config: Dict[str, Any]):
super().__init__(config)
self.ticker_file = self.scanner_config.get(
"ticker_file",
config.get("tickers_file", DEFAULT_TICKER_FILE),
)
self.max_tickers = self.scanner_config.get("max_tickers", 100)
self.min_sector_accel = self.scanner_config.get("min_sector_acceleration", 2.0)
def scan(self, state: Dict[str, Any]) -> List[Dict[str, Any]]:
if not self.is_enabled():
return []
logger.info("🔄 Scanning sector rotation...")
from tradingagents.dataflows.y_finance import download_history, get_ticker_info
# Step 1: Identify accelerating sectors
try:
etf_symbols = list(SECTOR_ETFS.keys())
etf_data = download_history(
etf_symbols, period="2mo", interval="1d", auto_adjust=True, progress=False
)
except Exception as e:
logger.error(f"Failed to download sector ETF data: {e}")
return []
if etf_data is None or etf_data.empty:
return []
accelerating_sectors = self._find_accelerating_sectors(etf_data)
if not accelerating_sectors:
logger.info("No accelerating sectors detected")
return []
sector_names = [SECTOR_ETFS.get(etf, etf) for etf in accelerating_sectors]
logger.info(f"Accelerating sectors: {', '.join(sector_names)}")
# Step 2: Find laggard stocks in those sectors
tickers = _load_tickers_from_file(self.ticker_file)
if not tickers:
return []
tickers = tickers[: self.max_tickers]
candidates = []
for ticker in tickers:
result = self._check_sector_laggard(ticker, accelerating_sectors, get_ticker_info)
if result:
candidates.append(result)
if len(candidates) >= self.limit:
break
logger.info(f"Sector rotation: {len(candidates)} candidates")
return candidates
def _find_accelerating_sectors(self, data: pd.DataFrame) -> List[str]:
"""Find sectors where 5-day return is accelerating vs 20-day trend."""
accelerating = []
for etf in SECTOR_ETFS:
try:
if isinstance(data.columns, pd.MultiIndex):
if etf not in data.columns.get_level_values(1):
continue
close = data.xs(etf, axis=1, level=1)["Close"].dropna()
else:
close = data["Close"].dropna()
if len(close) < 21:
continue
ret_5d = (float(close.iloc[-1]) / float(close.iloc[-6]) - 1) * 100
ret_20d = (float(close.iloc[-1]) / float(close.iloc[-21]) - 1) * 100
# Acceleration: 5-day annualized return significantly beats 20-day
daily_rate_5d = ret_5d / 5
daily_rate_20d = ret_20d / 20
if daily_rate_20d != 0:
acceleration = daily_rate_5d / daily_rate_20d
elif daily_rate_5d > 0:
acceleration = 10.0 # Strong acceleration from flat
else:
acceleration = 0
if acceleration >= self.min_sector_accel and ret_5d > 0:
accelerating.append(etf)
logger.debug(
f"{etf} ({SECTOR_ETFS[etf]}): 5d={ret_5d:+.1f}%, "
f"20d={ret_20d:+.1f}%, accel={acceleration:.1f}x"
)
except Exception as e:
logger.debug(f"Error analyzing {etf}: {e}")
return accelerating
def _check_sector_laggard(
self, ticker: str, accelerating_sectors: List[str], get_info_fn
) -> Optional[Dict[str, Any]]:
"""Check if stock is in an accelerating sector but hasn't moved yet."""
try:
info = get_info_fn(ticker)
if not info:
return None
stock_sector = info.get("sector", "")
# Map stock sector to ETF
sector_to_etf = {v: k for k, v in SECTOR_ETFS.items()}
sector_etf = sector_to_etf.get(stock_sector)
if not sector_etf or sector_etf not in accelerating_sectors:
return None
# Check if stock is lagging its sector
from tradingagents.dataflows.y_finance import download_history
hist = download_history(
ticker, period="1mo", interval="1d", auto_adjust=True, progress=False
)
if hist is None or hist.empty or len(hist) < 6:
return None
# Handle MultiIndex
if isinstance(hist.columns, pd.MultiIndex):
tickers_in_data = hist.columns.get_level_values(1).unique()
target = ticker if ticker in tickers_in_data else tickers_in_data[0]
hist = hist.xs(target, level=1, axis=1)
close = hist["Close"] if "Close" in hist.columns else hist.iloc[:, 0]
ret_5d = (float(close.iloc[-1]) / float(close.iloc[-6]) - 1) * 100
# Stock is a laggard if it moved less than 2% while sector is accelerating
if ret_5d > 2.0:
return None # Already moved, not a laggard
context = (
f"Sector rotation: {stock_sector} sector accelerating, "
f"{ticker} lagging at {ret_5d:+.1f}% (5d)"
)
return {
"ticker": ticker,
"source": self.name,
"context": context,
"priority": Priority.MEDIUM.value,
"strategy": self.strategy,
"sector": stock_sector,
"sector_etf": sector_etf,
"stock_5d_return": round(ret_5d, 2),
}
except Exception as e:
logger.debug(f"Sector check failed for {ticker}: {e}")
return None
SCANNER_REGISTRY.register(SectorRotationScanner)

View File

@ -8,12 +8,29 @@ from tradingagents.utils.logger import get_logger
logger = get_logger(__name__)
# Catalyst keywords for priority classification
CATALYST_KEYWORDS = {
Priority.CRITICAL.value: [
"fda approval", "acquisition", "merger", "buyout", "takeover",
"breakthrough", "approved",
],
Priority.HIGH.value: [
"upgrade", "initiated", "beat", "surprise", "contract win",
"patent", "revenue growth", "guidance raise", "price target",
],
Priority.MEDIUM.value: [
"downgrade", "miss", "lawsuit", "investigation", "recall",
"warning", "delayed",
],
}
class SemanticNewsScanner(BaseScanner):
"""Scan news for early catalysts using semantic analysis."""
name = "semantic_news"
pipeline = "news"
strategy = "news_catalyst"
def __init__(self, config: Dict[str, Any]):
super().__init__(config)
@ -40,37 +57,49 @@ class SemanticNewsScanner(BaseScanner):
if not result or not isinstance(result, str):
return []
# Extract tickers mentioned in news
# Split into individual news items (lines or paragraphs)
import re
ticker_pattern = r"\b([A-Z]{2,5})\b|\$([A-Z]{2,5})"
matches = re.findall(ticker_pattern, result)
# Each news item typically starts with a headline or bullet point
news_lines = [line.strip() for line in result.split("\n") if line.strip()]
tickers = list(set([t[0] or t[1] for t in matches if t[0] or t[1]]))
stop_words = {
"NYSE",
"NASDAQ",
"CEO",
"CFO",
"IPO",
"ETF",
"USA",
"SEC",
"NEWS",
"STOCK",
"MARKET",
"NYSE", "NASDAQ", "CEO", "CFO", "IPO", "ETF", "USA",
"SEC", "NEWS", "STOCK", "MARKET", "GDP", "CPI", "FED",
"THE", "FOR", "AND", "ARE", "NOT", "BUT", "HAS", "WAS",
"INC", "LTD", "LLC", "EST", "PDT",
}
tickers = [t for t in tickers if t not in stop_words]
# Extract tickers from each line along with the headline context
ticker_headlines: dict = {} # ticker -> best headline
ticker_pattern = r"\$([A-Z]{2,5})\b|\b([A-Z]{2,5})\b"
for line in news_lines:
matches = re.findall(ticker_pattern, line)
for match in matches:
ticker = (match[0] or match[1]).upper()
if ticker in stop_words or len(ticker) < 2:
continue
# Keep the first (most relevant) headline per ticker
if ticker not in ticker_headlines:
# Clean headline: strip markdown/bullets
headline = re.sub(r"^[-*•]\s*", "", line).strip()[:150]
ticker_headlines[ticker] = headline
candidates = []
for ticker in tickers[: self.limit]:
for ticker, headline in list(ticker_headlines.items())[: self.limit]:
priority = self._classify_catalyst(headline)
context = f"News catalyst: {headline}" if headline else "Mentioned in recent market news"
candidates.append(
{
"ticker": ticker,
"source": self.name,
"context": "Mentioned in recent market news",
"priority": Priority.MEDIUM.value,
"strategy": "news_catalyst",
"context": context,
"priority": priority,
"strategy": self.strategy,
"news_headline": headline,
}
)
@ -81,5 +110,13 @@ class SemanticNewsScanner(BaseScanner):
logger.warning(f"⚠️ News scan failed: {e}")
return []
def _classify_catalyst(self, headline: str) -> str:
"""Classify news headline by catalyst type and return priority."""
headline_lower = headline.lower()
for priority, keywords in CATALYST_KEYWORDS.items():
if any(kw in headline_lower for kw in keywords):
return priority
return Priority.MEDIUM.value
SCANNER_REGISTRY.register(SemanticNewsScanner)

View File

@ -0,0 +1,177 @@
"""Technical breakout scanner — volume-confirmed price breakouts."""
from typing import Any, Dict, List, Optional
import pandas as pd
from tradingagents.dataflows.discovery.scanner_registry import SCANNER_REGISTRY, BaseScanner
from tradingagents.dataflows.discovery.utils import Priority
from tradingagents.utils.logger import get_logger
logger = get_logger(__name__)
DEFAULT_TICKER_FILE = "data/tickers.txt"
def _load_tickers_from_file(path: str) -> List[str]:
"""Load ticker symbols from a text file."""
try:
with open(path) as f:
tickers = [
line.strip().upper()
for line in f
if line.strip() and not line.strip().startswith("#")
]
if tickers:
logger.info(f"Breakout scanner: loaded {len(tickers)} tickers from {path}")
return tickers
except FileNotFoundError:
logger.warning(f"Ticker file not found: {path}")
except Exception as e:
logger.warning(f"Failed to load ticker file {path}: {e}")
return []
class TechnicalBreakoutScanner(BaseScanner):
"""Scan for volume-confirmed technical breakouts."""
name = "technical_breakout"
pipeline = "momentum"
strategy = "technical_breakout"
def __init__(self, config: Dict[str, Any]):
super().__init__(config)
self.ticker_file = self.scanner_config.get(
"ticker_file",
config.get("tickers_file", DEFAULT_TICKER_FILE),
)
self.max_tickers = self.scanner_config.get("max_tickers", 150)
self.min_volume_multiple = self.scanner_config.get("min_volume_multiple", 2.0)
self.lookback_days = self.scanner_config.get("lookback_days", 20)
def scan(self, state: Dict[str, Any]) -> List[Dict[str, Any]]:
if not self.is_enabled():
return []
logger.info("📈 Scanning for technical breakouts...")
tickers = _load_tickers_from_file(self.ticker_file)
if not tickers:
logger.warning("No tickers loaded for breakout scan")
return []
tickers = tickers[: self.max_tickers]
# Batch download OHLCV
from tradingagents.dataflows.y_finance import download_history
try:
data = download_history(
tickers,
period="3mo",
interval="1d",
auto_adjust=True,
progress=False,
)
except Exception as e:
logger.error(f"Batch download failed: {e}")
return []
if data is None or data.empty:
return []
candidates = []
for ticker in tickers:
result = self._check_breakout(ticker, data)
if result:
candidates.append(result)
if len(candidates) >= self.limit * 2:
break
candidates.sort(key=lambda c: c.get("volume_multiple", 0), reverse=True)
logger.info(f"Technical breakouts: {len(candidates)} candidates")
return candidates[: self.limit]
def _check_breakout(self, ticker: str, data: pd.DataFrame) -> Optional[Dict[str, Any]]:
"""Check if ticker has a volume-confirmed breakout."""
try:
# Extract single-ticker data from multi-ticker download
if isinstance(data.columns, pd.MultiIndex):
if ticker not in data.columns.get_level_values(1):
return None
df = data.xs(ticker, axis=1, level=1).dropna()
else:
df = data.dropna()
if len(df) < self.lookback_days + 5:
return None
close = df["Close"]
volume = df["Volume"]
high = df["High"]
latest_close = float(close.iloc[-1])
latest_vol = float(volume.iloc[-1])
# 20-day lookback resistance (excluding last day)
lookback_high = float(high.iloc[-(self.lookback_days + 1) : -1].max())
# Average volume over lookback period
avg_vol = float(volume.iloc[-(self.lookback_days + 1) : -1].mean())
if avg_vol <= 0:
return None
vol_multiple = latest_vol / avg_vol
# Breakout conditions:
# 1. Price closed above the lookback-period high
# 2. Volume is at least min_volume_multiple times average
is_breakout = (
latest_close > lookback_high and vol_multiple >= self.min_volume_multiple
)
if not is_breakout:
return None
# Check if near 52-week high for bonus
if len(df) >= 252:
high_52w = float(high.iloc[-252:].max())
else:
high_52w = float(high.max())
near_52w_high = latest_close >= high_52w * 0.95
# Priority
if vol_multiple >= 3.0 and near_52w_high:
priority = Priority.CRITICAL.value
elif vol_multiple >= 3.0 or near_52w_high:
priority = Priority.HIGH.value
else:
priority = Priority.MEDIUM.value
breakout_pct = ((latest_close - lookback_high) / lookback_high) * 100
context = (
f"Breakout: closed {breakout_pct:+.1f}% above {self.lookback_days}d high "
f"on {vol_multiple:.1f}x volume"
)
if near_52w_high:
context += " | Near 52-week high"
return {
"ticker": ticker,
"source": self.name,
"context": context,
"priority": priority,
"strategy": self.strategy,
"volume_multiple": round(vol_multiple, 2),
"breakout_pct": round(breakout_pct, 2),
"near_52w_high": near_52w_high,
}
except Exception as e:
logger.debug(f"Breakout check failed for {ticker}: {e}")
return None
SCANNER_REGISTRY.register(TechnicalBreakoutScanner)

View File

@ -15,6 +15,7 @@ class VolumeAccumulationScanner(BaseScanner):
name = "volume_accumulation"
pipeline = "momentum"
strategy = "early_accumulation"
def __init__(self, config: Dict[str, Any]):
super().__init__(config)
@ -39,15 +40,15 @@ class VolumeAccumulationScanner(BaseScanner):
logger.info("Found 0 volume accumulation candidates")
return []
candidates = []
raw_candidates = []
# Handle different result formats
if isinstance(result, str):
# Parse markdown/text result
candidates = self._parse_text_result(result)
raw_candidates = self._parse_text_result(result)
elif isinstance(result, list):
# Structured result
for item in result[: self.limit]:
for item in result[: self.limit * 2]:
ticker = item.get("ticker", "").upper()
if not ticker:
continue
@ -55,7 +56,7 @@ class VolumeAccumulationScanner(BaseScanner):
volume_ratio = item.get("volume_ratio", 0)
avg_volume = item.get("avg_volume", 0)
candidates.append(
raw_candidates.append(
{
"ticker": ticker,
"source": self.name,
@ -63,22 +64,32 @@ class VolumeAccumulationScanner(BaseScanner):
"priority": (
Priority.MEDIUM.value if volume_ratio < 3.0 else Priority.HIGH.value
),
"strategy": "volume_accumulation",
"strategy": self.strategy,
}
)
elif isinstance(result, dict):
# Dict with tickers list
for ticker in result.get("tickers", [])[: self.limit]:
candidates.append(
for ticker in result.get("tickers", [])[: self.limit * 2]:
raw_candidates.append(
{
"ticker": ticker.upper(),
"source": self.name,
"context": "Unusual volume accumulation",
"priority": Priority.MEDIUM.value,
"strategy": "volume_accumulation",
"strategy": self.strategy,
}
)
# Enrich with price-change context and filter distribution
candidates = []
for cand in raw_candidates:
cand = self._enrich_volume_candidate(cand["ticker"], cand)
if cand.get("volume_signal") == "distribution":
continue
candidates.append(cand)
if len(candidates) >= self.limit:
break
logger.info(f"Found {len(candidates)} volume accumulation candidates")
return candidates
@ -86,6 +97,65 @@ class VolumeAccumulationScanner(BaseScanner):
logger.warning(f"⚠️ Volume accumulation failed: {e}")
return []
def _enrich_volume_candidate(self, ticker: str, cand: Dict[str, Any]) -> Dict[str, Any]:
"""Add price-change context to distinguish accumulation from distribution."""
try:
from tradingagents.dataflows.y_finance import download_history
hist = download_history(
ticker, period="10d", interval="1d", auto_adjust=True, progress=False
)
if hist is None or hist.empty or len(hist) < 2:
return cand
# Handle MultiIndex from yfinance
if isinstance(hist.columns, __import__("pandas").MultiIndex):
tickers = hist.columns.get_level_values(1).unique()
target = ticker if ticker in tickers else tickers[0]
hist = hist.xs(target, level=1, axis=1)
# Today's price change
latest_close = float(hist["Close"].iloc[-1])
prev_close = float(hist["Close"].iloc[-2])
if prev_close == 0:
return cand
day_change_pct = ((latest_close - prev_close) / prev_close) * 100
cand["day_change_pct"] = round(day_change_pct, 2)
# Multi-day volume pattern: count days with >1.5x avg volume in last 5 days
if len(hist) >= 6:
avg_vol = float(hist["Volume"].iloc[:-5].mean()) if len(hist) > 5 else float(
hist["Volume"].mean()
)
if avg_vol > 0:
recent_high_vol_days = sum(
1 for v in hist["Volume"].iloc[-5:] if float(v) > avg_vol * 1.5
)
cand["high_vol_days_5d"] = recent_high_vol_days
if recent_high_vol_days >= 3:
cand["context"] += (
f" | Sustained: {recent_high_vol_days}/5 days above 1.5x avg"
)
# Classify signal
if abs(day_change_pct) < 3:
cand["volume_signal"] = "accumulation"
cand["context"] += f" | Price flat ({day_change_pct:+.1f}%) — quiet accumulation"
elif day_change_pct < -5:
cand["volume_signal"] = "distribution"
cand["priority"] = Priority.LOW.value
cand["context"] += (
f" | Price dropped {day_change_pct:+.1f}% — possible distribution"
)
else:
cand["volume_signal"] = "momentum"
except Exception as e:
logger.debug(f"Volume enrichment failed for {ticker}: {e}")
return cand
def _parse_text_result(self, text: str) -> List[Dict[str, Any]]:
"""Parse tickers from text result."""
from tradingagents.dataflows.discovery.common_utils import extract_tickers_from_text
@ -100,7 +170,7 @@ class VolumeAccumulationScanner(BaseScanner):
"source": self.name,
"context": "Unusual volume detected",
"priority": Priority.MEDIUM.value,
"strategy": "volume_accumulation",
"strategy": self.strategy,
}
)

View File

@ -45,6 +45,12 @@ class Strategy(str, Enum):
CONTRARIAN_VALUE = "contrarian_value"
MOMENTUM_CHASE = "momentum_chase"
SOCIAL_HYPE = "social_hype"
INSIDER_BUYING = "insider_buying"
OPTIONS_FLOW = "options_flow"
ML_SIGNAL = "ml_signal"
SOCIAL_DD = "social_dd"
SECTOR_ROTATION = "sector_rotation"
TECHNICAL_BREAKOUT = "technical_breakout"
PRIORITY_ORDER = {