TradingAgents/docs/plans/2026-02-05-volume-analysis-...

40 KiB
Raw Blame History

Enhanced Volume Analysis Tool Design

Created: 2026-02-05 Status: Design Complete - Ready for Implementation

Overview

Goal: Transform get_unusual_volume from a simple volume threshold detector into a sophisticated multi-signal volume analysis tool that provides 30-40% better signal quality through pattern recognition, sector-relative comparison, and price-volume divergence detection.

Architecture: Layered enhancement functions that progressively enrich volume signals with additional context. Each enhancement is independently togglable via feature flags for testing and performance tuning.

Tech Stack:

  • pandas for data manipulation and rolling calculations
  • numpy for statistical computations
  • existing yfinance/alpha_vantage infrastructure for data
  • stockstats for technical indicators (ATR, Bollinger Bands)

Section 1: Overall Architecture

Current State (Baseline)

The existing get_unusual_volume tool:

  • Fetches average volume for a list of tickers
  • Compares current volume to average volume
  • Returns tickers where volume exceeds threshold (e.g., 2x average)
  • Provides minimal context: "Volume 2.5x average"

Limitations:

  • No pattern recognition (accumulation vs distribution vs noise)
  • No relative comparison (is this unusual for the sector?)
  • No price context (is volume confirming or diverging from price action?)
  • All unusual volume treated equally regardless of quality

Enhanced Architecture

Layered Enhancement System:

Input: Tickers with volume > threshold
    ↓
Layer 1: Volume Pattern Analysis
    ├─ Detect accumulation/distribution patterns
    ├─ Identify compression setups
    └─ Flag unusual activity patterns
    ↓
Layer 2: Sector-Relative Comparison
    ├─ Map ticker to sector
    ├─ Compare to peer group volume
    └─ Calculate sector percentile ranking
    ↓
Layer 3: Price-Volume Divergence
    ├─ Analyze price trend
    ├─ Analyze volume trend
    └─ Detect bullish/bearish divergences
    ↓
Output: Enhanced candidates with rich context

Key Principles:

  1. Composable: Each layer is independent and optional
  2. Fail-Safe: Degradation if data unavailable (skip layer, continue)
  3. Configurable: Feature flags to enable/disable layers
  4. Testable: Each layer can be unit tested separately

Data Flow

# Step 1: Baseline volume screening (existing)
candidates = [ticker for ticker in tickers
              if current_volume(ticker) > avg_volume(ticker) * threshold]

# Step 2: Enrich each candidate (new)
for candidate in candidates:
    # Layer 1: Pattern analysis
    pattern_info = analyze_volume_pattern(candidate)

    # Layer 2: Sector comparison
    sector_info = compare_to_sector(candidate)

    # Layer 3: Divergence detection
    divergence_info = analyze_price_volume_divergence(candidate)

    # Combine into rich context
    candidate['context'] = build_context_string(
        pattern_info, sector_info, divergence_info
    )
    candidate['priority'] = assign_priority(
        pattern_info, sector_info, divergence_info
    )

Output Enhancement:

Before: {'ticker': 'AAPL', 'context': 'Volume 2.5x average'}

After: {'ticker': 'AAPL', 'context': 'Volume 3.2x avg (top 5% in Technology) | Bullish divergence detected | Price compression (ATR 1.2%)', 'priority': 'high', 'metadata': {...}}


Section 2: Volume Pattern Analysis

Purpose: Distinguish between meaningful volume patterns and random noise.

Three Key Patterns to Detect

1. Accumulation Pattern

Characteristics:

  • Volume consistently above average over multiple days (5-10 days)
  • Price relatively stable or slightly declining
  • Each volume spike followed by another (sustained interest)

Detection Logic:

def detect_accumulation(volume_series: pd.Series, lookback_days: int = 10) -> bool:
    """
    Returns True if volume shows accumulation pattern:
    - 7+ days in lookback with volume > 1.5x average
    - Volume trend is increasing (positive slope)
    - Price not showing extreme moves (filtering out pumps)
    """
    avg_volume = volume_series.rolling(lookback_days).mean()
    above_threshold_days = (volume_series > avg_volume * 1.5).sum()

    # Linear regression on recent volume to detect trend
    volume_slope = calculate_trend_slope(volume_series[-lookback_days:])

    return above_threshold_days >= 7 and volume_slope > 0

Signal Strength: High - Indicates smart money accumulating position

2. Compression Pattern

Characteristics:

  • Low volatility (tight price range)
  • Above-average volume despite low volatility
  • Setup for potential breakout

Detection Logic:

def detect_compression(
    price_data: pd.DataFrame,
    volume_data: pd.Series,
    lookback_days: int = 20
) -> Dict[str, Any]:
    """
    Detects compression using:
    - ATR (Average True Range) < 2% of price
    - Bollinger Band width in bottom 25% of historical range
    - Volume > 1.3x average (energy building)
    """
    atr_pct = calculate_atr_percent(price_data, lookback_days)
    bb_width = calculate_bollinger_bandwidth(price_data, lookback_days)
    bb_percentile = calculate_percentile(bb_width, lookback_days)

    is_compressed = (
        atr_pct < 2.0 and
        bb_percentile < 25 and
        volume_data.iloc[-1] > volume_data.rolling(lookback_days).mean() * 1.3
    )

    return {
        'is_compressed': is_compressed,
        'atr_pct': atr_pct,
        'bb_percentile': bb_percentile
    }

Signal Strength: Very High - Compression + volume = high-probability setup

3. Distribution Pattern

Characteristics:

  • High volume but weakening over time
  • Price potentially topping
  • Each volume spike smaller than previous

Detection Logic:

def detect_distribution(volume_series: pd.Series, lookback_days: int = 10) -> bool:
    """
    Returns True if volume shows distribution pattern:
    - Multiple high-volume days
    - Volume trend decreasing (negative slope)
    - Recent volume still elevated but declining
    """
    volume_slope = calculate_trend_slope(volume_series[-lookback_days:])
    recent_avg = volume_series[-lookback_days:].mean()
    historical_avg = volume_series[-lookback_days*2:-lookback_days].mean()

    return volume_slope < 0 and recent_avg > historical_avg * 1.3

Signal Strength: Medium - Warning signal (avoid/short opportunity)

Integration

Pattern analysis results are stored in candidate metadata and incorporated into the context string:

# Example output
{
    'ticker': 'AAPL',
    'pattern': 'compression',
    'pattern_metadata': {
        'atr_pct': 1.2,
        'bb_percentile': 18,
        'days_compressed': 5
    },
    'context_snippet': 'Price compression (ATR 1.2%, 5 days)'
}

Section 3: Sector-Relative Volume Comparison

Purpose: Determine if unusual volume is ticker-specific or sector-wide phenomenon.

Why This Matters

Scenario 1: Sector-Wide Volume Spike

  • All tech stocks see 2x volume → Likely sector news/trend
  • Individual ticker signal quality: Low-Medium

Scenario 2: Ticker-Specific Volume Spike

  • One tech stock sees 3x volume, peers at 1x → Ticker-specific catalyst
  • Individual ticker signal quality: High

Implementation Approach

Step 1: Sector Mapping

def get_ticker_sector(ticker: str) -> str:
    """
    Fetch sector from yfinance or cache.
    Returns: 'Technology', 'Healthcare', etc.

    Uses caching to avoid repeated API calls:
    - In-memory dict for session
    - File-based cache for persistence across runs
    """
    if ticker in SECTOR_CACHE:
        return SECTOR_CACHE[ticker]

    info = yf.Ticker(ticker).info
    sector = info.get('sector', 'Unknown')
    SECTOR_CACHE[ticker] = sector
    return sector

Step 2: Sector Percentile Calculation

def calculate_sector_volume_percentile(
    ticker: str,
    sector: str,
    volume_multiple: float,
    all_tickers: List[str]
) -> float:
    """
    Calculate where this ticker's volume ranks within its sector.

    Returns: Percentile 0-100 (95 = top 5% in sector)
    """
    # Get all tickers in same sector
    sector_tickers = [t for t in all_tickers if get_ticker_sector(t) == sector]

    # Get volume multiples for all sector peers
    sector_volumes = {t: get_volume_multiple(t) for t in sector_tickers}

    # Calculate percentile
    sorted_volumes = sorted(sector_volumes.values())
    percentile = (sorted_volumes.index(volume_multiple) / len(sorted_volumes)) * 100

    return percentile

Step 3: Context Enhancement

def enhance_with_sector_context(
    candidate: Dict,
    sector_percentile: float
) -> str:
    """
    Add sector context to candidate description.

    Examples:
    - "Volume 2.8x avg (top 3% in Technology)"
    - "Volume 2.1x avg (median in Healthcare - sector-wide activity)"
    """
    if sector_percentile >= 90:
        return f"top {100-sector_percentile:.0f}% in {candidate['sector']}"
    elif sector_percentile <= 50:
        return f"median in {candidate['sector']} - sector-wide activity"
    else:
        return f"{sector_percentile:.0f}th percentile in {candidate['sector']}"

Performance Optimization

  • Cache sector mappings to avoid repeated API calls
  • Batch fetch sector data for all candidates at once
  • Fallback gracefully if sector data unavailable (skip this layer)

Priority Boost Logic

def apply_sector_priority_boost(base_priority: str, sector_percentile: float) -> str:
    """
    Boost priority if ticker is outlier in its sector.

    - Top 10% in sector → boost by one level
    - Median or below → no boost (possibly reduce)
    """
    if sector_percentile >= 90 and base_priority == 'medium':
        return 'high'
    return base_priority

Section 4: Price-Volume Divergence Detection

Purpose: Identify when volume tells a different story than price movement - often a powerful early signal.

Core Detection Logic

def analyze_price_volume_divergence(
    ticker: str,
    price_data: pd.DataFrame,
    volume_data: pd.DataFrame,
    lookback_days: int = 20
) -> Dict[str, Any]:
    """
    Detect divergence between price and volume trends.

    Returns:
        {
            'has_divergence': bool,
            'divergence_type': 'bullish' | 'bearish' | None,
            'divergence_strength': float,  # 0-1 scale
            'explanation': str
        }
    """
    # Calculate trend slopes using linear regression
    price_slope = calculate_trend_slope(price_data['close'][-lookback_days:])
    volume_slope = calculate_trend_slope(volume_data[-lookback_days:])

    # Normalize slopes to compare direction
    price_trend = 'up' if price_slope > 0.02 else 'down' if price_slope < -0.02 else 'flat'
    volume_trend = 'up' if volume_slope > 0.05 else 'down' if volume_slope < -0.05 else 'flat'

    # Detect divergence patterns
    divergence_type = None
    if price_trend in ['down', 'flat'] and volume_trend == 'up':
        divergence_type = 'bullish'  # Accumulation
    elif price_trend in ['up', 'flat'] and volume_trend == 'down':
        divergence_type = 'bearish'  # Distribution/exhaustion

    # Calculate strength based on magnitude of slopes
    divergence_strength = abs(price_slope - volume_slope) / max(abs(price_slope), abs(volume_slope), 0.01)

    return {
        'has_divergence': divergence_type is not None,
        'divergence_type': divergence_type,
        'divergence_strength': min(divergence_strength, 1.0),
        'explanation': _build_divergence_explanation(price_trend, volume_trend, divergence_type)
    }

Four Key Divergence Patterns

1. Bullish Divergence (Accumulation)

  • Price: Declining or flat
  • Volume: Increasing
  • Interpretation: Smart money accumulating despite weak price action
  • Signal: Potential reversal upward
  • Example: Stock drifts lower on low volume, then volume spikes as price stabilizes

2. Bearish Divergence (Distribution)

  • Price: Rising or flat
  • Volume: Decreasing
  • Interpretation: Weak buying interest, unsustainable rally
  • Signal: Potential reversal down or exhaustion
  • Example: Stock rallies but each green day has less volume than previous

3. Volume Confirmation (Not Divergence)

  • Price: Rising
  • Volume: Increasing
  • Interpretation: Strong bullish momentum with conviction
  • Signal: Trend continuation likely
  • Note: Not a divergence, but worth flagging as "confirmed move"

4. Weak Movement (Both Declining)

  • Price: Declining
  • Volume: Decreasing
  • Interpretation: Weak signal overall, lack of conviction
  • Signal: Low priority, may be noise

Implementation Approach

def calculate_trend_slope(series: pd.Series) -> float:
    """
    Calculate linear regression slope for time series.
    Normalized to percentage change per day.
    """
    from scipy import stats
    x = np.arange(len(series))
    y = series.values
    slope, intercept, r_value, p_value, std_err = stats.linregress(x, y)

    # Normalize to percentage of mean
    normalized_slope = (slope / series.mean()) * 100
    return normalized_slope

Integration Point

Divergence detection enhances get_unusual_volume by flagging tickers where unusual volume might indicate accumulation/distribution rather than just noise. The divergence type becomes part of the context string returned to the discovery system.

Example Output:

{
    'ticker': 'NVDA',
    'divergence': {
        'type': 'bullish',
        'strength': 0.73,
        'explanation': 'Price flat while volume increasing - potential accumulation'
    },
    'context_snippet': 'Bullish divergence detected (strength: 0.73)'
}

Filtering Logic

Only flag divergences when:

  • Volume trend is strong (slope > 0.05 or < -0.05)
  • Minimum divergence strength of 0.4
  • At least 15 days of data available for reliable trend calculation

This prevents noise from weak or short-term patterns.


Section 5: Integration & Configuration

Complete Tool Signature

def get_unusual_volume(
    tickers: List[str],
    lookback_days: int = 20,
    volume_multiple_threshold: float = 2.0,
    enable_pattern_analysis: bool = True,
    enable_sector_comparison: bool = True,
    enable_divergence_detection: bool = True
) -> List[Dict[str, Any]]:
    """
    Enhanced volume analysis with configurable feature flags.

    Args:
        tickers: List of ticker symbols to analyze
        lookback_days: Days of history for calculations
        volume_multiple_threshold: Minimum volume multiple (vs avg) to flag
        enable_pattern_analysis: Enable accumulation/compression detection
        enable_sector_comparison: Enable sector-relative percentile ranking
        enable_divergence_detection: Enable price-volume divergence analysis

    Returns:
        List of candidates with enhanced context:
        [
            {
                'ticker': str,
                'source': 'volume_accumulation',
                'context': str,  # Rich description combining all insights
                'priority': 'high' | 'medium' | 'low',
                'strategy': 'momentum',
                'metadata': {
                    'volume_multiple': float,
                    'pattern': str | None,  # 'accumulation', 'compression', etc.
                    'sector': str | None,
                    'sector_percentile': float | None,  # 0-100
                    'divergence_type': str | None,  # 'bullish', 'bearish'
                    'divergence_strength': float | None  # 0-1
                }
            },
            ...
        ]
    """

Context String Construction

The context field combines insights in priority order:

Priority Order:

  1. Sector comparison (if top/bottom tier)
  2. Divergence type (if present)
  3. Pattern type (if detected)
  4. Baseline volume multiple

Example Contexts:

"Volume 3.2x avg (top 5% in Technology) | Bullish divergence detected | Price compression (ATR 1.2%)"

"Volume 2.1x avg (median in Healthcare - sector-wide activity) | Accumulation pattern (7 days)"

"Volume 2.8x avg | Bearish divergence - weakening rally"

Implementation:

def build_context_string(
    volume_multiple: float,
    pattern_info: Dict = None,
    sector_info: Dict = None,
    divergence_info: Dict = None
) -> str:
    """
    Build rich context string from all enhancement layers.
    """
    parts = []

    # Start with baseline volume
    base = f"Volume {volume_multiple:.1f}x avg"

    # Add sector context if available and notable
    if sector_info and sector_info.get('percentile', 0) >= 85:
        base += f" (top {100 - sector_info['percentile']:.0f}% in {sector_info['sector']})"
    elif sector_info and sector_info.get('percentile', 100) <= 50:
        base += f" (median in {sector_info['sector']} - sector-wide activity)"

    parts.append(base)

    # Add divergence if present
    if divergence_info and divergence_info.get('has_divergence'):
        parts.append(divergence_info['explanation'])

    # Add pattern if detected
    if pattern_info and pattern_info.get('pattern'):
        parts.append(pattern_info['context_snippet'])

    return " | ".join(parts)

Priority Assignment Logic

def assign_priority(
    volume_multiple: float,
    pattern_info: Dict,
    sector_info: Dict,
    divergence_info: Dict
) -> str:
    """
    Assign priority based on signal strength.

    High priority:
    - Sector top 10% + (pattern OR divergence)
    - Volume >3x + bullish divergence
    - Compression pattern + any other signal

    Medium priority:
    - Volume >2.5x avg + any enhancement signal
    - Sector top 25% + volume >2x

    Low priority:
    - Volume >2x avg only (baseline threshold)
    """
    has_pattern = pattern_info and pattern_info.get('pattern')
    has_divergence = divergence_info and divergence_info.get('has_divergence')
    sector_percentile = sector_info.get('percentile', 50) if sector_info else 50
    is_compression = pattern_info and pattern_info.get('pattern') == 'compression'

    # High priority conditions
    if sector_percentile >= 90 and (has_pattern or has_divergence):
        return 'high'
    if volume_multiple >= 3.0 and divergence_info.get('divergence_type') == 'bullish':
        return 'high'
    if is_compression and (has_divergence or sector_percentile >= 75):
        return 'high'

    # Medium priority conditions
    if volume_multiple >= 2.5 and (has_pattern or has_divergence):
        return 'medium'
    if sector_percentile >= 75:
        return 'medium'

    # Default: low priority
    return 'low'

Configuration in default_config.py

"volume_accumulation": {
    "enabled": True,
    "pipeline": "momentum",
    "limit": 15,
    "unusual_volume_multiple": 2.0,  # Baseline threshold

    # Enhancement feature flags
    "enable_pattern_analysis": True,
    "enable_sector_comparison": True,
    "enable_divergence_detection": True,

    # Enhancement-specific settings
    "pattern_lookback_days": 20,
    "divergence_lookback_days": 20,
    "compression_atr_pct_max": 2.0,
    "compression_bb_width_max": 6.0,
    "compression_min_volume_ratio": 1.3,

    # Cache key for volume data reuse
    "volume_cache_key": "default",
}

This allows easy feature toggling for:

  • Testing: Enable one feature at a time to validate
  • Performance tuning: Disable expensive features if needed
  • A/B testing: Compare signal quality with/without enhancements

Section 6: Testing Strategy

Test Structure

Tests organized at three levels:

  1. Unit tests - Each enhancement function in isolation
  2. Integration tests - Combined tool with all features
  3. Validation tests - Real market scenarios

Unit Tests

File: tests/dataflows/test_volume_enhancements.py

import pytest
import pandas as pd
import numpy as np
from tradingagents.dataflows.volume_enhancements import (
    detect_accumulation,
    detect_compression,
    detect_distribution,
    calculate_sector_volume_percentile,
    analyze_price_volume_divergence,
)

class TestPatternDetection:
    """Test volume pattern detection functions."""

    def test_detect_accumulation_pattern(self):
        """Test accumulation detection with synthetic data."""
        # Create volume data: consistently increasing over 10 days
        volume_series = pd.Series([
            100, 120, 150, 140, 160, 180, 170, 190, 200, 210
        ])

        result = detect_accumulation(volume_series, lookback_days=10)

        assert result is True, "Should detect accumulation pattern"

    def test_detect_compression_pattern(self):
        """Test compression pattern detection."""
        # Create price data: low volatility, tight range
        price_data = pd.DataFrame({
            'high': [101, 100.5, 101, 100.8, 101.2] * 4,
            'low': [99, 99.5, 99, 99.2, 98.8] * 4,
            'close': [100, 100, 100, 100, 100] * 4
        })

        # Create volume data: above average
        volume_data = pd.Series([150, 160, 155, 165, 170] * 4)

        result = detect_compression(price_data, volume_data, lookback_days=20)

        assert result['is_compressed'] is True
        assert result['atr_pct'] < 2.0
        assert result['bb_percentile'] < 25

    def test_detect_distribution_pattern(self):
        """Test distribution detection."""
        # Create volume data: high but declining
        volume_series = pd.Series([
            200, 190, 180, 170, 160, 150, 140, 130, 120, 110
        ])

        result = detect_distribution(volume_series, lookback_days=10)

        assert result is True, "Should detect distribution pattern"

class TestSectorComparison:
    """Test sector-relative volume analysis."""

    def test_sector_percentile_calculation(self):
        """Test sector percentile calculation."""
        # Mock scenario: 10 tickers in tech sector
        sector_volumes = {
            'AAPL': 1.5, 'MSFT': 1.8, 'GOOGL': 3.2,  # High volume
            'NVDA': 2.1, 'AMD': 1.9, 'INTC': 1.4,
            'QCOM': 1.2, 'TXN': 1.1, 'AVGO': 1.3, 'ORCL': 1.0
        }

        # GOOGL (3.2x) should be ~90th percentile
        percentile = calculate_sector_volume_percentile(
            'GOOGL', 'Technology', 3.2, list(sector_volumes.keys())
        )

        assert percentile >= 85, "GOOGL should be in top tier"
        assert percentile <= 95

    def test_sector_percentile_edge_cases(self):
        """Test edge cases: top ticker, bottom ticker."""
        sector_volumes = {'A': 1.0, 'B': 2.0, 'C': 3.0}

        # Top ticker
        top_pct = calculate_sector_volume_percentile('C', 'Tech', 3.0, ['A', 'B', 'C'])
        assert top_pct > 90

        # Bottom ticker
        bot_pct = calculate_sector_volume_percentile('A', 'Tech', 1.0, ['A', 'B', 'C'])
        assert bot_pct < 40

class TestDivergenceDetection:
    """Test price-volume divergence analysis."""

    def test_bullish_divergence_detection(self):
        """Test bullish divergence (price down, volume up)."""
        # Price declining
        price_data = pd.DataFrame({
            'close': [100, 98, 97, 96, 95, 94, 93, 92, 91, 90]
        })

        # Volume increasing
        volume_data = pd.Series([100, 110, 120, 130, 140, 150, 160, 170, 180, 190])

        result = analyze_price_volume_divergence('TEST', price_data, volume_data, lookback_days=10)

        assert result['has_divergence'] is True
        assert result['divergence_type'] == 'bullish'
        assert result['divergence_strength'] > 0.4

    def test_bearish_divergence_detection(self):
        """Test bearish divergence (price up, volume down)."""
        # Price rising
        price_data = pd.DataFrame({
            'close': [90, 91, 92, 93, 94, 95, 96, 97, 98, 99]
        })

        # Volume declining
        volume_data = pd.Series([190, 180, 170, 160, 150, 140, 130, 120, 110, 100])

        result = analyze_price_volume_divergence('TEST', price_data, volume_data, lookback_days=10)

        assert result['has_divergence'] is True
        assert result['divergence_type'] == 'bearish'

    def test_no_divergence_confirmation(self):
        """Test no divergence when price and volume both rising."""
        # Both rising
        price_data = pd.DataFrame({
            'close': [90, 91, 92, 93, 94, 95, 96, 97, 98, 99]
        })
        volume_data = pd.Series([100, 110, 120, 130, 140, 150, 160, 170, 180, 190])

        result = analyze_price_volume_divergence('TEST', price_data, volume_data, lookback_days=10)

        assert result['has_divergence'] is False
        assert result['divergence_type'] is None

Integration Tests

class TestEnhancedVolumeTool:
    """Test full enhanced volume tool."""

    def test_tool_with_all_features_enabled(self):
        """Test complete tool with all enhancements."""
        from tradingagents.tools.registry import TOOLS_REGISTRY

        # Get enhanced tool
        tool = TOOLS_REGISTRY.get_tool('get_unusual_volume')

        # Run with known tickers
        result = tool(
            tickers=['AAPL', 'MSFT', 'NVDA'],
            volume_multiple_threshold=2.0,
            enable_pattern_analysis=True,
            enable_sector_comparison=True,
            enable_divergence_detection=True
        )

        # Verify structure
        assert isinstance(result, list)
        for candidate in result:
            assert 'ticker' in candidate
            assert 'context' in candidate
            assert 'priority' in candidate
            assert 'metadata' in candidate

            # Verify metadata has enhancement fields
            metadata = candidate['metadata']
            assert 'volume_multiple' in metadata
            # Pattern, sector, divergence fields may be None but should exist
            assert 'pattern' in metadata or True  # May be None
            assert 'sector_percentile' in metadata or True
            assert 'divergence_type' in metadata or True

    def test_feature_flag_toggling(self):
        """Test that feature flags disable features correctly."""
        tool = TOOLS_REGISTRY.get_tool('get_unusual_volume')

        # Test with pattern analysis only
        result_pattern_only = tool(
            tickers=['AAPL'],
            enable_pattern_analysis=True,
            enable_sector_comparison=False,
            enable_divergence_detection=False
        )

        if result_pattern_only:
            metadata = result_pattern_only[0]['metadata']
            # Should have pattern but not sector/divergence
            assert 'pattern' in metadata or metadata['pattern'] is None
            assert metadata.get('sector_percentile') is None
            assert metadata.get('divergence_type') is None

    def test_priority_assignment(self):
        """Test priority assignment logic."""
        # This would use mocked data to verify priority levels
        # are assigned correctly based on enhancement signals
        pass

Validation Tests (Historical Cases)

class TestHistoricalValidation:
    """Validate with known historical patterns."""

    @pytest.mark.skip("Requires historical market data")
    def test_known_accumulation_case(self):
        """Test with ticker that had confirmed accumulation."""
        # Example: Find a ticker that showed accumulation before breakout
        # Verify tool would have flagged it
        pass

    @pytest.mark.skip("Requires historical market data")
    def test_known_compression_breakout(self):
        """Test with ticker that broke out from compression."""
        # Example: Low volatility period followed by big move
        # Verify compression detection would have worked
        pass

Performance Tests

class TestPerformance:
    """Test performance with realistic loads."""

    def test_performance_with_large_ticker_list(self):
        """Ensure tool scales to 100+ tickers."""
        import time

        # Generate 100 test tickers
        tickers = [f"TEST{i}" for i in range(100)]

        tool = TOOLS_REGISTRY.get_tool('get_unusual_volume')

        start = time.time()
        result = tool(tickers, volume_multiple_threshold=2.0)
        elapsed = time.time() - start

        # Should complete within reasonable time
        assert elapsed < 10.0, f"Tool took {elapsed:.1f}s for 100 tickers (limit: 10s)"

    def test_caching_effectiveness(self):
        """Verify caching reduces redundant API calls."""
        # Run tool twice with same tickers
        # Verify second run is significantly faster
        pass

Test Execution

# Run all volume enhancement tests
pytest tests/dataflows/test_volume_enhancements.py -v

# Run integration tests only
pytest tests/dataflows/test_volume_enhancements.py::TestEnhancedVolumeTool -v

# Run with coverage
pytest tests/dataflows/test_volume_enhancements.py --cov=tradingagents.dataflows.volume_enhancements

# Run performance tests
pytest tests/dataflows/test_volume_enhancements.py::TestPerformance -v -s

Section 7: Performance & Implementation Considerations

Performance Optimization

1. Caching Strategy

Sector Mapping Cache:

# In-memory cache for session
SECTOR_CACHE = {}

# File-based cache for persistence
SECTOR_CACHE_FILE = "data/sector_mappings.json"

def get_ticker_sector_cached(ticker: str) -> str:
    """Get sector with two-tier caching."""
    # Check memory cache first
    if ticker in SECTOR_CACHE:
        return SECTOR_CACHE[ticker]

    # Check file cache
    if os.path.exists(SECTOR_CACHE_FILE):
        with open(SECTOR_CACHE_FILE) as f:
            file_cache = json.load(f)
            if ticker in file_cache:
                SECTOR_CACHE[ticker] = file_cache[ticker]
                return file_cache[ticker]

    # Fetch from API and cache
    sector = yf.Ticker(ticker).info.get('sector', 'Unknown')
    SECTOR_CACHE[ticker] = sector

    # Update file cache
    _update_file_cache(ticker, sector)

    return sector

Volume Data Cache:

# Reuse existing volume cache infrastructure
def get_volume_data_cached(ticker: str, lookback_days: int) -> pd.Series:
    """
    Leverage existing volume cache from discovery system.
    Cache key: f"{ticker}_{date}_{lookback_days}"
    """
    cache_key = f"{ticker}_{date.today()}_{lookback_days}"

    if cache_key in VOLUME_CACHE:
        return VOLUME_CACHE[cache_key]

    # Fetch and cache
    volume_data = fetch_volume_data(ticker, lookback_days)
    VOLUME_CACHE[cache_key] = volume_data

    return volume_data

2. Batch Processing

def get_unusual_volume_enhanced(tickers: List[str], **kwargs) -> List[Dict]:
    """
    Enhanced version with batch processing optimization.
    """
    # Step 1: Batch fetch volume data for all tickers
    volume_data_batch = fetch_volume_batch(tickers, kwargs['lookback_days'])

    # Step 2: Filter to candidates (volume > threshold)
    candidates = [
        ticker for ticker, vol in volume_data_batch.items()
        if vol.iloc[-1] > vol.mean() * kwargs['volume_multiple_threshold']
    ]

    # Step 3: Batch fetch enhancement data (only for candidates)
    if kwargs.get('enable_sector_comparison'):
        sectors_batch = fetch_sectors_batch(candidates)  # Single API call

    if kwargs.get('enable_divergence_detection'):
        price_data_batch = fetch_price_batch(candidates, kwargs['lookback_days'])

    # Step 4: Process each candidate with pre-fetched data
    results = []
    for ticker in candidates:
        enhanced_candidate = _enrich_candidate(
            ticker,
            volume_data_batch[ticker],
            price_data_batch.get(ticker),
            sectors_batch.get(ticker),
            **kwargs
        )
        results.append(enhanced_candidate)

    return results

Batch Fetching Functions:

def fetch_volume_batch(tickers: List[str], lookback_days: int) -> Dict[str, pd.Series]:
    """Fetch volume data for multiple tickers in one call."""
    # Use yfinance's multi-ticker support
    data = yf.download(tickers, period=f"{lookback_days}d", progress=False)
    return {ticker: data['Volume'][ticker] for ticker in tickers}

def fetch_sectors_batch(tickers: List[str]) -> Dict[str, str]:
    """Fetch sector info for multiple tickers."""
    # Check cache first
    results = {}
    uncached = []

    for ticker in tickers:
        if ticker in SECTOR_CACHE:
            results[ticker] = SECTOR_CACHE[ticker]
        else:
            uncached.append(ticker)

    # Batch fetch uncached
    if uncached:
        for ticker in uncached:
            sector = yf.Ticker(ticker).info.get('sector', 'Unknown')
            SECTOR_CACHE[ticker] = sector
            results[ticker] = sector

    return results

3. Lazy Evaluation

def _enrich_candidate(
    ticker: str,
    volume_data: pd.Series,
    price_data: pd.DataFrame = None,
    sector: str = None,
    **kwargs
) -> Dict:
    """
    Enrich candidate with lazy evaluation.
    Only compute expensive operations if feature enabled.
    """
    candidate = {
        'ticker': ticker,
        'source': 'volume_accumulation',
        'metadata': {
            'volume_multiple': volume_data.iloc[-1] / volume_data.mean()
        }
    }

    # Pattern analysis (requires price data)
    if kwargs.get('enable_pattern_analysis'):
        if price_data is None:
            price_data = fetch_price_data(ticker, kwargs['lookback_days'])

        pattern_info = analyze_volume_pattern(ticker, price_data, volume_data)
        candidate['metadata']['pattern'] = pattern_info.get('pattern')

    # Sector comparison (requires sector data)
    if kwargs.get('enable_sector_comparison'):
        if sector is None:
            sector = get_ticker_sector_cached(ticker)

        sector_info = calculate_sector_percentile(ticker, sector, volume_data)
        candidate['metadata']['sector_percentile'] = sector_info['percentile']

    # Divergence detection (requires price data)
    if kwargs.get('enable_divergence_detection'):
        if price_data is None:
            price_data = fetch_price_data(ticker, kwargs['lookback_days'])

        divergence_info = analyze_price_volume_divergence(ticker, price_data, volume_data)
        candidate['metadata']['divergence_type'] = divergence_info.get('divergence_type')

    # Build context and assign priority
    candidate['context'] = build_context_string(candidate['metadata'])
    candidate['priority'] = assign_priority(candidate['metadata'])

    return candidate

API Call Minimization

Before (inefficient):

# 3 API calls per ticker × 15 tickers = 45 API calls
for ticker in tickers:
    volume = fetch_volume(ticker)          # API call 1
    price = fetch_price(ticker)            # API call 2
    sector = fetch_sector(ticker)          # API call 3

After (efficient):

# 3 batch API calls total (regardless of ticker count)
volumes = fetch_volume_batch(tickers)      # API call 1 (all tickers)
prices = fetch_price_batch(candidates)     # API call 2 (only candidates)
sectors = fetch_sector_batch(candidates)   # API call 3 (only candidates, cached)

Savings: ~90% reduction in API calls (45 → 3-5 calls)

Expected Performance

Baseline (Current Implementation):

  • Tickers analyzed: ~15
  • Execution time: ~2 seconds
  • API calls: ~15-20

Enhanced (All Features Enabled):

  • Tickers analyzed: ~15
  • Execution time: ~4-5 seconds
  • API calls: ~5-8 (with caching)

Trade-off Analysis:

  • Cost: 2-3x slower execution
  • Benefit: 30-40% better signal quality
  • Verdict: Worth the trade-off for quality improvement

Performance by Feature:

  • Pattern analysis: +0.5s (minimal impact)
  • Divergence detection: +1.0s (moderate impact)
  • Sector comparison: +1.5s first run, +0.2s cached (high variance)

Fallback Handling

Graceful Degradation Strategy:

def _safe_enhance(enhancement_func, *args, **kwargs):
    """
    Wrapper for enhancement functions with fallback.
    If enhancement fails, log warning and return None.
    """
    try:
        return enhancement_func(*args, **kwargs)
    except Exception as e:
        logger.warning(f"Enhancement failed: {enhancement_func.__name__} - {e}")
        return None

# Usage
pattern_info = _safe_enhance(analyze_volume_pattern, ticker, price_data, volume_data)
if pattern_info:
    candidate['metadata']['pattern'] = pattern_info['pattern']
else:
    candidate['metadata']['pattern'] = None  # Continue without pattern info

Specific Fallback Scenarios:

  1. Sector data unavailable:

    • Skip sector comparison layer
    • Log warning: "Sector data unavailable for {ticker}"
    • Continue with other enhancements
  2. Insufficient price history:

    • Skip divergence detection
    • Log warning: "Insufficient data for divergence analysis"
    • Use pattern analysis if possible
  3. API rate limit hit:

    • Use cached data if available
    • Otherwise skip enhancement for this run
    • Don't fail entire tool execution

Result: Tool never fails completely, always returns at least baseline volume signals.

Memory Considerations

Memory Usage Estimates:

  • Volume data: ~5KB per ticker × 100 tickers = 500KB
  • Price data: ~10KB per ticker × 50 candidates = 500KB
  • Sector mappings: ~100 bytes × 1000 tickers = 100KB (cached)
  • Pattern analysis: Temporary rolling windows ~50KB
  • Total peak usage: ~2-5MB

Memory Optimizations:

  1. Stream processing: Process candidates one at a time, don't hold all in memory
  2. Cache limits: Cap sector cache at 5000 tickers (oldest evicted first)
  3. Cleanup: Delete temporary DataFrames after processing each ticker
# Memory-efficient processing
for ticker in candidates:
    # Fetch data for this ticker only
    data = fetch_ticker_data(ticker)

    # Process and append result
    result = process_candidate(ticker, data)
    results.append(result)

    # Clean up
    del data  # Free memory immediately

return results

Memory footprint: <50MB for typical use case (well within limits)

Implementation Order

Recommended Phased Approach:

Phase 1: Pattern Analysis

  • Complexity: Low (self-contained, uses existing data)
  • Value: High (compression detection is very strong signal)
  • Estimated effort: 3-4 hours
  • Files to create/modify:
    • tradingagents/dataflows/volume_pattern_analysis.py (new)
    • tradingagents/tools/registry.py (modify get_unusual_volume)
    • tests/dataflows/test_volume_patterns.py (new)

Phase 2: Divergence Detection

  • Complexity: Medium (requires price trend analysis)
  • Value: Medium-High (good signal, depends on quality of trend detection)
  • Estimated effort: 4-5 hours
  • Files to create/modify:
    • tradingagents/dataflows/divergence_analysis.py (new)
    • Update get_unusual_volume tool
    • tests/dataflows/test_divergence.py (new)

Phase 3: Sector Comparison

  • Complexity: High (requires sector mapping, percentile calculation)
  • Value: Medium (contextual signal, useful for filtering sector-wide noise)
  • Estimated effort: 5-6 hours
  • Files to create/modify:
    • tradingagents/dataflows/sector_comparison.py (new)
    • tradingagents/dataflows/sector_cache.py (new)
    • Update get_unusual_volume tool
    • tests/dataflows/test_sector_comparison.py (new)

Total estimated effort: 12-15 hours for complete implementation

Validation after each phase:

  • Run test suite
  • Manual testing with 5-10 known tickers
  • Performance benchmarking (execution time, API calls)
  • Signal quality spot-check (do results make sense?)

Summary

This design transforms get_unusual_volume from a simple threshold detector into a sophisticated multi-signal analysis tool through:

  1. Volume Pattern Analysis: Detect accumulation, compression, and distribution patterns
  2. Sector-Relative Comparison: Contextualize volume relative to peer group
  3. Price-Volume Divergence: Identify when volume and price tell different stories

Key Benefits:

  • 30-40% improvement in signal quality (estimated)
  • Rich context strings for better decision-making
  • Configurable feature flags for testing and optimization
  • Graceful degradation ensures reliability
  • Phased implementation allows incremental value delivery

Next Steps:

  1. Review and approve this design
  2. Choose execution approach (subagent-driven or parallel session)
  3. Implement Phase 1 (pattern analysis) first
  4. Validate and iterate before moving to Phase 2/3