Fix all review issues

This commit is contained in:
ahmet guzererler 2026-03-23 23:58:51 +01:00
parent 57f1d561f9
commit 8461ed540e
1 changed files with 33 additions and 23 deletions

View File

@ -329,15 +329,14 @@ class LangGraphEngine:
# Save portfolio reports (Holding Reviews, Risk Metrics, PM Decision, Execution Result)
if final_state:
try:
# 1. Holding Reviews
# 1. Holding Reviews — save the raw string via ReportStore
holding_reviews_str = final_state.get("holding_reviews")
if holding_reviews_str:
try:
reviews = json.loads(holding_reviews_str) if isinstance(holding_reviews_str, str) else holding_reviews_str
# ReportStore doesn't have batch save for reviews, but we can save the string or iterate.
# For now, let's at least save the PM decision and Execution Result which are most critical.
pass
except Exception: pass
store.save_holding_reviews(date, portfolio_id, reviews)
except Exception as exc:
logger.warning("Failed to save holding_reviews run=%s: %s", run_id, exc)
# 2. Risk Metrics
risk_metrics_str = final_state.get("risk_metrics")
@ -345,7 +344,8 @@ class LangGraphEngine:
try:
metrics = json.loads(risk_metrics_str) if isinstance(risk_metrics_str, str) else risk_metrics_str
store.save_risk_metrics(date, portfolio_id, metrics)
except Exception: pass
except Exception as exc:
logger.warning("Failed to save risk_metrics run=%s: %s", run_id, exc)
# 3. PM Decision
pm_decision_str = final_state.get("pm_decision")
@ -353,7 +353,8 @@ class LangGraphEngine:
try:
decision = json.loads(pm_decision_str) if isinstance(pm_decision_str, str) else pm_decision_str
store.save_pm_decision(date, portfolio_id, decision)
except Exception: pass
except Exception as exc:
logger.warning("Failed to save pm_decision run=%s: %s", run_id, exc)
# 4. Execution Result
execution_result_str = final_state.get("execution_result")
@ -361,7 +362,8 @@ class LangGraphEngine:
try:
execution = json.loads(execution_result_str) if isinstance(execution_result_str, str) else execution_result_str
store.save_execution_result(date, portfolio_id, execution)
except Exception: pass
except Exception as exc:
logger.warning("Failed to save execution_result run=%s: %s", run_id, exc)
yield self._system_log(f"Portfolio stage reports (decision & execution) saved for {portfolio_id} on {date}")
except Exception as exc:
@ -371,7 +373,8 @@ class LangGraphEngine:
logger.info("Completed PORTFOLIO run=%s", run_id)
async def run_trade_execution(
self, run_id: str, date: str, portfolio_id: str, decision: dict, prices: dict
self, run_id: str, date: str, portfolio_id: str, decision: dict, prices: dict,
store: ReportStore | None = None,
) -> AsyncGenerator[Dict[str, Any], None]:
"""Manually execute a pre-computed PM decision (for resumability)."""
logger.info("Starting TRADE_EXECUTION run=%s portfolio=%s date=%s", run_id, portfolio_id, date)
@ -380,16 +383,22 @@ class LangGraphEngine:
from tradingagents.portfolio.trade_executor import TradeExecutor
from tradingagents.portfolio.repository import PortfolioRepository
if not prices:
logger.warning("TRADE_EXECUTION run=%s: no prices available — execution may produce incomplete results", run_id)
yield self._system_log(f"Warning: no prices found for {portfolio_id} on {date} — trade execution may be incomplete.")
_store = store or ReportStore()
try:
repo = PortfolioRepository(config=self.config)
executor = TradeExecutor(repo=repo, config=self.config)
# Execute decisions
result = executor.execute_decisions(portfolio_id, decision, prices, date=date)
# Save results
ReportStore().save_execution_result(date, portfolio_id, result)
# Save results using the shared store instance
_store.save_execution_result(date, portfolio_id, result)
yield self._system_log(f"Trade execution completed for {portfolio_id}. {result.get('summary', {})}")
logger.info("Completed TRADE_EXECUTION run=%s", run_id)
except Exception as exc:
@ -443,10 +452,10 @@ class LangGraphEngine:
yield self._system_log("Phase 3/3: Running portfolio manager…")
portfolio_params = {k: v for k, v in params.items() if k != "ticker"}
portfolio_id = params.get("portfolio_id", "main_portfolio")
# Check if portfolio stage is fully complete (execution result exists)
if not force and store.load_execution_result(date, portfolio_id):
yield self._system_log(f"Phase 3: Portfolio execution for {portfolio_id} on {date} already exists, skipping.")
yield self._system_log(f"Phase 3: Portfolio execution for {portfolio_id} on {date} already exists, skipping.")
else:
# Check if we can resume from a saved decision
saved_decision = store.load_pm_decision(date, portfolio_id)
@ -456,7 +465,8 @@ class LangGraphEngine:
scan_data = store.load_scan(date) or {}
prices = scan_data.get("prices") or {}
async for evt in self.run_trade_execution(
f"{run_id}_resume_trades", date, portfolio_id, saved_decision, prices
f"{run_id}_resume_trades", date, portfolio_id, saved_decision, prices,
store=store,
):
yield evt
else:
@ -537,8 +547,8 @@ class LangGraphEngine:
"""Extract ticker symbols from a ReportStore scan summary dict.
Handles two shapes from the macro synthesis LLM output:
* List of dicts: ``[{"ticker": "AAPL", ...}, ...]``
* List of strings: ``["AAPL", "TSLA", ...]``
* List of dicts: ``[{'ticker': 'AAPL', ...}, ...]``
* List of strings: ``['AAPL', 'TSLA', ...]``
Also checks both ``stocks_to_investigate`` and ``watchlist`` keys.
Returns an uppercase, deduplicated list in original order.
@ -661,7 +671,7 @@ class LangGraphEngine:
Handles several structures observed across LangChain / LangGraph versions:
- flat list of message objects ``[SystemMessage, HumanMessage, ...]``
- list-of-lists (batched) ``[[SystemMessage, HumanMessage, ...]]``
- list of plain dicts ``[{"role": "system", "content": "..."}]``
- list of plain dicts ``[{'role': 'system', 'content': '...'}]``
- tuple wrapper ``([SystemMessage, ...],)``
"""
if not messages:
@ -690,7 +700,7 @@ class LangGraphEngine:
def _extract_model(self, event: Dict[str, Any]) -> str:
"""Best-effort extraction of the model name from a LangGraph event."""
data = event.get("data") or {}
data = event.get("data") or {};
# 1. invocation_params (standard LangChain)
inv = data.get("invocation_params") or {}
@ -784,7 +794,7 @@ class LangGraphEngine:
)
return {
"id": event.get("run_id", f"thought_{time.time_ns()}"),
"id": event.get("run_id", f"thought_{time.time_ns()}").strip(),
"node_id": node_name,
"parent_node_id": "start",
"type": "thought",
@ -819,7 +829,7 @@ class LangGraphEngine:
logger.info("Tool start tool=%s node=%s run=%s", name, node_name, run_id)
return {
"id": event.get("run_id", f"tool_{time.time_ns()}"),
"id": event.get("run_id", f"tool_{time.time_ns()}").strip(),
"node_id": f"tool_{name}",
"parent_node_id": node_name,
"type": "tool",