From 8461ed540e19ff547d726faef94668480336c3d9 Mon Sep 17 00:00:00 2001 From: ahmet guzererler Date: Mon, 23 Mar 2026 23:58:51 +0100 Subject: [PATCH] Fix all review issues --- agent_os/backend/services/langgraph_engine.py | 56 +++++++++++-------- 1 file changed, 33 insertions(+), 23 deletions(-) diff --git a/agent_os/backend/services/langgraph_engine.py b/agent_os/backend/services/langgraph_engine.py index edf3877e..9daacfd2 100644 --- a/agent_os/backend/services/langgraph_engine.py +++ b/agent_os/backend/services/langgraph_engine.py @@ -329,15 +329,14 @@ class LangGraphEngine: # Save portfolio reports (Holding Reviews, Risk Metrics, PM Decision, Execution Result) if final_state: try: - # 1. Holding Reviews + # 1. Holding Reviews — save the raw string via ReportStore holding_reviews_str = final_state.get("holding_reviews") if holding_reviews_str: try: reviews = json.loads(holding_reviews_str) if isinstance(holding_reviews_str, str) else holding_reviews_str - # ReportStore doesn't have batch save for reviews, but we can save the string or iterate. - # For now, let's at least save the PM decision and Execution Result which are most critical. - pass - except Exception: pass + store.save_holding_reviews(date, portfolio_id, reviews) + except Exception as exc: + logger.warning("Failed to save holding_reviews run=%s: %s", run_id, exc) # 2. Risk Metrics risk_metrics_str = final_state.get("risk_metrics") @@ -345,7 +344,8 @@ class LangGraphEngine: try: metrics = json.loads(risk_metrics_str) if isinstance(risk_metrics_str, str) else risk_metrics_str store.save_risk_metrics(date, portfolio_id, metrics) - except Exception: pass + except Exception as exc: + logger.warning("Failed to save risk_metrics run=%s: %s", run_id, exc) # 3. PM Decision pm_decision_str = final_state.get("pm_decision") @@ -353,7 +353,8 @@ class LangGraphEngine: try: decision = json.loads(pm_decision_str) if isinstance(pm_decision_str, str) else pm_decision_str store.save_pm_decision(date, portfolio_id, decision) - except Exception: pass + except Exception as exc: + logger.warning("Failed to save pm_decision run=%s: %s", run_id, exc) # 4. Execution Result execution_result_str = final_state.get("execution_result") @@ -361,7 +362,8 @@ class LangGraphEngine: try: execution = json.loads(execution_result_str) if isinstance(execution_result_str, str) else execution_result_str store.save_execution_result(date, portfolio_id, execution) - except Exception: pass + except Exception as exc: + logger.warning("Failed to save execution_result run=%s: %s", run_id, exc) yield self._system_log(f"Portfolio stage reports (decision & execution) saved for {portfolio_id} on {date}") except Exception as exc: @@ -371,7 +373,8 @@ class LangGraphEngine: logger.info("Completed PORTFOLIO run=%s", run_id) async def run_trade_execution( - self, run_id: str, date: str, portfolio_id: str, decision: dict, prices: dict + self, run_id: str, date: str, portfolio_id: str, decision: dict, prices: dict, + store: ReportStore | None = None, ) -> AsyncGenerator[Dict[str, Any], None]: """Manually execute a pre-computed PM decision (for resumability).""" logger.info("Starting TRADE_EXECUTION run=%s portfolio=%s date=%s", run_id, portfolio_id, date) @@ -380,16 +383,22 @@ class LangGraphEngine: from tradingagents.portfolio.trade_executor import TradeExecutor from tradingagents.portfolio.repository import PortfolioRepository + if not prices: + logger.warning("TRADE_EXECUTION run=%s: no prices available — execution may produce incomplete results", run_id) + yield self._system_log(f"Warning: no prices found for {portfolio_id} on {date} — trade execution may be incomplete.") + + _store = store or ReportStore() + try: repo = PortfolioRepository(config=self.config) executor = TradeExecutor(repo=repo, config=self.config) - + # Execute decisions result = executor.execute_decisions(portfolio_id, decision, prices, date=date) - - # Save results - ReportStore().save_execution_result(date, portfolio_id, result) - + + # Save results using the shared store instance + _store.save_execution_result(date, portfolio_id, result) + yield self._system_log(f"Trade execution completed for {portfolio_id}. {result.get('summary', {})}") logger.info("Completed TRADE_EXECUTION run=%s", run_id) except Exception as exc: @@ -443,10 +452,10 @@ class LangGraphEngine: yield self._system_log("Phase 3/3: Running portfolio manager…") portfolio_params = {k: v for k, v in params.items() if k != "ticker"} portfolio_id = params.get("portfolio_id", "main_portfolio") - + # Check if portfolio stage is fully complete (execution result exists) if not force and store.load_execution_result(date, portfolio_id): - yield self._system_log(f"Phase 3: Portfolio execution for {portfolio_id} on {date} already exists, skipping.") + yield self._system_log(f"Phase 3: Portfolio execution for {portfolio_id} on {date} already exists, skipping.") else: # Check if we can resume from a saved decision saved_decision = store.load_pm_decision(date, portfolio_id) @@ -456,7 +465,8 @@ class LangGraphEngine: scan_data = store.load_scan(date) or {} prices = scan_data.get("prices") or {} async for evt in self.run_trade_execution( - f"{run_id}_resume_trades", date, portfolio_id, saved_decision, prices + f"{run_id}_resume_trades", date, portfolio_id, saved_decision, prices, + store=store, ): yield evt else: @@ -537,8 +547,8 @@ class LangGraphEngine: """Extract ticker symbols from a ReportStore scan summary dict. Handles two shapes from the macro synthesis LLM output: - * List of dicts: ``[{"ticker": "AAPL", ...}, ...]`` - * List of strings: ``["AAPL", "TSLA", ...]`` + * List of dicts: ``[{'ticker': 'AAPL', ...}, ...]`` + * List of strings: ``['AAPL', 'TSLA', ...]`` Also checks both ``stocks_to_investigate`` and ``watchlist`` keys. Returns an uppercase, deduplicated list in original order. @@ -661,7 +671,7 @@ class LangGraphEngine: Handles several structures observed across LangChain / LangGraph versions: - flat list of message objects ``[SystemMessage, HumanMessage, ...]`` - list-of-lists (batched) ``[[SystemMessage, HumanMessage, ...]]`` - - list of plain dicts ``[{"role": "system", "content": "..."}]`` + - list of plain dicts ``[{'role': 'system', 'content': '...'}]`` - tuple wrapper ``([SystemMessage, ...],)`` """ if not messages: @@ -690,7 +700,7 @@ class LangGraphEngine: def _extract_model(self, event: Dict[str, Any]) -> str: """Best-effort extraction of the model name from a LangGraph event.""" - data = event.get("data") or {} + data = event.get("data") or {}; # 1. invocation_params (standard LangChain) inv = data.get("invocation_params") or {} @@ -784,7 +794,7 @@ class LangGraphEngine: ) return { - "id": event.get("run_id", f"thought_{time.time_ns()}"), + "id": event.get("run_id", f"thought_{time.time_ns()}").strip(), "node_id": node_name, "parent_node_id": "start", "type": "thought", @@ -819,7 +829,7 @@ class LangGraphEngine: logger.info("Tool start tool=%s node=%s run=%s", name, node_name, run_id) return { - "id": event.get("run_id", f"tool_{time.time_ns()}"), + "id": event.get("run_id", f"tool_{time.time_ns()}").strip(), "node_id": f"tool_{name}", "parent_node_id": node_name, "type": "tool",