TradingAgents/tradingagents/dataflows/time_series_cache.py

445 lines
17 KiB
Python

"""
Time Series Cache System for Financial Data
Handles intelligent caching of financial API data with time series optimization
"""
import os
import sqlite3
import pandas as pd
import json
import hashlib
from datetime import datetime, timedelta
from typing import Dict, List, Tuple, Optional, Any, Union
from pathlib import Path
import pickle
from dataclasses import dataclass
from enum import Enum
import logging
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class DataType(Enum):
"""Supported data types for caching"""
OHLCV = "ohlcv" # Open, High, Low, Close, Volume data
NEWS = "news" # News articles
FUNDAMENTALS = "fundamentals" # Financial statements
INDICATORS = "indicators" # Technical indicators
INSIDER = "insider" # Insider transactions
SENTIMENT = "sentiment" # Sentiment data
ECONOMIC = "economic" # Economic indicators
@dataclass
class CacheEntry:
"""Represents a cached data entry"""
symbol: str
data_type: DataType
start_date: datetime
end_date: datetime
cache_path: str
last_updated: datetime
metadata: Dict[str, Any]
class TimeSeriesCache:
"""
Intelligent time series cache for financial data
Features:
- Detects overlapping date ranges to minimize API calls
- Handles multiple data types (OHLCV, news, fundamentals, etc.)
- Stores data in efficient time-indexed formats
- Supports both CSV and SQLite storage
- Provides cache statistics and management
"""
def __init__(self, cache_dir: str = None):
"""Initialize the time series cache"""
if cache_dir is None:
from .config import get_config
config = get_config()
cache_dir = os.path.join(config.get("data_cache_dir", "data_cache"), "time_series")
self.cache_dir = Path(cache_dir)
self.cache_dir.mkdir(parents=True, exist_ok=True)
# Initialize cache database
self.db_path = self.cache_dir / "cache_index.db"
self._init_database()
# Cache statistics
self.stats = {
"cache_hits": 0,
"cache_misses": 0,
"api_calls_saved": 0,
"data_merged": 0
}
def _init_database(self):
"""Initialize SQLite database for cache management"""
with sqlite3.connect(self.db_path) as conn:
conn.execute("""
CREATE TABLE IF NOT EXISTS cache_entries (
id INTEGER PRIMARY KEY AUTOINCREMENT,
symbol TEXT NOT NULL,
data_type TEXT NOT NULL,
start_date TEXT NOT NULL,
end_date TEXT NOT NULL,
cache_path TEXT NOT NULL,
last_updated TEXT NOT NULL,
metadata TEXT,
UNIQUE(symbol, data_type, start_date, end_date)
)
""")
conn.execute("""
CREATE INDEX IF NOT EXISTS idx_symbol_type_date
ON cache_entries(symbol, data_type, start_date, end_date)
""")
def _generate_cache_key(self, symbol: str, data_type: DataType,
start_date: datetime, end_date: datetime, **kwargs) -> str:
"""Generate unique cache key for data"""
key_data = f"{symbol}_{data_type.value}_{start_date.date()}_{end_date.date()}"
if kwargs:
key_data += "_" + "_".join(f"{k}={v}" for k, v in sorted(kwargs.items()))
return hashlib.md5(key_data.encode()).hexdigest()[:16]
def _get_cache_path(self, symbol: str, data_type: DataType, cache_key: str) -> Path:
"""Get cache file path"""
type_dir = self.cache_dir / data_type.value
type_dir.mkdir(exist_ok=True)
return type_dir / f"{symbol}_{cache_key}.parquet"
def check_cache_coverage(self, symbol: str, data_type: DataType,
start_date: datetime, end_date: datetime) -> Tuple[List[Tuple[datetime, datetime]], List[CacheEntry]]:
"""
Check what date ranges are already cached and what gaps need to be filled
Returns:
- List of date ranges that need to be fetched from API
- List of existing cache entries that cover parts of the requested range
"""
with sqlite3.connect(self.db_path) as conn:
cursor = conn.execute("""
SELECT symbol, data_type, start_date, end_date, cache_path, last_updated, metadata
FROM cache_entries
WHERE symbol = ? AND data_type = ?
AND end_date >= ? AND start_date <= ?
ORDER BY start_date
""", (symbol, data_type.value, start_date.isoformat(), end_date.isoformat()))
cached_entries = []
for row in cursor.fetchall():
entry = CacheEntry(
symbol=row[0],
data_type=DataType(row[1]),
start_date=datetime.fromisoformat(row[2]),
end_date=datetime.fromisoformat(row[3]),
cache_path=row[4],
last_updated=datetime.fromisoformat(row[5]),
metadata=json.loads(row[6]) if row[6] else {}
)
cached_entries.append(entry)
if not cached_entries:
return [(start_date, end_date)], []
# Find gaps in coverage
gaps = []
current_start = start_date
for entry in cached_entries:
entry_start = max(entry.start_date, start_date)
entry_end = min(entry.end_date, end_date)
# Gap before this entry
if current_start < entry_start:
gaps.append((current_start, entry_start - timedelta(days=1)))
current_start = max(current_start, entry_end + timedelta(days=1))
# Gap after last entry
if current_start <= end_date:
gaps.append((current_start, end_date))
return gaps, cached_entries
def get_cached_data(self, symbol: str, data_type: DataType,
start_date: datetime, end_date: datetime) -> Optional[pd.DataFrame]:
"""Retrieve cached data for the specified date range"""
gaps, cached_entries = self.check_cache_coverage(symbol, data_type, start_date, end_date)
if gaps: # Has gaps, can't return complete cached data
return None
if not cached_entries:
return None
# Load and combine all relevant cached data
dfs = []
for entry in cached_entries:
try:
cache_path = Path(entry.cache_path)
if cache_path.exists():
df = pd.read_parquet(cache_path)
# Filter to requested date range
if 'date' in df.columns:
df['date'] = pd.to_datetime(df['date'])
df = df[(df['date'] >= start_date) & (df['date'] <= end_date)]
elif 'timestamp' in df.columns:
df['timestamp'] = pd.to_datetime(df['timestamp'])
df = df[(df['timestamp'] >= start_date) & (df['timestamp'] <= end_date)]
dfs.append(df)
except Exception as e:
logger.warning(f"Failed to load cached data from {entry.cache_path}: {e}")
continue
if not dfs:
return None
# Combine dataframes
combined_df = pd.concat(dfs, ignore_index=True)
# Remove duplicates based on date/timestamp
date_col = 'date' if 'date' in combined_df.columns else 'timestamp'
if date_col in combined_df.columns:
combined_df = combined_df.drop_duplicates(subset=[date_col]).sort_values(date_col)
self.stats["cache_hits"] += 1
return combined_df
def cache_data(self, symbol: str, data_type: DataType, data: pd.DataFrame,
start_date: datetime, end_date: datetime, **metadata) -> str:
"""Cache data with time series optimization"""
# Ensure data has proper date column
date_col = None
for col in ['date', 'timestamp', 'Date', 'Timestamp']:
if col in data.columns:
date_col = col
break
if date_col is None:
raise ValueError("Data must have a date/timestamp column")
# Standardize date column
data[date_col] = pd.to_datetime(data[date_col])
# Generate cache key
cache_key = self._generate_cache_key(symbol, data_type, start_date, end_date, **metadata)
cache_path = self._get_cache_path(symbol, data_type, cache_key)
# Save data to parquet for efficiency
try:
data.to_parquet(cache_path, index=False)
# Update database
with sqlite3.connect(self.db_path) as conn:
conn.execute("""
INSERT OR REPLACE INTO cache_entries
(symbol, data_type, start_date, end_date, cache_path, last_updated, metadata)
VALUES (?, ?, ?, ?, ?, ?, ?)
""", (
symbol,
data_type.value,
start_date.isoformat(),
end_date.isoformat(),
str(cache_path),
datetime.now().isoformat(),
json.dumps(metadata)
))
logger.info(f"Cached {len(data)} records for {symbol} {data_type.value} ({start_date.date()} to {end_date.date()})")
return str(cache_path)
except Exception as e:
logger.error(f"Failed to cache data: {e}")
raise
def fetch_with_cache(self, symbol: str, data_type: DataType,
start_date: datetime, end_date: datetime,
fetch_function, **fetch_kwargs) -> pd.DataFrame:
"""
Fetch data with intelligent caching
Args:
symbol: Symbol to fetch
data_type: Type of data
start_date, end_date: Date range
fetch_function: Function to call for API data (should return DataFrame)
**fetch_kwargs: Additional arguments for fetch function
"""
# Check what's already cached
gaps, cached_entries = self.check_cache_coverage(symbol, data_type, start_date, end_date)
if not gaps:
# Everything is cached
cached_data = self.get_cached_data(symbol, data_type, start_date, end_date)
if cached_data is not None:
logger.info(f"Cache hit: {symbol} {data_type.value} ({start_date.date()} to {end_date.date()})")
return cached_data
# Need to fetch some data
self.stats["cache_misses"] += 1
# Fetch missing data
new_data_frames = []
for gap_start, gap_end in gaps:
logger.info(f"Fetching {symbol} {data_type.value} from API: {gap_start.date()} to {gap_end.date()}")
try:
# Call the provided fetch function
gap_data = fetch_function(symbol, gap_start, gap_end, **fetch_kwargs)
if gap_data is not None and not gap_data.empty:
new_data_frames.append(gap_data)
# Cache the new data
self.cache_data(symbol, data_type, gap_data, gap_start, gap_end, **fetch_kwargs)
except Exception as e:
logger.error(f"Failed to fetch data for gap {gap_start} to {gap_end}: {e}")
continue
# Combine cached and new data
all_data_frames = []
# Add cached data
for entry in cached_entries:
try:
cached_df = pd.read_parquet(entry.cache_path)
# Filter to requested range
date_col = 'date' if 'date' in cached_df.columns else 'timestamp'
if date_col in cached_df.columns:
cached_df[date_col] = pd.to_datetime(cached_df[date_col])
cached_df = cached_df[
(cached_df[date_col] >= start_date) &
(cached_df[date_col] <= end_date)
]
all_data_frames.append(cached_df)
except Exception as e:
logger.warning(f"Failed to load cached data: {e}")
# Add new data
all_data_frames.extend(new_data_frames)
if not all_data_frames:
return pd.DataFrame()
# Combine and deduplicate
result_df = pd.concat(all_data_frames, ignore_index=True)
date_col = 'date' if 'date' in result_df.columns else 'timestamp'
if date_col in result_df.columns:
result_df = result_df.drop_duplicates(subset=[date_col]).sort_values(date_col)
self.stats["api_calls_saved"] += len(cached_entries)
return result_df
def get_cache_stats(self) -> Dict[str, Any]:
"""Get cache performance statistics"""
with sqlite3.connect(self.db_path) as conn:
cursor = conn.execute("SELECT COUNT(*) FROM cache_entries")
total_entries = cursor.fetchone()[0]
cursor = conn.execute("SELECT data_type, COUNT(*) FROM cache_entries GROUP BY data_type")
by_type = dict(cursor.fetchall())
# Calculate cache directory size
total_size = sum(f.stat().st_size for f in self.cache_dir.rglob("*") if f.is_file())
return {
"total_cache_entries": total_entries,
"entries_by_type": by_type,
"cache_size_mb": total_size / (1024 * 1024),
"cache_hits": self.stats["cache_hits"],
"cache_misses": self.stats["cache_misses"],
"hit_ratio": self.stats["cache_hits"] / max(1, self.stats["cache_hits"] + self.stats["cache_misses"]),
"api_calls_saved": self.stats["api_calls_saved"]
}
def clear_cache(self, symbol: str = None, data_type: DataType = None,
older_than_days: int = None):
"""Clear cache entries based on criteria"""
conditions = []
params = []
if symbol:
conditions.append("symbol = ?")
params.append(symbol)
if data_type:
conditions.append("data_type = ?")
params.append(data_type.value)
if older_than_days:
cutoff_date = datetime.now() - timedelta(days=older_than_days)
conditions.append("last_updated < ?")
params.append(cutoff_date.isoformat())
where_clause = " AND ".join(conditions) if conditions else "1=1"
with sqlite3.connect(self.db_path) as conn:
# Get paths of files to delete
cursor = conn.execute(f"SELECT cache_path FROM cache_entries WHERE {where_clause}", params)
paths_to_delete = [row[0] for row in cursor.fetchall()]
# Delete files
for path in paths_to_delete:
try:
Path(path).unlink(missing_ok=True)
except Exception as e:
logger.warning(f"Failed to delete cache file {path}: {e}")
# Delete database entries
cursor = conn.execute(f"DELETE FROM cache_entries WHERE {where_clause}", params)
deleted_count = cursor.rowcount
logger.info(f"Cleared {deleted_count} cache entries")
return deleted_count
# Global cache instance
_cache_instance = None
def get_cache() -> TimeSeriesCache:
"""Get or create the global cache instance"""
global _cache_instance
if _cache_instance is None:
_cache_instance = TimeSeriesCache()
return _cache_instance
# Convenience functions for different data types
def fetch_ohlcv_with_cache(symbol: str, start_date: datetime, end_date: datetime,
fetch_function, **kwargs) -> pd.DataFrame:
"""Fetch OHLCV data with caching"""
cache = get_cache()
return cache.fetch_with_cache(symbol, DataType.OHLCV, start_date, end_date, fetch_function, **kwargs)
def fetch_news_with_cache(symbol: str, start_date: datetime, end_date: datetime,
fetch_function, **kwargs) -> pd.DataFrame:
"""Fetch news data with caching"""
cache = get_cache()
return cache.fetch_with_cache(symbol, DataType.NEWS, start_date, end_date, fetch_function, **kwargs)
def fetch_fundamentals_with_cache(symbol: str, start_date: datetime, end_date: datetime,
fetch_function, **kwargs) -> pd.DataFrame:
"""Fetch fundamentals data with caching"""
cache = get_cache()
return cache.fetch_with_cache(symbol, DataType.FUNDAMENTALS, start_date, end_date, fetch_function, **kwargs)
if __name__ == "__main__":
# Example usage and testing
cache = TimeSeriesCache()
print("Cache statistics:", cache.get_cache_stats())