import math from decimal import Decimal from typing import Optional from tradingagents.models.backtest import BacktestMetrics, EquityCurvePoint, TradeLog class MetricsCalculator: TRADING_DAYS_PER_YEAR = 252 def __init__(self, risk_free_rate: Decimal = Decimal("0.05")): self.risk_free_rate = risk_free_rate def calculate_metrics( self, equity_curve: list[EquityCurvePoint], trade_log: TradeLog, benchmark_curve: list[EquityCurvePoint] | None = None, ) -> BacktestMetrics: if not equity_curve: raise ValueError("Equity curve cannot be empty") start_equity = equity_curve[0].equity end_equity = equity_curve[-1].equity trading_days = len(equity_curve) total_return = end_equity - start_equity total_return_percent = (total_return / start_equity) * 100 years = Decimal(trading_days) / Decimal(self.TRADING_DAYS_PER_YEAR) if years > 0: annualized_return = ((end_equity / start_equity) ** (1 / years) - 1) * 100 else: annualized_return = Decimal("0") daily_returns = self._calculate_daily_returns(equity_curve) volatility = self._calculate_volatility(daily_returns) annualized_volatility = volatility * Decimal( math.sqrt(self.TRADING_DAYS_PER_YEAR) ) downside_returns = [r for r in daily_returns if r < 0] downside_volatility = ( self._calculate_volatility(downside_returns) if downside_returns else Decimal("0") ) annualized_downside_vol = downside_volatility * Decimal( math.sqrt(self.TRADING_DAYS_PER_YEAR) ) max_dd, max_dd_pct, max_dd_duration, avg_dd = self._calculate_drawdown_metrics( equity_curve ) sharpe = self._calculate_sharpe_ratio(annualized_return, annualized_volatility) sortino = self._calculate_sortino_ratio( annualized_return, annualized_downside_vol ) calmar = self._calculate_calmar_ratio(annualized_return, max_dd_pct) benchmark_return = None benchmark_return_percent = None alpha = None beta = None information_ratio = None if benchmark_curve and len(benchmark_curve) == len(equity_curve): benchmark_return = benchmark_curve[-1].equity - benchmark_curve[0].equity benchmark_return_percent = ( benchmark_return / benchmark_curve[0].equity ) * 100 benchmark_daily = self._calculate_daily_returns(benchmark_curve) alpha, beta = self._calculate_alpha_beta(daily_returns, benchmark_daily) information_ratio = self._calculate_information_ratio( daily_returns, benchmark_daily ) all_pnls = [ t.pnl for t in trade_log.trades if t.is_closed and t.pnl is not None ] avg_trade_pnl = sum(all_pnls) / len(all_pnls) if all_pnls else None largest_win = max((p for p in all_pnls if p > 0), default=None) largest_loss = min((p for p in all_pnls if p < 0), default=None) return BacktestMetrics( total_return=total_return, total_return_percent=total_return_percent, annualized_return=annualized_return, benchmark_return=benchmark_return, benchmark_return_percent=benchmark_return_percent, alpha=alpha, beta=beta, volatility=volatility * 100, annualized_volatility=annualized_volatility * 100, downside_volatility=annualized_downside_vol * 100, sharpe_ratio=sharpe, sortino_ratio=sortino, calmar_ratio=calmar, information_ratio=information_ratio, max_drawdown=max_dd, max_drawdown_percent=max_dd_pct, max_drawdown_duration=max_dd_duration, avg_drawdown=avg_dd, total_trades=trade_log.total_trades, win_rate=trade_log.win_rate, profit_factor=trade_log.profit_factor, avg_trade_pnl=avg_trade_pnl, avg_win=trade_log.avg_win, avg_loss=trade_log.avg_loss, largest_win=largest_win, largest_loss=largest_loss, avg_holding_period_days=trade_log.avg_holding_period, trading_days=trading_days, start_equity=start_equity, end_equity=end_equity, ) def _calculate_daily_returns( self, equity_curve: list[EquityCurvePoint], ) -> list[Decimal]: returns = [] for i in range(1, len(equity_curve)): prev_equity = equity_curve[i - 1].equity curr_equity = equity_curve[i].equity if prev_equity > 0: daily_return = (curr_equity - prev_equity) / prev_equity returns.append(daily_return) return returns def _calculate_volatility(self, returns: list[Decimal]) -> Decimal: if len(returns) < 2: return Decimal("0") mean = sum(returns) / len(returns) variance = sum((r - mean) ** 2 for r in returns) / (len(returns) - 1) return Decimal(str(math.sqrt(float(variance)))) def _calculate_drawdown_metrics( self, equity_curve: list[EquityCurvePoint], ) -> tuple[Decimal, Decimal, int | None, Decimal]: if not equity_curve: return Decimal("0"), Decimal("0"), None, Decimal("0") peak = equity_curve[0].equity max_drawdown = Decimal("0") max_drawdown_percent = Decimal("0") drawdown_start = 0 max_drawdown_duration = 0 current_drawdown_start = 0 in_drawdown = False drawdowns = [] for i, point in enumerate(equity_curve): equity = point.equity if equity > peak: if in_drawdown: duration = i - current_drawdown_start max_drawdown_duration = max(max_drawdown_duration, duration) in_drawdown = False peak = equity current_drawdown_start = i else: if not in_drawdown: in_drawdown = True current_drawdown_start = i drawdown = peak - equity drawdown_pct = (drawdown / peak) * 100 if peak > 0 else Decimal("0") drawdowns.append(drawdown_pct) if drawdown > max_drawdown: max_drawdown = drawdown max_drawdown_percent = drawdown_pct point.drawdown = drawdown point.drawdown_percent = drawdown_pct if in_drawdown: duration = len(equity_curve) - current_drawdown_start max_drawdown_duration = max(max_drawdown_duration, duration) avg_drawdown = sum(drawdowns) / len(drawdowns) if drawdowns else Decimal("0") return ( max_drawdown, max_drawdown_percent, max_drawdown_duration or None, avg_drawdown, ) def _calculate_sharpe_ratio( self, annualized_return: Decimal, annualized_volatility: Decimal, ) -> Decimal | None: if annualized_volatility == 0: return None excess_return = annualized_return - (self.risk_free_rate * 100) return excess_return / annualized_volatility def _calculate_sortino_ratio( self, annualized_return: Decimal, annualized_downside_vol: Decimal, ) -> Decimal | None: if annualized_downside_vol == 0: return None excess_return = annualized_return - (self.risk_free_rate * 100) return excess_return / annualized_downside_vol def _calculate_calmar_ratio( self, annualized_return: Decimal, max_drawdown_percent: Decimal, ) -> Decimal | None: if max_drawdown_percent == 0: return None return annualized_return / max_drawdown_percent def _calculate_alpha_beta( self, returns: list[Decimal], benchmark_returns: list[Decimal], ) -> tuple[Decimal | None, Decimal | None]: if len(returns) != len(benchmark_returns) or len(returns) < 2: return None, None n = len(returns) sum_x = sum(benchmark_returns) sum_y = sum(returns) sum_xy = sum(r * b for r, b in zip(returns, benchmark_returns, strict=False)) sum_xx = sum(b * b for b in benchmark_returns) denominator = n * sum_xx - sum_x * sum_x if denominator == 0: return None, None beta = (n * sum_xy - sum_x * sum_y) / denominator alpha = (sum_y - beta * sum_x) / n alpha_annualized = alpha * self.TRADING_DAYS_PER_YEAR return alpha_annualized, beta def _calculate_information_ratio( self, returns: list[Decimal], benchmark_returns: list[Decimal], ) -> Decimal | None: if len(returns) != len(benchmark_returns) or len(returns) < 2: return None excess_returns = [ r - b for r, b in zip(returns, benchmark_returns, strict=False) ] mean_excess = sum(excess_returns) / len(excess_returns) tracking_error = self._calculate_volatility(excess_returns) if tracking_error == 0: return None annualized_tracking_error = tracking_error * Decimal( math.sqrt(self.TRADING_DAYS_PER_YEAR) ) annualized_excess = mean_excess * self.TRADING_DAYS_PER_YEAR return annualized_excess / annualized_tracking_error def calculate_rolling_metrics( self, equity_curve: list[EquityCurvePoint], window: int = 20, ) -> dict[str, list[Decimal]]: if len(equity_curve) < window: return {"rolling_sharpe": [], "rolling_volatility": []} rolling_sharpe = [] rolling_volatility = [] daily_returns = self._calculate_daily_returns(equity_curve) for i in range(window - 1, len(daily_returns)): window_returns = daily_returns[i - window + 1 : i + 1] vol = self._calculate_volatility(window_returns) annualized_vol = vol * Decimal(math.sqrt(self.TRADING_DAYS_PER_YEAR)) mean_return = sum(window_returns) / len(window_returns) annualized_return = mean_return * self.TRADING_DAYS_PER_YEAR * 100 sharpe = self._calculate_sharpe_ratio( annualized_return, annualized_vol * 100 ) rolling_sharpe.append(sharpe if sharpe else Decimal("0")) rolling_volatility.append(annualized_vol * 100) return { "rolling_sharpe": rolling_sharpe, "rolling_volatility": rolling_volatility, }