From 573b756b4b9351e7fdb35570ef4c3df6dcda030c Mon Sep 17 00:00:00 2001 From: Youssef Aitousarrah Date: Wed, 18 Feb 2026 08:15:32 -0800 Subject: [PATCH] fix(insider-buying): preserve transaction details, add cluster detection and smart priority - Call get_finviz_insider_buying with return_structured=True and deduplicate=False to get all raw transaction dicts instead of parsing markdown - Group transactions by ticker for cluster detection (2+ unique insiders = CRITICAL) - Smart priority: CEO/CFO + >$100K = CRITICAL, director + >$50K = HIGH, etc. - Preserve insider_name, insider_title, transaction_value, num_insiders_buying in output - Rich context strings: "CEO John Smith purchased $250K of AAPL shares" - Update finviz_scraper alias to pass through return_structured and deduplicate params Co-Authored-By: Claude Opus 4.6 --- .../discovery/scanners/insider_buying.py | 94 ++++++++++++------- tradingagents/dataflows/finviz_scraper.py | 26 ++++- 2 files changed, 84 insertions(+), 36 deletions(-) diff --git a/tradingagents/dataflows/discovery/scanners/insider_buying.py b/tradingagents/dataflows/discovery/scanners/insider_buying.py index 000bbb82..21eecd9d 100644 --- a/tradingagents/dataflows/discovery/scanners/insider_buying.py +++ b/tradingagents/dataflows/discovery/scanners/insider_buying.py @@ -14,6 +14,7 @@ class InsiderBuyingScanner(BaseScanner): name = "insider_buying" pipeline = "edge" + strategy = "insider_buying" def __init__(self, config: Dict[str, Any]): super().__init__(config) @@ -24,69 +25,94 @@ class InsiderBuyingScanner(BaseScanner): if not self.is_enabled(): return [] - logger.info(f"💼 Scanning insider buying (last {self.lookback_days} days)...") + logger.info("Scanning insider buying (OpenInsider)...") try: - # Use Finviz insider buying screener from tradingagents.dataflows.finviz_scraper import get_finviz_insider_buying - result = get_finviz_insider_buying( - transaction_type="buy", + transactions = get_finviz_insider_buying( lookback_days=self.lookback_days, min_value=self.min_transaction_value, - top_n=self.limit, + return_structured=True, + deduplicate=False, ) - if not result or not isinstance(result, str): - logger.info("Found 0 insider purchases") + if not transactions: + logger.info("No insider buying transactions found") return [] - # Parse the markdown result + logger.info(f"Found {len(transactions)} insider transactions") + + # Group by ticker for cluster detection + by_ticker: Dict[str, list] = {} + for txn in transactions: + ticker = txn.get("ticker", "").upper().strip() + if not ticker: + continue + by_ticker.setdefault(ticker, []).append(txn) + candidates = [] - seen_tickers = set() + for ticker, txns in by_ticker.items(): + # Use the largest transaction as primary + txns.sort(key=lambda t: t.get("value_num", 0), reverse=True) + primary = txns[0] - # Extract tickers from markdown table - import re + insider_name = primary.get("insider", "Unknown") + title = primary.get("title", "") + value = primary.get("value_num", 0) + value_str = primary.get("value_str", f"${value:,.0f}") + num_insiders = len(set(t.get("insider", "") for t in txns)) - lines = result.split("\n") - for line in lines: - if "|" not in line or "Ticker" in line or "---" in line: - continue + # Priority by significance + title_lower = title.lower() + is_c_suite = any( + t in title_lower + for t in ["ceo", "cfo", "coo", "cto", "president", "chairman"] + ) + is_director = "director" in title_lower - parts = [p.strip() for p in line.split("|")] - if len(parts) < 3: - continue + if num_insiders >= 2: + priority = Priority.CRITICAL.value + elif is_c_suite and value >= 100_000: + priority = Priority.CRITICAL.value + elif is_c_suite or (is_director and value >= 50_000): + priority = Priority.HIGH.value + elif value >= 50_000: + priority = Priority.HIGH.value + else: + priority = Priority.MEDIUM.value - ticker = parts[1] if len(parts) > 1 else "" - ticker = ticker.strip().upper() - - if not ticker or ticker in seen_tickers: - continue - - # Validate ticker format - if not re.match(r"^[A-Z]{1,5}$", ticker): - continue - - seen_tickers.add(ticker) + # Build context + if num_insiders > 1: + context = ( + f"Cluster: {num_insiders} insiders buying {ticker}. " + f"Largest: {title} {insider_name} purchased {value_str}" + ) + else: + context = f"{title} {insider_name} purchased {value_str} of {ticker}" candidates.append( { "ticker": ticker, "source": self.name, - "context": "Insider purchase detected (Finviz)", - "priority": Priority.HIGH.value, - "strategy": "insider_buying", + "context": context, + "priority": priority, + "strategy": self.strategy, + "insider_name": insider_name, + "insider_title": title, + "transaction_value": value, + "num_insiders_buying": num_insiders, } ) if len(candidates) >= self.limit: break - logger.info(f"Found {len(candidates)} insider purchases") + logger.info(f"Insider buying: {len(candidates)} candidates") return candidates except Exception as e: - logger.warning(f"⚠️ Insider buying failed: {e}") + logger.error(f"Insider buying scan failed: {e}", exc_info=True) return [] diff --git a/tradingagents/dataflows/finviz_scraper.py b/tradingagents/dataflows/finviz_scraper.py index ed661d3f..a49554e8 100644 --- a/tradingagents/dataflows/finviz_scraper.py +++ b/tradingagents/dataflows/finviz_scraper.py @@ -251,6 +251,7 @@ def get_insider_buying_screener( min_value: Annotated[int, "Minimum transaction value in dollars"] = 25000, top_n: Annotated[int, "Number of top results to return"] = 20, return_structured: Annotated[bool, "Return list of dicts instead of markdown"] = False, + deduplicate: Annotated[bool, "If False, return all transactions without deduplication"] = True, ): """ Discover stocks with recent insider buying/selling using OpenInsider. @@ -389,6 +390,13 @@ def get_insider_buying_screener( # Sort by value (largest first) transactions.sort(key=lambda x: x["value_num"], reverse=True) + # Return all transactions without deduplication if requested + if return_structured and not deduplicate: + logger.info( + f"Returning all {len(transactions)} {filter_desc} transactions (no dedup)" + ) + return transactions + # Deduplicate by ticker, keeping the largest transaction per ticker seen_tickers = set() unique_transactions = [] @@ -442,11 +450,25 @@ def get_finviz_insider_buying( lookback_days: int = 7, min_value: int = 25000, top_n: int = 20, -) -> str: - """Alias for get_insider_buying_screener to match registry naming convention""" + return_structured: bool = False, + deduplicate: bool = True, +): + """Alias for get_insider_buying_screener to match registry naming convention. + + Args: + transaction_type: "buy" for purchases, "sell" for sales + lookback_days: Days to look back (default 7) + min_value: Minimum transaction value in dollars + top_n: Number of top results to return + return_structured: If True, returns list of dicts instead of markdown + deduplicate: If False and return_structured=True, returns all transactions + (not deduplicated by ticker). Useful for cluster detection. + """ return get_insider_buying_screener( transaction_type=transaction_type, lookback_days=lookback_days, min_value=min_value, top_n=top_n, + return_structured=return_structured, + deduplicate=deduplicate, )