fix(insider-buying): preserve transaction details, add cluster detection and smart priority
- Call get_finviz_insider_buying with return_structured=True and deduplicate=False to get all raw transaction dicts instead of parsing markdown - Group transactions by ticker for cluster detection (2+ unique insiders = CRITICAL) - Smart priority: CEO/CFO + >$100K = CRITICAL, director + >$50K = HIGH, etc. - Preserve insider_name, insider_title, transaction_value, num_insiders_buying in output - Rich context strings: "CEO John Smith purchased $250K of AAPL shares" - Update finviz_scraper alias to pass through return_structured and deduplicate params Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
parent
2b74d298da
commit
573b756b4b
|
|
@ -14,6 +14,7 @@ class InsiderBuyingScanner(BaseScanner):
|
|||
|
||||
name = "insider_buying"
|
||||
pipeline = "edge"
|
||||
strategy = "insider_buying"
|
||||
|
||||
def __init__(self, config: Dict[str, Any]):
|
||||
super().__init__(config)
|
||||
|
|
@ -24,69 +25,94 @@ class InsiderBuyingScanner(BaseScanner):
|
|||
if not self.is_enabled():
|
||||
return []
|
||||
|
||||
logger.info(f"💼 Scanning insider buying (last {self.lookback_days} days)...")
|
||||
logger.info("Scanning insider buying (OpenInsider)...")
|
||||
|
||||
try:
|
||||
# Use Finviz insider buying screener
|
||||
from tradingagents.dataflows.finviz_scraper import get_finviz_insider_buying
|
||||
|
||||
result = get_finviz_insider_buying(
|
||||
transaction_type="buy",
|
||||
transactions = get_finviz_insider_buying(
|
||||
lookback_days=self.lookback_days,
|
||||
min_value=self.min_transaction_value,
|
||||
top_n=self.limit,
|
||||
return_structured=True,
|
||||
deduplicate=False,
|
||||
)
|
||||
|
||||
if not result or not isinstance(result, str):
|
||||
logger.info("Found 0 insider purchases")
|
||||
if not transactions:
|
||||
logger.info("No insider buying transactions found")
|
||||
return []
|
||||
|
||||
# Parse the markdown result
|
||||
logger.info(f"Found {len(transactions)} insider transactions")
|
||||
|
||||
# Group by ticker for cluster detection
|
||||
by_ticker: Dict[str, list] = {}
|
||||
for txn in transactions:
|
||||
ticker = txn.get("ticker", "").upper().strip()
|
||||
if not ticker:
|
||||
continue
|
||||
by_ticker.setdefault(ticker, []).append(txn)
|
||||
|
||||
candidates = []
|
||||
seen_tickers = set()
|
||||
for ticker, txns in by_ticker.items():
|
||||
# Use the largest transaction as primary
|
||||
txns.sort(key=lambda t: t.get("value_num", 0), reverse=True)
|
||||
primary = txns[0]
|
||||
|
||||
# Extract tickers from markdown table
|
||||
import re
|
||||
insider_name = primary.get("insider", "Unknown")
|
||||
title = primary.get("title", "")
|
||||
value = primary.get("value_num", 0)
|
||||
value_str = primary.get("value_str", f"${value:,.0f}")
|
||||
num_insiders = len(set(t.get("insider", "") for t in txns))
|
||||
|
||||
lines = result.split("\n")
|
||||
for line in lines:
|
||||
if "|" not in line or "Ticker" in line or "---" in line:
|
||||
continue
|
||||
# Priority by significance
|
||||
title_lower = title.lower()
|
||||
is_c_suite = any(
|
||||
t in title_lower
|
||||
for t in ["ceo", "cfo", "coo", "cto", "president", "chairman"]
|
||||
)
|
||||
is_director = "director" in title_lower
|
||||
|
||||
parts = [p.strip() for p in line.split("|")]
|
||||
if len(parts) < 3:
|
||||
continue
|
||||
if num_insiders >= 2:
|
||||
priority = Priority.CRITICAL.value
|
||||
elif is_c_suite and value >= 100_000:
|
||||
priority = Priority.CRITICAL.value
|
||||
elif is_c_suite or (is_director and value >= 50_000):
|
||||
priority = Priority.HIGH.value
|
||||
elif value >= 50_000:
|
||||
priority = Priority.HIGH.value
|
||||
else:
|
||||
priority = Priority.MEDIUM.value
|
||||
|
||||
ticker = parts[1] if len(parts) > 1 else ""
|
||||
ticker = ticker.strip().upper()
|
||||
|
||||
if not ticker or ticker in seen_tickers:
|
||||
continue
|
||||
|
||||
# Validate ticker format
|
||||
if not re.match(r"^[A-Z]{1,5}$", ticker):
|
||||
continue
|
||||
|
||||
seen_tickers.add(ticker)
|
||||
# Build context
|
||||
if num_insiders > 1:
|
||||
context = (
|
||||
f"Cluster: {num_insiders} insiders buying {ticker}. "
|
||||
f"Largest: {title} {insider_name} purchased {value_str}"
|
||||
)
|
||||
else:
|
||||
context = f"{title} {insider_name} purchased {value_str} of {ticker}"
|
||||
|
||||
candidates.append(
|
||||
{
|
||||
"ticker": ticker,
|
||||
"source": self.name,
|
||||
"context": "Insider purchase detected (Finviz)",
|
||||
"priority": Priority.HIGH.value,
|
||||
"strategy": "insider_buying",
|
||||
"context": context,
|
||||
"priority": priority,
|
||||
"strategy": self.strategy,
|
||||
"insider_name": insider_name,
|
||||
"insider_title": title,
|
||||
"transaction_value": value,
|
||||
"num_insiders_buying": num_insiders,
|
||||
}
|
||||
)
|
||||
|
||||
if len(candidates) >= self.limit:
|
||||
break
|
||||
|
||||
logger.info(f"Found {len(candidates)} insider purchases")
|
||||
logger.info(f"Insider buying: {len(candidates)} candidates")
|
||||
return candidates
|
||||
|
||||
except Exception as e:
|
||||
logger.warning(f"⚠️ Insider buying failed: {e}")
|
||||
logger.error(f"Insider buying scan failed: {e}", exc_info=True)
|
||||
return []
|
||||
|
||||
|
||||
|
|
|
|||
|
|
@ -251,6 +251,7 @@ def get_insider_buying_screener(
|
|||
min_value: Annotated[int, "Minimum transaction value in dollars"] = 25000,
|
||||
top_n: Annotated[int, "Number of top results to return"] = 20,
|
||||
return_structured: Annotated[bool, "Return list of dicts instead of markdown"] = False,
|
||||
deduplicate: Annotated[bool, "If False, return all transactions without deduplication"] = True,
|
||||
):
|
||||
"""
|
||||
Discover stocks with recent insider buying/selling using OpenInsider.
|
||||
|
|
@ -389,6 +390,13 @@ def get_insider_buying_screener(
|
|||
# Sort by value (largest first)
|
||||
transactions.sort(key=lambda x: x["value_num"], reverse=True)
|
||||
|
||||
# Return all transactions without deduplication if requested
|
||||
if return_structured and not deduplicate:
|
||||
logger.info(
|
||||
f"Returning all {len(transactions)} {filter_desc} transactions (no dedup)"
|
||||
)
|
||||
return transactions
|
||||
|
||||
# Deduplicate by ticker, keeping the largest transaction per ticker
|
||||
seen_tickers = set()
|
||||
unique_transactions = []
|
||||
|
|
@ -442,11 +450,25 @@ def get_finviz_insider_buying(
|
|||
lookback_days: int = 7,
|
||||
min_value: int = 25000,
|
||||
top_n: int = 20,
|
||||
) -> str:
|
||||
"""Alias for get_insider_buying_screener to match registry naming convention"""
|
||||
return_structured: bool = False,
|
||||
deduplicate: bool = True,
|
||||
):
|
||||
"""Alias for get_insider_buying_screener to match registry naming convention.
|
||||
|
||||
Args:
|
||||
transaction_type: "buy" for purchases, "sell" for sales
|
||||
lookback_days: Days to look back (default 7)
|
||||
min_value: Minimum transaction value in dollars
|
||||
top_n: Number of top results to return
|
||||
return_structured: If True, returns list of dicts instead of markdown
|
||||
deduplicate: If False and return_structured=True, returns all transactions
|
||||
(not deduplicated by ticker). Useful for cluster detection.
|
||||
"""
|
||||
return get_insider_buying_screener(
|
||||
transaction_type=transaction_type,
|
||||
lookback_days=lookback_days,
|
||||
min_value=min_value,
|
||||
top_n=top_n,
|
||||
return_structured=return_structured,
|
||||
deduplicate=deduplicate,
|
||||
)
|
||||
|
|
|
|||
Loading…
Reference in New Issue