diff --git a/agent_os/backend/services/langgraph_engine.py b/agent_os/backend/services/langgraph_engine.py index 71618c79..79d8d174 100644 --- a/agent_os/backend/services/langgraph_engine.py +++ b/agent_os/backend/services/langgraph_engine.py @@ -292,10 +292,25 @@ class LangGraphEngine: return "unknown" + @staticmethod + def _safe_dict(obj: object) -> Dict[str, Any]: + """Return *obj* if it is a dict, otherwise an empty dict. + + Many LangChain message objects expose dict-like metadata + properties (``usage_metadata``, ``response_metadata``) but some + providers return non-dict types (e.g. bound methods, None, or + custom objects). This helper guarantees safe ``.get()`` calls. + """ + return obj if isinstance(obj, dict) else {} + def _map_langgraph_event( self, run_id: str, event: Dict[str, Any] ) -> Dict[str, Any] | None: - """Map LangGraph v2 events to AgentOS frontend contract.""" + """Map LangGraph v2 events to AgentOS frontend contract. + + Each branch is wrapped in a ``try / except`` so that a single + unexpected object shape never crashes the whole streaming loop. + """ kind = event.get("event", "") name = event.get("name", "unknown") node_name = self._extract_node_name(event) @@ -305,179 +320,223 @@ class LangGraphEngine: # ------ LLM start ------ if kind == "on_chat_model_start": - starts[node_name] = time.monotonic() + try: + starts[node_name] = time.monotonic() - data = event.get("data") or {} + data = event.get("data") or {} - # Extract the full prompt being sent to the LLM. - # Try multiple paths observed in different LangChain versions: - # 1. data.messages (most common) - # 2. data.input.messages (newer LangGraph) - # 3. data.input (if it's a list of messages itself) - # 4. data.kwargs.messages (some providers) - full_prompt = "" - for source in ( - data.get("messages"), - (data.get("input") or {}).get("messages") if isinstance(data.get("input"), dict) else None, - data.get("input") if isinstance(data.get("input"), (list, tuple)) else None, - (data.get("kwargs") or {}).get("messages"), - ): - if source: - full_prompt = self._extract_all_messages_content(source) - if full_prompt: - break + # Extract the full prompt being sent to the LLM. + # Try multiple paths observed in different LangChain versions: + # 1. data.messages (most common) + # 2. data.input.messages (newer LangGraph) + # 3. data.input (if it's a list of messages itself) + # 4. data.kwargs.messages (some providers) + full_prompt = "" + for source in ( + data.get("messages"), + (data.get("input") or {}).get("messages") if isinstance(data.get("input"), dict) else None, + data.get("input") if isinstance(data.get("input"), (list, tuple)) else None, + (data.get("kwargs") or {}).get("messages"), + ): + if source: + full_prompt = self._extract_all_messages_content(source) + if full_prompt: + break - # If all structured extractions failed, dump a raw preview - if not full_prompt: - raw_dump = str(data)[:_MAX_FULL_LEN] - if raw_dump and raw_dump != "{}": - full_prompt = f"[raw event data] {raw_dump}" + # If all structured extractions failed, dump a raw preview + if not full_prompt: + raw_dump = str(data)[:_MAX_FULL_LEN] + if raw_dump and raw_dump != "{}": + full_prompt = f"[raw event data] {raw_dump}" - prompt_snippet = self._truncate( - full_prompt.replace("\n", " "), _MAX_CONTENT_LEN - ) if full_prompt else "" + prompt_snippet = self._truncate( + full_prompt.replace("\n", " "), _MAX_CONTENT_LEN + ) if full_prompt else "" - # Remember the full prompt so we can attach it to the result event - prompts[node_name] = full_prompt + # Remember the full prompt so we can attach it to the result event + prompts[node_name] = full_prompt - model = self._extract_model(event) + model = self._extract_model(event) - logger.info( - "LLM start node=%s model=%s run=%s", node_name, model, run_id - ) + logger.info( + "LLM start node=%s model=%s run=%s", node_name, model, run_id + ) - return { - "id": event["run_id"], - "node_id": node_name, - "parent_node_id": "start", - "type": "thought", - "agent": node_name.upper(), - "message": f"Prompting {model}…" - + (f" | {prompt_snippet}" if prompt_snippet else ""), - "prompt": full_prompt, - "metrics": {"model": model}, - } + return { + "id": event.get("run_id", f"thought_{time.time_ns()}"), + "node_id": node_name, + "parent_node_id": "start", + "type": "thought", + "agent": node_name.upper(), + "message": f"Prompting {model}…" + + (f" | {prompt_snippet}" if prompt_snippet else ""), + "prompt": full_prompt, + "metrics": {"model": model}, + } + except Exception: + logger.exception("Error mapping on_chat_model_start run=%s", run_id) + return { + "id": f"thought_err_{time.time_ns()}", + "node_id": node_name, + "type": "thought", + "agent": node_name.upper(), + "message": f"Prompting LLM… (event parse error)", + "prompt": "", + "metrics": {}, + } # ------ Tool call ------ elif kind == "on_tool_start": - full_input = "" - tool_input = "" - inp = (event.get("data") or {}).get("input") - if inp: - full_input = str(inp)[:_MAX_FULL_LEN] - tool_input = self._truncate(str(inp)) + try: + full_input = "" + tool_input = "" + inp = (event.get("data") or {}).get("input") + if inp: + full_input = str(inp)[:_MAX_FULL_LEN] + tool_input = self._truncate(str(inp)) - logger.info("Tool start tool=%s node=%s run=%s", name, node_name, run_id) + logger.info("Tool start tool=%s node=%s run=%s", name, node_name, run_id) - return { - "id": event["run_id"], - "node_id": f"tool_{name}", - "parent_node_id": node_name, - "type": "tool", - "agent": node_name.upper(), - "message": f"▶ Tool: {name}" - + (f" | {tool_input}" if tool_input else ""), - "prompt": full_input, - "metrics": {}, - } + return { + "id": event.get("run_id", f"tool_{time.time_ns()}"), + "node_id": f"tool_{name}", + "parent_node_id": node_name, + "type": "tool", + "agent": node_name.upper(), + "message": f"▶ Tool: {name}" + + (f" | {tool_input}" if tool_input else ""), + "prompt": full_input, + "metrics": {}, + } + except Exception: + logger.exception("Error mapping on_tool_start run=%s", run_id) + return None # ------ Tool result ------ elif kind == "on_tool_end": - full_output = "" - tool_output = "" - out = (event.get("data") or {}).get("output") - if out is not None: - raw = self._extract_content(out) - full_output = raw[:_MAX_FULL_LEN] - tool_output = self._truncate(raw) + try: + full_output = "" + tool_output = "" + out = (event.get("data") or {}).get("output") + if out is not None: + raw = self._extract_content(out) + full_output = raw[:_MAX_FULL_LEN] + tool_output = self._truncate(raw) - logger.info("Tool end tool=%s node=%s run=%s", name, node_name, run_id) + logger.info("Tool end tool=%s node=%s run=%s", name, node_name, run_id) - return { - "id": f"{event['run_id']}_tool_end", - "node_id": f"tool_{name}", - "parent_node_id": node_name, - "type": "tool_result", - "agent": node_name.upper(), - "message": f"✓ Tool result: {name}" - + (f" | {tool_output}" if tool_output else ""), - "response": full_output, - "metrics": {}, - } + return { + "id": f"{event.get('run_id', 'tool_end')}_{time.time_ns()}", + "node_id": f"tool_{name}", + "parent_node_id": node_name, + "type": "tool_result", + "agent": node_name.upper(), + "message": f"✓ Tool result: {name}" + + (f" | {tool_output}" if tool_output else ""), + "response": full_output, + "metrics": {}, + } + except Exception: + logger.exception("Error mapping on_tool_end run=%s", run_id) + return None # ------ LLM end ------ elif kind == "on_chat_model_end": - output = (event.get("data") or {}).get("output") - usage: Dict[str, Any] = {} - model = "unknown" - response_snippet = "" - full_response = "" + try: + output = (event.get("data") or {}).get("output") + usage: Dict[str, Any] = {} + model = "unknown" + response_snippet = "" + full_response = "" - if output is not None: - if hasattr(output, "usage_metadata") and output.usage_metadata: - usage = output.usage_metadata - if hasattr(output, "response_metadata") and output.response_metadata: - model = output.response_metadata.get("model_name") or output.response_metadata.get("model", model) + if output is not None: + # Safely extract usage & response metadata (must be dicts) + usage_raw = getattr(output, "usage_metadata", None) + usage = self._safe_dict(usage_raw) - # Extract the response text – handle both message objects and plain dicts - raw = self._extract_content(output) - # If .content was empty or the repr of the whole object, try harder - if not raw or raw.startswith("<") or raw == str(output): - # Some providers wrap in .text or .message - potential_text = getattr(output, "text", "") - if callable(potential_text): - potential_text = "" + resp_meta = getattr(output, "response_metadata", None) + resp_dict = self._safe_dict(resp_meta) + if resp_dict: + model = resp_dict.get("model_name") or resp_dict.get("model", model) - raw = ( - potential_text - or (output.get("content", "") if isinstance(output, dict) else "") - ) + # Extract the response text – handle message objects and dicts + raw = self._extract_content(output) - # Ensure raw is a string before subscripting - if not isinstance(raw, str): - raw = str(raw) + # If .content was empty or the repr of the whole object, try harder + if not raw or raw.startswith("<") or raw == str(output): + # Some providers wrap in .text or .message + potential_text = getattr(output, "text", None) + if potential_text is None or callable(potential_text): + potential_text = "" + if not isinstance(potential_text, str): + potential_text = str(potential_text) - if raw: - full_response = raw[:_MAX_FULL_LEN] - response_snippet = self._truncate(raw) + raw = ( + potential_text + or (output.get("content", "") if isinstance(output, dict) else "") + ) - # Fall back to event-level model extraction - if model == "unknown": - model = self._extract_model(event) + # Ensure raw is always a string before slicing + if not isinstance(raw, str): + raw = str(raw) if raw is not None else "" - latency_ms = 0 - start_t = starts.pop(node_name, None) - if start_t is not None: - latency_ms = round((time.monotonic() - start_t) * 1000) + if raw: + full_response = raw[:_MAX_FULL_LEN] + response_snippet = self._truncate(raw) - # Retrieve the prompt that started this LLM call - matched_prompt = prompts.pop(node_name, "") + # Fall back to event-level model extraction + if model == "unknown": + model = self._extract_model(event) - logger.info( - "LLM end node=%s model=%s tokens_in=%s tokens_out=%s latency=%dms run=%s", - node_name, - model, - usage.get("input_tokens", "?"), - usage.get("output_tokens", "?"), - latency_ms, - run_id, - ) + latency_ms = 0 + start_t = starts.pop(node_name, None) + if start_t is not None: + latency_ms = round((time.monotonic() - start_t) * 1000) - return { - "id": f"{event['run_id']}_end", - "node_id": node_name, - "type": "result", - "agent": node_name.upper(), - "message": response_snippet or "Completed.", - "prompt": matched_prompt, - "response": full_response, - "metrics": { - "model": model, - "tokens_in": usage.get("input_tokens", 0), - "tokens_out": usage.get("output_tokens", 0), - "latency_ms": latency_ms, - }, - } + # Retrieve the prompt that started this LLM call + matched_prompt = prompts.pop(node_name, "") + + tokens_in = usage.get("input_tokens", 0) + tokens_out = usage.get("output_tokens", 0) + + logger.info( + "LLM end node=%s model=%s tokens_in=%s tokens_out=%s latency=%dms run=%s", + node_name, + model, + tokens_in or "?", + tokens_out or "?", + latency_ms, + run_id, + ) + + return { + "id": f"{event.get('run_id', 'result')}_{time.time_ns()}", + "node_id": node_name, + "type": "result", + "agent": node_name.upper(), + "message": response_snippet or "Completed.", + "prompt": matched_prompt, + "response": full_response, + "metrics": { + "model": model, + "tokens_in": tokens_in if isinstance(tokens_in, (int, float)) else 0, + "tokens_out": tokens_out if isinstance(tokens_out, (int, float)) else 0, + "latency_ms": latency_ms, + }, + } + except Exception: + logger.exception("Error mapping on_chat_model_end run=%s", run_id) + matched_prompt = prompts.pop(node_name, "") + return { + "id": f"result_err_{time.time_ns()}", + "node_id": node_name, + "type": "result", + "agent": node_name.upper(), + "message": "Completed (event parse error).", + "prompt": matched_prompt, + "response": "", + "metrics": {"model": "unknown", "tokens_in": 0, "tokens_out": 0, "latency_ms": 0}, + } return None diff --git a/tests/unit/test_langgraph_engine_extraction.py b/tests/unit/test_langgraph_engine_extraction.py index f9cf1aff..4ba2b05c 100644 --- a/tests/unit/test_langgraph_engine_extraction.py +++ b/tests/unit/test_langgraph_engine_extraction.py @@ -1,16 +1,22 @@ import sys +import os import unittest from unittest.mock import MagicMock -# Add project root to sys.path -sys.path.append("/Users/Ahmet/Repo/TradingAgents") +# Ensure project root is on sys.path (works in CI and local) +_project_root = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..")) +if _project_root not in sys.path: + sys.path.insert(0, _project_root) from agent_os.backend.services.langgraph_engine import LangGraphEngine + class TestLangGraphEngineExtraction(unittest.TestCase): def setUp(self): self.engine = LangGraphEngine() + # ── _extract_content ──────────────────────────────────────────── + def test_extract_content_string(self): mock_obj = MagicMock() mock_obj.content = "hello world" @@ -18,51 +24,150 @@ class TestLangGraphEngineExtraction(unittest.TestCase): def test_extract_content_method(self): mock_obj = MagicMock() - # Mocking a method - def my_content(): - return "should not be called" - mock_obj.content = my_content - # Should fall back to str(mock_obj) + mock_obj.content = lambda: "should not be called" result = self.engine._extract_content(mock_obj) - self.assertTrue(result.startswith("