fix(discovery): fix three scanner hang/validation bugs found in ranker_debug.log

1. executor.shutdown(wait=True) still blocked after global timeout (critical)
   The previous fix added timeout= to as_completed() but used `with
   ThreadPoolExecutor() as executor`, whose __exit__ calls shutdown(wait=True).
   This meant the process still hung waiting for stuck threads (ml_signal) even
   after the TimeoutError was caught.  Fixed by creating the executor explicitly
   and calling shutdown(wait=False) in a finally block.

2. ml_signal hangs on every run — "Batch-downloading 592 tickers (1y)..." never
   completes. Root cause: a single yfinance request for 592 tickers × 1 year of
   daily OHLCV is a very large payload that regularly times out at the network
   layer. Fixed by:
   - Reducing default lookback from "1y" to "6mo" (halves download size)
   - Splitting downloads into 150-ticker chunks so a slow chunk doesn't kill
     the whole scan (partial results are still returned)

3. C (Citigroup) and other single-letter NYSE tickers rejected as invalid.
   validate_ticker_format used ^[A-Z]{2,5}$ requiring at least 2 letters.
   Real tickers like C, A, F, T, X, M are 1 letter. Fixed to ^[A-Z]{1,5}$.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Youssef Aitousarrah 2026-02-20 22:35:42 -08:00
parent ce2a6ef8fa
commit c792b17ab6
3 changed files with 65 additions and 48 deletions

View File

@ -138,12 +138,13 @@ def validate_ticker_format(ticker: str) -> bool:
ticker: Ticker symbol to validate ticker: Ticker symbol to validate
Returns: Returns:
True if ticker matches expected format (2-5 uppercase letters) True if ticker matches expected format (1-5 uppercase letters).
Single-letter tickers (C, A, F, T, X, M, etc.) are valid NYSE symbols.
""" """
if not ticker or not isinstance(ticker, str): if not ticker or not isinstance(ticker, str):
return False return False
return bool(re.match(r"^[A-Z]{2,5}$", ticker.strip().upper())) return bool(re.match(r"^[A-Z]{1,5}$", ticker.strip().upper()))
def validate_candidate_structure(candidate: dict) -> bool: def validate_candidate_structure(candidate: dict) -> bool:

View File

@ -58,7 +58,9 @@ class MLSignalScanner(BaseScanner):
def __init__(self, config: Dict[str, Any]): def __init__(self, config: Dict[str, Any]):
super().__init__(config) super().__init__(config)
self.min_win_prob = self.scanner_config.get("min_win_prob", 0.50) self.min_win_prob = self.scanner_config.get("min_win_prob", 0.50)
self.lookback_period = self.scanner_config.get("lookback_period", "1y") # "6mo" instead of "1y" — halves the yfinance payload for 500+ tickers,
# which prevents the batch download from hanging on slow connections.
self.lookback_period = self.scanner_config.get("lookback_period", "6mo")
self.max_workers = self.scanner_config.get("max_workers", 8) self.max_workers = self.scanner_config.get("max_workers", 8)
self.fetch_market_cap = self.scanner_config.get("fetch_market_cap", False) self.fetch_market_cap = self.scanner_config.get("fetch_market_cap", False)
@ -137,54 +139,57 @@ class MLSignalScanner(BaseScanner):
return None return None
def _fetch_universe_ohlcv(self) -> Dict[str, pd.DataFrame]: def _fetch_universe_ohlcv(self) -> Dict[str, pd.DataFrame]:
"""Batch-fetch OHLCV data for the entire ticker universe. """Batch-fetch OHLCV data for the entire ticker universe in chunks.
Uses yfinance batch download a single HTTP request regardless of Downloads in chunks of 150 tickers so a single slow/failed chunk doesn't
universe size. This is the key optimization for large universes. block the whole scanner. This replaces the previous single-request approach
which would hang on large universes (500+ tickers × 1y of data).
""" """
try: from tradingagents.dataflows.y_finance import download_history
from tradingagents.dataflows.y_finance import download_history
logger.info( chunk_size = 150
f"Batch-downloading {len(self.universe)} tickers ({self.lookback_period})..." universe = self.universe
) result: Dict[str, pd.DataFrame] = {}
# yfinance batch download — single HTTP request for all tickers chunks = [universe[i : i + chunk_size] for i in range(0, len(universe), chunk_size)]
raw = download_history( logger.info(
" ".join(self.universe), f"Batch-downloading {len(universe)} tickers ({self.lookback_period}) "
period=self.lookback_period, f"in {len(chunks)} chunks..."
auto_adjust=True, )
progress=False,
)
if raw.empty: for idx, chunk in enumerate(chunks):
return {} try:
raw = download_history(
" ".join(chunk),
period=self.lookback_period,
auto_adjust=True,
progress=False,
)
# Handle multi-level columns from batch download if raw is None or raw.empty:
result = {} continue
if isinstance(raw.columns, pd.MultiIndex):
# Multi-ticker: columns are (Price, Ticker)
tickers_in_data = raw.columns.get_level_values(1).unique()
for ticker in tickers_in_data:
try:
ticker_df = raw.xs(ticker, level=1, axis=1).copy()
ticker_df = ticker_df.reset_index()
if len(ticker_df) > 0:
result[ticker] = ticker_df
except (KeyError, ValueError):
continue
else:
# Single ticker fallback
raw = raw.reset_index()
if len(self.universe) == 1:
result[self.universe[0]] = raw
logger.info(f"Fetched OHLCV for {len(result)} tickers") if isinstance(raw.columns, pd.MultiIndex):
return result tickers_in_data = raw.columns.get_level_values(1).unique()
for ticker in tickers_in_data:
try:
ticker_df = raw.xs(ticker, level=1, axis=1).copy().reset_index()
if len(ticker_df) > 0:
result[ticker] = ticker_df
except (KeyError, ValueError):
continue
else:
# Single-ticker fallback
raw = raw.reset_index()
if chunk:
result[chunk[0]] = raw
except Exception as e: except Exception as e:
logger.warning(f"OHLCV batch fetch failed: {e}") logger.warning(f"Chunk {idx + 1}/{len(chunks)} download failed: {e}")
return {} continue
logger.info(f"Fetched OHLCV for {len(result)} tickers")
return result
def _predict_universe( def _predict_universe(
self, predictor, ohlcv_by_ticker: Dict[str, pd.DataFrame] self, predictor, ohlcv_by_ticker: Dict[str, pd.DataFrame]

View File

@ -483,17 +483,22 @@ class DiscoveryGraph:
logger.error(f"Scanner {name} failed: {e}", exc_info=True) logger.error(f"Scanner {name} failed: {e}", exc_info=True)
return (name, pipeline, [], str(e), []) return (name, pipeline, [], str(e), [])
# Submit all scanner tasks # Submit all scanner tasks.
with ThreadPoolExecutor(max_workers=max_workers) as executor: # NOTE: Do NOT use `with ThreadPoolExecutor() as executor` here — that
# form calls shutdown(wait=True) on exit, which blocks until every thread
# finishes even after as_completed() has already timed out. We call
# shutdown(wait=False) explicitly so stuck threads are abandoned.
executor = ThreadPoolExecutor(max_workers=max_workers)
try:
future_to_scanner = { future_to_scanner = {
executor.submit(run_scanner, scanner_info): scanner_info[1] executor.submit(run_scanner, scanner_info): scanner_info[1]
for scanner_info in enabled_scanners for scanner_info in enabled_scanners
} }
# Collect results as they complete. # Collect results as they complete.
# The global_timeout passed to as_completed() ensures that if any # global_timeout is the wall-clock budget for ALL scanners together.
# scanner thread blocks indefinitely (e.g. waiting on a hung network # If any thread blocks indefinitely (e.g. a hung yfinance download),
# call), we raise TimeoutError and continue rather than hanging forever. # as_completed() raises TimeoutError so we continue immediately.
completed_count = 0 completed_count = 0
try: try:
for future in as_completed(future_to_scanner, timeout=global_timeout): for future in as_completed(future_to_scanner, timeout=global_timeout):
@ -535,6 +540,12 @@ class DiscoveryGraph:
if completed_count < len(enabled_scanners): if completed_count < len(enabled_scanners):
logger.warning(f"Only {completed_count}/{len(enabled_scanners)} scanners completed") logger.warning(f"Only {completed_count}/{len(enabled_scanners)} scanners completed")
finally:
# wait=False: don't block on threads that are still running (e.g. a
# hung ml_signal download). The daemon threads will be cleaned up
# when the process exits.
executor.shutdown(wait=False)
return pipeline_candidates return pipeline_candidates
def filter_node(self, state: DiscoveryState) -> Dict[str, Any]: def filter_node(self, state: DiscoveryState) -> Dict[str, Any]: