feat: wire chief_analyst into streaming, logging, and verdict extraction

This commit is contained in:
Ali AL OGAILI 2026-03-24 14:07:46 +01:00
parent 5420ccb360
commit 1d7273d765
1 changed files with 24 additions and 11 deletions

View File

@ -59,6 +59,7 @@ _NODE_TO_STEP = {
"Conservative Analyst": "conservative_analyst",
"Neutral Analyst": "neutral_analyst",
"Risk Judge": "risk_judge",
"Chief Analyst": "chief_analyst",
}
_SKIP_NODES = {"tools_market", "tools_news", "tools_fundamentals", "tools_social"}
@ -304,8 +305,13 @@ class TradingAgentsGraph:
# Log state
self._log_state(trade_date, final_state)
# Return decision and processed signal
return final_state, self.process_signal(final_state["final_trade_decision"])
# Return decision and processed signal (prefer structured chief_analyst_report verdict)
chief_report = (final_state.get("chief_analyst_report") or {})
if chief_report.get("verdict") in {"BUY", "SELL", "HOLD"}:
decision = chief_report["verdict"]
else:
decision = self.process_signal(final_state.get("final_trade_decision", ""))
return final_state, decision
@staticmethod
def _extract_report(step_key: str, update: dict) -> str:
@ -323,6 +329,7 @@ class TradingAgentsGraph:
"conservative_analyst": lambda u: (u.get("risk_debate_state") or {}).get("current_conservative_response", ""),
"neutral_analyst": lambda u: (u.get("risk_debate_state") or {}).get("current_neutral_response", ""),
"risk_judge": lambda u: (u.get("risk_debate_state") or {}).get("judge_decision", ""),
"chief_analyst": lambda u: json.dumps(u.get("chief_analyst_report") or {}),
}
return extractors[step_key](update) or ""
@ -383,15 +390,20 @@ class TradingAgentsGraph:
final_snap = self.graph.get_state(thread_config)
final_state = final_snap.values if hasattr(final_snap, "values") else {}
raw_signal = final_state.get("final_trade_decision", "")
try:
raw_decision = self.process_signal(raw_signal)
decision = raw_decision.strip().upper()
if decision not in {"BUY", "SELL", "HOLD"}:
logger.warning("stream_propagate: unexpected decision '%s' — defaulting to HOLD", decision)
decision = "HOLD"
except Exception:
raise # propagate to run_service for run:error handling
chief_report = final_state.get("chief_analyst_report") or {}
if chief_report.get("verdict") in {"BUY", "SELL", "HOLD"}:
decision = chief_report["verdict"]
else:
# Fallback for partial runs or old checkpoints without chief_analyst_report
try:
raw_signal = final_state.get("final_trade_decision", "")
raw_decision = self.process_signal(raw_signal)
decision = raw_decision.strip().upper()
if decision not in {"BUY", "SELL", "HOLD"}:
logger.warning("stream_propagate: unexpected decision '%s' — defaulting to HOLD", decision)
decision = "HOLD"
except Exception:
raise # propagate to run_service for run:error handling
self._last_decision = decision
self._log_state(trade_date, final_state)
@ -426,6 +438,7 @@ class TradingAgentsGraph:
},
"investment_plan": final_state["investment_plan"],
"final_trade_decision": final_state["final_trade_decision"],
"chief_analyst_report": final_state.get("chief_analyst_report"),
}
# Save to file