diff --git a/agent_os/backend/routes/runs.py b/agent_os/backend/routes/runs.py index 7c763c08..8b114f0c 100644 --- a/agent_os/backend/routes/runs.py +++ b/agent_os/backend/routes/runs.py @@ -6,12 +6,14 @@ import time from agent_os.backend.store import runs from agent_os.backend.dependencies import get_current_user from agent_os.backend.services.langgraph_engine import LangGraphEngine +from agent_os.backend.services.mock_engine import MockEngine logger = logging.getLogger("agent_os.runs") router = APIRouter(prefix="/api/run", tags=["runs"]) engine = LangGraphEngine() +mock_engine = MockEngine() async def _run_and_store(run_id: str, gen: AsyncGenerator[Dict[str, Any], None]) -> None: @@ -104,6 +106,40 @@ async def trigger_auto( background_tasks.add_task(_run_and_store, run_id, engine.run_auto(run_id, params or {})) return {"run_id": run_id, "status": "queued"} +@router.post("/mock") +async def trigger_mock( + background_tasks: BackgroundTasks, + params: Dict[str, Any] = None, + user: dict = Depends(get_current_user), +): + """Start a mock run that streams scripted events — no real LLM calls. + + Accepted params: + mock_type : "pipeline" | "scan" | "auto" (default: "pipeline") + ticker : ticker symbol for pipeline / auto (default: "AAPL") + tickers : list of tickers for auto mock + date : analysis date (default: today) + speed : delay divisor — 1.0 = realistic, 5.0 = fast (default: 1.0) + """ + p = params or {} + run_id = str(uuid.uuid4()) + runs[run_id] = { + "id": run_id, + "type": "mock", + "status": "queued", + "created_at": time.time(), + "user_id": user["user_id"], + "params": p, + } + logger.info( + "Queued MOCK run=%s mock_type=%s user=%s", + run_id, p.get("mock_type", "pipeline"), user["user_id"], + ) + background_tasks.add_task( + _run_and_store, run_id, mock_engine.run_mock(run_id, p) + ) + return {"run_id": run_id, "status": "queued"} + @router.delete("/portfolio-stage") async def reset_portfolio_stage( params: Dict[str, Any], diff --git a/agent_os/backend/routes/websocket.py b/agent_os/backend/routes/websocket.py index f9ee63e7..81291639 100644 --- a/agent_os/backend/routes/websocket.py +++ b/agent_os/backend/routes/websocket.py @@ -7,6 +7,7 @@ from typing import Dict, Any from agent_os.backend.dependencies import get_current_user from agent_os.backend.store import runs from agent_os.backend.services.langgraph_engine import LangGraphEngine +from agent_os.backend.services.mock_engine import MockEngine logger = logging.getLogger("agent_os.websocket") @@ -16,6 +17,7 @@ router = APIRouter(prefix="/ws", tags=["websocket"]) _EVENT_POLL_INTERVAL_SECONDS = 0.05 engine = LangGraphEngine() +_mock_engine = MockEngine() @router.websocket("/stream/{run_id}") async def websocket_endpoint( @@ -66,7 +68,9 @@ async def websocket_endpoint( else: # status == "queued" — WebSocket is the executor (background task didn't start yet) stream_gen = None - if run_type == "scan": + if run_type == "mock": + stream_gen = _mock_engine.run_mock(run_id, params) + elif run_type == "scan": stream_gen = engine.run_scan(run_id, params) elif run_type == "pipeline": stream_gen = engine.run_pipeline(run_id, params) diff --git a/agent_os/backend/services/langgraph_engine.py b/agent_os/backend/services/langgraph_engine.py index dfb27a89..268329ca 100644 --- a/agent_os/backend/services/langgraph_engine.py +++ b/agent_os/backend/services/langgraph_engine.py @@ -72,6 +72,8 @@ class LangGraphEngine: self._node_start_times: Dict[str, Dict[str, float]] = {} # Track the last prompt per node so we can attach it to result events self._node_prompts: Dict[str, Dict[str, str]] = {} + # Track the human-readable identifier (ticker / "MARKET" / portfolio_id) per run + self._run_identifiers: Dict[str, str] = {} # ------------------------------------------------------------------ # Run helpers @@ -100,6 +102,7 @@ class LangGraphEngine: } self._node_start_times[run_id] = {} + self._run_identifiers[run_id] = "MARKET" final_state: Dict[str, Any] = {} async for event in scanner.graph.astream_events(initial_state, version="v2"): @@ -116,6 +119,7 @@ class LangGraphEngine: self._node_start_times.pop(run_id, None) self._node_prompts.pop(run_id, None) + self._run_identifiers.pop(run_id, None) # Fallback: if the root on_chain_end event was never captured (can happen # with deeply nested sub-graphs), re-invoke to get the complete final state. @@ -205,6 +209,7 @@ class LangGraphEngine: initial_state = graph_wrapper.propagator.create_initial_state(ticker, date) self._node_start_times[run_id] = {} + self._run_identifiers[run_id] = ticker.upper() final_state: Dict[str, Any] = {} async for event in graph_wrapper.graph.astream_events( @@ -223,6 +228,7 @@ class LangGraphEngine: self._node_start_times.pop(run_id, None) self._node_prompts.pop(run_id, None) + self._run_identifiers.pop(run_id, None) # Fallback: if the root on_chain_end event was never captured (can happen # with deeply nested sub-graphs), re-invoke to get the complete final state. @@ -351,6 +357,7 @@ class LangGraphEngine: } self._node_start_times[run_id] = {} + self._run_identifiers[run_id] = portfolio_id final_state: Dict[str, Any] = {} async for event in portfolio_graph.graph.astream_events( @@ -366,6 +373,7 @@ class LangGraphEngine: self._node_start_times.pop(run_id, None) self._node_prompts.pop(run_id, None) + self._run_identifiers.pop(run_id, None) # Fallback: if the root on_chain_end event was never captured, re-invoke. if not final_state: @@ -805,6 +813,7 @@ class LangGraphEngine: starts = self._node_start_times.get(run_id, {}) prompts = self._node_prompts.setdefault(run_id, {}) + identifier = self._run_identifiers.get(run_id, "") # ------ LLM start ------ if kind == "on_chat_model_start": @@ -856,6 +865,7 @@ class LangGraphEngine: "parent_node_id": "start", "type": "thought", "agent": node_name.upper(), + "identifier": identifier, "message": f"Prompting {model}…" + (f" | {prompt_snippet}" if prompt_snippet else ""), "prompt": full_prompt, @@ -868,6 +878,7 @@ class LangGraphEngine: "node_id": node_name, "type": "thought", "agent": node_name.upper(), + "identifier": identifier, "message": f"Prompting LLM… (event parse error)", "prompt": "", "metrics": {}, @@ -891,6 +902,7 @@ class LangGraphEngine: "parent_node_id": node_name, "type": "tool", "agent": node_name.upper(), + "identifier": identifier, "message": f"▶ Tool: {name}" + (f" | {tool_input}" if tool_input else ""), "prompt": full_input, @@ -919,6 +931,7 @@ class LangGraphEngine: "parent_node_id": node_name, "type": "tool_result", "agent": node_name.upper(), + "identifier": identifier, "message": f"✓ Tool result: {name}" + (f" | {tool_output}" if tool_output else ""), "response": full_output, @@ -1002,6 +1015,7 @@ class LangGraphEngine: "node_id": node_name, "type": "result", "agent": node_name.upper(), + "identifier": identifier, "message": response_snippet or "Completed.", "prompt": matched_prompt, "response": full_response, @@ -1020,6 +1034,7 @@ class LangGraphEngine: "node_id": node_name, "type": "result", "agent": node_name.upper(), + "identifier": identifier, "message": "Completed (event parse error).", "prompt": matched_prompt, "response": "", diff --git a/agent_os/backend/services/mock_engine.py b/agent_os/backend/services/mock_engine.py new file mode 100644 index 00000000..8a704f95 --- /dev/null +++ b/agent_os/backend/services/mock_engine.py @@ -0,0 +1,304 @@ +"""MockEngine — streams scripted events for UI testing without real LLM calls. + +Usage (via POST /api/run/mock): + params = { + "mock_type": "pipeline" | "scan" | "auto", + "ticker": "AAPL", # used for pipeline / auto + "tickers": ["AAPL","NVDA"], # used for auto (overrides ticker list) + "date": "2026-03-24", + "speed": 2.0, # delay divisor — higher = faster + } +""" + +import asyncio +import time +from typing import AsyncGenerator, Dict, Any + + +class MockEngine: + """Generates scripted AgentOS events without calling real LLMs.""" + + # ------------------------------------------------------------------ + # Public entry points + # ------------------------------------------------------------------ + + async def run_mock( + self, run_id: str, params: Dict[str, Any] + ) -> AsyncGenerator[Dict[str, Any], None]: + mock_type = params.get("mock_type", "pipeline") + speed = max(float(params.get("speed", 1.0)), 0.1) + + if mock_type == "scan": + async for evt in self._run_scan(run_id, params, speed): + yield evt + elif mock_type == "auto": + async for evt in self._run_auto(run_id, params, speed): + yield evt + else: + async for evt in self._run_pipeline(run_id, params, speed): + yield evt + + # ------------------------------------------------------------------ + # Pipeline mock + # ------------------------------------------------------------------ + + async def _run_pipeline( + self, run_id: str, params: Dict[str, Any], speed: float + ) -> AsyncGenerator[Dict[str, Any], None]: + ticker = params.get("ticker", "AAPL").upper() + date = params.get("date", time.strftime("%Y-%m-%d")) + + yield self._log(f"[MOCK] Starting pipeline for {ticker} on {date}") + await self._sleep(0.3, speed) + + # Analysts (sequential for simplicity in mock) + analysts = [ + ("news_analyst", "gpt-4o-mini", 1.4, 480, 310), + ("market_analyst", "gpt-4o-mini", 1.2, 390, 240), + ("fundamentals_analyst", "gpt-4o", 2.1, 620, 430), + ("social_analyst", "gpt-4o-mini", 0.9, 310, 190), + ] + for node, model, latency, tok_in, tok_out in analysts: + async for evt in self._agent_with_tool( + run_id, node, ticker, model, latency, tok_in, tok_out, speed, + tool_name=f"get_{node.split('_')[0]}_data", + ): + yield evt + + # Research debate + for node, model, latency, tok_in, tok_out in [ + ("bull_researcher", "gpt-4o", 1.8, 540, 360), + ("bear_researcher", "gpt-4o", 1.7, 510, 340), + ("research_manager","gpt-4o", 2.3, 680, 480), + ]: + async for evt in self._agent_no_tool( + run_id, node, ticker, model, latency, tok_in, tok_out, speed + ): + yield evt + + # Trading decision + for node, model, latency, tok_in, tok_out in [ + ("trader", "gpt-4o", 2.0, 600, 420), + ("risk_manager", "gpt-4o", 1.5, 450, 310), + ("risk_judge", "gpt-4o", 1.1, 380, 260), + ]: + async for evt in self._agent_no_tool( + run_id, node, ticker, model, latency, tok_in, tok_out, speed + ): + yield evt + + yield self._log(f"[MOCK] Pipeline for {ticker} completed.") + + # ------------------------------------------------------------------ + # Scan mock + # ------------------------------------------------------------------ + + async def _run_scan( + self, run_id: str, params: Dict[str, Any], speed: float + ) -> AsyncGenerator[Dict[str, Any], None]: + date = params.get("date", time.strftime("%Y-%m-%d")) + identifier = "MARKET" + + yield self._log(f"[MOCK] Starting market scan for {date}") + await self._sleep(0.3, speed) + + # Phase 1 — three scanners in "parallel" (interleaved) + yield self._log("[MOCK] Phase 1: Running geopolitical, market-movers, sector scanners…") + phase1 = [ + ("geopolitical_scanner", "gpt-4o-mini", 1.5, 420, 280), + ("market_movers_scanner", "gpt-4o-mini", 1.3, 380, 250), + ("sector_scanner", "gpt-4o-mini", 1.4, 400, 265), + ] + # Emit thought events for all three before any complete + for node, model, _, _, _ in phase1: + yield self._thought(node, identifier, model, f"[MOCK] Scanning {node.replace('_', ' ')}…") + await self._sleep(0.1, speed) + + # Then complete them in order + for node, model, latency, tok_in, tok_out in phase1: + await self._sleep(latency, speed) + yield self._result(node, identifier, model, tok_in, tok_out, round(latency * 1000), + f"[MOCK] {node.replace('_', ' ').title()} report ready.") + + # Phase 2 — industry deep dive + yield self._log("[MOCK] Phase 2: Industry deep dive…") + async for evt in self._agent_no_tool( + run_id, "industry_deep_dive", identifier, "gpt-4o", 2.2, 680, 460, speed + ): + yield evt + + # Phase 3 — macro synthesis + yield self._log("[MOCK] Phase 3: Macro synthesis + watchlist generation…") + async for evt in self._agent_no_tool( + run_id, "macro_synthesis", identifier, "gpt-4o", 2.8, 820, 590, speed + ): + yield evt + + yield self._log("[MOCK] Scan completed. Top-10 watchlist generated.") + + # ------------------------------------------------------------------ + # Auto mock (scan → pipeline per ticker → portfolio) + # ------------------------------------------------------------------ + + async def _run_auto( + self, run_id: str, params: Dict[str, Any], speed: float + ) -> AsyncGenerator[Dict[str, Any], None]: + tickers = params.get("tickers") or [params.get("ticker", "AAPL").upper()] + + yield self._log(f"[MOCK] Starting auto run — scan + {len(tickers)} pipeline(s) + portfolio") + await self._sleep(0.2, speed) + + # Phase 1: Scan + yield self._log("[MOCK] Phase 1/3: Market scan…") + async for evt in self._run_scan(run_id, params, speed): + yield evt + + # Phase 2: Per-ticker pipeline + for ticker in tickers: + yield self._log(f"[MOCK] Phase 2/3: Pipeline for {ticker}…") + async for evt in self._run_pipeline(run_id, {**params, "ticker": ticker}, speed): + yield evt + + # Phase 3: Portfolio + yield self._log("[MOCK] Phase 3/3: Portfolio manager…") + async for evt in self._agent_no_tool( + run_id, "portfolio_manager", "PORTFOLIO", "gpt-4o", 2.5, 740, 520, speed + ): + yield evt + + yield self._log("[MOCK] Auto run completed.") + + # ------------------------------------------------------------------ + # Building blocks + # ------------------------------------------------------------------ + + async def _agent_with_tool( + self, + run_id: str, + node: str, + identifier: str, + model: str, + latency: float, + tok_in: int, + tok_out: int, + speed: float, + tool_name: str, + ) -> AsyncGenerator[Dict[str, Any], None]: + yield self._thought(node, identifier, model, f"[MOCK] {node} analysing {identifier}…") + await self._sleep(0.4, speed) + + yield self._tool_call(node, identifier, tool_name, f'{{"ticker": "{identifier}"}}') + await self._sleep(0.6, speed) + + yield self._tool_result(node, identifier, tool_name, + f"[MOCK] Retrieved {tool_name} data for {identifier}.") + await self._sleep(latency - 1.0, speed) + + yield self._result(node, identifier, model, tok_in, tok_out, round(latency * 1000), + f"[MOCK] {node.replace('_', ' ').title()} analysis complete for {identifier}.") + + async def _agent_no_tool( + self, + run_id: str, + node: str, + identifier: str, + model: str, + latency: float, + tok_in: int, + tok_out: int, + speed: float, + ) -> AsyncGenerator[Dict[str, Any], None]: + yield self._thought(node, identifier, model, f"[MOCK] {node} processing {identifier}…") + await self._sleep(latency, speed) + + yield self._result(node, identifier, model, tok_in, tok_out, round(latency * 1000), + f"[MOCK] {node.replace('_', ' ').title()} decision for {identifier}.") + + # ------------------------------------------------------------------ + # Event constructors + # ------------------------------------------------------------------ + + @staticmethod + def _ns() -> str: + return str(time.time_ns()) + + def _log(self, message: str) -> Dict[str, Any]: + return { + "id": f"log_{self._ns()}", + "node_id": "__system__", + "type": "log", + "agent": "SYSTEM", + "identifier": "", + "message": message, + "metrics": {}, + } + + def _thought(self, node: str, identifier: str, model: str, message: str) -> Dict[str, Any]: + return { + "id": f"thought_{self._ns()}", + "node_id": node, + "parent_node_id": "start", + "type": "thought", + "agent": node.upper(), + "identifier": identifier, + "message": message, + "prompt": f"[MOCK PROMPT] Analyse {identifier} using available data.", + "metrics": {"model": model}, + } + + def _tool_call(self, node: str, identifier: str, tool: str, inp: str) -> Dict[str, Any]: + return { + "id": f"tool_{self._ns()}", + "node_id": f"tool_{tool}", + "parent_node_id": node, + "type": "tool", + "agent": node.upper(), + "identifier": identifier, + "message": f"▶ Tool: {tool} | {inp}", + "prompt": inp, + "metrics": {}, + } + + def _tool_result(self, node: str, identifier: str, tool: str, output: str) -> Dict[str, Any]: + return { + "id": f"tool_res_{self._ns()}", + "node_id": f"tool_{tool}", + "parent_node_id": node, + "type": "tool_result", + "agent": node.upper(), + "identifier": identifier, + "message": f"✓ Tool result: {tool} | {output}", + "response": output, + "metrics": {}, + } + + def _result( + self, + node: str, + identifier: str, + model: str, + tok_in: int, + tok_out: int, + latency_ms: int, + message: str, + ) -> Dict[str, Any]: + return { + "id": f"result_{self._ns()}", + "node_id": node, + "type": "result", + "agent": node.upper(), + "identifier": identifier, + "message": message, + "response": f"[MOCK RESPONSE] {message}", + "metrics": { + "model": model, + "tokens_in": tok_in, + "tokens_out": tok_out, + "latency_ms": latency_ms, + }, + } + + @staticmethod + async def _sleep(seconds: float, speed: float) -> None: + delay = max(seconds / speed, 0.01) + await asyncio.sleep(delay) diff --git a/agent_os/frontend/src/Dashboard.tsx b/agent_os/frontend/src/Dashboard.tsx index 80317f14..408fe38b 100644 --- a/agent_os/frontend/src/Dashboard.tsx +++ b/agent_os/frontend/src/Dashboard.tsx @@ -35,7 +35,7 @@ import { Collapse, useToast, } from '@chakra-ui/react'; -import { LayoutDashboard, Wallet, Settings, Terminal as TerminalIcon, ChevronRight, Eye, Search, BarChart3, Bot, ChevronDown, ChevronUp, Trash2 } from 'lucide-react'; +import { LayoutDashboard, Wallet, Settings, Terminal as TerminalIcon, ChevronRight, Eye, Search, BarChart3, Bot, ChevronDown, ChevronUp, FlaskConical, Trash2 } from 'lucide-react'; import { MetricHeader } from './components/MetricHeader'; import { AgentGraph } from './components/AgentGraph'; import { PortfolioViewer } from './components/PortfolioViewer'; @@ -45,12 +45,17 @@ import axios from 'axios'; const API_BASE = 'http://127.0.0.1:8088/api'; // ─── Run type definitions with required parameters ──────────────────── -type RunType = 'scan' | 'pipeline' | 'portfolio' | 'auto'; +type RunType = 'scan' | 'pipeline' | 'portfolio' | 'auto' | 'mock'; + +/** Mock-specific sub-type. */ +type MockType = 'pipeline' | 'scan' | 'auto'; interface RunParams { date: string; ticker: string; portfolio_id: string; + mock_type: MockType; + speed: string; force: boolean; } @@ -59,6 +64,7 @@ const RUN_TYPE_LABELS: Record = { pipeline: 'Pipeline', portfolio: 'Portfolio', auto: 'Auto', + mock: 'Mock', }; /** Which params each run type needs. */ @@ -67,6 +73,7 @@ const REQUIRED_PARAMS: Record = { pipeline: ['ticker', 'date'], portfolio: ['date', 'portfolio_id'], auto: ['date', 'portfolio_id'], + mock: [], }; /** Return the colour token for a given event type. */ @@ -266,10 +273,13 @@ const EventDetail: React.FC<{ event: AgentEvent; onOpenModal?: (evt: AgentEvent) ); // ─── Detail drawer showing all events for a given graph node ────────── -const NodeEventsDetail: React.FC<{ nodeId: string; events: AgentEvent[]; onOpenModal: (evt: AgentEvent) => void }> = ({ nodeId, events, onOpenModal }) => { +const NodeEventsDetail: React.FC<{ nodeId: string; identifier?: string | null; events: AgentEvent[]; onOpenModal: (evt: AgentEvent) => void }> = ({ nodeId, identifier, events, onOpenModal }) => { const nodeEvents = useMemo( - () => events.filter((e) => e.node_id === nodeId), - [events, nodeId], + () => events.filter((e) => + e.node_id === nodeId && + (!identifier || e.identifier === identifier) + ), + [events, nodeId, identifier], ); if (nodeEvents.length === 0) { @@ -307,6 +317,7 @@ export const Dashboard: React.FC = () => { const [drawerMode, setDrawerMode] = useState<'event' | 'node'>('event'); const [selectedEvent, setSelectedEvent] = useState(null); const [selectedNodeId, setSelectedNodeId] = useState(null); + const [selectedNodeIdentifier, setSelectedNodeIdentifier] = useState(null); // Parameter inputs const [showParams, setShowParams] = useState(false); @@ -314,6 +325,8 @@ export const Dashboard: React.FC = () => { date: new Date().toISOString().split('T')[0], ticker: 'AAPL', portfolio_id: 'main_portfolio', + mock_type: 'pipeline', + speed: '3', force: false, }); @@ -333,12 +346,14 @@ export const Dashboard: React.FC = () => { const isRunning = isTriggering || status === 'streaming' || status === 'connecting'; - const startRun = async (type: RunType) => { + const startRun = async (type: RunType, overrideParams?: Partial) => { if (isRunning) return; + const effectiveParams = { ...params, ...overrideParams }; + // Validate required params const required = REQUIRED_PARAMS[type]; - const missing = required.filter((k) => { const v = params[k]; return typeof v === 'string' ? !v.trim() : !v; }); + const missing = required.filter((k) => { const v = effectiveParams[k]; return typeof v === 'string' ? !v.trim() : !v; }); if (missing.length > 0) { toast({ title: `Missing required fields for ${RUN_TYPE_LABELS[type]}`, @@ -351,17 +366,30 @@ export const Dashboard: React.FC = () => { setShowParams(true); return; } - + setIsTriggering(true); setActiveRunType(type); try { clearEvents(); - const res = await axios.post(`${API_BASE}/run/${type}`, { - portfolio_id: params.portfolio_id, - date: params.date, - ticker: params.ticker, - force: params.force, - }); + // For mock auto runs, parse comma-separated tickers into an array + const mockTickers = effectiveParams.mock_type === 'auto' + ? effectiveParams.ticker.split(',').map((t) => t.trim().toUpperCase()).filter(Boolean) + : undefined; + const body = type === 'mock' + ? { + mock_type: effectiveParams.mock_type, + ticker: effectiveParams.ticker.split(',')[0].trim().toUpperCase(), + ...(mockTickers && mockTickers.length > 1 ? { tickers: mockTickers } : {}), + date: effectiveParams.date, + speed: parseFloat(effectiveParams.speed) || 3, + } + : { + portfolio_id: effectiveParams.portfolio_id, + date: effectiveParams.date, + ticker: effectiveParams.ticker, + force: effectiveParams.force, + }; + const res = await axios.post(`${API_BASE}/run/${type}`, body); setActiveRunId(res.data.run_id); } catch (err) { console.error("Failed to start run:", err); @@ -371,6 +399,16 @@ export const Dashboard: React.FC = () => { } }; + /** Re-run triggered from a graph node's Re-run button. */ + const handleNodeRerun = useCallback((identifier: string, _nodeId: string) => { + if (identifier === 'MARKET' || identifier === '') { + startRun('scan'); + } else { + startRun('pipeline', { ticker: identifier }); + } + // eslint-disable-next-line react-hooks/exhaustive-deps + }, [isRunning, params]); + const resetPortfolioStage = async () => { if (!params.date || !params.portfolio_id) { toast({ title: 'Date and Portfolio ID are required', status: 'warning', duration: 3000, isClosable: true, position: 'top' }); @@ -407,9 +445,10 @@ export const Dashboard: React.FC = () => { }, [onOpen]); /** Open the drawer showing all events for a graph node (node click). */ - const openNodeDetail = useCallback((nodeId: string) => { + const openNodeDetail = useCallback((nodeId: string, identifier?: string) => { setDrawerMode('node'); setSelectedNodeId(nodeId); + setSelectedNodeIdentifier(identifier || null); setSelectedEvent(null); onOpen(); }, [onOpen]); @@ -417,7 +456,7 @@ export const Dashboard: React.FC = () => { // Derive a readable drawer title const drawerTitle = drawerMode === 'event' ? `Event: ${selectedEvent?.agent ?? ''} — ${selectedEvent?.type ?? ''}` - : `Node: ${selectedNodeId ?? ''}`; + : `Node: ${selectedNodeId ?? ''}${selectedNodeIdentifier ? ` · ${selectedNodeIdentifier}` : ''}`; return ( @@ -466,7 +505,7 @@ export const Dashboard: React.FC = () => { {/* Left Side: Graph Area */} - + {/* Floating Control Panel */} @@ -475,13 +514,13 @@ export const Dashboard: React.FC = () => { {(['scan', 'pipeline', 'portfolio', 'auto'] as RunType[]).map((type) => { const isThisRunning = isRunning && activeRunType === type; const isOtherRunning = isRunning && activeRunType !== type; - const icons: Record = { + const icons: Record = { scan: , pipeline: , portfolio: , auto: , }; - const colors: Record = { + const colors: Record = { scan: 'cyan', pipeline: 'blue', portfolio: 'purple', @@ -504,6 +543,21 @@ export const Dashboard: React.FC = () => { ); })} + {/* Mock run button — no LLM calls */} + + + + ))} + + + + Speed: + + {[['1×', '1'], ['3×', '3'], ['5×', '5'], ['10×', '10']].map(([label, val]) => ( + + ))} + + + { - Required: Scan → date · Pipeline → ticker, date · Portfolio → date, portfolio · Auto → date, portfolio + Required: Scan → date · Pipeline → ticker, date · Portfolio → date, portfolio · Auto → date, portfolio · Mock → no API calls @@ -651,7 +741,7 @@ export const Dashboard: React.FC = () => { )} {drawerMode === 'node' && selectedNodeId && ( - + )} diff --git a/agent_os/frontend/src/components/AgentGraph.tsx b/agent_os/frontend/src/components/AgentGraph.tsx index 1205eaa0..c51a5d80 100644 --- a/agent_os/frontend/src/components/AgentGraph.tsx +++ b/agent_os/frontend/src/components/AgentGraph.tsx @@ -1,8 +1,8 @@ import React, { useEffect, useRef, useCallback } from 'react'; -import ReactFlow, { - Background, - Controls, - Node, +import ReactFlow, { + Background, + Controls, + Node, Edge, Handle, Position, @@ -11,208 +11,462 @@ import ReactFlow, { useEdgesState, } from 'reactflow'; import 'reactflow/dist/style.css'; -import { Box, Text, Flex, Icon, Badge } from '@chakra-ui/react'; -import { Cpu, Settings, Database, TrendingUp, Clock } from 'lucide-react'; +import { Box, Text, Flex, Icon, Badge, IconButton, Tooltip } from '@chakra-ui/react'; +import { Cpu, Settings, Database, TrendingUp, Clock, RefreshCw } from 'lucide-react'; import { AgentEvent } from '../hooks/useAgentStream'; -// --- Custom Agent Node Component --- +// ─── Layout constants ───────────────────────────────────────────────────────── +const COL_WIDTH = 230; // horizontal space per ticker column / scan slot +const ROW_HEIGHT = 148; // vertical space per agent row within a column +const SCAN_TOP_Y = 40; // y offset for the first scan row +const SCAN_ROW_H = 148; // vertical gap between scan rows +// Phase-1 scanners align with the first 3 ticker columns (x = 0, COL_WIDTH, COL_WIDTH*2) +const SCAN_CENTER_X = COL_WIDTH; // x for phase-2/3 scan nodes (centre of 3) +const TICKER_GAP = 80; // vertical gap between scan phase and ticker headers +const TICKER_HDR_H = 108; // height reserved for the ticker header card + +// ─── Node classification ────────────────────────────────────────────────────── +const SCAN_PHASE1 = ['geopolitical_scanner', 'market_movers_scanner', 'sector_scanner']; +const SCAN_PHASE2 = new Set(['industry_deep_dive']); +const SCAN_PHASE3 = new Set(['macro_synthesis']); +const SCAN_ALL = new Set([...SCAN_PHASE1, ...SCAN_PHASE2, ...SCAN_PHASE3]); + +type NodeKind = 'scan' | 'ticker' | 'portfolio' | 'skip'; + +function classifyNode(nodeId: string, identifier: string): NodeKind { + if (nodeId.startsWith('tool_')) return 'skip'; + if (identifier === 'MARKET' || SCAN_ALL.has(nodeId)) return 'scan'; + if (identifier === 'PORTFOLIO' || nodeId === 'portfolio_manager' || + nodeId === 'make_pm_decision') return 'portfolio'; + if (identifier) return 'ticker'; + return 'skip'; +} + +// ─── Colour helpers ─────────────────────────────────────────────────────────── +const STATUS_COLORS: Record = { + running: '#4fd1c5', + completed: '#68d391', + error: '#fc8181', +}; +const DEFAULT_COLOR = 'rgba(255,255,255,0.25)'; + +function statusColor(status: string): string { + return STATUS_COLORS[status] ?? DEFAULT_COLOR; +} + +const ID_PALETTE = [ + '#63b3ed', '#9f7aea', '#f6ad55', '#4fd1c5', + '#f687b3', '#f6e05e', '#68d391', '#fc8181', +]; +function identifierColor(id: string): string { + if (!id || id === 'MARKET') return '#4fd1c5'; + if (id === 'PORTFOLIO') return '#9f7aea'; + let h = 0; + for (let i = 0; i < id.length; i++) h = (h * 31 + id.charCodeAt(i)) & 0xffff; + return ID_PALETTE[h % ID_PALETTE.length]; +} + +function toLabel(nodeId: string): string { + return nodeId.replace(/_/g, ' ').replace(/\b\w/g, c => c.toUpperCase()); +} + +// ─── Agent Node ─────────────────────────────────────────────────────────────── const AgentNode = ({ data }: NodeProps) => { - const getIcon = (agent: string) => { - switch (agent.toUpperCase()) { - case 'ANALYST': return Cpu; - case 'RESEARCHER': return Database; - case 'TRADER': return TrendingUp; - default: return Settings; - } + const getIcon = (agent = '') => { + const a = agent.toUpperCase(); + if (a.includes('ANALYST') || a.includes('SCANNER')) return Cpu; + if (a.includes('RESEARCHER') || a.includes('MANAGER') || a.includes('SYNTHESIS') || a.includes('DIVE')) return Database; + if (a.includes('TRADER') || a.includes('RISK') || a.includes('JUDGE')) return TrendingUp; + return Settings; }; - const getStatusColor = (status: string) => { - switch (status) { - case 'running': return 'cyan.400'; - case 'completed': return 'green.400'; - case 'error': return 'red.400'; - default: return 'whiteAlpha.500'; - } - }; + const sc = statusColor(data.status); + const canRerun = data.status === 'completed' || data.status === 'error'; + const totalTok = (data.metrics?.tokens_in ?? 0) + (data.metrics?.tokens_out ?? 0); return ( - - - - - - - {data.agent} - {data.status === 'completed' && ( - Done - )} + + + + {/* Header row */} + + + + {data.label} + + {data.status === 'completed' && } + {data.status === 'error' && } - - - + + + + {/* Metrics row */} - - {data.metrics?.latency_ms || 0}ms + + + {data.metrics?.latency_ms ? `${data.metrics.latency_ms}ms` : '—'} + - {data.metrics?.model && data.metrics.model !== 'unknown' && ( - {data.metrics.model} + {totalTok > 0 && ( + + {totalTok.toLocaleString()} tok + )} - + + {data.metrics?.model && data.metrics.model !== 'unknown' && ( + + {data.metrics.model} + + )} + + {/* Running shimmer */} {data.status === 'running' && ( - - - + + + + )} + + {/* Re-run */} + {canRerun && data.onRerun && ( + + } + size="xs" variant="ghost" colorScheme="cyan" alignSelf="flex-end" + onClick={(e) => { e.stopPropagation(); data.onRerun(); }} + /> + )} - + ); }; -const nodeTypes = { - agentNode: AgentNode, +// ─── Ticker Header Node ─────────────────────────────────────────────────────── +const TickerHeaderNode = ({ data }: NodeProps) => { + const color = identifierColor(data.ticker); + const sc = statusColor(data.status ?? 'running'); + const done = data.completedCount ?? 0; + const total = data.agentCount ?? 0; + + return ( + + + + + + + {data.ticker} + + {/* Status pulse dot */} + + + + + + + Pipeline + {total > 0 && ( + + {done}/{total} done + + )} + + + + + + ); }; -interface AgentGraphProps { - events: AgentEvent[]; - onNodeClick?: (nodeId: string) => void; +const nodeTypes = { agentNode: AgentNode, tickerHeader: TickerHeaderNode }; + +// ─── Layout state ───────────────────────────────────────────────────────────── +interface LayoutState { + // Scan phase + scanPhase1Count: number; + scanLastY: number; + hasScan: boolean; + lastScanNodeId: string | null; + // Ticker columns + identifierToCol: Map; + identifierLastNode: Map; + identifierAgentRow: Map; + identifierAgentCount: Map; + identifierDoneCount: Map; + colCount: number; + tickerStartY: number; + maxTickerY: number; + // Tracking + seenNodeIds: Set; + seenEdgeIds: Set; + processedCount: number; } -export const AgentGraph: React.FC = ({ events, onNodeClick }) => { +function freshLayout(): LayoutState { + return { + scanPhase1Count: 0, + scanLastY: 0, + hasScan: false, + lastScanNodeId: null, + identifierToCol: new Map(), + identifierLastNode: new Map(), + identifierAgentRow: new Map(), + identifierAgentCount: new Map(), + identifierDoneCount: new Map(), + colCount: 0, + tickerStartY: 0, + maxTickerY: 0, + seenNodeIds: new Set(), + seenEdgeIds: new Set(), + processedCount: 0, + }; +} + +// ─── Props ──────────────────────────────────────────────────────────────────── +interface AgentGraphProps { + events: AgentEvent[]; + onNodeClick?: (nodeId: string, identifier?: string) => void; + onNodeRerun?: (identifier: string, nodeId: string) => void; +} + +// ─── Component ──────────────────────────────────────────────────────────────── +export const AgentGraph: React.FC = ({ events, onNodeClick, onNodeRerun }) => { const [nodes, setNodes, onNodesChange] = useNodesState([]); const [edges, setEdges, onEdgesChange] = useEdgesState([]); - // Track which node_ids we have already added so we never duplicate - const seenNodeIds = useRef(new Set()); - const seenEdgeIds = useRef(new Set()); - // Track how many unique nodes exist for vertical layout - const nodeCount = useRef(0); - // Track the last processed event index to only process new events - const processedCount = useRef(0); + const ls = useRef(freshLayout()); useEffect(() => { - // Only process newly arrived events - const newEvents = events.slice(processedCount.current); + const newEvents = events.slice(ls.current.processedCount); if (newEvents.length === 0) return; - processedCount.current = events.length; + ls.current.processedCount = events.length; - const addedNodes: Node[] = []; - const addedEdges: Edge[] = []; - const updatedNodeData: Map> = new Map(); + const addedNodes: Node[] = []; + const addedEdges: Edge[] = []; + const patchMap: Map> = new Map(); for (const evt of newEvents) { - if (!evt.node_id || evt.node_id === '__system__') continue; + const nodeId = evt.node_id; + if (!nodeId || nodeId === '__system__') continue; - // Determine if this event means the node is completed - const isCompleted = evt.type === 'result' || evt.type === 'tool_result'; + const identifier = evt.identifier ?? ''; + const kind = classifyNode(nodeId, identifier); + if (kind === 'skip') continue; - if (!seenNodeIds.current.has(evt.node_id)) { - // New node — create it - seenNodeIds.current.add(evt.node_id); - nodeCount.current += 1; + const scopedId = identifier ? `${nodeId}:${identifier}` : nodeId; + const isResult = evt.type === 'result'; - addedNodes.push({ - id: evt.node_id, - type: 'agentNode', - position: { x: 250, y: nodeCount.current * 150 + 50 }, - data: { - agent: evt.agent, - status: isCompleted ? 'completed' : 'running', - metrics: evt.metrics, - }, + // ── Update path (node already exists) ────────────────────────────────── + if (ls.current.seenNodeIds.has(scopedId)) { + if (isResult && kind === 'ticker') { + const done = (ls.current.identifierDoneCount.get(identifier) ?? 0) + 1; + const total = ls.current.identifierAgentCount.get(identifier) ?? 0; + ls.current.identifierDoneCount.set(identifier, done); + patchMap.set(`header:${identifier}`, { + completedCount: done, + agentCount: total, + status: done >= total && total > 0 ? 'completed' : 'running', + }); + } + const prev = patchMap.get(scopedId); + const wasDone = prev?.status === 'completed'; + patchMap.set(scopedId, { + ...prev, + status: wasDone || isResult ? 'completed' : 'running', + metrics: evt.metrics ?? prev?.metrics, }); + continue; + } - // Add edge from parent (if applicable) - if (evt.parent_node_id && evt.parent_node_id !== 'start') { - const edgeId = `e-${evt.parent_node_id}-${evt.node_id}`; - if (!seenEdgeIds.current.has(edgeId)) { - seenEdgeIds.current.add(edgeId); - addedEdges.push({ - id: edgeId, - source: evt.parent_node_id, - target: evt.node_id, - animated: true, - style: { stroke: '#4fd1c5' }, - }); + // ── Create path (new node) ───────────────────────────────────────────── + ls.current.seenNodeIds.add(scopedId); + let x = 0, y = 0; + let parentScopedId: string | null = null; + + if (kind === 'scan') { + if (SCAN_PHASE1.includes(nodeId)) { + x = ls.current.scanPhase1Count * COL_WIDTH; + y = SCAN_TOP_Y; + ls.current.scanPhase1Count++; + } else { + const phaseRow = SCAN_PHASE2.has(nodeId) ? 1 : 2; + x = SCAN_CENTER_X; + y = SCAN_TOP_Y + phaseRow * SCAN_ROW_H; + if (evt.parent_node_id && evt.parent_node_id !== 'start') { + const pid = evt.parent_node_id; + parentScopedId = identifier ? `${pid}:${identifier}` : pid; } } - } else { - // Existing node — queue a status/metrics update - // Never revert a completed node back to running - const prev = updatedNodeData.get(evt.node_id); - const currentlyCompleted = prev?.status === 'completed'; - updatedNodeData.set(evt.node_id, { - status: currentlyCompleted || isCompleted ? 'completed' : 'running', - metrics: evt.metrics, + ls.current.hasScan = true; + ls.current.scanLastY = Math.max(ls.current.scanLastY, y); + ls.current.lastScanNodeId = scopedId; + + } else if (kind === 'ticker') { + // Create ticker header column if this is the first agent for this identifier + if (!ls.current.identifierToCol.has(identifier)) { + const col = ls.current.colCount++; + ls.current.identifierToCol.set(identifier, col); + ls.current.identifierAgentRow.set(identifier, 0); + ls.current.identifierAgentCount.set(identifier, 0); + ls.current.identifierDoneCount.set(identifier, 0); + + if (!ls.current.tickerStartY) { + ls.current.tickerStartY = ls.current.hasScan + ? ls.current.scanLastY + SCAN_ROW_H + TICKER_GAP + : SCAN_TOP_Y; + } + + const hx = col * COL_WIDTH; + const hy = ls.current.tickerStartY; + const hid = `header:${identifier}`; + + addedNodes.push({ + id: hid, type: 'tickerHeader', + position: { x: hx, y: hy }, + data: { ticker: identifier, status: 'running', agentCount: 0, completedCount: 0, + node_id: 'header', identifier }, + }); + ls.current.seenNodeIds.add(hid); + + // Fan-out edge: last scan node → this ticker header + const lastScan = ls.current.lastScanNodeId; + if (lastScan) { + const eid = `e-${lastScan}-${hid}`; + if (!ls.current.seenEdgeIds.has(eid)) { + ls.current.seenEdgeIds.add(eid); + addedEdges.push({ + id: eid, source: lastScan, target: hid, animated: true, + style: { stroke: '#4fd1c5', strokeDasharray: '5 5' }, + }); + } + } + } + + const col = ls.current.identifierToCol.get(identifier)!; + const agentRow = ls.current.identifierAgentRow.get(identifier)!; + ls.current.identifierAgentRow.set(identifier, agentRow + 1); + const newCount = (ls.current.identifierAgentCount.get(identifier) ?? 0) + 1; + ls.current.identifierAgentCount.set(identifier, newCount); + + x = col * COL_WIDTH; + y = ls.current.tickerStartY + TICKER_HDR_H + agentRow * ROW_HEIGHT; + ls.current.maxTickerY = Math.max(ls.current.maxTickerY, y); + + // Parent: previous node in same column (header → agent0 → agent1 → …) + parentScopedId = ls.current.identifierLastNode.get(identifier) ?? `header:${identifier}`; + ls.current.identifierLastNode.set(identifier, scopedId); + + // Update header agent count + patchMap.set(`header:${identifier}`, { + ...(patchMap.get(`header:${identifier}`) ?? {}), + agentCount: newCount, }); + + } else { + // portfolio + const totalW = ls.current.colCount * COL_WIDTH; + x = totalW > 0 ? totalW / 2 - 100 : SCAN_CENTER_X; + y = ls.current.maxTickerY + ROW_HEIGHT + TICKER_GAP; + } + + addedNodes.push({ + id: scopedId, type: 'agentNode', + position: { x, y }, + data: { + agent: evt.agent, + label: toLabel(nodeId), + identifier, + node_id: nodeId, + status: isResult ? 'completed' : 'running', + metrics: evt.metrics, + onRerun: onNodeRerun ? () => onNodeRerun(identifier, nodeId) : undefined, + }, + }); + + // Edge to parent + if (parentScopedId) { + const eid = `e-${parentScopedId}-${scopedId}`; + if (!ls.current.seenEdgeIds.has(eid)) { + ls.current.seenEdgeIds.add(eid); + addedEdges.push({ + id: eid, source: parentScopedId, target: scopedId, animated: true, + style: { stroke: '#4fd1c5' }, + }); + } } } - // Batch state updates - if (addedNodes.length > 0) { - setNodes((prev) => [...prev, ...addedNodes]); + if (addedNodes.length > 0) setNodes(prev => [...prev, ...addedNodes]); + if (addedEdges.length > 0) setEdges(prev => [...prev, ...addedEdges]); + if (patchMap.size > 0) { + setNodes(prev => prev.map(n => { + const patch = patchMap.get(n.id); + if (!patch) return n; + const finalStatus = n.data.status === 'completed' ? 'completed' : (patch.status ?? n.data.status); + return { ...n, data: { ...n.data, ...patch, status: finalStatus } }; + })); } - if (addedEdges.length > 0) { - setEdges((prev) => [...prev, ...addedEdges]); - } - if (updatedNodeData.size > 0) { - setNodes((prev) => - prev.map((n) => { - const patch = updatedNodeData.get(n.id); - if (!patch) return n; - // Never revert a completed node back to running - const finalStatus = n.data.status === 'completed' ? 'completed' : patch.status; - return { - ...n, - data: { ...n.data, ...patch, status: finalStatus, metrics: patch.metrics ?? n.data.metrics }, - }; - }), - ); - } - }, [events, setNodes, setEdges]); + }, [events, setNodes, setEdges, onNodeRerun]); - // Reset tracked state when the events array is cleared (new run) + // Reset on new run (events cleared) useEffect(() => { if (events.length === 0) { - seenNodeIds.current.clear(); - seenEdgeIds.current.clear(); - nodeCount.current = 0; - processedCount.current = 0; + ls.current = freshLayout(); setNodes([]); setEdges([]); } }, [events.length, setNodes, setEdges]); - const handleNodeClick = useCallback((_event: React.MouseEvent, node: Node) => { - onNodeClick?.(node.id); + const handleNodeClick = useCallback((_: React.MouseEvent, node: Node) => { + onNodeClick?.(node.data.node_id as string, node.data.identifier as string); }, [onNodeClick]); return ( - + = ({ events, onNodeClick }) = onNodeClick={handleNodeClick} nodeTypes={nodeTypes} fitView + fitViewOptions={{ padding: 0.15 }} > - + diff --git a/agent_os/frontend/src/hooks/useAgentStream.ts b/agent_os/frontend/src/hooks/useAgentStream.ts index 6ce3216c..bbf7d3f7 100644 --- a/agent_os/frontend/src/hooks/useAgentStream.ts +++ b/agent_os/frontend/src/hooks/useAgentStream.ts @@ -11,6 +11,8 @@ export interface AgentEvent { prompt?: string; /** Full response text (available on result & tool_result events). */ response?: string; + /** Ticker symbol (e.g. "AAPL"), "MARKET" for scans, or portfolio id. */ + identifier?: string; node_id?: string; parent_node_id?: string; metrics?: {