TradingAgents/tradingagents/portfolio/risk_metrics.py

328 lines
13 KiB
Python

"""Pure-Python risk metrics computation for the Portfolio Manager.
This module computes portfolio-level risk metrics from a time series of
NAV (Net Asset Value) snapshots. It is intentionally **LLM-free** — all
calculations are deterministic Python / NumPy.
Metrics computed
----------------
- **Sharpe ratio** — annualised, risk-free rate = 0
- **Sortino ratio** — like Sharpe but denominator uses downside deviation only
- **95 % VaR** — historical simulation (5th percentile of daily returns),
expressed as a *positive* fraction (e.g. 0.02 = 2 % expected max loss)
- **Max drawdown** — worst peak-to-trough decline as a fraction (negative)
- **Beta** — portfolio vs. an optional benchmark return series
- **Sector concentration** — weight per GICS sector (%) from the most-recent
snapshot's ``holdings_snapshot`` field
Usage::
from tradingagents.portfolio import compute_risk_metrics, PortfolioSnapshot
metrics = compute_risk_metrics(snapshots, benchmark_returns=spy_returns)
# {
# "sharpe": 1.23,
# "sortino": 1.87,
# "var_95": 0.018,
# "max_drawdown": -0.142,
# "beta": 0.91,
# "sector_concentration": {"Technology": 35.4, "Healthcare": 18.2, ...},
# "return_stats": {"mean_daily": 0.0008, "std_daily": 0.011, "n_days": 90},
# }
See ``docs/portfolio/00_overview.md`` — Phase 3 for the full specification.
"""
from __future__ import annotations
import heapq
import math
from typing import Any
from tradingagents.portfolio.models import PortfolioSnapshot
# ---------------------------------------------------------------------------
# Constants
# ---------------------------------------------------------------------------
TRADING_DAYS_PER_YEAR: int = 252
MIN_PERIODS_SHARPE: int = 2 # minimum data points for Sharpe / Sortino
MIN_PERIODS_VAR: int = 5 # minimum data points for VaR
MIN_PERIODS_DRAWDOWN: int = 2 # minimum data points for max drawdown
MIN_PERIODS_BETA: int = 2 # minimum data points for beta
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _daily_returns(nav_series: list[float]) -> list[float]:
"""Compute daily percentage returns from an ordered NAV series.
Returns a list one element shorter than the input. Each element is
``(nav[t] - nav[t-1]) / nav[t-1]``. Periods where the previous NAV
is zero are skipped (appended as 0.0 to avoid division by zero).
"""
returns: list[float] = []
for i in range(1, len(nav_series)):
prev = nav_series[i - 1]
if prev == 0.0:
returns.append(0.0)
else:
returns.append((nav_series[i] - prev) / prev)
return returns
def _mean(values: list[float]) -> float:
"""Arithmetic mean of a list. Raises ValueError on empty input."""
if not values:
raise ValueError("Cannot compute mean of empty list")
return sum(values) / len(values)
def _std(values: list[float], ddof: int = 1) -> float:
"""Sample standard deviation.
Args:
values: List of floats.
ddof: Degrees of freedom adjustment (1 = sample std, 0 = population).
Returns:
Standard deviation, or 0.0 when insufficient data.
"""
n = len(values)
if n <= ddof:
return 0.0
mu = _mean(values)
variance = sum((x - mu) ** 2 for x in values) / (n - ddof)
return math.sqrt(variance)
def _percentile(values: list[float], pct: float) -> float:
"""Return the *pct*-th percentile of *values* using linear interpolation.
Args:
values: Non-empty list of floats.
pct: Percentile in [0, 100].
Returns:
Interpolated percentile value.
"""
if not values:
raise ValueError("Cannot compute percentile of empty list")
n = len(values)
# Linear interpolation index
index = (pct / 100.0) * (n - 1)
lower = int(index)
upper = lower + 1
frac = index - lower
k_lower = lower + 1
k_upper = n - lower
# Small lists or k is too large to use nsmallest/nlargest efficiently -> sort
if min(k_lower, k_upper) > 50 and min(k_lower, k_upper) > n * 0.05:
sorted_vals = sorted(values)
if upper >= n:
return sorted_vals[-1]
return sorted_vals[lower] * (1.0 - frac) + sorted_vals[upper] * frac
if k_lower <= k_upper:
# Use nsmallest
k = k_lower + 1
k_smallest = heapq.nsmallest(k, values)
if upper >= n:
return k_smallest[-1]
return k_smallest[-2] * (1.0 - frac) + k_smallest[-1] * frac
else:
# Use nlargest
k = k_upper
if k == 0:
return max(values)
k_largest = heapq.nlargest(k, values)
if upper >= n:
return k_largest[-1]
return k_largest[-1] * (1.0 - frac) + k_largest[-2] * frac
# ---------------------------------------------------------------------------
# Public API
# ---------------------------------------------------------------------------
def compute_risk_metrics(
snapshots: list[PortfolioSnapshot],
benchmark_returns: list[float] | None = None,
trading_days_per_year: int = TRADING_DAYS_PER_YEAR,
) -> dict[str, Any]:
"""Compute portfolio risk metrics from a NAV time series.
Args:
snapshots: Ordered list of :class:`~tradingagents.portfolio.models.PortfolioSnapshot`
objects (oldest first). Each snapshot contributes one NAV data
point (``snapshot.total_value``). At least 2 snapshots are
required; fewer than that returns ``None`` for all rate metrics.
benchmark_returns: Optional list of daily returns for a benchmark
(e.g. SPY) aligned 1-to-1 with the *portfolio* daily returns
derived from ``snapshots``. Must be the same length as
``len(snapshots) - 1``. When provided, beta is computed.
trading_days_per_year: Number of trading days used to annualise
Sharpe and Sortino ratios. Defaults to 252.
Returns:
A dict with keys:
- ``sharpe`` (:class:`float` | ``None``) — annualised Sharpe ratio
- ``sortino`` (:class:`float` | ``None``) — annualised Sortino ratio
- ``var_95`` (:class:`float` | ``None``) — 95 % historical VaR
(positive = expected max loss as a fraction of portfolio value)
- ``max_drawdown`` (:class:`float` | ``None``) — worst peak-to-trough
as a fraction (negative value)
- ``beta`` (:class:`float` | ``None``) — portfolio beta vs. benchmark
- ``sector_concentration`` (:class:`dict[str, float]`) — sector weights
in % from the most-recent snapshot, or ``{}`` when not available
- ``return_stats`` (:class:`dict`) — summary stats:
``mean_daily``, ``std_daily``, ``n_days``
Raises:
TypeError: If any element of *snapshots* is not a ``PortfolioSnapshot``.
"""
# ------------------------------------------------------------------
# Validate input
# ------------------------------------------------------------------
for i, snap in enumerate(snapshots):
if not isinstance(snap, PortfolioSnapshot):
raise TypeError(
f"snapshots[{i}] must be a PortfolioSnapshot, got {type(snap).__name__}"
)
# ------------------------------------------------------------------
# Extract NAV series and compute daily returns
# ------------------------------------------------------------------
nav_series = [s.total_value for s in snapshots]
returns = _daily_returns(nav_series)
n_days = len(returns)
return_stats: dict[str, Any] = {
"mean_daily": _mean(returns) if returns else None,
"std_daily": _std(returns) if n_days >= 2 else None,
"n_days": n_days,
}
# Pre-compute mean once for reuse in Sharpe and Sortino
mu: float | None = _mean(returns) if n_days >= MIN_PERIODS_SHARPE else None
# ------------------------------------------------------------------
# Sharpe ratio (annualised, rf = 0)
# ------------------------------------------------------------------
sharpe: float | None = None
if mu is not None:
sigma = _std(returns)
if sigma > 0.0:
sharpe = mu / sigma * math.sqrt(trading_days_per_year)
# ------------------------------------------------------------------
# Sortino ratio (downside deviation denominator)
# ------------------------------------------------------------------
sortino: float | None = None
if mu is not None:
downside = [r for r in returns if r < 0.0]
sigma_down = _std(downside) if len(downside) >= 2 else 0.0
if sigma_down > 0.0:
sortino = mu / sigma_down * math.sqrt(trading_days_per_year)
# ------------------------------------------------------------------
# 95 % Value at Risk (historical simulation — 5th percentile)
# ------------------------------------------------------------------
var_95: float | None = None
if n_days >= MIN_PERIODS_VAR:
# 5th-percentile return (worst end of distribution)
fifth_pct = _percentile(returns, 5.0)
# Express as a *positive* loss fraction
var_95 = -fifth_pct if fifth_pct < 0.0 else 0.0
# ------------------------------------------------------------------
# Max drawdown (peak-to-trough over the full window)
# ------------------------------------------------------------------
max_drawdown: float | None = None
if len(nav_series) >= MIN_PERIODS_DRAWDOWN:
peak = nav_series[0]
worst = 0.0
for nav in nav_series[1:]:
if nav > peak:
peak = nav
if peak > 0.0:
drawdown = (nav - peak) / peak
if drawdown < worst:
worst = drawdown
max_drawdown = worst # 0.0 when no drawdown occurred
# ------------------------------------------------------------------
# Beta (vs. benchmark, when provided)
# ------------------------------------------------------------------
beta: float | None = None
if benchmark_returns is not None and len(benchmark_returns) >= MIN_PERIODS_BETA:
# Align lengths
min_len = min(len(returns), len(benchmark_returns))
r_p = returns[-min_len:]
r_b = benchmark_returns[-min_len:]
if min_len >= MIN_PERIODS_BETA:
mu_p = _mean(r_p)
mu_b = _mean(r_b)
covariance = sum(
(r_p[i] - mu_p) * (r_b[i] - mu_b) for i in range(min_len)
) / (min_len - 1)
var_b = _std(r_b) ** 2
if var_b > 0.0:
beta = covariance / var_b
# ------------------------------------------------------------------
# Sector concentration (from last snapshot's holdings_snapshot)
# ------------------------------------------------------------------
sector_concentration: dict[str, float] = {}
if snapshots:
last_snap = snapshots[-1]
holdings = last_snap.holdings_snapshot or []
total_value = last_snap.total_value
if holdings and total_value and total_value > 0.0:
sector_totals: dict[str, float] = {}
for h in holdings:
sector = h.get("sector") or "Unknown"
shares = float(h.get("shares", 0.0))
# Use current_value if available; fall back to shares * avg_cost
current_value = h.get("current_value")
if current_value is not None:
value = float(current_value)
else:
avg_cost = float(h.get("avg_cost", 0.0))
value = shares * avg_cost
sector_totals[sector] = sector_totals.get(sector, 0.0) + value
sector_concentration = {
sector: round(total / total_value * 100.0, 2)
for sector, total in sector_totals.items()
}
return {
"sharpe": round(sharpe, 4) if sharpe is not None else None,
"sortino": round(sortino, 4) if sortino is not None else None,
"var_95": round(var_95, 6) if var_95 is not None else None,
"max_drawdown": round(max_drawdown, 6) if max_drawdown is not None else None,
"beta": round(beta, 4) if beta is not None else None,
"sector_concentration": sector_concentration,
"return_stats": {
"mean_daily": round(return_stats["mean_daily"], 6)
if return_stats["mean_daily"] is not None
else None,
"std_daily": round(return_stats["std_daily"], 6)
if return_stats["std_daily"] is not None
else None,
"n_days": n_days,
},
}