TradingAgents/tradingagents/dataflows/macro_regime.py

380 lines
14 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""Macro regime classifier: risk-on / transition / risk-off."""
from __future__ import annotations
from datetime import datetime
from typing import Optional
import pandas as pd
import yfinance as yf
# ---------------------------------------------------------------------------
# Signal thresholds
# ---------------------------------------------------------------------------
VIX_RISK_ON_THRESHOLD = 16.0 # VIX < 16 → risk-on
VIX_RISK_OFF_THRESHOLD = 25.0 # VIX > 25 → risk-off
REGIME_RISK_ON_THRESHOLD = 3 # score ≥ 3 → risk-on
REGIME_RISK_OFF_THRESHOLD = -3 # score ≤ -3 → risk-off
# Sector ETFs used for rotation signal
_DEFENSIVE_ETFS = ["XLU", "XLP", "XLV"] # Utilities, Staples, Health Care
_CYCLICAL_ETFS = ["XLY", "XLK", "XLI"] # Discretionary, Technology, Industrials
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _download(symbols: list[str], period: str = "3mo") -> Optional[pd.DataFrame]:
"""Download closing prices, returning None on failure."""
try:
hist = yf.download(symbols, period=period, auto_adjust=True, progress=False, threads=True)
if hist.empty:
return None
if len(symbols) == 1:
closes = hist["Close"]
if isinstance(closes, pd.DataFrame):
closes = closes.iloc[:, 0]
return closes.to_frame(name=symbols[0]).dropna()
return hist["Close"].dropna(how="all")
except Exception:
return None
def _latest(series: pd.Series) -> Optional[float]:
if series is None or series.empty:
return None
v = series.dropna()
return float(v.iloc[-1]) if len(v) > 0 else None
def _sma(series: pd.Series, window: int) -> Optional[float]:
if series is None or len(series.dropna()) < window:
return None
return float(series.dropna().rolling(window).mean().iloc[-1])
def _pct_change_n(series: pd.Series, n: int) -> Optional[float]:
s = series.dropna()
if len(s) < n + 1:
return None
base = float(s.iloc[-(n + 1)])
current = float(s.iloc[-1])
if base == 0:
return None
return (current - base) / base * 100
def _fmt_pct(val: Optional[float]) -> str:
if val is None:
return "N/A"
sign = "+" if val >= 0 else ""
return f"{sign}{val:.2f}%"
# ---------------------------------------------------------------------------
# Individual signal evaluators (each returns +1, 0, or -1)
# ---------------------------------------------------------------------------
def _signal_vix_level(vix_price: Optional[float]) -> tuple[int, str]:
"""VIX level: <16 risk-on (+1), >25 risk-off (-1), else transition (0)."""
if vix_price is None:
return 0, "VIX level: unavailable (neutral)"
if vix_price < VIX_RISK_ON_THRESHOLD:
return 1, f"VIX level: {vix_price:.1f} < {VIX_RISK_ON_THRESHOLD} → risk-on"
if vix_price > VIX_RISK_OFF_THRESHOLD:
return -1, f"VIX level: {vix_price:.1f} > {VIX_RISK_OFF_THRESHOLD} → risk-off"
return 0, f"VIX level: {vix_price:.1f} (neutral zone {VIX_RISK_ON_THRESHOLD}{VIX_RISK_OFF_THRESHOLD})"
def _signal_vix_trend(vix_series: Optional[pd.Series]) -> tuple[int, str]:
"""VIX 5-day SMA vs 20-day SMA: rising VIX = risk-off."""
if vix_series is None or len(vix_series) < 21:
return 0, "VIX trend: insufficient history (neutral)"
sma5 = _sma(vix_series, 5)
sma20 = _sma(vix_series, 20)
if sma5 is None or sma20 is None:
return 0, "VIX trend: insufficient history (neutral)"
if sma5 < sma20:
return 1, f"VIX trend: declining (SMA5={sma5:.1f} < SMA20={sma20:.1f}) → risk-on"
if sma5 > sma20:
return -1, f"VIX trend: rising (SMA5={sma5:.1f} > SMA20={sma20:.1f}) → risk-off"
return 0, f"VIX trend: flat (SMA5={sma5:.1f} ≈ SMA20={sma20:.1f}) → neutral"
def _signal_credit_spread(hyg_series: Optional[pd.Series], lqd_series: Optional[pd.Series]) -> tuple[int, str]:
"""HYG/LQD ratio: declining ratio = credit spreads widening = risk-off."""
if hyg_series is None or lqd_series is None:
return 0, "Credit spread proxy (HYG/LQD): unavailable (neutral)"
# Align on common dates
hyg = hyg_series.dropna()
lqd = lqd_series.dropna()
common = hyg.index.intersection(lqd.index)
if len(common) < 22:
return 0, "Credit spread proxy: insufficient history (neutral)"
hyg_c = hyg.loc[common]
lqd_c = lqd.loc[common]
ratio = hyg_c / lqd_c
ratio_1m = _pct_change_n(ratio, 21)
if ratio_1m is None:
return 0, "Credit spread proxy: cannot compute 1-month change (neutral)"
if ratio_1m > 0.5:
return 1, f"Credit spread (HYG/LQD) 1M: {_fmt_pct(ratio_1m)} → improving (risk-on)"
if ratio_1m < -0.5:
return -1, f"Credit spread (HYG/LQD) 1M: {_fmt_pct(ratio_1m)} → deteriorating (risk-off)"
return 0, f"Credit spread (HYG/LQD) 1M: {_fmt_pct(ratio_1m)} → stable (neutral)"
def _signal_yield_curve(tlt_series: Optional[pd.Series], shy_series: Optional[pd.Series]) -> tuple[int, str]:
"""TLT (20yr) vs SHY (1-3yr): TLT outperforming = flight to safety = risk-off."""
if tlt_series is None or shy_series is None:
return 0, "Yield curve proxy (TLT vs SHY): unavailable (neutral)"
tlt = tlt_series.dropna()
shy = shy_series.dropna()
tlt_1m = _pct_change_n(tlt, 21)
shy_1m = _pct_change_n(shy, 21)
if tlt_1m is None or shy_1m is None:
return 0, "Yield curve proxy: insufficient history (neutral)"
spread = tlt_1m - shy_1m
if spread > 1.0:
return -1, f"Yield curve: TLT {_fmt_pct(tlt_1m)} vs SHY {_fmt_pct(shy_1m)} → flight to safety (risk-off)"
if spread < -1.0:
return 1, f"Yield curve: TLT {_fmt_pct(tlt_1m)} vs SHY {_fmt_pct(shy_1m)} → risk appetite (risk-on)"
return 0, f"Yield curve: TLT {_fmt_pct(tlt_1m)} vs SHY {_fmt_pct(shy_1m)} → neutral"
def _signal_market_breadth(spx_series: Optional[pd.Series]) -> tuple[int, str]:
"""S&P 500 above/below 200-day SMA."""
if spx_series is None:
return 0, "Market breadth (SPX vs 200 SMA): unavailable (neutral)"
spx = spx_series.dropna()
sma200 = _sma(spx, 200)
current = _latest(spx)
if sma200 is None or current is None:
return 0, "Market breadth: insufficient history (neutral)"
pct_from_sma = (current - sma200) / sma200 * 100
if current > sma200:
return 1, f"Market breadth: SPX {pct_from_sma:+.1f}% above 200-SMA → risk-on"
return -1, f"Market breadth: SPX {pct_from_sma:+.1f}% below 200-SMA → risk-off"
def _signal_sector_rotation(
defensive_closes: dict[str, pd.Series],
cyclical_closes: dict[str, pd.Series],
) -> tuple[int, str]:
"""Defensive vs cyclical sector rotation over 1 month."""
def avg_return(closes_dict: dict[str, pd.Series], days: int) -> Optional[float]:
returns = []
for sym, s in closes_dict.items():
pct = _pct_change_n(s.dropna(), days)
if pct is not None:
returns.append(pct)
return sum(returns) / len(returns) if returns else None
def_ret = avg_return(defensive_closes, 21)
cyc_ret = avg_return(cyclical_closes, 21)
if def_ret is None or cyc_ret is None:
return 0, "Sector rotation: unavailable (neutral)"
spread = def_ret - cyc_ret
if spread > 1.0:
return -1, (
f"Sector rotation: defensives {_fmt_pct(def_ret)} vs cyclicals {_fmt_pct(cyc_ret)} "
f"(defensives leading → risk-off)"
)
if spread < -1.0:
return 1, (
f"Sector rotation: cyclicals {_fmt_pct(cyc_ret)} vs defensives {_fmt_pct(def_ret)} "
f"(cyclicals leading → risk-on)"
)
return 0, (
f"Sector rotation: defensives {_fmt_pct(def_ret)} vs cyclicals {_fmt_pct(cyc_ret)} → neutral"
)
# ---------------------------------------------------------------------------
# Main classifier
# ---------------------------------------------------------------------------
def classify_macro_regime(curr_date: str = None) -> dict:
"""
Classify current macro regime using 6 market signals.
Args:
curr_date: Optional reference date (informational only; always uses latest data)
Returns:
dict with keys:
regime (str): "risk-on" | "transition" | "risk-off"
score (int): Sum of signal scores (-6 to +6)
confidence (str): "high" | "medium" | "low"
signals (list[dict]): Per-signal breakdowns
summary (str): Human-readable summary
"""
signals = []
total_score = 0
# --- Download all required data ---
vix_data = _download(["^VIX"], period="3mo")
market_data = _download(["^GSPC"], period="14mo") # 14mo for 200-SMA
hyg_lqd_data = _download(["HYG", "LQD"], period="3mo")
tlt_shy_data = _download(["TLT", "SHY"], period="3mo")
sector_data = _download(_DEFENSIVE_ETFS + _CYCLICAL_ETFS, period="3mo")
# Extract series
vix_series = vix_data["^VIX"] if vix_data is not None and "^VIX" in vix_data.columns else None
spx_series = market_data["^GSPC"] if market_data is not None and "^GSPC" in market_data.columns else None
hyg_series = (hyg_lqd_data["HYG"] if hyg_lqd_data is not None and "HYG" in hyg_lqd_data.columns else None)
lqd_series = (hyg_lqd_data["LQD"] if hyg_lqd_data is not None and "LQD" in hyg_lqd_data.columns else None)
tlt_series = (tlt_shy_data["TLT"] if tlt_shy_data is not None and "TLT" in tlt_shy_data.columns else None)
shy_series = (tlt_shy_data["SHY"] if tlt_shy_data is not None and "SHY" in tlt_shy_data.columns else None)
defensive_closes: dict[str, pd.Series] = {}
cyclical_closes: dict[str, pd.Series] = {}
if sector_data is not None:
for sym in _DEFENSIVE_ETFS:
if sym in sector_data.columns:
defensive_closes[sym] = sector_data[sym]
for sym in _CYCLICAL_ETFS:
if sym in sector_data.columns:
cyclical_closes[sym] = sector_data[sym]
vix_price = _latest(vix_series)
# --- Evaluate each signal ---
evaluators = [
_signal_vix_level(vix_price),
_signal_vix_trend(vix_series),
_signal_credit_spread(hyg_series, lqd_series),
_signal_yield_curve(tlt_series, shy_series),
_signal_market_breadth(spx_series),
_signal_sector_rotation(defensive_closes, cyclical_closes),
]
signal_names = [
"vix_level", "vix_trend", "credit_spread",
"yield_curve", "market_breadth", "sector_rotation",
]
for name, (score, description) in zip(signal_names, evaluators):
signals.append({"name": name, "score": score, "description": description})
total_score += score
# --- Classify regime ---
if total_score >= REGIME_RISK_ON_THRESHOLD:
regime = "risk-on"
elif total_score <= REGIME_RISK_OFF_THRESHOLD:
regime = "risk-off"
else:
regime = "transition"
# Confidence based on how decisive the score is
abs_score = abs(total_score)
if abs_score >= 4:
confidence = "high"
elif abs_score >= 2:
confidence = "medium"
else:
confidence = "low"
risk_on_count = sum(1 for s in signals if s["score"] > 0)
risk_off_count = sum(1 for s in signals if s["score"] < 0)
neutral_count = sum(1 for s in signals if s["score"] == 0)
summary = (
f"Macro regime: **{regime.upper()}** "
f"(score {total_score:+d}/6, confidence: {confidence}). "
f"{risk_on_count} risk-on signals, {risk_off_count} risk-off signals, {neutral_count} neutral. "
f"VIX: {vix_price:.1f}" if vix_price else
f"Macro regime: **{regime.upper()}** "
f"(score {total_score:+d}/6, confidence: {confidence}). "
f"{risk_on_count} risk-on signals, {risk_off_count} risk-off signals, {neutral_count} neutral."
)
return {
"regime": regime,
"score": total_score,
"confidence": confidence,
"vix": vix_price,
"signals": signals,
"summary": summary,
}
def format_macro_report(regime_data: dict) -> str:
"""Format classify_macro_regime output as a Markdown report."""
regime = regime_data.get("regime", "unknown")
score = regime_data.get("score", 0)
confidence = regime_data.get("confidence", "unknown")
vix = regime_data.get("vix")
signals = regime_data.get("signals", [])
summary = regime_data.get("summary", "")
# Emoji-free regime indicator
regime_display = regime.upper()
lines = [
"# Macro Regime Classification",
f"# Data retrieved on: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}",
"",
f"## Regime: {regime_display}",
"",
f"| Attribute | Value |",
f"|-----------|-------|",
f"| Regime | **{regime_display}** |",
f"| Composite Score | {score:+d} / 6 |",
f"| Confidence | {confidence.title()} |",
f"| VIX | {f'{vix:.2f}' if vix is not None else 'N/A'} |",
"",
"## Signal Breakdown",
"",
"| Signal | Score | Assessment |",
"|--------|-------|------------|",
]
score_labels = {1: "+1 (risk-on)", 0: " 0 (neutral)", -1: "-1 (risk-off)"}
for sig in signals:
score_label = score_labels.get(sig["score"], str(sig["score"]))
lines.append(f"| {sig['name'].replace('_', ' ').title()} | {score_label} | {sig['description']} |")
lines += [
"",
"## Interpretation",
"",
summary,
"",
"### What This Means for Trading",
"",
]
if regime == "risk-on":
lines += [
"- **Prefer:** Growth, cyclicals, small-caps, high-beta equities",
"- **Reduce:** Defensive sectors, cash, long-duration bonds",
"- **Technicals:** Favour breakout entries; momentum strategies work well",
]
elif regime == "risk-off":
lines += [
"- **Prefer:** Defensive sectors (utilities, staples, healthcare), quality, low-beta",
"- **Reduce:** Cyclicals, high-beta names, speculative positions",
"- **Technicals:** Tighten stop-losses; favour mean-reversion over momentum",
]
else: # transition
lines += [
"- **Mixed signals:** No strong directional bias — size positions conservatively",
"- **Watch:** Upcoming catalysts (FOMC, earnings, geopolitical events) may resolve direction",
"- **Technicals:** Use wider stops; avoid overconfident entries",
]
return "\n".join(lines)