From 80a54b2411adb96c427651718182ae0a1b2ed844 Mon Sep 17 00:00:00 2001 From: Ahmet Guzererler Date: Tue, 24 Mar 2026 00:25:51 +0100 Subject: [PATCH] fix: fetch live prices from yfinance when none available for trade execution The scan summary never contained a 'prices' key, so trade execution always ran with an empty prices dict. Now prices are fetched via yfinance: - run_trade_execution: fetches prices for all tickers in the PM decision (sells/buys/holds) when prices arg is empty - run_auto resume path: fetches prices for decision tickers instead of reading from scan data - run_portfolio: fetches prices for current holdings + scan candidates before invoking the portfolio graph Co-Authored-By: Claude Sonnet 4.6 --- agent_os/backend/services/langgraph_engine.py | 71 +++++++++++++++++-- 1 file changed, 64 insertions(+), 7 deletions(-) diff --git a/agent_os/backend/services/langgraph_engine.py b/agent_os/backend/services/langgraph_engine.py index 6ce52360..dfb27a89 100644 --- a/agent_os/backend/services/langgraph_engine.py +++ b/agent_os/backend/services/langgraph_engine.py @@ -19,6 +19,45 @@ logger = logging.getLogger("agent_os.engine") # Maximum characters of prompt/response content to include in the short message _MAX_CONTENT_LEN = 300 + +def _fetch_prices(tickers: list[str]) -> dict[str, float]: + """Fetch the latest closing price for each ticker via yfinance. + + Returns a dict of {ticker: price}. Tickers that fail are silently skipped. + """ + if not tickers: + return {} + try: + import yfinance as yf + data = yf.download(tickers, period="2d", auto_adjust=True, progress=False, threads=True) + if data.empty: + return {} + close = data["Close"] if "Close" in data.columns else data + # Take the last available row + last_row = close.iloc[-1] + return { + t: float(last_row[t]) + for t in tickers + if t in last_row.index and not __import__("math").isnan(last_row[t]) + } + except Exception as exc: + logger.warning("_fetch_prices failed: %s", exc) + return {} + + +def _tickers_from_decision(decision: dict) -> list[str]: + """Extract all ticker symbols referenced in a PM decision dict.""" + tickers = set() + for key in ("sells", "buys", "holds"): + for item in decision.get(key) or []: + if isinstance(item, dict): + t = item.get("ticker") or item.get("symbol") + else: + t = str(item) + if t: + tickers.add(t.upper()) + return list(tickers) + # Maximum characters of prompt/response for the full fields (generous limit) _MAX_FULL_LEN = 50_000 @@ -280,8 +319,21 @@ class LangGraphEngine: if ticker_analyses: scan_summary["ticker_analyses"] = ticker_analyses - # Fetch prices from scan_summary if available, else default to empty dict - prices = scan_summary.get("prices") or {} + # Collect tickers: current holdings + scan candidates, then fetch live prices + holding_tickers: list[str] = [] + try: + from tradingagents.portfolio.repository import PortfolioRepository + _repo = PortfolioRepository() + _, holdings = _repo.get_portfolio_with_holdings(portfolio_id) + holding_tickers = [h.ticker for h in holdings] + except Exception as exc: + logger.warning("run_portfolio: could not load holdings for price fetch: %s", exc) + candidate_tickers = [ + c if isinstance(c, str) else (c.get("ticker") or c.get("symbol") or "") + for c in (scan_summary.get("stocks_to_investigate") or []) + ] + all_tickers = list({t.upper() for t in holding_tickers + candidate_tickers if t}) + prices = _fetch_prices(all_tickers) if all_tickers else {} initial_state = { "portfolio_id": portfolio_id, @@ -384,8 +436,14 @@ class LangGraphEngine: from tradingagents.portfolio.repository import PortfolioRepository if not prices: - logger.warning("TRADE_EXECUTION run=%s: no prices available — execution may produce incomplete results", run_id) - yield self._system_log(f"Warning: no prices found for {portfolio_id} on {date} — trade execution may be incomplete.") + tickers = _tickers_from_decision(decision) + if tickers: + yield self._system_log(f"Fetching live prices for {tickers} from yfinance…") + prices = _fetch_prices(tickers) + logger.info("TRADE_EXECUTION run=%s: fetched prices for %s", run_id, list(prices.keys())) + if not prices: + logger.warning("TRADE_EXECUTION run=%s: no prices available — execution may produce incomplete results", run_id) + yield self._system_log(f"Warning: no prices found for {portfolio_id} on {date} — trade execution may be incomplete.") _store = store or ReportStore() @@ -461,9 +519,8 @@ class LangGraphEngine: saved_decision = store.load_pm_decision(date, portfolio_id) if not force and saved_decision: yield self._system_log(f"Phase 3: Found saved PM decision for {portfolio_id}, resuming trade execution…") - # Need prices for execution - scan_data = store.load_scan(date) or {} - prices = scan_data.get("prices") or {} + # Fetch live prices for all tickers referenced in the decision + prices = _fetch_prices(_tickers_from_decision(saved_decision)) async for evt in self.run_trade_execution( f"{run_id}_resume_trades", date, portfolio_id, saved_decision, prices, store=store,