feat: add regime awareness, smart-money tracking, theme substitution & position replacement

- MacroRegimeOutput: risk_appetite, liquidity_regime, regime_score_adjustment (-2 to +2)
- InstitutionalFlowOutput: 13F holders, insider transactions, short interest trend, smart_money_signal
- Scoring node applies regime adjustment to master score
- Theme Substitution Engine: identifies best expression of theme, ranks peers, flags overlap
- Position Replacement Agent: compares candidate to theme alternatives, flags replacements
- Pipeline: Scoring → Portfolio Analysis → Debate → Decision
- Final decision narrative includes theme context and replacement flags

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
dtarkent2-sys 2026-03-09 21:46:03 +00:00
parent 7ad9e1d1ce
commit ee80a42971
12 changed files with 521 additions and 19 deletions

20
app.py
View File

@ -103,6 +103,8 @@ FIELD_AGENT_MAP = {
"crowding": ("Narrative Crowding", "tier2"),
"archetype": ("Archetype", "scoring"),
"master_score": ("Master Score", "scoring"),
"theme_substitution": ("Theme Substitution", "portfolio"),
"position_replacement": ("Position Replacement", "portfolio"),
"bull_case": ("Bull Researcher", "debate"),
"bear_case": ("Bear Researcher", "debate"),
"debate": ("Debate Referee", "debate"),
@ -111,7 +113,7 @@ FIELD_AGENT_MAP = {
}
ALL_AGENTS = [name for name, _ in FIELD_AGENT_MAP.values()]
ALL_STAGES = ["validation", "tier1", "tier2", "scoring", "debate", "decision"]
ALL_STAGES = ["validation", "tier1", "tier2", "scoring", "portfolio", "debate", "decision"]
# ---------------------------------------------------------------------------
@ -317,6 +319,22 @@ async def _update_in_progress(chunk, emitted, statuses, state, q, start_time):
state["events"].append(evt)
await q.put(evt)
# If scoring done, mark portfolio analysis in_progress
if "master_score" in emitted:
for field in ("theme_substitution", "position_replacement"):
if field not in emitted and statuses.get(field) == "pending":
statuses[field] = "in_progress"
agent_name, stage = FIELD_AGENT_MAP[field]
evt = {
"type": "agent_update",
"agent": agent_name,
"stage": stage,
"status": "in_progress",
"stats": _stats(start_time, emitted),
}
state["events"].append(evt)
await q.put(evt)
def _stats(start_time: float, emitted_fields: set) -> dict:
return {

View File

@ -24,6 +24,10 @@ from .tier3 import (
create_final_decision_node,
)
from .scoring import create_scoring_node
from .portfolio import (
create_theme_substitution_node,
create_position_replacement_node,
)
__all__ = [
"create_validation_node",
@ -44,4 +48,6 @@ __all__ = [
"create_risk_node",
"create_final_decision_node",
"create_scoring_node",
"create_theme_substitution_node",
"create_position_replacement_node",
]

View File

@ -0,0 +1,244 @@
"""Portfolio-level agents: Theme Substitution Engine, Position Replacement Agent.
These run after scoring, before the debate phase. They use the deep-thinking LLM
to evaluate the stock in context is it the best expression of its theme? Should
it replace an existing holding?
"""
from __future__ import annotations
import json
import logging
from typing import Any, Dict, List
import yfinance as yf
from tradingagents.models import (
PositionReplacementOutput,
ThemeStock,
ThemeSubstitutionOutput,
invoke_structured,
)
logger = logging.getLogger(__name__)
def _fetch_peer_basics(tickers: List[str]) -> List[dict]:
"""Fetch basic yfinance data for a list of peer tickers."""
peers = []
for sym in tickers[:8]: # cap at 8 to keep prompt manageable
try:
info = yf.Ticker(sym.upper()).info or {}
peers.append({
"ticker": sym.upper(),
"company_name": info.get("longName") or info.get("shortName") or sym,
"market_cap": info.get("marketCap"),
"current_price": info.get("currentPrice") or info.get("regularMarketPrice"),
"trailing_pe": info.get("trailingPE"),
"forward_pe": info.get("forwardPE"),
"revenue_growth": info.get("revenueGrowth"),
"profit_margins": info.get("profitMargins"),
"return_on_equity": info.get("returnOnEquity"),
"52w_range_pct": _range_pct(info),
})
except Exception:
peers.append({"ticker": sym.upper(), "error": "fetch failed"})
return peers
def _range_pct(info: dict) -> float | None:
hi = info.get("fiftyTwoWeekHigh")
lo = info.get("fiftyTwoWeekLow")
price = info.get("currentPrice") or info.get("regularMarketPrice")
if hi and lo and price and (hi - lo) > 0:
return round((price - lo) / (hi - lo) * 100, 1)
return None
def _summarize_for_theme(state: Dict[str, Any]) -> str:
"""Compact summary of the candidate stock for theme comparison."""
card = state.get("company_card") or {}
macro = state.get("macro") or {}
bq = state.get("business_quality") or {}
inst = state.get("institutional_flow") or {}
val = state.get("valuation") or {}
er = state.get("earnings_revisions") or {}
arch = state.get("archetype") or {}
return "\n".join([
f"Ticker: {card.get('ticker', '?')} | {card.get('company_name', '?')}",
f"Sector: {card.get('sector', '?')} | Industry: {card.get('industry', '?')}",
f"Market Cap: {card.get('market_cap_formatted', 'N/A')}",
f"Archetype: {arch.get('archetype', 'N/A')}",
f"Master Score: {state.get('master_score', 'N/A')}",
f"Adjusted Score: {state.get('adjusted_score', 'N/A')}",
f"Position Role: {state.get('position_role', 'N/A')}",
f"Macro Regime: {macro.get('regime_label', '?')} | Risk: {macro.get('risk_appetite', '?')} | Liq: {macro.get('liquidity_regime', '?')}",
f"Business Quality: {bq.get('score_0_to_10', 'N/A')} | Moat: {bq.get('competitive_moat', '?')}",
f"Inst Flow: {inst.get('score_0_to_10', 'N/A')} | Smart Money: {inst.get('smart_money_signal', '?')}",
f"Valuation: {val.get('score_0_to_10', 'N/A')} | Verdict: {val.get('valuation_verdict', '?')}",
f"Earnings Rev: {er.get('score_0_to_10', 'N/A')} | Direction: {er.get('eps_revision_direction', '?')}",
])
# ---------------------------------------------------------------------------
# Theme Substitution Engine
# ---------------------------------------------------------------------------
def create_theme_substitution_node(llm):
"""Identifies whether the stock is the best expression of its theme."""
def node(state: Dict[str, Any]) -> Dict[str, Any]:
ticker = state["ticker"]
card = state.get("company_card") or {}
summary = _summarize_for_theme(state)
master_score = state.get("master_score", 0)
# Use yfinance to find peers in the same industry
try:
t = yf.Ticker(ticker.upper())
info = t.info or {}
industry = info.get("industry", "")
sector = info.get("sector", "")
except Exception:
industry = card.get("industry", "")
sector = card.get("sector", "")
# Fetch peer data for comparison
# First, ask LLM to identify theme peers, then we'll fetch their data
theme_prompt = f"""You are a Theme Substitution Analyst. Your job: determine if {ticker} is the BEST
expression of its investment theme, or if better alternatives exist.
CANDIDATE STOCK:
{summary}
INSTRUCTIONS do this in order:
1. IDENTIFY THE THEME: What macro/sector theme does {ticker} express?
Examples: "AI infrastructure buildout", "GLP-1 obesity drugs", "defense spending ramp",
"EV supply chain", "cloud migration", "reshoring/nearshoring".
Name it clearly in theme_name.
2. LIST THEME PEERS: Name 3-6 other publicly traded stocks that express the SAME theme.
These should be the strongest competitors for capital allocation in this theme.
For each peer, estimate a master_score_estimate (0-10) based on your knowledge of
their fundamentals, momentum, and positioning vs {ticker}.
3. RANK WITHIN THEME: Rank all stocks (including {ticker}) by investment quality.
The stock with the best combination of: business quality, valuation, momentum,
and institutional positioning should rank #1.
4. DETERMINE BEST EXPRESSION:
- Set best_expression_of_theme=true if {ticker} is rank #1 or close (#1-2).
- Set best_expression_of_theme=false if clearly better alternatives exist.
- List stronger_alternatives (tickers that rank above {ticker}).
- Set relative_score_gap: how many score points {ticker} trails the best alternative
(0 if {ticker} is best, positive number if it trails).
5. PORTFOLIO OVERLAP: Flag if {ticker} has high correlation with common holdings.
Set portfolio_overlap_warning if this stock would add redundant exposure.
Be honest and rigorous. A stock can score well absolutely but still not be the best
way to express its theme."""
try:
result = invoke_structured(llm, ThemeSubstitutionOutput, theme_prompt)
except Exception as e:
logger.warning("ThemeSubstitution LLM failed: %s", e)
result = ThemeSubstitutionOutput(
theme_name="Unknown",
best_expression_of_theme=True,
reasoning="Theme analysis unavailable",
)
return {"theme_substitution": result.model_dump()}
return node
# ---------------------------------------------------------------------------
# Position Replacement Agent
# ---------------------------------------------------------------------------
def create_position_replacement_node(llm):
"""Identifies when a new stock is a better use of capital than alternatives."""
def node(state: Dict[str, Any]) -> Dict[str, Any]:
ticker = state["ticker"]
summary = _summarize_for_theme(state)
master_score = state.get("master_score", 0)
theme = state.get("theme_substitution") or {}
# Get the strongest alternative from theme analysis
stronger = theme.get("stronger_alternatives", [])
theme_stocks = theme.get("theme_stocks_ranked", [])
theme_name = theme.get("theme_name", "Unknown")
# If no stronger alternatives, this IS the best — skip deep comparison
if not stronger and theme.get("best_expression_of_theme", True):
result = PositionReplacementOutput(
replace_candidate=ticker,
replace_with="",
score_difference=0.0,
theme_overlap=theme_name,
replacement_reason=f"{ticker} is the best expression of the '{theme_name}' theme.",
conviction_level="high",
should_replace=False,
)
return {"position_replacement": result.model_dump()}
# Format theme peers for comparison
peer_lines = []
for ts in theme_stocks[:6]:
if isinstance(ts, dict):
peer_lines.append(
f" {ts.get('ticker', '?')}: est. score {ts.get('master_score_estimate', '?')}/10 "
f"— advantage: {ts.get('key_advantage', 'N/A')}, weakness: {ts.get('key_weakness', 'N/A')}"
)
prompt = f"""You are a Position Replacement Analyst. Determine if {ticker} should be replaced
by a stronger alternative in the same theme.
CANDIDATE STOCK:
{summary}
THEME: {theme_name}
Best expression: {'Yes' if theme.get('best_expression_of_theme') else 'No'}
Score gap vs best: {theme.get('relative_score_gap', 0):.1f}
THEME PEERS:
{chr(10).join(peer_lines) or 'No peers available'}
STRONGER ALTERNATIVES: {', '.join(stronger) if stronger else 'None'}
INSTRUCTIONS:
1. Compare {ticker} to the strongest alternative in the theme.
2. Assess on these dimensions: master score, earnings revisions, institutional flow,
risk profile, valuation, entry timing.
3. Set replace_with to the best alternative ticker (empty if none).
4. Set score_difference: how much better the replacement is (positive = replacement is stronger).
5. Set conviction_level: high / medium / low.
- high: replacement is clearly better on 3+ dimensions.
- medium: replacement is better on 1-2 dimensions, mixed on others.
- low: marginal difference, keep current.
6. Set should_replace=true only if conviction_level is high.
7. List what the replacement is stronger_on and weaker_on vs {ticker}.
Be conservative. Don't recommend replacement for marginal differences."""
try:
result = invoke_structured(llm, PositionReplacementOutput, prompt)
except Exception as e:
logger.warning("PositionReplacement LLM failed: %s", e)
result = PositionReplacementOutput(
replace_candidate=ticker,
should_replace=False,
replacement_reason="Position replacement analysis unavailable",
)
result.replace_candidate = ticker
result.theme_overlap = theme_name
return {"position_replacement": result.model_dump()}
return node

View File

@ -30,7 +30,13 @@ def create_scoring_node():
bl = (state.get("backlog") or {}).get("score_0_to_10", 5.0)
cr = (state.get("crowding") or {}).get("score_0_to_10", 5.0)
master = compute_master_score(bq, macro, inst, val, et, er, bl, cr)
# Regime adjustment from macro agent
regime_adj = (state.get("macro") or {}).get("regime_score_adjustment", 0.0)
master = compute_master_score(
bq, macro, inst, val, et, er, bl, cr,
regime_adjustment=regime_adj,
)
# Collect all data quality flags
all_flags = []

View File

@ -181,12 +181,24 @@ SECTOR PERFORMANCE (1M):
{chr(10).join(sector_lines[:12]) or 'N/A'}
INSTRUCTIONS:
1. Classify the macro regime (risk-on / risk-off / transitional).
2. Score macro_alignment_0_to_10: how well the current macro supports {ticker} specifically.
- 0 = macro is hostile to this stock; 10 = macro tailwinds are perfect.
3. Also provide the standard score_0_to_10 (overall macro health).
4. List key positives, negatives, risks for the macro environment.
5. Be concise. One sentence summary."""
1. Classify risk_appetite: "risk-on" / "risk-off" / "transitional".
- risk-on: VIX low, spreads tight, SPY up, breadth strong.
- risk-off: VIX elevated, spreads widening, SPY down, flight to safety.
- transitional: mixed signals.
2. Classify liquidity_regime: "expansion" / "contraction" / "neutral".
- expansion: falling yields, dovish Fed, credit flowing, dollar weakening.
- contraction: rising yields, hawkish Fed, tight credit, dollar strengthening.
3. Set regime_score_adjustment (-2 to +2):
- +2 = strong macro tailwind for this specific stock/sector.
- +1 = mild tailwind.
- 0 = neutral.
- -1 = mild headwind.
- -2 = severe macro headwind (risk-off + contraction + hostile sector).
This adjustment directly modifies the master score for ALL stocks.
4. Score macro_alignment_0_to_10: how well macro supports {ticker} specifically.
5. Also provide score_0_to_10 (overall macro health).
6. Set regime_label: descriptive label (e.g., "Late Cycle Risk-Off").
7. List key positives, negatives, risks. Be concise."""
try:
result = invoke_structured(llm, MacroRegimeOutput, prompt)

View File

@ -115,11 +115,21 @@ def create_institutional_flow_node(llm):
except Exception:
data = {}
# Format top holders for prompt
holders = data.get("top_institutional_holders", [])
holder_lines = []
for h in holders[:5]:
pct = h.get("pct_out")
holder_lines.append(
f" {h.get('holder', '?')}: {pct:.1f}%" if pct else f" {h.get('holder', '?')}"
)
prompt = f"""You are an Institutional Flow Analyst in a structured equity ranking pipeline.
Your job: track real smart-money movement not just static ownership percentages.
Ticker: {ticker}
FLOW DATA:
OWNERSHIP & VOLUME:
- Institutional Ownership: {data.get('held_percent_institutions', 'N/A')}%
- Insider Ownership: {data.get('held_percent_insiders', 'N/A')}%
- Volume Ratio (10d/avg): {data.get('volume_ratio', 'N/A')}
@ -127,11 +137,32 @@ FLOW DATA:
- Short Ratio (days): {data.get('short_ratio', 'N/A')}
- Float Turnover 5d: {data.get('float_turnover_5d_pct', 'N/A')}%
SHORT INTEREST TREND:
- Short Interest Change (vs prior month): {data.get('short_interest_change_pct', 'N/A')}%
- Short Interest Trend: {data.get('short_interest_trend', 'N/A')}
TOP INSTITUTIONAL HOLDERS (13F):
{chr(10).join(holder_lines) or ' No data available'}
- Total top holders tracked: {data.get('top_holders_count', 'N/A')}
INSIDER TRANSACTIONS (recent):
- Insider Buys: {data.get('insider_buys_recent', 'N/A')}
- Insider Sells: {data.get('insider_sells_recent', 'N/A')}
- Insider Signal: {data.get('insider_transaction_signal', 'N/A')}
INSTRUCTIONS:
1. Score institutional flow signal 0-10.
High ownership + rising volume + low short interest = bullish.
1. Score institutional flow signal 0-10 (this has 15% weight make it count).
High ownership + rising volume + low short interest + insider buying = bullish.
2. Classify accumulation_signal: accumulating / distributing / neutral.
3. This score has 15% weight in the master score make it count."""
3. Classify top_holders_change: increasing / decreasing / stable.
(Based on holder concentration and any visible 13F patterns.)
4. Classify fund_accumulation_pattern: accumulating / distributing / holding.
(Volume + ownership trends suggest funds are adding or reducing.)
5. Classify short_interest_trend: rising / falling / stable.
6. Classify insider_transaction_signal: buying / selling / none.
7. Classify smart_money_signal: bullish / bearish / neutral.
(Synthesize all signals: 13F, insiders, short interest, volume.)
8. Be concise."""
try:
result = invoke_structured(llm, InstitutionalFlowOutput, prompt)
@ -142,12 +173,18 @@ INSTRUCTIONS:
summary_1_sentence="Institutional flow analysis unavailable",
)
# Override with actual fetched data
result.institutional_ownership_pct = data.get("held_percent_institutions")
result.insider_ownership_pct = data.get("held_percent_insiders")
result.volume_ratio = data.get("volume_ratio")
result.short_interest_pct = data.get("short_pct_of_float")
result.short_ratio = data.get("short_ratio")
result.float_turnover_pct = data.get("float_turnover_5d_pct")
# Override trend fields with actual data when available
if data.get("short_interest_trend"):
result.short_interest_trend = data["short_interest_trend"]
if data.get("insider_transaction_signal"):
result.insider_transaction_signal = data["insider_transaction_signal"]
flags = [f.model_dump() for f in result.data_quality_flags]
return {"institutional_flow": result.model_dump(), "global_flags": flags}

View File

@ -60,8 +60,12 @@ def _summarize_tier2(state: Dict[str, Any]) -> str:
f" Liquidity: {liq.get('score_0_to_10', 'N/A')}{liq.get('summary_1_sentence', '')}",
"",
f" Macro Regime: {macro.get('regime_label', '?')} | VIX: {macro.get('vix_level', '?')}",
f" Risk Appetite: {macro.get('risk_appetite', '?')} | Liquidity Regime: {macro.get('liquidity_regime', '?')}",
f" Regime Score Adjustment: {macro.get('regime_score_adjustment', 0):+.1f}",
f" Moat: {bq.get('competitive_moat', '?')} | Valuation: {val.get('valuation_verdict', '?')}",
f" Accumulation: {inst.get('accumulation_signal', '?')} | Timing: {et.get('timing_verdict', '?')}",
f" Smart Money: {inst.get('smart_money_signal', '?')} | Accumulation: {inst.get('accumulation_signal', '?')}",
f" Short Trend: {inst.get('short_interest_trend', '?')} | Insider Signal: {inst.get('insider_transaction_signal', '?')}",
f" Timing: {et.get('timing_verdict', '?')}",
]
return "\n".join(lines)
@ -262,6 +266,8 @@ def create_final_decision_node(llm):
bear = state.get("bear_case") or {}
debate = state.get("debate") or {}
risk = state.get("risk") or {}
theme = state.get("theme_substitution") or {}
replacement = state.get("position_replacement") or {}
master_score = state.get("master_score", 0)
adjusted_score = state.get("adjusted_score", 0)
@ -284,12 +290,29 @@ def create_final_decision_node(llm):
else:
action = "AVOID"
# Theme/replacement context
theme_lines = ""
if theme.get("theme_name"):
theme_lines = (
f"\nTHEME CONTEXT:"
f"\n Theme: {theme.get('theme_name', '?')}"
f"\n Best expression: {'Yes' if theme.get('best_expression_of_theme') else 'No'}"
f"\n Stronger alternatives: {', '.join(theme.get('stronger_alternatives', [])) or 'None'}"
f"\n Score gap vs best: {theme.get('relative_score_gap', 0):.1f}"
)
if replacement.get("should_replace"):
theme_lines += (
f"\n REPLACEMENT FLAG: Consider {replacement.get('replace_with', '?')} instead"
f"\n Reason: {replacement.get('replacement_reason', '')}"
)
prompt = f"""You are the Final Decision Synthesizer for {ticker}.
{summary}
DEBATE: {debate.get('winner', '?')} won | Conviction adjustment: {conviction_adj:+.1f}
RISK: {risk.get('overall_risk_level', '?')} | Max position: {risk.get('max_position_size_pct', '?')}%
{theme_lines}
FINAL SCORES:
Master Score: {master_score}
@ -304,6 +327,8 @@ Write a concise narrative (3-5 sentences) that:
2. Highlights the top 2-3 catalysts and top 2-3 risks.
3. States the action ({action}) and position role ({final_role}).
4. Notes what would change the thesis (invalidation triggers).
5. If theme analysis found stronger alternatives, mention them and whether
this stock is still the best expression of the theme.
Also provide:
- thesis_summary (one sentence)

View File

@ -51,6 +51,10 @@ class PipelineState(TypedDict):
adjusted_score: Optional[float]
position_role: Optional[str]
# Portfolio-level
theme_substitution: Optional[dict]
position_replacement: Optional[dict]
# Tier 3
bull_case: Optional[dict]
bear_case: Optional[dict]

View File

@ -558,19 +558,89 @@ def get_sector_rotation(ticker, curr_date=None):
def get_institutional_flow(ticker):
"""Get institutional flow data via yfinance (plain function for interface routing)."""
"""Get institutional flow data via yfinance including 13F holders and insider transactions."""
try:
t = yf.Ticker(ticker.upper())
info = t.info
return _json.dumps({
info = t.info or {}
# Base metrics
result = {
"ticker": ticker.upper(),
"average_volume": _safe_get_yf(info, "averageVolume"),
"average_volume_10d": _safe_get_yf(info, "averageVolume10days"),
"float_shares": _safe_get_yf(info, "floatShares"),
"shares_short": _safe_get_yf(info, "sharesShort"),
"shares_short_prior": _safe_get_yf(info, "sharesShortPriorMonth"),
"short_ratio": _safe_get_yf(info, "shortRatio"),
"held_percent_institutions": _safe_get_yf(info, "heldPercentInstitutions"),
}, default=str)
"held_percent_insiders": _safe_get_yf(info, "heldPercentInsiders"),
}
# Volume ratio (10d vs avg)
vol_10d = _safe_get_yf(info, "averageVolume10days")
vol_avg = _safe_get_yf(info, "averageVolume")
if vol_10d and vol_avg and vol_avg > 0:
result["volume_ratio"] = round(vol_10d / vol_avg, 2)
# Short % of float
float_shares = _safe_get_yf(info, "floatShares")
shares_short = _safe_get_yf(info, "sharesShort")
if float_shares and shares_short and float_shares > 0:
result["short_pct_of_float"] = round(shares_short / float_shares * 100, 2)
# Short interest trend (current vs prior month)
prior = _safe_get_yf(info, "sharesShortPriorMonth")
if shares_short is not None and prior is not None and prior > 0:
pct_change = (shares_short - prior) / prior * 100
result["short_interest_change_pct"] = round(pct_change, 1)
if pct_change > 5:
result["short_interest_trend"] = "rising"
elif pct_change < -5:
result["short_interest_trend"] = "falling"
else:
result["short_interest_trend"] = "stable"
# Float turnover (5d volume / float)
if vol_10d and float_shares and float_shares > 0:
result["float_turnover_5d_pct"] = round(vol_10d * 5 / float_shares * 100, 2)
# Top institutional holders (13F data)
try:
holders = t.institutional_holders
if holders is not None and not holders.empty:
top = holders.head(10).to_dict("records")
result["top_institutional_holders"] = [
{
"holder": str(r.get("Holder", "")),
"shares": int(r["Shares"]) if r.get("Shares") else None,
"pct_out": float(r["% Out"]) if r.get("% Out") else None,
"value": float(r["Value"]) if r.get("Value") else None,
}
for r in top
]
result["top_holders_count"] = len(top)
except Exception:
pass
# Insider transactions
try:
insiders = t.insider_transactions
if insiders is not None and not insiders.empty:
recent = insiders.head(10).to_dict("records")
buys = sum(1 for r in recent if "Purchase" in str(r.get("Text", "")))
sells = sum(1 for r in recent if "Sale" in str(r.get("Text", "")))
result["insider_buys_recent"] = buys
result["insider_sells_recent"] = sells
if buys > sells:
result["insider_transaction_signal"] = "buying"
elif sells > buys:
result["insider_transaction_signal"] = "selling"
else:
result["insider_transaction_signal"] = "none"
except Exception:
pass
return _json.dumps(result, default=str)
except Exception as e:
return _json.dumps({"error": str(e)})

View File

@ -44,9 +44,11 @@ class StructuredGraphSetup:
create_institutional_flow_node,
create_liquidity_node,
create_macro_node,
create_position_replacement_node,
create_risk_node,
create_scoring_node,
create_sector_rotation_node,
create_theme_substitution_node,
create_validation_node,
create_valuation_node,
)
@ -69,6 +71,10 @@ class StructuredGraphSetup:
arch_fn = create_archetype_node(self.quick_llm)
score_fn = create_scoring_node()
# Portfolio-level: deep model for theme/replacement analysis
theme_fn = create_theme_substitution_node(self.deep_llm)
replace_fn = create_position_replacement_node(self.deep_llm)
# Tier 3: deep model for reasoning/debate
bull_fn = create_bull_case_node(self.deep_llm)
bear_fn = create_bear_case_node(self.deep_llm)
@ -106,6 +112,13 @@ class StructuredGraphSetup:
score_result = score_fn(merged)
return {**arch_result, **score_result}
# Theme + Replacement combined node (sequential: theme feeds replacement)
def theme_and_replacement(state):
theme_result = theme_fn(state)
merged = {**state, **theme_result}
replace_result = replace_fn(merged)
return {**theme_result, **replace_result}
# Risk + Final Decision combined node
def risk_and_decision(state):
risk_result = risk_fn(state)
@ -120,6 +133,7 @@ class StructuredGraphSetup:
workflow.add_node("Tier 1 Analysis", parallel_tier1)
workflow.add_node("Tier 2 Analysis", parallel_tier2)
workflow.add_node("Scoring", archetype_and_score)
workflow.add_node("Portfolio Analysis", theme_and_replacement)
workflow.add_node("Debate", parallel_bull_bear)
workflow.add_node("Debate Referee", debate_fn)
workflow.add_node("Decision", risk_and_decision)
@ -133,7 +147,8 @@ class StructuredGraphSetup:
)
workflow.add_edge("Tier 1 Analysis", "Tier 2 Analysis")
workflow.add_edge("Tier 2 Analysis", "Scoring")
workflow.add_edge("Scoring", "Debate")
workflow.add_edge("Scoring", "Portfolio Analysis")
workflow.add_edge("Portfolio Analysis", "Debate")
workflow.add_edge("Debate", "Debate Referee")
workflow.add_edge("Debate Referee", "Decision")
workflow.add_edge("Decision", END)

View File

@ -129,6 +129,8 @@ class TradingAgentsGraph:
"master_score": None,
"adjusted_score": None,
"position_role": None,
"theme_substitution": None,
"position_replacement": None,
"bull_case": None,
"bear_case": None,
"debate": None,
@ -161,6 +163,8 @@ class TradingAgentsGraph:
"backlog": state.get("backlog"),
"crowding": state.get("crowding"),
"archetype": state.get("archetype"),
"theme_substitution": state.get("theme_substitution"),
"position_replacement": state.get("position_replacement"),
"bull_case": state.get("bull_case"),
"bear_case": state.get("bear_case"),
"debate": state.get("debate"),

View File

@ -117,6 +117,14 @@ class MacroRegimeOutput(AgentBaseOutput):
spy_1m_return: Optional[float] = None
regime_label: str = "unknown"
macro_alignment_0_to_10: float = Field(default=5.0, ge=0, le=10)
# Regime awareness
risk_appetite: str = "neutral" # risk-on / risk-off / transitional
liquidity_regime: str = "neutral" # expansion / contraction / neutral
regime_score_adjustment: float = Field(
default=0.0, ge=-2, le=2,
description="Adjustment applied to all downstream scores. "
"+2 = strong macro tailwind, -2 = severe macro headwind.",
)
class LiquidityOutput(AgentBaseOutput):
@ -156,6 +164,12 @@ class InstitutionalFlowOutput(AgentBaseOutput):
short_ratio: Optional[float] = None
float_turnover_pct: Optional[float] = None
accumulation_signal: str = "unknown"
# Smart-money tracking
top_holders_change: str = "unknown" # increasing / decreasing / stable
fund_accumulation_pattern: str = "unknown" # accumulating / distributing / holding
short_interest_trend: str = "unknown" # rising / falling / stable
insider_transaction_signal: str = "unknown" # buying / selling / none
smart_money_signal: str = "unknown" # bullish / bearish / neutral
class ValuationOutput(AgentBaseOutput):
@ -293,6 +307,48 @@ class FinalDecisionOutput(BaseModel):
narrative: str = ""
# ---------------------------------------------------------------------------
# Theme Substitution & Position Replacement outputs
# ---------------------------------------------------------------------------
class ThemeStock(BaseModel):
"""A stock ranked within a theme."""
ticker: str
company_name: str = ""
master_score_estimate: float = Field(default=5.0, ge=0, le=10)
key_advantage: str = ""
key_weakness: str = ""
class ThemeSubstitutionOutput(BaseModel):
"""Identifies whether a stock is the best expression of its theme."""
theme_name: str = ""
theme_description: str = ""
theme_stocks_ranked: List[ThemeStock] = Field(default_factory=list)
best_expression_of_theme: bool = True
best_expression_ticker: str = ""
stronger_alternatives: List[str] = Field(default_factory=list)
relative_score_gap: float = 0.0
portfolio_overlap_warning: str = ""
reasoning: str = ""
class PositionReplacementOutput(BaseModel):
"""Identifies when a new stock is a better use of capital."""
replace_candidate: str = ""
replace_with: str = ""
score_difference: float = 0.0
theme_overlap: str = ""
replacement_reason: str = ""
conviction_level: str = "low" # low / medium / high
stronger_on: List[str] = Field(
default_factory=list,
description="Dimensions where candidate beats replacement target",
)
weaker_on: List[str] = Field(default_factory=list)
should_replace: bool = False
# ---------------------------------------------------------------------------
# Deterministic scoring functions
# ---------------------------------------------------------------------------
@ -306,6 +362,7 @@ def compute_master_score(
earnings_revisions: float,
backlog: float,
crowding: float,
regime_adjustment: float = 0.0,
) -> float:
"""Compute weighted master score (0-100).
@ -318,6 +375,8 @@ def compute_master_score(
10% earnings_revisions
5% backlog
5% crowding
regime_adjustment: -2 to +2, applied as direct offset to the 0-100 score.
"""
weighted = (
0.25 * business_quality
@ -329,7 +388,9 @@ def compute_master_score(
+ 0.05 * backlog
+ 0.05 * crowding
)
return round(weighted * 10, 2)
raw = weighted * 10
adjusted = max(0.0, min(100.0, raw + regime_adjustment))
return round(adjusted, 2)
def assign_position_role(score: float) -> str: