diff --git a/agent_os/backend/routes/websocket.py b/agent_os/backend/routes/websocket.py index 05e4b86c..5b4c6904 100644 --- a/agent_os/backend/routes/websocket.py +++ b/agent_os/backend/routes/websocket.py @@ -38,7 +38,10 @@ async def websocket_endpoint( stream_gen = engine.run_scan(run_id, params) elif run_type == "pipeline": stream_gen = engine.run_pipeline(run_id, params) - # Add other types as they are implemented in LangGraphEngine + elif run_type == "portfolio": + stream_gen = engine.run_portfolio(run_id, params) + elif run_type == "auto": + stream_gen = engine.run_auto(run_id, params) if stream_gen: async for payload in stream_gen: diff --git a/agent_os/backend/services/langgraph_engine.py b/agent_os/backend/services/langgraph_engine.py index 853a4586..d12e5b54 100644 --- a/agent_os/backend/services/langgraph_engine.py +++ b/agent_os/backend/services/langgraph_engine.py @@ -4,13 +4,17 @@ import time from typing import Dict, Any, AsyncGenerator from tradingagents.graph.trading_graph import TradingAgentsGraph from tradingagents.graph.scanner_graph import ScannerGraph +from tradingagents.graph.portfolio_graph import PortfolioGraph from tradingagents.default_config import DEFAULT_CONFIG logger = logging.getLogger("agent_os.engine") -# Maximum characters of prompt/response content to include in streamed events +# Maximum characters of prompt/response content to include in the short message _MAX_CONTENT_LEN = 300 +# Maximum characters of prompt/response for the full fields (generous limit) +_MAX_FULL_LEN = 50_000 + class LangGraphEngine: """Orchestrates LangGraph pipeline executions and streams events.""" @@ -20,6 +24,8 @@ class LangGraphEngine: self.active_runs: Dict[str, Dict[str, Any]] = {} # Track node start times per run so we can compute latency self._node_start_times: Dict[str, Dict[str, float]] = {} + # Track the last prompt per node so we can attach it to result events + self._node_prompts: Dict[str, Dict[str, str]] = {} # ------------------------------------------------------------------ # Run helpers @@ -55,6 +61,7 @@ class LangGraphEngine: yield mapped self._node_start_times.pop(run_id, None) + self._node_prompts.pop(run_id, None) logger.info("Completed SCAN run=%s", run_id) async def run_pipeline( @@ -88,8 +95,76 @@ class LangGraphEngine: yield mapped self._node_start_times.pop(run_id, None) + self._node_prompts.pop(run_id, None) logger.info("Completed PIPELINE run=%s", run_id) + async def run_portfolio( + self, run_id: str, params: Dict[str, Any] + ) -> AsyncGenerator[Dict[str, Any], None]: + """Run the portfolio manager workflow and stream events.""" + date = params.get("date", time.strftime("%Y-%m-%d")) + portfolio_id = params.get("portfolio_id", "main_portfolio") + + logger.info( + "Starting PORTFOLIO run=%s portfolio=%s date=%s", + run_id, portfolio_id, date, + ) + yield self._system_log( + f"Starting portfolio manager for {portfolio_id} on {date}" + ) + + portfolio_graph = PortfolioGraph(config=self.config) + + initial_state = { + "portfolio_id": portfolio_id, + "scan_date": date, + "messages": [], + } + + self._node_start_times[run_id] = {} + + async for event in portfolio_graph.graph.astream_events( + initial_state, version="v2" + ): + mapped = self._map_langgraph_event(run_id, event) + if mapped: + yield mapped + + self._node_start_times.pop(run_id, None) + self._node_prompts.pop(run_id, None) + logger.info("Completed PORTFOLIO run=%s", run_id) + + async def run_auto( + self, run_id: str, params: Dict[str, Any] + ) -> AsyncGenerator[Dict[str, Any], None]: + """Run the full auto pipeline: scan → pipeline → portfolio.""" + date = params.get("date", time.strftime("%Y-%m-%d")) + + logger.info("Starting AUTO run=%s date=%s", run_id, date) + yield self._system_log(f"Starting full auto workflow for {date}") + + # Phase 1: Market scan + yield self._system_log("Phase 1/3: Running market scan…") + async for evt in self.run_scan(f"{run_id}_scan", {"date": date}): + yield evt + + # Phase 2: Pipeline analysis (default ticker for now) + ticker = params.get("ticker", "AAPL") + yield self._system_log(f"Phase 2/3: Running analysis pipeline for {ticker}…") + async for evt in self.run_pipeline( + f"{run_id}_pipeline", {"ticker": ticker, "date": date} + ): + yield evt + + # Phase 3: Portfolio management + yield self._system_log("Phase 3/3: Running portfolio manager…") + async for evt in self.run_portfolio( + f"{run_id}_portfolio", {"date": date, **params} + ): + yield evt + + logger.info("Completed AUTO run=%s", run_id) + # ------------------------------------------------------------------ # Event mapping # ------------------------------------------------------------------ @@ -153,6 +228,51 @@ class LangGraphEngine: content = getattr(first_item, "content", None) return str(content) if content is not None else str(first_item) + def _extract_all_messages_content(self, messages: Any) -> str: + """Extract text from ALL messages in a LangGraph messages payload. + + Returns the concatenated content of every message so the user can + inspect the full prompt that was sent to the LLM. + """ + if not isinstance(messages, list) or not messages: + return "" + parts: list[str] = [] + items = messages + # Handle list-of-lists + if isinstance(items[0], list): + items = items[0] + for msg in items: + content = getattr(msg, "content", None) + role = getattr(msg, "type", "unknown") + text = str(content) if content is not None else str(msg) + parts.append(f"[{role}] {text}") + return "\n\n".join(parts) + + def _extract_model(self, event: Dict[str, Any]) -> str: + """Best-effort extraction of the model name from a LangGraph event.""" + data = event.get("data") or {} + + # 1. invocation_params (standard LangChain) + inv = data.get("invocation_params") or {} + model = inv.get("model_name") or inv.get("model") or "" + if model: + return model + + # 2. Serialized kwargs (OpenRouter / ChatOpenAI) + serialized = event.get("serialized") or data.get("serialized") or {} + kwargs = serialized.get("kwargs") or {} + model = kwargs.get("model_name") or kwargs.get("model") or "" + if model: + return model + + # 3. metadata.ls_model_name (LangSmith tracing) + metadata = event.get("metadata") or {} + model = metadata.get("ls_model_name") or "" + if model: + return model + + return "unknown" + def _map_langgraph_event( self, run_id: str, event: Dict[str, Any] ) -> Dict[str, Any] | None: @@ -162,22 +282,24 @@ class LangGraphEngine: node_name = self._extract_node_name(event) starts = self._node_start_times.get(run_id, {}) + prompts = self._node_prompts.setdefault(run_id, {}) # ------ LLM start ------ if kind == "on_chat_model_start": starts[node_name] = time.monotonic() - # Extract the prompt being sent to the LLM + # Extract the full prompt being sent to the LLM + full_prompt = "" prompt_snippet = "" messages = (event.get("data") or {}).get("messages") if messages: - raw = self._first_message_content(messages) - if raw: - prompt_snippet = self._truncate(raw) + full_prompt = self._extract_all_messages_content(messages) + prompt_snippet = self._truncate(full_prompt.replace("\n", " ")) - model = "unknown" - inv_params = (event.get("data") or {}).get("invocation_params") or {} - model = inv_params.get("model_name") or inv_params.get("model") or "unknown" + # Remember the full prompt so we can attach it to the result event + prompts[node_name] = full_prompt + + model = self._extract_model(event) logger.info( "LLM start node=%s model=%s run=%s", node_name, model, run_id @@ -191,14 +313,17 @@ class LangGraphEngine: "agent": node_name.upper(), "message": f"Prompting {model}…" + (f" | {prompt_snippet}" if prompt_snippet else ""), + "prompt": full_prompt, "metrics": {"model": model}, } # ------ Tool call ------ elif kind == "on_tool_start": + full_input = "" tool_input = "" inp = (event.get("data") or {}).get("input") if inp: + full_input = str(inp)[:_MAX_FULL_LEN] tool_input = self._truncate(str(inp)) logger.info("Tool start tool=%s node=%s run=%s", name, node_name, run_id) @@ -211,15 +336,19 @@ class LangGraphEngine: "agent": node_name.upper(), "message": f"▶ Tool: {name}" + (f" | {tool_input}" if tool_input else ""), + "prompt": full_input, "metrics": {}, } # ------ Tool result ------ elif kind == "on_tool_end": + full_output = "" tool_output = "" out = (event.get("data") or {}).get("output") if out is not None: - tool_output = self._truncate(self._extract_content(out)) + raw = self._extract_content(out) + full_output = raw[:_MAX_FULL_LEN] + tool_output = self._truncate(raw) logger.info("Tool end tool=%s node=%s run=%s", name, node_name, run_id) @@ -227,10 +356,11 @@ class LangGraphEngine: "id": f"{event['run_id']}_tool_end", "node_id": f"tool_{name}", "parent_node_id": node_name, - "type": "tool", + "type": "tool_result", "agent": node_name.upper(), "message": f"✓ Tool result: {name}" + (f" | {tool_output}" if tool_output else ""), + "response": full_output, "metrics": {}, } @@ -240,21 +370,30 @@ class LangGraphEngine: usage: Dict[str, Any] = {} model = "unknown" response_snippet = "" + full_response = "" if output is not None: if hasattr(output, "usage_metadata") and output.usage_metadata: usage = output.usage_metadata if hasattr(output, "response_metadata") and output.response_metadata: - model = output.response_metadata.get("model_name", model) + model = output.response_metadata.get("model_name") or output.response_metadata.get("model", model) raw = self._extract_content(output) if raw: + full_response = raw[:_MAX_FULL_LEN] response_snippet = self._truncate(raw) + # Fall back to event-level model extraction + if model == "unknown": + model = self._extract_model(event) + latency_ms = 0 start_t = starts.pop(node_name, None) if start_t is not None: latency_ms = round((time.monotonic() - start_t) * 1000) + # Retrieve the prompt that started this LLM call + matched_prompt = prompts.pop(node_name, "") + logger.info( "LLM end node=%s model=%s tokens_in=%s tokens_out=%s latency=%dms run=%s", node_name, @@ -271,6 +410,8 @@ class LangGraphEngine: "type": "result", "agent": node_name.upper(), "message": response_snippet or "Completed.", + "prompt": matched_prompt, + "response": full_response, "metrics": { "model": model, "tokens_in": usage.get("input_tokens", 0), @@ -292,3 +433,11 @@ class LangGraphEngine: async def run_pipeline_background(self, run_id: str, params: Dict[str, Any]): async for _ in self.run_pipeline(run_id, params): pass + + async def run_portfolio_background(self, run_id: str, params: Dict[str, Any]): + async for _ in self.run_portfolio(run_id, params): + pass + + async def run_auto_background(self, run_id: str, params: Dict[str, Any]): + async for _ in self.run_auto(run_id, params): + pass diff --git a/agent_os/frontend/src/Dashboard.tsx b/agent_os/frontend/src/Dashboard.tsx index f16a3309..101eac22 100644 --- a/agent_os/frontend/src/Dashboard.tsx +++ b/agent_os/frontend/src/Dashboard.tsx @@ -18,8 +18,19 @@ import { Tag, Code, Badge, + Modal, + ModalOverlay, + ModalContent, + ModalHeader, + ModalBody, + ModalCloseButton, + Tabs, + TabList, + TabPanels, + Tab, + TabPanel, } from '@chakra-ui/react'; -import { LayoutDashboard, Wallet, Settings, Play, Terminal as TerminalIcon, ChevronRight, Eye } from 'lucide-react'; +import { LayoutDashboard, Wallet, Settings, Play, Terminal as TerminalIcon, ChevronRight, Eye, Search, BarChart3, Bot } from 'lucide-react'; import { MetricHeader } from './components/MetricHeader'; import { AgentGraph } from './components/AgentGraph'; import { useAgentStream, AgentEvent } from './hooks/useAgentStream'; @@ -30,42 +41,131 @@ const API_BASE = 'http://127.0.0.1:8088/api'; /** Return the colour token for a given event type. */ const eventColor = (type: AgentEvent['type']): string => { switch (type) { - case 'tool': return 'purple.400'; - case 'result': return 'green.400'; - case 'log': return 'yellow.300'; - default: return 'cyan.400'; + case 'tool': return 'purple.400'; + case 'tool_result': return 'purple.300'; + case 'result': return 'green.400'; + case 'log': return 'yellow.300'; + default: return 'cyan.400'; } }; /** Return a short label badge for the event type. */ const eventLabel = (type: AgentEvent['type']): string => { switch (type) { - case 'thought': return '💭'; - case 'tool': return '🔧'; - case 'result': return '✅'; - case 'log': return 'ℹ️'; - default: return '●'; + case 'thought': return '💭'; + case 'tool': return '🔧'; + case 'tool_result': return '✅🔧'; + case 'result': return '✅'; + case 'log': return 'ℹ️'; + default: return '●'; } }; /** Short summary for terminal — no inline prompts, just agent + type. */ const eventSummary = (evt: AgentEvent): string => { switch (evt.type) { - case 'thought': return `Thinking… (${evt.metrics?.model || 'LLM'})`; - case 'tool': return evt.message.startsWith('✓') ? 'Tool result received' : `Tool call: ${evt.message.replace(/^▶ Tool: /, '').split(' | ')[0]}`; - case 'result': return 'Completed'; - case 'log': return evt.message; - default: return evt.type; + case 'thought': return `Thinking… (${evt.metrics?.model || 'LLM'})`; + case 'tool': return evt.message.startsWith('✓') ? 'Tool result received' : `Tool call: ${evt.message.replace(/^▶ Tool: /, '').split(' | ')[0]}`; + case 'tool_result': return `Tool done: ${evt.message.replace(/^✓ Tool result: /, '').split(' | ')[0]}`; + case 'result': return 'Completed'; + case 'log': return evt.message; + default: return evt.type; } }; -// ─── Detail drawer for a single event ───────────────────────────────── -const EventDetail: React.FC<{ event: AgentEvent }> = ({ event }) => ( +// ─── Full Event Detail Modal ───────────────────────────────────────── +const EventDetailModal: React.FC<{ event: AgentEvent | null; isOpen: boolean; onClose: () => void }> = ({ event, isOpen, onClose }) => { + if (!event) return null; + + return ( + + + + + + + + {event.type.toUpperCase()} + + {event.agent} + {event.timestamp} + + + + + + {event.prompt && Prompt / Request} + {(event.response || (event.type === 'result' && event.message)) && Response} + Summary + {event.metrics && Metrics} + + + {event.prompt && ( + + + + {event.prompt} + + + + )} + {(event.response || (event.type === 'result' && event.message)) && ( + + + + {event.response || event.message} + + + + )} + + + + {event.message} + + + + {event.metrics && ( + + + {event.metrics.model && event.metrics.model !== 'unknown' && ( + Model:{event.metrics.model} + )} + {event.metrics.tokens_in != null && event.metrics.tokens_in > 0 && ( + Tokens In:{event.metrics.tokens_in} + )} + {event.metrics.tokens_out != null && event.metrics.tokens_out > 0 && ( + Tokens Out:{event.metrics.tokens_out} + )} + {event.metrics.latency_ms != null && event.metrics.latency_ms > 0 && ( + Latency:{event.metrics.latency_ms}ms + )} + {event.node_id && ( + Node ID:{event.node_id} + )} + + + )} + + + + + + ); +}; + +// ─── Detail card for a single event in the drawer ───────────────────── +const EventDetail: React.FC<{ event: AgentEvent; onOpenModal?: (evt: AgentEvent) => void }> = ({ event, onOpenModal }) => ( - {event.type.toUpperCase()} + {event.type.toUpperCase()} {event.agent} {event.timestamp} + {onOpenModal && ( + + )} {event.metrics?.model && event.metrics.model !== 'unknown' && ( @@ -79,7 +179,7 @@ const EventDetail: React.FC<{ event: AgentEvent }> = ({ event }) => ( Metrics - {event.metrics.tokens_in != null && ( + {event.metrics.tokens_in != null && event.metrics.tokens_in > 0 && ( Tokens: {event.metrics.tokens_in} in / {event.metrics.tokens_out} out )} {event.metrics.latency_ms != null && event.metrics.latency_ms > 0 && ( @@ -89,16 +189,41 @@ const EventDetail: React.FC<{ event: AgentEvent }> = ({ event }) => ( )} - - - {event.type === 'thought' ? 'Request / Prompt' : event.type === 'result' ? 'Response' : 'Message'} - - - - {event.message} - + {/* Show prompt if available */} + {event.prompt && ( + + Request / Prompt + + + {event.prompt.length > 1000 ? event.prompt.substring(0, 1000) + '…' : event.prompt} + + - + )} + + {/* Show response if available (result events) */} + {event.response && ( + + Response + + + {event.response.length > 1000 ? event.response.substring(0, 1000) + '…' : event.response} + + + + )} + + {/* Fallback: show message if no prompt/response */} + {!event.prompt && !event.response && ( + + Message + + + {event.message} + + + + )} {event.node_id && ( @@ -110,7 +235,7 @@ const EventDetail: React.FC<{ event: AgentEvent }> = ({ event }) => ( ); // ─── Detail drawer showing all events for a given graph node ────────── -const NodeEventsDetail: React.FC<{ nodeId: string; events: AgentEvent[] }> = ({ nodeId, events }) => { +const NodeEventsDetail: React.FC<{ nodeId: string; events: AgentEvent[]; onOpenModal: (evt: AgentEvent) => void }> = ({ nodeId, events, onOpenModal }) => { const nodeEvents = useMemo( () => events.filter((e) => e.node_id === nodeId), [events, nodeId], @@ -124,7 +249,7 @@ const NodeEventsDetail: React.FC<{ nodeId: string; events: AgentEvent[] }> = ({ {nodeEvents.map((evt) => ( - + ))} @@ -138,6 +263,10 @@ export const Dashboard: React.FC = () => { const { events, status, clearEvents } = useAgentStream(activeRunId); const { isOpen, onOpen, onClose } = useDisclosure(); + // Event detail modal state + const { isOpen: isModalOpen, onOpen: onModalOpen, onClose: onModalClose } = useDisclosure(); + const [modalEvent, setModalEvent] = useState(null); + // What's shown in the drawer: either a single event or all events for a node const [drawerMode, setDrawerMode] = useState<'event' | 'node'>('event'); const [selectedEvent, setSelectedEvent] = useState(null); @@ -150,8 +279,10 @@ export const Dashboard: React.FC = () => { terminalEndRef.current?.scrollIntoView({ behavior: 'smooth' }); }, [events.length]); + const isRunning = isTriggering || status === 'streaming' || status === 'connecting'; + const startRun = async (type: string) => { - if (isTriggering || status === 'streaming' || status === 'connecting') return; + if (isRunning) return; setIsTriggering(true); try { @@ -168,6 +299,12 @@ export const Dashboard: React.FC = () => { } }; + /** Open the full-screen event detail modal */ + const openModal = useCallback((evt: AgentEvent) => { + setModalEvent(evt); + onModalOpen(); + }, [onModalOpen]); + /** Open the drawer for a single event (terminal click). */ const openEventDetail = useCallback((evt: AgentEvent) => { setDrawerMode('event'); @@ -211,16 +348,50 @@ export const Dashboard: React.FC = () => { {/* Floating Control Panel */} - + + + + @@ -290,14 +461,17 @@ export const Dashboard: React.FC = () => { {drawerMode === 'event' && selectedEvent && ( - + )} {drawerMode === 'node' && selectedNodeId && ( - + )} + + {/* Full event detail modal */} + ); }; diff --git a/agent_os/frontend/src/components/AgentGraph.tsx b/agent_os/frontend/src/components/AgentGraph.tsx index 09c851d4..1205eaa0 100644 --- a/agent_os/frontend/src/components/AgentGraph.tsx +++ b/agent_os/frontend/src/components/AgentGraph.tsx @@ -127,6 +127,9 @@ export const AgentGraph: React.FC = ({ events, onNodeClick }) = for (const evt of newEvents) { if (!evt.node_id || evt.node_id === '__system__') continue; + // Determine if this event means the node is completed + const isCompleted = evt.type === 'result' || evt.type === 'tool_result'; + if (!seenNodeIds.current.has(evt.node_id)) { // New node — create it seenNodeIds.current.add(evt.node_id); @@ -138,7 +141,7 @@ export const AgentGraph: React.FC = ({ events, onNodeClick }) = position: { x: 250, y: nodeCount.current * 150 + 50 }, data: { agent: evt.agent, - status: evt.type === 'result' ? 'completed' : 'running', + status: isCompleted ? 'completed' : 'running', metrics: evt.metrics, }, }); @@ -159,8 +162,11 @@ export const AgentGraph: React.FC = ({ events, onNodeClick }) = } } else { // Existing node — queue a status/metrics update + // Never revert a completed node back to running + const prev = updatedNodeData.get(evt.node_id); + const currentlyCompleted = prev?.status === 'completed'; updatedNodeData.set(evt.node_id, { - status: evt.type === 'result' ? 'completed' : 'running', + status: currentlyCompleted || isCompleted ? 'completed' : 'running', metrics: evt.metrics, }); } @@ -178,9 +184,11 @@ export const AgentGraph: React.FC = ({ events, onNodeClick }) = prev.map((n) => { const patch = updatedNodeData.get(n.id); if (!patch) return n; + // Never revert a completed node back to running + const finalStatus = n.data.status === 'completed' ? 'completed' : patch.status; return { ...n, - data: { ...n.data, ...patch, metrics: patch.metrics ?? n.data.metrics }, + data: { ...n.data, ...patch, status: finalStatus, metrics: patch.metrics ?? n.data.metrics }, }; }), ); diff --git a/agent_os/frontend/src/hooks/useAgentStream.ts b/agent_os/frontend/src/hooks/useAgentStream.ts index a4017952..6ce3216c 100644 --- a/agent_os/frontend/src/hooks/useAgentStream.ts +++ b/agent_os/frontend/src/hooks/useAgentStream.ts @@ -5,8 +5,12 @@ export interface AgentEvent { timestamp: string; agent: string; tier: 'quick' | 'mid' | 'deep'; - type: 'thought' | 'tool' | 'result' | 'log' | 'system'; + type: 'thought' | 'tool' | 'tool_result' | 'result' | 'log' | 'system'; message: string; + /** Full prompt text (available on thought & result events). */ + prompt?: string; + /** Full response text (available on result & tool_result events). */ + response?: string; node_id?: string; parent_node_id?: string; metrics?: {