From 5d5bd4a3cbae8bad677fdc73e4b75e4c9d61c7f3 Mon Sep 17 00:00:00 2001 From: ahmet guzererler Date: Tue, 24 Mar 2026 10:03:16 +0100 Subject: [PATCH] feat(ui): scoped graph nodes per ticker + MockEngine for LLM-free UI testing (#100) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * feat(ui): scoped graph nodes per ticker + MockEngine for LLM-free UI testing ## Summary Adds a MockEngine that streams scripted agent events with zero real LLM calls, enabling full UI testing (graph, terminal, drawer, metrics) without API keys or network. Also fixes the ReactFlow graph so that each ticker/identifier gets its own visual node — previously an auto run with 5 tickers collapsed all pipelines into the same node IDs, overwriting each other. ## Changes - **MockEngine** (`agent_os/backend/services/mock_engine.py`): new class that generates realistic scripted events for pipeline, scan, and auto run types. Supports configurable speed divisor (1× realistic → 10× instant). Auto mock accepts a `tickers` list for multi-ticker runs. - **POST /api/run/mock** (`runs.py`): new endpoint wiring MockEngine into the BackgroundTasks + store pattern identical to real run endpoints. - **WebSocket routing** (`websocket.py`): added `mock` run-type branch so the WS executor path also dispatches to MockEngine when the background task hasn't started yet. - **LangGraphEngine** (`langgraph_engine.py`): added `_run_identifiers` dict to track ticker/MARKET/portfolio_id per run; all emitted events now carry an `identifier` field so the frontend can scope them. - **AgentGraph.tsx**: ReactFlow nodes now keyed by `node_id:identifier` (e.g. `news_analyst:AAPL`, `news_analyst:NVDA`). Edges scoped to same identifier. `onNodeClick` passes raw `node_id` + `identifier` separately so the event drawer can filter without parsing the scoped key. - **Dashboard.tsx**: Mock button + type/speed controls added. `openNodeDetail` accepts identifier; `NodeEventsDetail` filters by both `node_id` and `identifier`. Comma-separated ticker input for mock auto runs (e.g. `AAPL,NVDA,TSLA`). - **useAgentStream.ts**: `AgentEvent` interface extended with `identifier?` field. ## Decision Context - Scoped node ID format chosen as `node_id:identifier` (colon separator) rather than embedding identifier in the agent display name — keeps node labels clean and identifier visible as a coloured badge, not label text. - Raw `node_id` and `identifier` stored separately in `node.data` so the drawer filtering (`events.filter(e => e.node_id === nodeId && e.identifier === id)`) does not need to parse/split the scoped key. - Parent edges are scoped to the same identifier as the child, assuming intra- ticker chains. Cross-run topology edges (e.g. scan → pipeline) are implicit via log events, not ReactFlow edges. - MockEngine uses `asyncio.sleep` with a speed divisor — higher speed values give faster replays for rapid iteration during UI development. ## Considerations for Future Agents - Re-run button on graph nodes already uses `identifier` to dispatch `startRun('pipeline', { ticker: identifier })` or `startRun('scan')` — no further changes needed for per-node re-runs to be correctly scoped. - The `_run_identifiers` dict in LangGraphEngine is keyed by `run_id`; it is cleaned up after each run. If parallel runs are ever supported per engine instance, this dict handles them correctly already. - For run_auto, each sub-run (scan, per-ticker pipeline) calls its own `run_scan`/`run_pipeline` which sets `_run_identifiers[run_id]`. The outer `run_auto` does not set it — this is intentional. - `uv.lock` changes reflect dependency tree after Chainlit removal in the previous commit; no new runtime dependencies were added by this PR. --- 🤖 Commit Agent | Session: mock-engine + scoped-graph-nodes * feat(graph): two-phase column layout — scan top, ticker columns below ## Summary Redesigns the ReactFlow graph layout engine so scan nodes form a centred funnel at the top and each ticker gets its own vertical column below, matching the agreed design. Ticker header cards (bold ticker symbol + pulse dot + progress counter) act as column anchors; agent cards stack beneath each one. Fan-out dashed edges connect macro_synthesis → each ticker header. ## Changes - SCAN phase: geopolitical/market-movers/sector scanners placed on the same horizontal row at x = [0, COL_WIDTH, 2×COL_WIDTH] (aligns with first 3 ticker columns); industry_deep_dive and macro_synthesis centered below. - TICKER columns: new identifiers get a TickerHeaderNode at tickerStartY; agent nodes stack beneath using column-based parent tracking (header → agent0 → agent1 → …) independent of evt.parent_node_id. - TickerHeaderNode: wide card, bold ticker symbol, animated pulse status dot, completedCount/agentCount counter updated live as results arrive. - Tool nodes (node_id starts with "tool_") skipped from graph — visible in terminal/drawer, not cluttering the column layout. - Portfolio nodes centred below all ticker columns. - Layout state extracted into LayoutState ref + freshLayout() for clean resets. - Node labels use toLabel() (snake_case → Title Case). - Metrics row shows total tokens (in+out) instead of just latency. ## Decision Context - Column-based parent edges chosen over evt.parent_node_id because mock engine emits parent_node_id="start" for all agents; column ordering is reliable. - Scan phase X positions reuse COL_WIDTH so phase-1 scanners visually align above first three ticker columns — no arbitrary magic numbers. - Tool nodes removed from graph (not hidden) — they add noise to column layout with no actionable meaning; the drawer already shows them per node. ## Considerations for Future Agents - identifierLastNode tracks scoped ID of previous agent per ticker column — used for sequential edge chaining; do not remove without replacing edge logic. - tickerStartY is set once on first ticker arrival; subsequent tickers share the same Y baseline — only colCount and identifierAgentRow differ per ticker. - TickerHeaderNode clicks pass node_id='header' + identifier to onNodeClick; Dashboard NodeEventsDetail filters all events by identifier when node_id is 'header' (shows the full ticker run timeline in the drawer). --- 🤖 Commit Agent | Session: two-phase column graph layout --- agent_os/backend/routes/runs.py | 36 ++ agent_os/backend/routes/websocket.py | 6 +- agent_os/backend/services/langgraph_engine.py | 15 + agent_os/backend/services/mock_engine.py | 304 ++++++++++ agent_os/frontend/src/Dashboard.tsx | 134 ++++- .../frontend/src/components/AgentGraph.tsx | 553 +++++++++++++----- agent_os/frontend/src/hooks/useAgentStream.ts | 2 + 7 files changed, 878 insertions(+), 172 deletions(-) create mode 100644 agent_os/backend/services/mock_engine.py diff --git a/agent_os/backend/routes/runs.py b/agent_os/backend/routes/runs.py index 7c763c08..8b114f0c 100644 --- a/agent_os/backend/routes/runs.py +++ b/agent_os/backend/routes/runs.py @@ -6,12 +6,14 @@ import time from agent_os.backend.store import runs from agent_os.backend.dependencies import get_current_user from agent_os.backend.services.langgraph_engine import LangGraphEngine +from agent_os.backend.services.mock_engine import MockEngine logger = logging.getLogger("agent_os.runs") router = APIRouter(prefix="/api/run", tags=["runs"]) engine = LangGraphEngine() +mock_engine = MockEngine() async def _run_and_store(run_id: str, gen: AsyncGenerator[Dict[str, Any], None]) -> None: @@ -104,6 +106,40 @@ async def trigger_auto( background_tasks.add_task(_run_and_store, run_id, engine.run_auto(run_id, params or {})) return {"run_id": run_id, "status": "queued"} +@router.post("/mock") +async def trigger_mock( + background_tasks: BackgroundTasks, + params: Dict[str, Any] = None, + user: dict = Depends(get_current_user), +): + """Start a mock run that streams scripted events — no real LLM calls. + + Accepted params: + mock_type : "pipeline" | "scan" | "auto" (default: "pipeline") + ticker : ticker symbol for pipeline / auto (default: "AAPL") + tickers : list of tickers for auto mock + date : analysis date (default: today) + speed : delay divisor — 1.0 = realistic, 5.0 = fast (default: 1.0) + """ + p = params or {} + run_id = str(uuid.uuid4()) + runs[run_id] = { + "id": run_id, + "type": "mock", + "status": "queued", + "created_at": time.time(), + "user_id": user["user_id"], + "params": p, + } + logger.info( + "Queued MOCK run=%s mock_type=%s user=%s", + run_id, p.get("mock_type", "pipeline"), user["user_id"], + ) + background_tasks.add_task( + _run_and_store, run_id, mock_engine.run_mock(run_id, p) + ) + return {"run_id": run_id, "status": "queued"} + @router.delete("/portfolio-stage") async def reset_portfolio_stage( params: Dict[str, Any], diff --git a/agent_os/backend/routes/websocket.py b/agent_os/backend/routes/websocket.py index f9ee63e7..81291639 100644 --- a/agent_os/backend/routes/websocket.py +++ b/agent_os/backend/routes/websocket.py @@ -7,6 +7,7 @@ from typing import Dict, Any from agent_os.backend.dependencies import get_current_user from agent_os.backend.store import runs from agent_os.backend.services.langgraph_engine import LangGraphEngine +from agent_os.backend.services.mock_engine import MockEngine logger = logging.getLogger("agent_os.websocket") @@ -16,6 +17,7 @@ router = APIRouter(prefix="/ws", tags=["websocket"]) _EVENT_POLL_INTERVAL_SECONDS = 0.05 engine = LangGraphEngine() +_mock_engine = MockEngine() @router.websocket("/stream/{run_id}") async def websocket_endpoint( @@ -66,7 +68,9 @@ async def websocket_endpoint( else: # status == "queued" — WebSocket is the executor (background task didn't start yet) stream_gen = None - if run_type == "scan": + if run_type == "mock": + stream_gen = _mock_engine.run_mock(run_id, params) + elif run_type == "scan": stream_gen = engine.run_scan(run_id, params) elif run_type == "pipeline": stream_gen = engine.run_pipeline(run_id, params) diff --git a/agent_os/backend/services/langgraph_engine.py b/agent_os/backend/services/langgraph_engine.py index dfb27a89..268329ca 100644 --- a/agent_os/backend/services/langgraph_engine.py +++ b/agent_os/backend/services/langgraph_engine.py @@ -72,6 +72,8 @@ class LangGraphEngine: self._node_start_times: Dict[str, Dict[str, float]] = {} # Track the last prompt per node so we can attach it to result events self._node_prompts: Dict[str, Dict[str, str]] = {} + # Track the human-readable identifier (ticker / "MARKET" / portfolio_id) per run + self._run_identifiers: Dict[str, str] = {} # ------------------------------------------------------------------ # Run helpers @@ -100,6 +102,7 @@ class LangGraphEngine: } self._node_start_times[run_id] = {} + self._run_identifiers[run_id] = "MARKET" final_state: Dict[str, Any] = {} async for event in scanner.graph.astream_events(initial_state, version="v2"): @@ -116,6 +119,7 @@ class LangGraphEngine: self._node_start_times.pop(run_id, None) self._node_prompts.pop(run_id, None) + self._run_identifiers.pop(run_id, None) # Fallback: if the root on_chain_end event was never captured (can happen # with deeply nested sub-graphs), re-invoke to get the complete final state. @@ -205,6 +209,7 @@ class LangGraphEngine: initial_state = graph_wrapper.propagator.create_initial_state(ticker, date) self._node_start_times[run_id] = {} + self._run_identifiers[run_id] = ticker.upper() final_state: Dict[str, Any] = {} async for event in graph_wrapper.graph.astream_events( @@ -223,6 +228,7 @@ class LangGraphEngine: self._node_start_times.pop(run_id, None) self._node_prompts.pop(run_id, None) + self._run_identifiers.pop(run_id, None) # Fallback: if the root on_chain_end event was never captured (can happen # with deeply nested sub-graphs), re-invoke to get the complete final state. @@ -351,6 +357,7 @@ class LangGraphEngine: } self._node_start_times[run_id] = {} + self._run_identifiers[run_id] = portfolio_id final_state: Dict[str, Any] = {} async for event in portfolio_graph.graph.astream_events( @@ -366,6 +373,7 @@ class LangGraphEngine: self._node_start_times.pop(run_id, None) self._node_prompts.pop(run_id, None) + self._run_identifiers.pop(run_id, None) # Fallback: if the root on_chain_end event was never captured, re-invoke. if not final_state: @@ -805,6 +813,7 @@ class LangGraphEngine: starts = self._node_start_times.get(run_id, {}) prompts = self._node_prompts.setdefault(run_id, {}) + identifier = self._run_identifiers.get(run_id, "") # ------ LLM start ------ if kind == "on_chat_model_start": @@ -856,6 +865,7 @@ class LangGraphEngine: "parent_node_id": "start", "type": "thought", "agent": node_name.upper(), + "identifier": identifier, "message": f"Prompting {model}…" + (f" | {prompt_snippet}" if prompt_snippet else ""), "prompt": full_prompt, @@ -868,6 +878,7 @@ class LangGraphEngine: "node_id": node_name, "type": "thought", "agent": node_name.upper(), + "identifier": identifier, "message": f"Prompting LLM… (event parse error)", "prompt": "", "metrics": {}, @@ -891,6 +902,7 @@ class LangGraphEngine: "parent_node_id": node_name, "type": "tool", "agent": node_name.upper(), + "identifier": identifier, "message": f"▶ Tool: {name}" + (f" | {tool_input}" if tool_input else ""), "prompt": full_input, @@ -919,6 +931,7 @@ class LangGraphEngine: "parent_node_id": node_name, "type": "tool_result", "agent": node_name.upper(), + "identifier": identifier, "message": f"✓ Tool result: {name}" + (f" | {tool_output}" if tool_output else ""), "response": full_output, @@ -1002,6 +1015,7 @@ class LangGraphEngine: "node_id": node_name, "type": "result", "agent": node_name.upper(), + "identifier": identifier, "message": response_snippet or "Completed.", "prompt": matched_prompt, "response": full_response, @@ -1020,6 +1034,7 @@ class LangGraphEngine: "node_id": node_name, "type": "result", "agent": node_name.upper(), + "identifier": identifier, "message": "Completed (event parse error).", "prompt": matched_prompt, "response": "", diff --git a/agent_os/backend/services/mock_engine.py b/agent_os/backend/services/mock_engine.py new file mode 100644 index 00000000..8a704f95 --- /dev/null +++ b/agent_os/backend/services/mock_engine.py @@ -0,0 +1,304 @@ +"""MockEngine — streams scripted events for UI testing without real LLM calls. + +Usage (via POST /api/run/mock): + params = { + "mock_type": "pipeline" | "scan" | "auto", + "ticker": "AAPL", # used for pipeline / auto + "tickers": ["AAPL","NVDA"], # used for auto (overrides ticker list) + "date": "2026-03-24", + "speed": 2.0, # delay divisor — higher = faster + } +""" + +import asyncio +import time +from typing import AsyncGenerator, Dict, Any + + +class MockEngine: + """Generates scripted AgentOS events without calling real LLMs.""" + + # ------------------------------------------------------------------ + # Public entry points + # ------------------------------------------------------------------ + + async def run_mock( + self, run_id: str, params: Dict[str, Any] + ) -> AsyncGenerator[Dict[str, Any], None]: + mock_type = params.get("mock_type", "pipeline") + speed = max(float(params.get("speed", 1.0)), 0.1) + + if mock_type == "scan": + async for evt in self._run_scan(run_id, params, speed): + yield evt + elif mock_type == "auto": + async for evt in self._run_auto(run_id, params, speed): + yield evt + else: + async for evt in self._run_pipeline(run_id, params, speed): + yield evt + + # ------------------------------------------------------------------ + # Pipeline mock + # ------------------------------------------------------------------ + + async def _run_pipeline( + self, run_id: str, params: Dict[str, Any], speed: float + ) -> AsyncGenerator[Dict[str, Any], None]: + ticker = params.get("ticker", "AAPL").upper() + date = params.get("date", time.strftime("%Y-%m-%d")) + + yield self._log(f"[MOCK] Starting pipeline for {ticker} on {date}") + await self._sleep(0.3, speed) + + # Analysts (sequential for simplicity in mock) + analysts = [ + ("news_analyst", "gpt-4o-mini", 1.4, 480, 310), + ("market_analyst", "gpt-4o-mini", 1.2, 390, 240), + ("fundamentals_analyst", "gpt-4o", 2.1, 620, 430), + ("social_analyst", "gpt-4o-mini", 0.9, 310, 190), + ] + for node, model, latency, tok_in, tok_out in analysts: + async for evt in self._agent_with_tool( + run_id, node, ticker, model, latency, tok_in, tok_out, speed, + tool_name=f"get_{node.split('_')[0]}_data", + ): + yield evt + + # Research debate + for node, model, latency, tok_in, tok_out in [ + ("bull_researcher", "gpt-4o", 1.8, 540, 360), + ("bear_researcher", "gpt-4o", 1.7, 510, 340), + ("research_manager","gpt-4o", 2.3, 680, 480), + ]: + async for evt in self._agent_no_tool( + run_id, node, ticker, model, latency, tok_in, tok_out, speed + ): + yield evt + + # Trading decision + for node, model, latency, tok_in, tok_out in [ + ("trader", "gpt-4o", 2.0, 600, 420), + ("risk_manager", "gpt-4o", 1.5, 450, 310), + ("risk_judge", "gpt-4o", 1.1, 380, 260), + ]: + async for evt in self._agent_no_tool( + run_id, node, ticker, model, latency, tok_in, tok_out, speed + ): + yield evt + + yield self._log(f"[MOCK] Pipeline for {ticker} completed.") + + # ------------------------------------------------------------------ + # Scan mock + # ------------------------------------------------------------------ + + async def _run_scan( + self, run_id: str, params: Dict[str, Any], speed: float + ) -> AsyncGenerator[Dict[str, Any], None]: + date = params.get("date", time.strftime("%Y-%m-%d")) + identifier = "MARKET" + + yield self._log(f"[MOCK] Starting market scan for {date}") + await self._sleep(0.3, speed) + + # Phase 1 — three scanners in "parallel" (interleaved) + yield self._log("[MOCK] Phase 1: Running geopolitical, market-movers, sector scanners…") + phase1 = [ + ("geopolitical_scanner", "gpt-4o-mini", 1.5, 420, 280), + ("market_movers_scanner", "gpt-4o-mini", 1.3, 380, 250), + ("sector_scanner", "gpt-4o-mini", 1.4, 400, 265), + ] + # Emit thought events for all three before any complete + for node, model, _, _, _ in phase1: + yield self._thought(node, identifier, model, f"[MOCK] Scanning {node.replace('_', ' ')}…") + await self._sleep(0.1, speed) + + # Then complete them in order + for node, model, latency, tok_in, tok_out in phase1: + await self._sleep(latency, speed) + yield self._result(node, identifier, model, tok_in, tok_out, round(latency * 1000), + f"[MOCK] {node.replace('_', ' ').title()} report ready.") + + # Phase 2 — industry deep dive + yield self._log("[MOCK] Phase 2: Industry deep dive…") + async for evt in self._agent_no_tool( + run_id, "industry_deep_dive", identifier, "gpt-4o", 2.2, 680, 460, speed + ): + yield evt + + # Phase 3 — macro synthesis + yield self._log("[MOCK] Phase 3: Macro synthesis + watchlist generation…") + async for evt in self._agent_no_tool( + run_id, "macro_synthesis", identifier, "gpt-4o", 2.8, 820, 590, speed + ): + yield evt + + yield self._log("[MOCK] Scan completed. Top-10 watchlist generated.") + + # ------------------------------------------------------------------ + # Auto mock (scan → pipeline per ticker → portfolio) + # ------------------------------------------------------------------ + + async def _run_auto( + self, run_id: str, params: Dict[str, Any], speed: float + ) -> AsyncGenerator[Dict[str, Any], None]: + tickers = params.get("tickers") or [params.get("ticker", "AAPL").upper()] + + yield self._log(f"[MOCK] Starting auto run — scan + {len(tickers)} pipeline(s) + portfolio") + await self._sleep(0.2, speed) + + # Phase 1: Scan + yield self._log("[MOCK] Phase 1/3: Market scan…") + async for evt in self._run_scan(run_id, params, speed): + yield evt + + # Phase 2: Per-ticker pipeline + for ticker in tickers: + yield self._log(f"[MOCK] Phase 2/3: Pipeline for {ticker}…") + async for evt in self._run_pipeline(run_id, {**params, "ticker": ticker}, speed): + yield evt + + # Phase 3: Portfolio + yield self._log("[MOCK] Phase 3/3: Portfolio manager…") + async for evt in self._agent_no_tool( + run_id, "portfolio_manager", "PORTFOLIO", "gpt-4o", 2.5, 740, 520, speed + ): + yield evt + + yield self._log("[MOCK] Auto run completed.") + + # ------------------------------------------------------------------ + # Building blocks + # ------------------------------------------------------------------ + + async def _agent_with_tool( + self, + run_id: str, + node: str, + identifier: str, + model: str, + latency: float, + tok_in: int, + tok_out: int, + speed: float, + tool_name: str, + ) -> AsyncGenerator[Dict[str, Any], None]: + yield self._thought(node, identifier, model, f"[MOCK] {node} analysing {identifier}…") + await self._sleep(0.4, speed) + + yield self._tool_call(node, identifier, tool_name, f'{{"ticker": "{identifier}"}}') + await self._sleep(0.6, speed) + + yield self._tool_result(node, identifier, tool_name, + f"[MOCK] Retrieved {tool_name} data for {identifier}.") + await self._sleep(latency - 1.0, speed) + + yield self._result(node, identifier, model, tok_in, tok_out, round(latency * 1000), + f"[MOCK] {node.replace('_', ' ').title()} analysis complete for {identifier}.") + + async def _agent_no_tool( + self, + run_id: str, + node: str, + identifier: str, + model: str, + latency: float, + tok_in: int, + tok_out: int, + speed: float, + ) -> AsyncGenerator[Dict[str, Any], None]: + yield self._thought(node, identifier, model, f"[MOCK] {node} processing {identifier}…") + await self._sleep(latency, speed) + + yield self._result(node, identifier, model, tok_in, tok_out, round(latency * 1000), + f"[MOCK] {node.replace('_', ' ').title()} decision for {identifier}.") + + # ------------------------------------------------------------------ + # Event constructors + # ------------------------------------------------------------------ + + @staticmethod + def _ns() -> str: + return str(time.time_ns()) + + def _log(self, message: str) -> Dict[str, Any]: + return { + "id": f"log_{self._ns()}", + "node_id": "__system__", + "type": "log", + "agent": "SYSTEM", + "identifier": "", + "message": message, + "metrics": {}, + } + + def _thought(self, node: str, identifier: str, model: str, message: str) -> Dict[str, Any]: + return { + "id": f"thought_{self._ns()}", + "node_id": node, + "parent_node_id": "start", + "type": "thought", + "agent": node.upper(), + "identifier": identifier, + "message": message, + "prompt": f"[MOCK PROMPT] Analyse {identifier} using available data.", + "metrics": {"model": model}, + } + + def _tool_call(self, node: str, identifier: str, tool: str, inp: str) -> Dict[str, Any]: + return { + "id": f"tool_{self._ns()}", + "node_id": f"tool_{tool}", + "parent_node_id": node, + "type": "tool", + "agent": node.upper(), + "identifier": identifier, + "message": f"▶ Tool: {tool} | {inp}", + "prompt": inp, + "metrics": {}, + } + + def _tool_result(self, node: str, identifier: str, tool: str, output: str) -> Dict[str, Any]: + return { + "id": f"tool_res_{self._ns()}", + "node_id": f"tool_{tool}", + "parent_node_id": node, + "type": "tool_result", + "agent": node.upper(), + "identifier": identifier, + "message": f"✓ Tool result: {tool} | {output}", + "response": output, + "metrics": {}, + } + + def _result( + self, + node: str, + identifier: str, + model: str, + tok_in: int, + tok_out: int, + latency_ms: int, + message: str, + ) -> Dict[str, Any]: + return { + "id": f"result_{self._ns()}", + "node_id": node, + "type": "result", + "agent": node.upper(), + "identifier": identifier, + "message": message, + "response": f"[MOCK RESPONSE] {message}", + "metrics": { + "model": model, + "tokens_in": tok_in, + "tokens_out": tok_out, + "latency_ms": latency_ms, + }, + } + + @staticmethod + async def _sleep(seconds: float, speed: float) -> None: + delay = max(seconds / speed, 0.01) + await asyncio.sleep(delay) diff --git a/agent_os/frontend/src/Dashboard.tsx b/agent_os/frontend/src/Dashboard.tsx index 80317f14..408fe38b 100644 --- a/agent_os/frontend/src/Dashboard.tsx +++ b/agent_os/frontend/src/Dashboard.tsx @@ -35,7 +35,7 @@ import { Collapse, useToast, } from '@chakra-ui/react'; -import { LayoutDashboard, Wallet, Settings, Terminal as TerminalIcon, ChevronRight, Eye, Search, BarChart3, Bot, ChevronDown, ChevronUp, Trash2 } from 'lucide-react'; +import { LayoutDashboard, Wallet, Settings, Terminal as TerminalIcon, ChevronRight, Eye, Search, BarChart3, Bot, ChevronDown, ChevronUp, FlaskConical, Trash2 } from 'lucide-react'; import { MetricHeader } from './components/MetricHeader'; import { AgentGraph } from './components/AgentGraph'; import { PortfolioViewer } from './components/PortfolioViewer'; @@ -45,12 +45,17 @@ import axios from 'axios'; const API_BASE = 'http://127.0.0.1:8088/api'; // ─── Run type definitions with required parameters ──────────────────── -type RunType = 'scan' | 'pipeline' | 'portfolio' | 'auto'; +type RunType = 'scan' | 'pipeline' | 'portfolio' | 'auto' | 'mock'; + +/** Mock-specific sub-type. */ +type MockType = 'pipeline' | 'scan' | 'auto'; interface RunParams { date: string; ticker: string; portfolio_id: string; + mock_type: MockType; + speed: string; force: boolean; } @@ -59,6 +64,7 @@ const RUN_TYPE_LABELS: Record = { pipeline: 'Pipeline', portfolio: 'Portfolio', auto: 'Auto', + mock: 'Mock', }; /** Which params each run type needs. */ @@ -67,6 +73,7 @@ const REQUIRED_PARAMS: Record = { pipeline: ['ticker', 'date'], portfolio: ['date', 'portfolio_id'], auto: ['date', 'portfolio_id'], + mock: [], }; /** Return the colour token for a given event type. */ @@ -266,10 +273,13 @@ const EventDetail: React.FC<{ event: AgentEvent; onOpenModal?: (evt: AgentEvent) ); // ─── Detail drawer showing all events for a given graph node ────────── -const NodeEventsDetail: React.FC<{ nodeId: string; events: AgentEvent[]; onOpenModal: (evt: AgentEvent) => void }> = ({ nodeId, events, onOpenModal }) => { +const NodeEventsDetail: React.FC<{ nodeId: string; identifier?: string | null; events: AgentEvent[]; onOpenModal: (evt: AgentEvent) => void }> = ({ nodeId, identifier, events, onOpenModal }) => { const nodeEvents = useMemo( - () => events.filter((e) => e.node_id === nodeId), - [events, nodeId], + () => events.filter((e) => + e.node_id === nodeId && + (!identifier || e.identifier === identifier) + ), + [events, nodeId, identifier], ); if (nodeEvents.length === 0) { @@ -307,6 +317,7 @@ export const Dashboard: React.FC = () => { const [drawerMode, setDrawerMode] = useState<'event' | 'node'>('event'); const [selectedEvent, setSelectedEvent] = useState(null); const [selectedNodeId, setSelectedNodeId] = useState(null); + const [selectedNodeIdentifier, setSelectedNodeIdentifier] = useState(null); // Parameter inputs const [showParams, setShowParams] = useState(false); @@ -314,6 +325,8 @@ export const Dashboard: React.FC = () => { date: new Date().toISOString().split('T')[0], ticker: 'AAPL', portfolio_id: 'main_portfolio', + mock_type: 'pipeline', + speed: '3', force: false, }); @@ -333,12 +346,14 @@ export const Dashboard: React.FC = () => { const isRunning = isTriggering || status === 'streaming' || status === 'connecting'; - const startRun = async (type: RunType) => { + const startRun = async (type: RunType, overrideParams?: Partial) => { if (isRunning) return; + const effectiveParams = { ...params, ...overrideParams }; + // Validate required params const required = REQUIRED_PARAMS[type]; - const missing = required.filter((k) => { const v = params[k]; return typeof v === 'string' ? !v.trim() : !v; }); + const missing = required.filter((k) => { const v = effectiveParams[k]; return typeof v === 'string' ? !v.trim() : !v; }); if (missing.length > 0) { toast({ title: `Missing required fields for ${RUN_TYPE_LABELS[type]}`, @@ -351,17 +366,30 @@ export const Dashboard: React.FC = () => { setShowParams(true); return; } - + setIsTriggering(true); setActiveRunType(type); try { clearEvents(); - const res = await axios.post(`${API_BASE}/run/${type}`, { - portfolio_id: params.portfolio_id, - date: params.date, - ticker: params.ticker, - force: params.force, - }); + // For mock auto runs, parse comma-separated tickers into an array + const mockTickers = effectiveParams.mock_type === 'auto' + ? effectiveParams.ticker.split(',').map((t) => t.trim().toUpperCase()).filter(Boolean) + : undefined; + const body = type === 'mock' + ? { + mock_type: effectiveParams.mock_type, + ticker: effectiveParams.ticker.split(',')[0].trim().toUpperCase(), + ...(mockTickers && mockTickers.length > 1 ? { tickers: mockTickers } : {}), + date: effectiveParams.date, + speed: parseFloat(effectiveParams.speed) || 3, + } + : { + portfolio_id: effectiveParams.portfolio_id, + date: effectiveParams.date, + ticker: effectiveParams.ticker, + force: effectiveParams.force, + }; + const res = await axios.post(`${API_BASE}/run/${type}`, body); setActiveRunId(res.data.run_id); } catch (err) { console.error("Failed to start run:", err); @@ -371,6 +399,16 @@ export const Dashboard: React.FC = () => { } }; + /** Re-run triggered from a graph node's Re-run button. */ + const handleNodeRerun = useCallback((identifier: string, _nodeId: string) => { + if (identifier === 'MARKET' || identifier === '') { + startRun('scan'); + } else { + startRun('pipeline', { ticker: identifier }); + } + // eslint-disable-next-line react-hooks/exhaustive-deps + }, [isRunning, params]); + const resetPortfolioStage = async () => { if (!params.date || !params.portfolio_id) { toast({ title: 'Date and Portfolio ID are required', status: 'warning', duration: 3000, isClosable: true, position: 'top' }); @@ -407,9 +445,10 @@ export const Dashboard: React.FC = () => { }, [onOpen]); /** Open the drawer showing all events for a graph node (node click). */ - const openNodeDetail = useCallback((nodeId: string) => { + const openNodeDetail = useCallback((nodeId: string, identifier?: string) => { setDrawerMode('node'); setSelectedNodeId(nodeId); + setSelectedNodeIdentifier(identifier || null); setSelectedEvent(null); onOpen(); }, [onOpen]); @@ -417,7 +456,7 @@ export const Dashboard: React.FC = () => { // Derive a readable drawer title const drawerTitle = drawerMode === 'event' ? `Event: ${selectedEvent?.agent ?? ''} — ${selectedEvent?.type ?? ''}` - : `Node: ${selectedNodeId ?? ''}`; + : `Node: ${selectedNodeId ?? ''}${selectedNodeIdentifier ? ` · ${selectedNodeIdentifier}` : ''}`; return ( @@ -466,7 +505,7 @@ export const Dashboard: React.FC = () => { {/* Left Side: Graph Area */} - + {/* Floating Control Panel */} @@ -475,13 +514,13 @@ export const Dashboard: React.FC = () => { {(['scan', 'pipeline', 'portfolio', 'auto'] as RunType[]).map((type) => { const isThisRunning = isRunning && activeRunType === type; const isOtherRunning = isRunning && activeRunType !== type; - const icons: Record = { + const icons: Record = { scan: , pipeline: , portfolio: , auto: , }; - const colors: Record = { + const colors: Record = { scan: 'cyan', pipeline: 'blue', portfolio: 'purple', @@ -504,6 +543,21 @@ export const Dashboard: React.FC = () => { ); })} + {/* Mock run button — no LLM calls */} + + + + ))} + + + + Speed: + + {[['1×', '1'], ['3×', '3'], ['5×', '5'], ['10×', '10']].map(([label, val]) => ( + + ))} + + + { - Required: Scan → date · Pipeline → ticker, date · Portfolio → date, portfolio · Auto → date, portfolio + Required: Scan → date · Pipeline → ticker, date · Portfolio → date, portfolio · Auto → date, portfolio · Mock → no API calls @@ -651,7 +741,7 @@ export const Dashboard: React.FC = () => { )} {drawerMode === 'node' && selectedNodeId && ( - + )} diff --git a/agent_os/frontend/src/components/AgentGraph.tsx b/agent_os/frontend/src/components/AgentGraph.tsx index 1205eaa0..c51a5d80 100644 --- a/agent_os/frontend/src/components/AgentGraph.tsx +++ b/agent_os/frontend/src/components/AgentGraph.tsx @@ -1,8 +1,8 @@ import React, { useEffect, useRef, useCallback } from 'react'; -import ReactFlow, { - Background, - Controls, - Node, +import ReactFlow, { + Background, + Controls, + Node, Edge, Handle, Position, @@ -11,208 +11,462 @@ import ReactFlow, { useEdgesState, } from 'reactflow'; import 'reactflow/dist/style.css'; -import { Box, Text, Flex, Icon, Badge } from '@chakra-ui/react'; -import { Cpu, Settings, Database, TrendingUp, Clock } from 'lucide-react'; +import { Box, Text, Flex, Icon, Badge, IconButton, Tooltip } from '@chakra-ui/react'; +import { Cpu, Settings, Database, TrendingUp, Clock, RefreshCw } from 'lucide-react'; import { AgentEvent } from '../hooks/useAgentStream'; -// --- Custom Agent Node Component --- +// ─── Layout constants ───────────────────────────────────────────────────────── +const COL_WIDTH = 230; // horizontal space per ticker column / scan slot +const ROW_HEIGHT = 148; // vertical space per agent row within a column +const SCAN_TOP_Y = 40; // y offset for the first scan row +const SCAN_ROW_H = 148; // vertical gap between scan rows +// Phase-1 scanners align with the first 3 ticker columns (x = 0, COL_WIDTH, COL_WIDTH*2) +const SCAN_CENTER_X = COL_WIDTH; // x for phase-2/3 scan nodes (centre of 3) +const TICKER_GAP = 80; // vertical gap between scan phase and ticker headers +const TICKER_HDR_H = 108; // height reserved for the ticker header card + +// ─── Node classification ────────────────────────────────────────────────────── +const SCAN_PHASE1 = ['geopolitical_scanner', 'market_movers_scanner', 'sector_scanner']; +const SCAN_PHASE2 = new Set(['industry_deep_dive']); +const SCAN_PHASE3 = new Set(['macro_synthesis']); +const SCAN_ALL = new Set([...SCAN_PHASE1, ...SCAN_PHASE2, ...SCAN_PHASE3]); + +type NodeKind = 'scan' | 'ticker' | 'portfolio' | 'skip'; + +function classifyNode(nodeId: string, identifier: string): NodeKind { + if (nodeId.startsWith('tool_')) return 'skip'; + if (identifier === 'MARKET' || SCAN_ALL.has(nodeId)) return 'scan'; + if (identifier === 'PORTFOLIO' || nodeId === 'portfolio_manager' || + nodeId === 'make_pm_decision') return 'portfolio'; + if (identifier) return 'ticker'; + return 'skip'; +} + +// ─── Colour helpers ─────────────────────────────────────────────────────────── +const STATUS_COLORS: Record = { + running: '#4fd1c5', + completed: '#68d391', + error: '#fc8181', +}; +const DEFAULT_COLOR = 'rgba(255,255,255,0.25)'; + +function statusColor(status: string): string { + return STATUS_COLORS[status] ?? DEFAULT_COLOR; +} + +const ID_PALETTE = [ + '#63b3ed', '#9f7aea', '#f6ad55', '#4fd1c5', + '#f687b3', '#f6e05e', '#68d391', '#fc8181', +]; +function identifierColor(id: string): string { + if (!id || id === 'MARKET') return '#4fd1c5'; + if (id === 'PORTFOLIO') return '#9f7aea'; + let h = 0; + for (let i = 0; i < id.length; i++) h = (h * 31 + id.charCodeAt(i)) & 0xffff; + return ID_PALETTE[h % ID_PALETTE.length]; +} + +function toLabel(nodeId: string): string { + return nodeId.replace(/_/g, ' ').replace(/\b\w/g, c => c.toUpperCase()); +} + +// ─── Agent Node ─────────────────────────────────────────────────────────────── const AgentNode = ({ data }: NodeProps) => { - const getIcon = (agent: string) => { - switch (agent.toUpperCase()) { - case 'ANALYST': return Cpu; - case 'RESEARCHER': return Database; - case 'TRADER': return TrendingUp; - default: return Settings; - } + const getIcon = (agent = '') => { + const a = agent.toUpperCase(); + if (a.includes('ANALYST') || a.includes('SCANNER')) return Cpu; + if (a.includes('RESEARCHER') || a.includes('MANAGER') || a.includes('SYNTHESIS') || a.includes('DIVE')) return Database; + if (a.includes('TRADER') || a.includes('RISK') || a.includes('JUDGE')) return TrendingUp; + return Settings; }; - const getStatusColor = (status: string) => { - switch (status) { - case 'running': return 'cyan.400'; - case 'completed': return 'green.400'; - case 'error': return 'red.400'; - default: return 'whiteAlpha.500'; - } - }; + const sc = statusColor(data.status); + const canRerun = data.status === 'completed' || data.status === 'error'; + const totalTok = (data.metrics?.tokens_in ?? 0) + (data.metrics?.tokens_out ?? 0); return ( - - - - - - - {data.agent} - {data.status === 'completed' && ( - Done - )} + + + + {/* Header row */} + + + + {data.label} + + {data.status === 'completed' && } + {data.status === 'error' && } - - - + + + + {/* Metrics row */} - - {data.metrics?.latency_ms || 0}ms + + + {data.metrics?.latency_ms ? `${data.metrics.latency_ms}ms` : '—'} + - {data.metrics?.model && data.metrics.model !== 'unknown' && ( - {data.metrics.model} + {totalTok > 0 && ( + + {totalTok.toLocaleString()} tok + )} - + + {data.metrics?.model && data.metrics.model !== 'unknown' && ( + + {data.metrics.model} + + )} + + {/* Running shimmer */} {data.status === 'running' && ( - - - + + + + )} + + {/* Re-run */} + {canRerun && data.onRerun && ( + + } + size="xs" variant="ghost" colorScheme="cyan" alignSelf="flex-end" + onClick={(e) => { e.stopPropagation(); data.onRerun(); }} + /> + )} - + ); }; -const nodeTypes = { - agentNode: AgentNode, +// ─── Ticker Header Node ─────────────────────────────────────────────────────── +const TickerHeaderNode = ({ data }: NodeProps) => { + const color = identifierColor(data.ticker); + const sc = statusColor(data.status ?? 'running'); + const done = data.completedCount ?? 0; + const total = data.agentCount ?? 0; + + return ( + + + + + + + {data.ticker} + + {/* Status pulse dot */} + + + + + + + Pipeline + {total > 0 && ( + + {done}/{total} done + + )} + + + + + + ); }; -interface AgentGraphProps { - events: AgentEvent[]; - onNodeClick?: (nodeId: string) => void; +const nodeTypes = { agentNode: AgentNode, tickerHeader: TickerHeaderNode }; + +// ─── Layout state ───────────────────────────────────────────────────────────── +interface LayoutState { + // Scan phase + scanPhase1Count: number; + scanLastY: number; + hasScan: boolean; + lastScanNodeId: string | null; + // Ticker columns + identifierToCol: Map; + identifierLastNode: Map; + identifierAgentRow: Map; + identifierAgentCount: Map; + identifierDoneCount: Map; + colCount: number; + tickerStartY: number; + maxTickerY: number; + // Tracking + seenNodeIds: Set; + seenEdgeIds: Set; + processedCount: number; } -export const AgentGraph: React.FC = ({ events, onNodeClick }) => { +function freshLayout(): LayoutState { + return { + scanPhase1Count: 0, + scanLastY: 0, + hasScan: false, + lastScanNodeId: null, + identifierToCol: new Map(), + identifierLastNode: new Map(), + identifierAgentRow: new Map(), + identifierAgentCount: new Map(), + identifierDoneCount: new Map(), + colCount: 0, + tickerStartY: 0, + maxTickerY: 0, + seenNodeIds: new Set(), + seenEdgeIds: new Set(), + processedCount: 0, + }; +} + +// ─── Props ──────────────────────────────────────────────────────────────────── +interface AgentGraphProps { + events: AgentEvent[]; + onNodeClick?: (nodeId: string, identifier?: string) => void; + onNodeRerun?: (identifier: string, nodeId: string) => void; +} + +// ─── Component ──────────────────────────────────────────────────────────────── +export const AgentGraph: React.FC = ({ events, onNodeClick, onNodeRerun }) => { const [nodes, setNodes, onNodesChange] = useNodesState([]); const [edges, setEdges, onEdgesChange] = useEdgesState([]); - // Track which node_ids we have already added so we never duplicate - const seenNodeIds = useRef(new Set()); - const seenEdgeIds = useRef(new Set()); - // Track how many unique nodes exist for vertical layout - const nodeCount = useRef(0); - // Track the last processed event index to only process new events - const processedCount = useRef(0); + const ls = useRef(freshLayout()); useEffect(() => { - // Only process newly arrived events - const newEvents = events.slice(processedCount.current); + const newEvents = events.slice(ls.current.processedCount); if (newEvents.length === 0) return; - processedCount.current = events.length; + ls.current.processedCount = events.length; - const addedNodes: Node[] = []; - const addedEdges: Edge[] = []; - const updatedNodeData: Map> = new Map(); + const addedNodes: Node[] = []; + const addedEdges: Edge[] = []; + const patchMap: Map> = new Map(); for (const evt of newEvents) { - if (!evt.node_id || evt.node_id === '__system__') continue; + const nodeId = evt.node_id; + if (!nodeId || nodeId === '__system__') continue; - // Determine if this event means the node is completed - const isCompleted = evt.type === 'result' || evt.type === 'tool_result'; + const identifier = evt.identifier ?? ''; + const kind = classifyNode(nodeId, identifier); + if (kind === 'skip') continue; - if (!seenNodeIds.current.has(evt.node_id)) { - // New node — create it - seenNodeIds.current.add(evt.node_id); - nodeCount.current += 1; + const scopedId = identifier ? `${nodeId}:${identifier}` : nodeId; + const isResult = evt.type === 'result'; - addedNodes.push({ - id: evt.node_id, - type: 'agentNode', - position: { x: 250, y: nodeCount.current * 150 + 50 }, - data: { - agent: evt.agent, - status: isCompleted ? 'completed' : 'running', - metrics: evt.metrics, - }, + // ── Update path (node already exists) ────────────────────────────────── + if (ls.current.seenNodeIds.has(scopedId)) { + if (isResult && kind === 'ticker') { + const done = (ls.current.identifierDoneCount.get(identifier) ?? 0) + 1; + const total = ls.current.identifierAgentCount.get(identifier) ?? 0; + ls.current.identifierDoneCount.set(identifier, done); + patchMap.set(`header:${identifier}`, { + completedCount: done, + agentCount: total, + status: done >= total && total > 0 ? 'completed' : 'running', + }); + } + const prev = patchMap.get(scopedId); + const wasDone = prev?.status === 'completed'; + patchMap.set(scopedId, { + ...prev, + status: wasDone || isResult ? 'completed' : 'running', + metrics: evt.metrics ?? prev?.metrics, }); + continue; + } - // Add edge from parent (if applicable) - if (evt.parent_node_id && evt.parent_node_id !== 'start') { - const edgeId = `e-${evt.parent_node_id}-${evt.node_id}`; - if (!seenEdgeIds.current.has(edgeId)) { - seenEdgeIds.current.add(edgeId); - addedEdges.push({ - id: edgeId, - source: evt.parent_node_id, - target: evt.node_id, - animated: true, - style: { stroke: '#4fd1c5' }, - }); + // ── Create path (new node) ───────────────────────────────────────────── + ls.current.seenNodeIds.add(scopedId); + let x = 0, y = 0; + let parentScopedId: string | null = null; + + if (kind === 'scan') { + if (SCAN_PHASE1.includes(nodeId)) { + x = ls.current.scanPhase1Count * COL_WIDTH; + y = SCAN_TOP_Y; + ls.current.scanPhase1Count++; + } else { + const phaseRow = SCAN_PHASE2.has(nodeId) ? 1 : 2; + x = SCAN_CENTER_X; + y = SCAN_TOP_Y + phaseRow * SCAN_ROW_H; + if (evt.parent_node_id && evt.parent_node_id !== 'start') { + const pid = evt.parent_node_id; + parentScopedId = identifier ? `${pid}:${identifier}` : pid; } } - } else { - // Existing node — queue a status/metrics update - // Never revert a completed node back to running - const prev = updatedNodeData.get(evt.node_id); - const currentlyCompleted = prev?.status === 'completed'; - updatedNodeData.set(evt.node_id, { - status: currentlyCompleted || isCompleted ? 'completed' : 'running', - metrics: evt.metrics, + ls.current.hasScan = true; + ls.current.scanLastY = Math.max(ls.current.scanLastY, y); + ls.current.lastScanNodeId = scopedId; + + } else if (kind === 'ticker') { + // Create ticker header column if this is the first agent for this identifier + if (!ls.current.identifierToCol.has(identifier)) { + const col = ls.current.colCount++; + ls.current.identifierToCol.set(identifier, col); + ls.current.identifierAgentRow.set(identifier, 0); + ls.current.identifierAgentCount.set(identifier, 0); + ls.current.identifierDoneCount.set(identifier, 0); + + if (!ls.current.tickerStartY) { + ls.current.tickerStartY = ls.current.hasScan + ? ls.current.scanLastY + SCAN_ROW_H + TICKER_GAP + : SCAN_TOP_Y; + } + + const hx = col * COL_WIDTH; + const hy = ls.current.tickerStartY; + const hid = `header:${identifier}`; + + addedNodes.push({ + id: hid, type: 'tickerHeader', + position: { x: hx, y: hy }, + data: { ticker: identifier, status: 'running', agentCount: 0, completedCount: 0, + node_id: 'header', identifier }, + }); + ls.current.seenNodeIds.add(hid); + + // Fan-out edge: last scan node → this ticker header + const lastScan = ls.current.lastScanNodeId; + if (lastScan) { + const eid = `e-${lastScan}-${hid}`; + if (!ls.current.seenEdgeIds.has(eid)) { + ls.current.seenEdgeIds.add(eid); + addedEdges.push({ + id: eid, source: lastScan, target: hid, animated: true, + style: { stroke: '#4fd1c5', strokeDasharray: '5 5' }, + }); + } + } + } + + const col = ls.current.identifierToCol.get(identifier)!; + const agentRow = ls.current.identifierAgentRow.get(identifier)!; + ls.current.identifierAgentRow.set(identifier, agentRow + 1); + const newCount = (ls.current.identifierAgentCount.get(identifier) ?? 0) + 1; + ls.current.identifierAgentCount.set(identifier, newCount); + + x = col * COL_WIDTH; + y = ls.current.tickerStartY + TICKER_HDR_H + agentRow * ROW_HEIGHT; + ls.current.maxTickerY = Math.max(ls.current.maxTickerY, y); + + // Parent: previous node in same column (header → agent0 → agent1 → …) + parentScopedId = ls.current.identifierLastNode.get(identifier) ?? `header:${identifier}`; + ls.current.identifierLastNode.set(identifier, scopedId); + + // Update header agent count + patchMap.set(`header:${identifier}`, { + ...(patchMap.get(`header:${identifier}`) ?? {}), + agentCount: newCount, }); + + } else { + // portfolio + const totalW = ls.current.colCount * COL_WIDTH; + x = totalW > 0 ? totalW / 2 - 100 : SCAN_CENTER_X; + y = ls.current.maxTickerY + ROW_HEIGHT + TICKER_GAP; + } + + addedNodes.push({ + id: scopedId, type: 'agentNode', + position: { x, y }, + data: { + agent: evt.agent, + label: toLabel(nodeId), + identifier, + node_id: nodeId, + status: isResult ? 'completed' : 'running', + metrics: evt.metrics, + onRerun: onNodeRerun ? () => onNodeRerun(identifier, nodeId) : undefined, + }, + }); + + // Edge to parent + if (parentScopedId) { + const eid = `e-${parentScopedId}-${scopedId}`; + if (!ls.current.seenEdgeIds.has(eid)) { + ls.current.seenEdgeIds.add(eid); + addedEdges.push({ + id: eid, source: parentScopedId, target: scopedId, animated: true, + style: { stroke: '#4fd1c5' }, + }); + } } } - // Batch state updates - if (addedNodes.length > 0) { - setNodes((prev) => [...prev, ...addedNodes]); + if (addedNodes.length > 0) setNodes(prev => [...prev, ...addedNodes]); + if (addedEdges.length > 0) setEdges(prev => [...prev, ...addedEdges]); + if (patchMap.size > 0) { + setNodes(prev => prev.map(n => { + const patch = patchMap.get(n.id); + if (!patch) return n; + const finalStatus = n.data.status === 'completed' ? 'completed' : (patch.status ?? n.data.status); + return { ...n, data: { ...n.data, ...patch, status: finalStatus } }; + })); } - if (addedEdges.length > 0) { - setEdges((prev) => [...prev, ...addedEdges]); - } - if (updatedNodeData.size > 0) { - setNodes((prev) => - prev.map((n) => { - const patch = updatedNodeData.get(n.id); - if (!patch) return n; - // Never revert a completed node back to running - const finalStatus = n.data.status === 'completed' ? 'completed' : patch.status; - return { - ...n, - data: { ...n.data, ...patch, status: finalStatus, metrics: patch.metrics ?? n.data.metrics }, - }; - }), - ); - } - }, [events, setNodes, setEdges]); + }, [events, setNodes, setEdges, onNodeRerun]); - // Reset tracked state when the events array is cleared (new run) + // Reset on new run (events cleared) useEffect(() => { if (events.length === 0) { - seenNodeIds.current.clear(); - seenEdgeIds.current.clear(); - nodeCount.current = 0; - processedCount.current = 0; + ls.current = freshLayout(); setNodes([]); setEdges([]); } }, [events.length, setNodes, setEdges]); - const handleNodeClick = useCallback((_event: React.MouseEvent, node: Node) => { - onNodeClick?.(node.id); + const handleNodeClick = useCallback((_: React.MouseEvent, node: Node) => { + onNodeClick?.(node.data.node_id as string, node.data.identifier as string); }, [onNodeClick]); return ( - + = ({ events, onNodeClick }) = onNodeClick={handleNodeClick} nodeTypes={nodeTypes} fitView + fitViewOptions={{ padding: 0.15 }} > - + diff --git a/agent_os/frontend/src/hooks/useAgentStream.ts b/agent_os/frontend/src/hooks/useAgentStream.ts index 6ce3216c..bbf7d3f7 100644 --- a/agent_os/frontend/src/hooks/useAgentStream.ts +++ b/agent_os/frontend/src/hooks/useAgentStream.ts @@ -11,6 +11,8 @@ export interface AgentEvent { prompt?: string; /** Full response text (available on result & tool_result events). */ response?: string; + /** Ticker symbol (e.g. "AAPL"), "MARKET" for scans, or portfolio id. */ + identifier?: string; node_id?: string; parent_node_id?: string; metrics?: {