TradingAgents/tradingagents/dataflows/discovery/scanners.py

766 lines
30 KiB
Python

from dataclasses import dataclass
from typing import Any, Callable, Dict, List, Optional
from langchain_core.messages import HumanMessage
from tradingagents.dataflows.discovery.utils import (
Priority,
append_llm_log,
is_valid_ticker,
resolve_llm_name,
resolve_trade_date,
resolve_trade_date_str,
)
from tradingagents.schemas import RedditTickerList
from tradingagents.utils.logger import get_logger
logger = get_logger(__name__)
@dataclass
class ScannerSpec:
name: str
handler: Callable[[Dict[str, Any]], List[Dict[str, Any]]]
default_priority: str = Priority.UNKNOWN.value
enabled_key: Optional[str] = None
class TraditionalScanner:
"""
Handles traditional market scanning strategies (Reddit, technicals, earnings, market moves).
"""
def __init__(self, config: Dict[str, Any], llm: Any, tool_executor: Callable):
"""
Initialize the scanner.
Args:
config: Configuration dictionary
llm: Quick thinking LLM for extracting tickers from text
tool_executor: Callback function to execute tools with logging
"""
self.config = config
self.llm = llm
self.execute_tool = tool_executor
# Extract limits
discovery_config = config.get("discovery", {})
self.discovery_config = discovery_config
self.reddit_trending_limit = discovery_config.get("reddit_trending_limit", 15)
self.market_movers_limit = discovery_config.get("market_movers_limit", 10)
self.max_earnings_candidates = discovery_config.get("max_earnings_candidates", 50)
self.max_days_until_earnings = discovery_config.get("max_days_until_earnings", 7)
self.min_market_cap = discovery_config.get("min_market_cap", 0)
self.scanner_registry = self._build_scanner_registry()
def scan(self, state: Dict[str, Any]) -> List[Dict[str, Any]]:
"""Run all traditional scanner sources and return candidates."""
candidates: List[Dict[str, Any]] = []
for spec in self.scanner_registry:
if not self._scanner_enabled(spec):
continue
results = self._safe_scan(spec, state)
if not results:
continue
for item in results:
if not item.get("priority"):
item["priority"] = spec.default_priority
if not item.get("source"):
item["source"] = spec.name
candidates.extend(results)
return self._batch_validate(state, candidates)
def _build_scanner_registry(self) -> List[ScannerSpec]:
return [
ScannerSpec(
name="reddit",
handler=self._scan_reddit,
default_priority=Priority.LOW.value,
enabled_key="enable_scanner_reddit",
),
ScannerSpec(
name="market_movers",
handler=self._scan_market_movers,
default_priority=Priority.LOW.value,
enabled_key="enable_scanner_market_movers",
),
ScannerSpec(
name="earnings",
handler=self._scan_earnings,
default_priority=Priority.MEDIUM.value,
enabled_key="enable_scanner_earnings",
),
ScannerSpec(
name="ipo",
handler=self._scan_ipo,
default_priority=Priority.MEDIUM.value,
enabled_key="enable_scanner_ipo",
),
ScannerSpec(
name="short_interest",
handler=self._scan_short_interest,
default_priority=Priority.MEDIUM.value,
enabled_key="enable_scanner_short_interest",
),
ScannerSpec(
name="unusual_volume",
handler=self._scan_unusual_volume,
default_priority=Priority.HIGH.value,
enabled_key="enable_scanner_unusual_volume",
),
ScannerSpec(
name="analyst_ratings",
handler=self._scan_analyst_ratings,
default_priority=Priority.MEDIUM.value,
enabled_key="enable_scanner_analyst_ratings",
),
ScannerSpec(
name="insider_buying",
handler=self._scan_insider_buying,
default_priority=Priority.HIGH.value,
enabled_key="enable_scanner_insider_buying",
),
]
def _scanner_enabled(self, spec: ScannerSpec) -> bool:
if not spec.enabled_key:
return True
return bool(self.discovery_config.get(spec.enabled_key, True))
def _safe_scan(self, spec: ScannerSpec, state: Dict[str, Any]) -> List[Dict[str, Any]]:
try:
return spec.handler(state)
except Exception as e:
logger.error(f"Error running scanner '{spec.name}': {e}")
return []
def _run_tool(
self,
state: Dict[str, Any],
step: str,
tool_name: str,
default: Any = None,
**params: Any,
) -> Any:
try:
return self.execute_tool(
state,
node="scanner",
step=step,
tool_name=tool_name,
**params,
)
except Exception as e:
logger.error(f"Error during {step}: {e}")
return default
def _run_call(
self,
label: str,
func: Callable,
default: Any = None,
**kwargs: Any,
) -> Any:
try:
return func(**kwargs)
except Exception as e:
logger.error(f"Error {label}: {e}")
return default
def _scan_reddit(self, state: Dict[str, Any]) -> List[Dict[str, Any]]:
"""Fetch Reddit sources and extract tickers in a single LLM pass."""
candidates: List[Dict[str, Any]] = []
reddit_trending_report = None
reddit_dd_report = None
# 1a. Get Reddit Trending (Social Sentiment)
reddit_trending_report = self._run_tool(
state,
step="Get Reddit trending tickers",
tool_name="get_trending_tickers",
limit=self.reddit_trending_limit,
)
# 1b. Get Undiscovered Reddit DD (LEADING INDICATOR)
try:
from tradingagents.dataflows.reddit_api import get_reddit_undiscovered_dd
logger.info("🔍 Scanning Reddit for undiscovered DD...")
# Note: get_reddit_undiscovered_dd is not a tool in strict sense but a direct function call
# that uses an LLM. We call it directly here as in original code.
reddit_dd_report = self._run_call(
"fetching undiscovered DD",
get_reddit_undiscovered_dd,
lookback_hours=24,
scan_limit=100,
top_n=15,
llm_evaluator=self.llm, # Use fast LLM for evaluation
)
except Exception as e:
logger.error(f"Error fetching undiscovered DD: {e}")
# BATCHED LLM CALL: Extract tickers from both Reddit sources in ONE call
# Uses proper Pydantic structured output for clean, validated results
if reddit_trending_report or reddit_dd_report:
try:
combined_prompt = """Extract stock tickers from these Reddit reports.
IMPORTANT RULES:
1. Only extract valid US stock tickers (1-5 uppercase letters, e.g., AAPL, NVDA, TSLA)
2. Do NOT include crypto (BTC, ETH), indices (SPY, QQQ), or gibberish
3. Classify each as 'trending' (social mentions) or 'dd' (due diligence research)
4. Set confidence to 'low' if you're unsure it's a real stock ticker
"""
if reddit_trending_report:
combined_prompt += f"""=== REDDIT TRENDING TICKERS ===
{reddit_trending_report}
"""
if reddit_dd_report:
combined_prompt += f"""=== REDDIT UNDISCOVERED DD ===
{reddit_dd_report}
"""
combined_prompt += (
"""Extract ALL mentioned stock tickers with their source and context."""
)
# Use proper Pydantic structured output (not raw JSON schema)
structured_llm = self.llm.with_structured_output(RedditTickerList)
response: RedditTickerList = structured_llm.invoke(
[HumanMessage(content=combined_prompt)]
)
tool_logs = state.get("tool_logs", [])
append_llm_log(
tool_logs,
node="scanner",
step="Extract Reddit tickers",
model=resolve_llm_name(self.llm),
prompt=combined_prompt,
output=response.model_dump() if hasattr(response, "model_dump") else response,
)
state["tool_logs"] = tool_logs
trending_count = 0
dd_count = 0
skipped_low_confidence = 0
for extracted in response.tickers:
ticker = extracted.ticker.upper().strip()
source_type = extracted.source
context = extracted.context
confidence = extracted.confidence
# Skip low-confidence extractions (likely gibberish or crypto)
if confidence == "low":
skipped_low_confidence += 1
continue
if is_valid_ticker(ticker):
if source_type == "dd":
candidates.append(
{
"ticker": ticker,
"source": "reddit_dd_undiscovered",
"context": f"💎 Undiscovered DD: {context}",
"priority": "high", # LEADING - quality DD before hype
}
)
dd_count += 1
else:
candidates.append(
{
"ticker": ticker,
"source": "social_trending",
"context": context,
"priority": "low", # LAGGING - already trending
}
)
trending_count += 1
logger.info(
f"Found {trending_count} trending + {dd_count} DD tickers from Reddit "
f"(skipped {skipped_low_confidence} low-confidence)"
)
except Exception as e:
tool_logs = state.get("tool_logs", [])
append_llm_log(
tool_logs,
node="scanner",
step="Extract Reddit tickers",
model=resolve_llm_name(self.llm),
prompt=combined_prompt,
output="",
error=str(e),
)
state["tool_logs"] = tool_logs
logger.error(f"Error extracting Reddit tickers: {e}")
return candidates
def _scan_market_movers(self, state: Dict[str, Any]) -> List[Dict[str, Any]]:
"""Fetch top gainers and losers."""
candidates: List[Dict[str, Any]] = []
from tradingagents.dataflows.alpha_vantage_stock import get_top_gainers_losers
logger.info("📊 Fetching market movers (direct parsing)...")
movers_data = self._run_call(
"fetching market movers",
get_top_gainers_losers,
limit=self.market_movers_limit,
return_structured=True,
)
if isinstance(movers_data, dict) and not movers_data.get("error"):
movers_count = 0
# Process gainers
for item in movers_data.get("gainers", []):
ticker_raw = item.get("ticker") or ""
ticker = ticker_raw.upper().strip() if ticker_raw else ""
if is_valid_ticker(ticker):
change_pct = item.get("change_percentage") or "N/A"
candidates.append(
{
"ticker": ticker,
"source": "gainer",
"context": f"Top gainer: {change_pct} change",
"priority": "low", # LAGGING - already moved
}
)
movers_count += 1
# Process losers
for item in movers_data.get("losers", []):
ticker_raw = item.get("ticker") or ""
ticker = ticker_raw.upper().strip() if ticker_raw else ""
if is_valid_ticker(ticker):
change_pct = item.get("change_percentage") or "N/A"
candidates.append(
{
"ticker": ticker,
"source": "loser",
"context": f"Top loser: {change_pct} change",
"priority": "medium", # Potential bounce play
}
)
movers_count += 1
logger.info(f"Found {movers_count} market movers (direct)")
else:
logger.warning("Market movers returned error or empty")
return candidates
def _scan_earnings(self, state: Dict[str, Any]) -> List[Dict[str, Any]]:
"""Fetch earnings calendar and pre-earnings accumulation signals."""
candidates: List[Dict[str, Any]] = []
from datetime import timedelta
from tradingagents.dataflows.finnhub_api import get_earnings_calendar
from tradingagents.dataflows.y_finance import get_pre_earnings_accumulation_signal
today = resolve_trade_date(state)
from_date = today.strftime("%Y-%m-%d")
to_date = (today + timedelta(days=self.max_days_until_earnings)).strftime("%Y-%m-%d")
logger.info(f"📅 Fetching earnings calendar (next {self.max_days_until_earnings} days)...")
earnings_data = self._run_call(
"fetching earnings calendar",
get_earnings_calendar,
from_date=from_date,
to_date=to_date,
return_structured=True,
)
if isinstance(earnings_data, list):
# First pass: collect all candidates with metadata
earnings_candidates = []
for entry in earnings_data:
symbol = entry.get("symbol") or ""
ticker = symbol.upper().strip() if symbol else ""
if not is_valid_ticker(ticker):
continue
# Calculate days until earnings
earnings_date_str = entry.get("date")
days_until = None
if earnings_date_str:
try:
earnings_date = datetime.strptime(earnings_date_str, "%Y-%m-%d")
days_until = (earnings_date - today).days
except Exception:
pass
# Build context from structured data
eps_est = entry.get("epsEstimate")
date = earnings_date_str or "upcoming"
hour = entry.get("hour") or ""
context = f"Earnings {date}"
if hour:
context += f" ({hour})"
if eps_est is not None:
context += (
f", EPS est: ${eps_est:.2f}"
if isinstance(eps_est, (int, float))
else f", EPS est: {eps_est}"
)
# Check for pre-earnings accumulation (LEADING indicator)
has_accumulation = False
accumulation_data = None
accumulation = self._run_call(
"checking pre-earnings accumulation",
get_pre_earnings_accumulation_signal,
ticker=ticker,
lookback_days=10,
)
if isinstance(accumulation, dict) and accumulation.get("signal"):
has_accumulation = True
accumulation_data = accumulation
earnings_candidates.append(
{
"ticker": ticker,
"context": context,
"days_until": days_until if days_until is not None else 999,
"has_accumulation": has_accumulation,
"accumulation_data": accumulation_data,
}
)
# Sort by priority: accumulation first, then by proximity to earnings
earnings_candidates.sort(
key=lambda x: (
0 if x["has_accumulation"] else 1, # Accumulation first
x["days_until"], # Then by proximity
)
)
# Apply hard cap
earnings_candidates = earnings_candidates[: self.max_earnings_candidates]
# Add to main candidates list
for ec in earnings_candidates:
if ec["has_accumulation"]:
enhanced_context = (
f"{ec['context']} | "
f"🔥 PRE-EARNINGS ACCUMULATION: "
f"Vol {ec['accumulation_data']['volume_ratio']}x avg, "
f"Price {ec['accumulation_data']['price_change_pct']:+.1f}%"
)
candidates.append(
{
"ticker": ec["ticker"],
"source": "earnings_accumulation",
"context": enhanced_context,
"priority": "high",
}
)
else:
candidates.append(
{
"ticker": ec["ticker"],
"source": "earnings_catalyst",
"context": ec["context"],
"priority": "medium",
}
)
logger.info(
f"Found {len(earnings_candidates)} earnings candidates (filtered from {len(earnings_data)} total, cap: {self.max_earnings_candidates})"
)
return candidates
def _scan_ipo(self, state: Dict[str, Any]) -> List[Dict[str, Any]]:
"""Fetch IPO calendar."""
candidates: List[Dict[str, Any]] = []
from datetime import timedelta
from tradingagents.dataflows.finnhub_api import get_ipo_calendar
today = resolve_trade_date(state)
from_date = (today - timedelta(days=7)).strftime("%Y-%m-%d")
to_date = (today + timedelta(days=14)).strftime("%Y-%m-%d")
logger.info("🆕 Fetching IPO calendar (direct parsing)...")
ipo_data = self._run_call(
"fetching IPO calendar",
get_ipo_calendar,
from_date=from_date,
to_date=to_date,
return_structured=True,
)
if isinstance(ipo_data, list):
ipo_count = 0
for entry in ipo_data:
symbol = entry.get("symbol") or ""
ticker = symbol.upper().strip() if symbol else ""
if ticker and is_valid_ticker(ticker):
name = entry.get("name") or ""
date = entry.get("date", "upcoming")
price = entry.get("price")
context = f"IPO {date}: {name}"
if price:
context += f" @ ${price}"
candidates.append(
{
"ticker": ticker,
"source": "ipo_listing",
"context": context,
"priority": "medium",
"allow_invalid": True, # IPOs may not be listed yet
}
)
ipo_count += 1
logger.info(f"Found {ipo_count} IPO candidates (direct)")
return candidates
def _scan_short_interest(self, state: Dict[str, Any]) -> List[Dict[str, Any]]:
"""Fetch short interest for squeeze candidates."""
candidates: List[Dict[str, Any]] = []
from tradingagents.dataflows.finviz_scraper import get_short_interest
logger.info("🩳 Fetching short interest (direct parsing)...")
short_data = self._run_call(
"fetching short interest",
get_short_interest,
min_short_interest_pct=15.0,
min_days_to_cover=5.0,
top_n=15,
return_structured=True,
)
if isinstance(short_data, list):
short_count = 0
for entry in short_data:
ticker_raw = entry.get("ticker") or ""
ticker = ticker_raw.upper().strip() if ticker_raw else ""
if is_valid_ticker(ticker):
short_pct = entry.get("short_interest_pct") or 0
signal = entry.get("signal") or "squeeze_potential"
context = f"Short interest: {short_pct:.1f}%, Signal: {signal}"
candidates.append(
{
"ticker": ticker,
"source": "short_squeeze",
"context": context,
"priority": "medium",
}
)
short_count += 1
logger.info(f"Found {short_count} short squeeze candidates (direct)")
return candidates
def _scan_unusual_volume(self, state: Dict[str, Any]) -> List[Dict[str, Any]]:
"""Fetch unusual volume (accumulation signal)."""
candidates: List[Dict[str, Any]] = []
from tradingagents.dataflows.alpha_vantage_volume import get_unusual_volume
today = resolve_trade_date_str(state)
logger.info("📈 Fetching unusual volume (direct parsing)...")
volume_data = self._run_call(
"fetching unusual volume",
get_unusual_volume,
date=today,
min_volume_multiple=2.0,
max_price_change=5.0,
top_n=15,
max_tickers_to_scan=3000,
use_cache=True,
return_structured=True,
)
if isinstance(volume_data, list):
volume_count = 0
for entry in volume_data:
ticker_raw = entry.get("ticker") or ""
ticker = ticker_raw.upper().strip() if ticker_raw else ""
if is_valid_ticker(ticker):
vol_ratio = entry.get("volume_ratio") or 0
price_change = entry.get("price_change_pct") or 0
intraday_change = entry.get("intraday_change_pct") or 0
direction = entry.get("direction") or "neutral"
signal = entry.get("signal") or "accumulation"
# Build context with direction info
direction_emoji = "🟢" if direction == "bullish" else ""
context = f"Volume: {vol_ratio}x avg, Price: {price_change:+.1f}%, "
context += (
f"Intraday: {intraday_change:+.1f}% {direction_emoji}, Signal: {signal}"
)
# Strong accumulation gets highest priority
priority = "critical" if signal == "strong_accumulation" else "high"
candidates.append(
{
"ticker": ticker,
"source": "unusual_volume",
"context": context,
"priority": priority, # LEADING INDICATOR
}
)
volume_count += 1
logger.info(
f"Found {volume_count} unusual volume candidates (direct, distribution filtered)"
)
return candidates
def _scan_analyst_ratings(self, state: Dict[str, Any]) -> List[Dict[str, Any]]:
"""Fetch analyst rating changes."""
candidates: List[Dict[str, Any]] = []
from tradingagents.dataflows.alpha_vantage_analysts import get_analyst_rating_changes
from tradingagents.dataflows.y_finance import check_if_price_reacted
logger.info("📊 Fetching analyst rating changes (direct parsing)...")
analyst_data = self._run_call(
"fetching analyst rating changes",
get_analyst_rating_changes,
lookback_days=7,
change_types=["upgrade", "initiated"],
top_n=15,
return_structured=True,
)
if isinstance(analyst_data, list):
analyst_count = 0
for entry in analyst_data:
ticker_raw = entry.get("ticker") or ""
ticker = ticker_raw.upper().strip() if ticker_raw else ""
if is_valid_ticker(ticker):
action = entry.get("action") or "rating_change"
source = entry.get("source") or "Unknown"
hours_old = entry.get("hours_old") or 0
freshness = (
"🔥 FRESH" if hours_old < 24 else "🟢 Recent" if hours_old < 72 else "Older"
)
context = f"{action.upper()} from {source} ({freshness}, {hours_old}h ago)"
# Check if prices already reacted
try:
reaction = check_if_price_reacted(
ticker, lookback_days=3, reaction_threshold=10.0
)
if reaction["status"] == "leading":
context += f" | 💎 EARLY: Price {reaction['price_change_pct']:+.1f}%"
priority = "high"
elif reaction["status"] == "lagging":
context += (
f" | ⚠️ LATE: Already moved {reaction['price_change_pct']:+.1f}%"
)
priority = "low"
else:
priority = "medium"
except Exception:
priority = "medium"
candidates.append(
{
"ticker": ticker,
"source": "analyst_upgrade",
"context": context,
"priority": priority,
}
)
analyst_count += 1
logger.info(f"Found {analyst_count} analyst upgrade candidates (direct)")
return candidates
def _scan_insider_buying(self, state: Dict[str, Any]) -> List[Dict[str, Any]]:
"""Fetch insider buying screen."""
candidates: List[Dict[str, Any]] = []
from tradingagents.dataflows.finviz_scraper import get_insider_buying_screener
logger.info("💰 Fetching insider buying (direct parsing)...")
insider_data = self._run_call(
"fetching insider buying",
get_insider_buying_screener,
transaction_type="buy",
lookback_days=2,
min_value=50000,
top_n=15,
return_structured=True,
)
if isinstance(insider_data, list):
insider_count = 0
for entry in insider_data:
ticker_raw = entry.get("ticker") or ""
ticker = ticker_raw.upper().strip() if ticker_raw else ""
if is_valid_ticker(ticker):
company = (entry.get("company") or "")[:30]
insider = (entry.get("insider") or "")[:20]
title = entry.get("title") or ""
value = entry.get("value_str") or ""
context = f"💰 Insider Buying: {insider} ({title}) bought {value}"
if company:
context = f"{company} - {context}"
candidates.append(
{
"ticker": ticker,
"source": "insider_buying",
"context": context,
"priority": "high", # LEADING - insiders know before market
}
)
insider_count += 1
logger.info(f"Found {insider_count} insider buying candidates (direct)")
return candidates
def _batch_validate(
self, state: Dict[str, Any], candidates: List[Dict[str, Any]]
) -> List[Dict[str, Any]]:
"""Batch validate tickers (keep IPOs even if not yet listed)."""
if not candidates:
return candidates
try:
validation = self.execute_tool(
state,
node="scanner",
step="Batch validate tickers",
tool_name="validate_tickers_batch",
symbols=list({c.get("ticker", "") for c in candidates}),
)
if isinstance(validation, dict) and not validation.get("error"):
valid_set = {t.upper() for t in validation.get("valid", [])}
invalid_list = validation.get("invalid", [])
if valid_set or len(invalid_list) < len(candidates):
before_count = len(candidates)
candidates = [
c
for c in candidates
if c.get("allow_invalid") or c.get("ticker", "").upper() in valid_set
]
removed = before_count - len(candidates)
if removed:
logger.info(f"Removed {removed} invalid tickers after batch validation.")
else:
logger.warning("Batch validation returned no valid tickers; skipping filter.")
except Exception as e:
logger.error(f"Error during batch validation: {e}")
return candidates