import backtrader as bt import pandas as pd import yfinance as yf import json import os import shutil from datetime import datetime, timedelta from tradingagents.graph.trading_graph import TradingAgentsGraph from tradingagents.default_config import DEFAULT_CONFIG from dotenv import load_dotenv # Load environment variables load_dotenv() class TradingAgentsStrategy(bt.Strategy): """Strategy that uses TradingAgents for decision making""" def __init__(self, trading_agent, ticker, backtest_config): self.trading_agent = trading_agent self.ticker = ticker self.backtest_config = backtest_config self.decisions = {} self.trade_count = 0 self.data_checks = {} def next(self): # Get current date current_date = self.datas[0].datetime.date(0) date_str = current_date.strftime("%Y-%m-%d") # Verify data range to avoid look-ahead bias self.verify_data_range(date_str) # Get decision from TradingAgents if date_str not in self.decisions: print(f"Processing date: {date_str}") try: _, decision = self.trading_agent.propagate(self.ticker, date_str) self.decisions[date_str] = decision print(f"Decision: {decision}") except Exception as e: print(f"Error getting decision for {date_str}: {e}") self.decisions[date_str] = "HOLD" decision = self.decisions[date_str] # Execute trade based on decision if decision == "BUY" and not self.position: # Buy with 100% of available cash size = int(self.broker.getcash() / self.data.close[0]) if size > 0: self.buy(size=size) self.trade_count += 1 print(f"BUY {self.ticker} on {date_str} at ${self.data.close[0]:.2f}") elif decision == "SELL" and self.position: # Sell all positions self.sell(size=self.position.size) self.trade_count += 1 print(f"SELL {self.ticker} on {date_str} at ${self.data.close[0]:.2f}") def verify_data_range(self, date_str): """Verify that data range is correct to avoid look-ahead bias""" current_date = datetime.strptime(date_str, "%Y-%m-%d") # Check if we already verified this date if date_str in self.data_checks: return # Verify data feed doesn't contain future data data_end_date = self.datas[0].datetime.date(-1) if data_end_date > current_date: print(f"⚠️ Warning: Data feed contains future data beyond {date_str}") self.data_checks[date_str] = True def clean_cache(): """Clean cache to avoid look-ahead bias""" print("\n=== Cleaning cache to avoid look-ahead bias ===") # Clean yfinance cache yfinance_cache = "yfinance_cache" if os.path.exists(yfinance_cache): shutil.rmtree(yfinance_cache) print(f"✓ Cleaned yfinance cache: {yfinance_cache}") # Clean dataflows cache dataflows_cache = "dataflows/data_cache" if os.path.exists(dataflows_cache): shutil.rmtree(dataflows_cache) print(f"✓ Cleaned dataflows cache: {dataflows_cache}") # Clean backtest results (optional) # backtest_results = "backtest_results" # if os.path.exists(backtest_results): # shutil.rmtree(backtest_results) # print(f"✓ Cleaned backtest results: {backtest_results}") def run_backtest(ticker, start_date, end_date, initial_cash=100000, clean_cache_flag=True): """Run backtest for a given ticker and date range""" # Clean cache to avoid look-ahead bias if clean_cache_flag: clean_cache() # Create Cerebro engine cerebro = bt.Cerebro() # Set initial cash cerebro.broker.setcash(initial_cash) # Add strategy config = DEFAULT_CONFIG.copy() config["llm_provider"] = "openrouter" config["deep_think_llm"] = "deepseek/deepseek-chat" config["quick_think_llm"] = "openai/gpt-4o-mini" config["max_debate_rounds"] = 2 # Verify date range start_dt = datetime.strptime(start_date, "%Y-%m-%d") end_dt = datetime.strptime(end_date, "%Y-%m-%d") if start_dt >= end_dt: raise ValueError("Start date must be before end date") if end_dt > datetime.now(): raise ValueError("End date cannot be in the future") trading_agent = TradingAgentsGraph(debug=False, config=config) backtest_config = { "ticker": ticker, "start_date": start_date, "end_date": end_date, "initial_cash": initial_cash, "clean_cache": clean_cache_flag } cerebro.addstrategy(TradingAgentsStrategy, trading_agent=trading_agent, ticker=ticker, backtest_config=backtest_config) # Get historical data from yfinance print("\n=== Fetching historical data ===") data = yf.download(ticker, start=start_date, end=end_date) # Verify data quality if data.empty: raise ValueError(f"No data found for {ticker} between {start_date} and {end_date}") print(f"✓ Data fetched: {len(data)} trading days") print(f"✓ Date range: {data.index.min().date()} to {data.index.max().date()}") # Convert to backtrader data feed data_feed = bt.feeds.PandasData( dataname=data, datetime=0, high=1, low=2, open=3, close=4, volume=5, openinterest=-1 ) # Add data feed to cerebro cerebro.adddata(data_feed, name=ticker) # Add analyzers cerebro.addanalyzer(bt.analyzers.SharpeRatio, _name='sharpe') cerebro.addanalyzer(bt.analyzers.DrawDown, _name='drawdown') cerebro.addanalyzer(bt.analyzers.TradeAnalyzer, _name='trades') cerebro.addanalyzer(bt.analyzers.AnnualReturn, _name='annual') cerebro.addanalyzer(bt.analyzers.Returns, _name='returns') cerebro.addanalyzer(bt.analyzers.PositionsValue, _name='positions') # Run backtest print(f"\n=== Starting Backtest for {ticker} ===") print(f"Date range: {start_date} to {end_date}") print(f"Initial cash: ${initial_cash:.2f}") print(f"LLM Provider: {config['llm_provider']}") print(f"Models: Deep={config['deep_think_llm']}, Quick={config['quick_think_llm']}") results = cerebro.run() # Get results strategy = results[0] final_value = cerebro.broker.getvalue() total_return = ((final_value - initial_cash) / initial_cash) * 100 # Get analyzer results sharpe = strategy.analyzers.sharpe.get_analysis() drawdown = strategy.analyzers.drawdown.get_analysis() trades = strategy.analyzers.trades.get_analysis() annual = strategy.analyzers.annual.get_analysis() returns = strategy.analyzers.returns.get_analysis() # Calculate additional metrics total_trades = trades.get('total', {}).get('total', 0) won_trades = trades.get('won', {}).get('total', 0) win_rate = won_trades / max(total_trades, 1) * 100 # Print results print(f"\n=== Backtest Results ===") print(f"Final portfolio value: ${final_value:.2f}") print(f"Total return: {total_return:.2f}%") print(f"Daily return: {returns.get('rnorm', 0) * 100:.4f}%") print(f"Sharpe Ratio: {sharpe.get('sharperatio', 'N/A'):.2f}") print(f"Max Drawdown: {drawdown.get('max', {}).get('drawdown', 'N/A'):.2f}%") print(f"Total trades: {total_trades}") print(f"Win rate: {win_rate:.2f}%") print(f"Average trade duration: {trades.get('len', {}).get('average', 'N/A'):.1f} days") # Save results save_results(ticker, start_date, end_date, { "initial_cash": initial_cash, "final_value": final_value, "total_return": total_return, "daily_return": returns.get('rnorm', 0), "sharpe_ratio": sharpe.get('sharperatio', None), "max_drawdown": drawdown.get('max', {}).get('drawdown', None), "total_trades": total_trades, "won_trades": won_trades, "win_rate": win_rate, "average_trade_duration": trades.get('len', {}).get('average', None), "decisions": strategy.decisions, "config": backtest_config }) # Plot results print("\n=== Generating backtest chart ===") cerebro.plot(style='candlestick') def save_results(ticker, start_date, end_date, results): """Save backtest results to file""" results_dir = f"backtest_results/{ticker}/" os.makedirs(results_dir, exist_ok=True) filename = f"backtest_{start_date}_{end_date}.json" filepath = os.path.join(results_dir, filename) with open(filepath, "w", encoding="utf-8") as f: json.dump(results, f, indent=4, default=str) print(f"Results saved to: {filepath}") def run_multiple_backtests(ticker_list, start_date, end_date, initial_cash=100000): """Run backtests for multiple tickers""" all_results = {} for ticker in ticker_list: print(f"\n{'='*60}") print(f"Running backtest for {ticker}") print(f"{'='*60}") try: # Run backtest without cleaning cache for subsequent tickers clean_cache_flag = (ticker == ticker_list[0]) run_backtest(ticker, start_date, end_date, initial_cash, clean_cache_flag) except Exception as e: print(f"Error running backtest for {ticker}: {e}") all_results[ticker] = {"error": str(e)} return all_results if __name__ == "__main__": # Define parameters ticker = "NVDA" start_date = "2024-01-01" end_date = "2024-03-29" initial_cash = 100000 # Run backtest run_backtest(ticker, start_date, end_date, initial_cash) # Example: Run multiple backtests # tickers = ["NVDA", "AAPL", "MSFT"] # run_multiple_backtests(tickers, start_date, end_date, initial_cash)