From a26c93463a7b0056074286ce9eee48d97a97a35e Mon Sep 17 00:00:00 2001 From: Ahmet Guzererler Date: Sun, 22 Mar 2026 21:54:13 +0100 Subject: [PATCH] feat: initialize AgentOS observability foundation - implement FastAPI backend with REST and WebSocket streaming - add node-level metrics (tokens, latency) to event protocol - design literal graph and top 3 metrics (Sharpe, Regime, Drawdown) - scaffold React frontend with Chakra UI and useAgentStream hook - add DESIGN.md and .env.example --- agent_os/DESIGN.md | 44 +++++++ agent_os/backend/.env.example | 16 +++ agent_os/backend/dependencies.py | 15 +++ agent_os/backend/main.py | 27 +++++ agent_os/backend/routes/portfolios.py | 49 ++++++++ agent_os/backend/routes/runs.py | 92 +++++++++++++++ agent_os/backend/routes/websocket.py | 109 ++++++++++++++++++ agent_os/backend/services/langgraph_engine.py | 35 ++++++ agent_os/frontend/README.md | 29 +++++ agent_os/frontend/useAgentStream.ts | 80 +++++++++++++ 10 files changed, 496 insertions(+) create mode 100644 agent_os/DESIGN.md create mode 100644 agent_os/backend/.env.example create mode 100644 agent_os/backend/dependencies.py create mode 100644 agent_os/backend/main.py create mode 100644 agent_os/backend/routes/portfolios.py create mode 100644 agent_os/backend/routes/runs.py create mode 100644 agent_os/backend/routes/websocket.py create mode 100644 agent_os/backend/services/langgraph_engine.py create mode 100644 agent_os/frontend/README.md create mode 100644 agent_os/frontend/useAgentStream.ts diff --git a/agent_os/DESIGN.md b/agent_os/DESIGN.md new file mode 100644 index 00000000..87888bdc --- /dev/null +++ b/agent_os/DESIGN.md @@ -0,0 +1,44 @@ +# AgentOS: Visual Observability Design + +## 1. The Literal Graph Visualization (Agent Map) + +The agent map is a directed graph (DAG) representing the LangGraph workflow in real-time. + +### Implementation Strategy +- **Frontend:** Powered by **React Flow**. Nodes are added and connected as WebSocket events arrive. +- **Node Data Contract:** + - `node_id`: Unique identifier for the graph node. + - `parent_node_id`: For building edges in real-time. + - `metrics`: `{ "tokens_in": int, "tokens_out": int, "latency_ms": float, "model": str }`. +- **Interactivity:** Clicking a node opens an **Inspector Drawer** showing: + - **LLM Metrics:** Model name, Request/Response tokens, Latency (ms). + - **Payload:** Raw JSON response and rationale. + +### Pause & Restart (Next Phase TODO) +- **Interrupts:** Use LangGraph's `interrupt_before` features to halt execution at specific nodes (e.g., `trader_node`). +- **Control API:** `POST /api/run/{run_id}/resume` to signal the graph to continue. + +--- + +## 2. The "Top 3" Metrics Consensus + +Synthetic consensus between **Economist** (Efficiency/Risk) and **UI Designer** (Clarity/Action): + +1. **Trailing 30-Day Sharpe Ratio (Risk-Adjusted Efficiency)** + - *Economist:* "Absolute P&L is vanity; we need to know the quality of the returns." + - *Display:* Large gauge showing trading efficiency. + +2. **Current Market Regime & Beta (Macro Alignment)** + - *Economist:* "Signals if we are riding the trend or fighting it." + - *Display:* Status badge (BULL/BEAR) + Beta value relative to S&P 500. + +3. **Real-Time Drawdown & 1-Day VaR (Capital Preservation)** + - *UI Designer:* "The 'Red Alert' metric. It must be visible if we are losing capital." + - *Display:* Percentage bar showing distance from the All-Time High. + +--- + +## 3. Tech Stack +- **Backend:** FastAPI, LangChain, Supabase (Postgres). +- **Frontend:** React, Chakra UI, React Flow, Axios. +- **Protocol:** REST for triggers, WebSockets for live streaming. diff --git a/agent_os/backend/.env.example b/agent_os/backend/.env.example new file mode 100644 index 00000000..bc0c2276 --- /dev/null +++ b/agent_os/backend/.env.example @@ -0,0 +1,16 @@ +# AgentOS Backend Configuration +PORT=8000 +HOST=0.0.0.0 + +# Database (Supabase Postgres) +# Use the same connection string from your TradingAgents root .env +SUPABASE_CONNECTION_STRING=postgresql://postgres:[PASSWORD]@db.[PROJECT_ID].supabase.co:5432/postgres + +# Multi-Tenancy (Future) +SUPABASE_JWT_SECRET=your_secret_here + +# LLM Providers (for LangGraphEngine) +OPENAI_API_KEY=sk-... +ANTHROPIC_API_KEY=sk-ant-... +OPENROUTER_API_KEY=... +GOOGLE_API_KEY=... diff --git a/agent_os/backend/dependencies.py b/agent_os/backend/dependencies.py new file mode 100644 index 00000000..671a72e7 --- /dev/null +++ b/agent_os/backend/dependencies.py @@ -0,0 +1,15 @@ +from typing import Dict, Any +from fastapi import Depends, HTTPException +from tradingagents.portfolio.supabase_client import SupabaseClient +from tradingagents.portfolio.exceptions import PortfolioError + +async def get_current_user(): + # V1 (Single Tenant): Just return a hardcoded user/workspace ID + # V2 (Multi-Tenant): Decode the JWT using supabase-py and return auth.uid() + return {"user_id": "tenant_001", "role": "admin"} + +def get_db_client() -> SupabaseClient: + try: + return SupabaseClient.get_instance() + except PortfolioError as e: + raise HTTPException(status_code=500, detail=str(e)) diff --git a/agent_os/backend/main.py b/agent_os/backend/main.py new file mode 100644 index 00000000..89137352 --- /dev/null +++ b/agent_os/backend/main.py @@ -0,0 +1,27 @@ +from fastapi import FastAPI +from fastapi.middleware.cors import CORSMiddleware +from agent_os.backend.routes import portfolios, runs, websocket + +app = FastAPI(title="AgentOS API") + +# --- CORS Middleware --- +app.add_middleware( + CORSMiddleware, + allow_origins=["*"], # In production, restrict to your React app's URL + allow_credentials=True, + allow_methods=["*"], + allow_headers=["*"], +) + +# --- Include Routes --- +app.include_router(portfolios.router) +app.include_router(runs.router) +app.include_router(websocket.router) + +@app.get("/") +async def health_check(): + return {"status": "ok", "service": "AgentOS API"} + +if __name__ == "__main__": + import uvicorn + uvicorn.run(app, host="0.0.0.0", port=8000) diff --git a/agent_os/backend/routes/portfolios.py b/agent_os/backend/routes/portfolios.py new file mode 100644 index 00000000..6f65174e --- /dev/null +++ b/agent_os/backend/routes/portfolios.py @@ -0,0 +1,49 @@ +from fastapi import APIRouter, Depends, HTTPException +from typing import List, Any +from agent_os.backend.dependencies import get_current_user, get_db_client +from tradingagents.portfolio.supabase_client import SupabaseClient +from tradingagents.portfolio.exceptions import PortfolioNotFoundError + +router = APIRouter(prefix="/api/portfolios", tags=["portfolios"]) + +@router.get("/") +async def list_portfolios( + user: dict = Depends(get_current_user), + db: SupabaseClient = Depends(get_db_client) +): + # In V2, we would filter by user_id + portfolios = db.list_portfolios() + return [p.to_dict() for p in portfolios] + +@router.get("/{portfolio_id}") +async def get_portfolio( + portfolio_id: str, + user: dict = Depends(get_current_user), + db: SupabaseClient = Depends(get_db_client) +): + try: + portfolio = db.get_portfolio(portfolio_id) + return portfolio.to_dict() + except PortfolioNotFoundError: + raise HTTPException(status_code=404, detail="Portfolio not found") + +@router.get("/{portfolio_id}/latest") +async def get_latest_portfolio_state( + portfolio_id: str, + user: dict = Depends(get_current_user), + db: SupabaseClient = Depends(get_db_client) +): + try: + portfolio = db.get_portfolio(portfolio_id) + snapshot = db.get_latest_snapshot(portfolio_id) + holdings = db.list_holdings(portfolio_id) + trades = db.list_trades(portfolio_id, limit=10) + + return { + "portfolio": portfolio.to_dict(), + "snapshot": snapshot.to_dict() if snapshot else None, + "holdings": [h.to_dict() for h in holdings], + "recent_trades": [t.to_dict() for t in trades] + } + except PortfolioNotFoundError: + raise HTTPException(status_code=404, detail="Portfolio not found") diff --git a/agent_os/backend/routes/runs.py b/agent_os/backend/routes/runs.py new file mode 100644 index 00000000..d95751bf --- /dev/null +++ b/agent_os/backend/routes/runs.py @@ -0,0 +1,92 @@ +from fastapi import APIRouter, Depends, BackgroundTasks, HTTPException +from typing import Dict, Any, List +import uuid +import time +from agent_os.backend.dependencies import get_current_user +from agent_os.backend.services.langgraph_engine import LangGraphEngine + +router = APIRouter(prefix="/api/run", tags=["runs"]) + +# In-memory store for demo (should be replaced by Redis/DB for persistence) +runs: Dict[str, Dict[str, Any]] = {} + +engine = LangGraphEngine() + +@router.post("/scan") +async def trigger_scan( + background_tasks: BackgroundTasks, + params: Dict[str, Any] = None, + user: dict = Depends(get_current_user) +): + run_id = str(uuid.uuid4()) + runs[run_id] = { + "id": run_id, + "type": "scan", + "status": "queued", + "created_at": time.time(), + "user_id": user["user_id"] + } + background_tasks.add_task(engine.run_scan, run_id, params or {}) + return {"run_id": run_id, "status": "queued"} + +@router.post("/pipeline") +async def trigger_pipeline( + background_tasks: BackgroundTasks, + params: Dict[str, Any] = None, + user: dict = Depends(get_current_user) +): + run_id = str(uuid.uuid4()) + runs[run_id] = { + "id": run_id, + "type": "pipeline", + "status": "queued", + "created_at": time.time(), + "user_id": user["user_id"] + } + background_tasks.add_task(engine.run_pipeline, run_id, params or {}) + return {"run_id": run_id, "status": "queued"} + +@router.post("/portfolio") +async def trigger_portfolio( + background_tasks: BackgroundTasks, + params: Dict[str, Any] = None, + user: dict = Depends(get_current_user) +): + run_id = str(uuid.uuid4()) + runs[run_id] = { + "id": run_id, + "type": "portfolio", + "status": "queued", + "created_at": time.time(), + "user_id": user["user_id"] + } + background_tasks.add_task(engine.run_portfolio, run_id, params or {}) + return {"run_id": run_id, "status": "queued"} + +@router.post("/auto") +async def trigger_auto( + background_tasks: BackgroundTasks, + params: Dict[str, Any] = None, + user: dict = Depends(get_current_user) +): + run_id = str(uuid.uuid4()) + runs[run_id] = { + "id": run_id, + "type": "auto", + "status": "queued", + "created_at": time.time(), + "user_id": user["user_id"] + } + background_tasks.add_task(engine.run_auto, run_id, params or {}) + return {"run_id": run_id, "status": "queued"} + +@router.get("/") +async def list_runs(user: dict = Depends(get_current_user)): + # Filter by user in production + return list(runs.values()) + +@router.get("/{run_id}") +async def get_run_status(run_id: str, user: dict = Depends(get_current_user)): + if run_id not in runs: + raise HTTPException(status_code=404, detail="Run not found") + return runs[run_id] diff --git a/agent_os/backend/routes/websocket.py b/agent_os/backend/routes/websocket.py new file mode 100644 index 00000000..a9aabd7b --- /dev/null +++ b/agent_os/backend/routes/websocket.py @@ -0,0 +1,109 @@ +from fastapi import APIRouter, WebSocket, WebSocketDisconnect, Depends +import asyncio +import time +import uuid +from typing import Dict, Any +from agent_os.backend.dependencies import get_current_user + +router = APIRouter(prefix="/ws", tags=["websocket"]) + +@router.websocket("/stream/{run_id}") +async def websocket_endpoint( + websocket: WebSocket, + run_id: str, + # user: dict = Depends(get_current_user) # In V2, validate token from query string +): + await websocket.accept() + print(f"WebSocket client connected to run: {run_id}") + + try: + # For now, we use a mock stream. + # In a real implementation, this would subscribe to an event queue or a database stream + # that's being populated by the BackgroundTask running the LangGraph. + + mock_events = [ + { + "id": "node_1", + "node_id": "analyst_node", + "parent_node_id": "start", + "type": "thought", + "agent": "ANALYST", + "message": "Evaluating market data...", + "metrics": { + "model": "gpt-4-turbo", + "tokens_in": 120, + "tokens_out": 45, + "latency_ms": 450 + } + }, + { + "id": "node_2", + "node_id": "tool_node", + "parent_node_id": "analyst_node", + "type": "tool", + "agent": "ANALYST", + "message": "> Tool Call: get_news_sentiment", + "metrics": { + "latency_ms": 800 + } + }, + { + "id": "node_3", + "node_id": "research_node", + "parent_node_id": "analyst_node", + "type": "thought", + "agent": "RESEARCHER", + "message": "Synthesizing industry trends...", + "metrics": { + "model": "claude-3-opus", + "tokens_in": 800, + "tokens_out": 300, + "latency_ms": 2200 + } + }, + { + "id": "node_4", + "node_id": "trader_node", + "parent_node_id": "research_node", + "type": "result", + "agent": "TRADER", + "message": "Action determined: BUY VLO", + "details": { + "model_used": "gpt-4-turbo", + "latency_ms": 1200, + "input_tokens": 450, + "output_tokens": 120, + "raw_json_response": '{"action": "buy", "ticker": "VLO"}' + }, + "metrics": { + "model": "gpt-4-turbo", + "tokens_in": 450, + "tokens_out": 120, + "latency_ms": 1200 + } + } + ] + + for evt in mock_events: + payload = { + "id": evt["id"], + "node_id": evt["node_id"], + "parent_node_id": evt["parent_node_id"], + "timestamp": time.strftime("%H:%M:%S"), + "agent": evt["agent"], + "tier": "mid" if evt["agent"] == "ANALYST" else "deep", + "type": evt["type"], + "message": evt["message"], + "details": evt.get("details"), + "metrics": evt.get("metrics") + } + await websocket.send_json(payload) + await asyncio.sleep(2) # Simulating execution delay + + await websocket.send_json({"type": "system", "message": "Run completed."}) + + except WebSocketDisconnect: + print(f"WebSocket client disconnected from run {run_id}") + except Exception as e: + await websocket.send_json({"type": "system", "message": f"Error: {str(e)}"}) + await websocket.close() diff --git a/agent_os/backend/services/langgraph_engine.py b/agent_os/backend/services/langgraph_engine.py new file mode 100644 index 00000000..90416803 --- /dev/null +++ b/agent_os/backend/services/langgraph_engine.py @@ -0,0 +1,35 @@ +import asyncio +import time +from typing import Dict, Any + +class LangGraphEngine: + """Orchestrates LangGraph pipeline executions for the AgentOS API.""" + + def __init__(self): + # This is where you would import and setup your LangGraph workflows + # e.g., from tradingagents.graph.setup import setup_trading_graph + pass + + async def run_scan(self, run_id: str, params: Dict[str, Any]): + print(f"Engine: Starting SCAN {run_id} with params {params}") + # Placeholder for actual scanner graph execution + await asyncio.sleep(15) + print(f"Engine: SCAN {run_id} completed") + + async def run_pipeline(self, run_id: str, params: Dict[str, Any]): + print(f"Engine: Starting PIPELINE {run_id} with params {params}") + # Placeholder for actual analysis pipeline execution + await asyncio.sleep(20) + print(f"Engine: PIPELINE {run_id} completed") + + async def run_portfolio(self, run_id: str, params: Dict[str, Any]): + print(f"Engine: Starting PORTFOLIO rebalance {run_id} with params {params}") + # Placeholder for actual portfolio manager graph execution + await asyncio.sleep(10) + print(f"Engine: PORTFOLIO {run_id} completed") + + async def run_auto(self, run_id: str, params: Dict[str, Any]): + print(f"Engine: Starting AUTO {run_id} with params {params}") + # Placeholder for full automated trading cycle + await asyncio.sleep(30) + print(f"Engine: AUTO {run_id} completed") diff --git a/agent_os/frontend/README.md b/agent_os/frontend/README.md new file mode 100644 index 00000000..5000490d --- /dev/null +++ b/agent_os/frontend/README.md @@ -0,0 +1,29 @@ +# AgentOS Frontend + +This is a React-based observability dashboard for TradingAgents. + +## Tech Stack +- **Framework:** React (Vite) +- **UI Library:** Chakra UI +- **State Management:** React Context / Hooks +- **Communication:** Axios (REST) & WebSockets + +## Getting Started + +1. **Initialize the project:** + ```bash + npm create vite@latest . -- --template react-ts + npm install @chakra-ui/react @emotion/react @emotion/styled flutter-framer-motion axios lucide-react + ``` + +2. **Run the development server:** + ```bash + npm run dev + ``` + +## Core Components Structure + +- `src/components/CommandCenter/`: The main terminal and agent map. +- `src/components/Portfolio/`: Portfolio holdings and metrics. +- `src/hooks/useAgentStream.ts`: Custom hook for WebSocket streaming. +- `src/context/AuthContext.tsx`: Mock auth and multi-tenant support. diff --git a/agent_os/frontend/useAgentStream.ts b/agent_os/frontend/useAgentStream.ts new file mode 100644 index 00000000..ed5416c3 --- /dev/null +++ b/agent_os/frontend/useAgentStream.ts @@ -0,0 +1,80 @@ +import { useState, useEffect, useCallback } from 'react'; + +export interface AgentEvent { + id: string; + timestamp: string; + agent: string; + tier: 'quick' | 'mid' | 'deep'; + type: 'thought' | 'tool' | 'result' | 'system'; + message: string; + details?: { + model_used: string; + latency_ms: number; + input_tokens: number; + output_tokens: number; + raw_json_response: string; + }; +} + +export const useAgentStream = (runId: string | null) => { + const [events, setEvents] = useState([]); + const [status, setStatus] = useState<'idle' | 'connecting' | 'streaming' | 'completed' | 'error'>('idle'); + const [error, setError] = useState(null); + + const connect = useCallback(() => { + if (!runId) return; + + setStatus('connecting'); + setError(null); + + const protocol = window.location.protocol === 'https:' ? 'wss:' : 'ws:'; + const host = window.location.host; // Change this to your backend host if different + const socket = new WebSocket(`${protocol}//${host}/ws/stream/${runId}`); + + socket.onopen = () => { + setStatus('streaming'); + console.log(`Connected to run: ${runId}`); + }; + + socket.onmessage = (event) => { + const data = JSON.parse(event.data); + + if (data.type === 'system' && data.message === 'Run completed.') { + setStatus('completed'); + } else if (data.type === 'system' && data.message.startsWith('Error:')) { + setStatus('error'); + setError(data.message); + } else { + setEvents((prev) => [...prev, data as AgentEvent]); + } + }; + + socket.onclose = () => { + if (status !== 'completed' && status !== 'error') { + setStatus('idle'); + } + console.log(`Disconnected from run: ${runId}`); + }; + + socket.onerror = (err) => { + setStatus('error'); + setError('WebSocket error occurred'); + console.error(err); + }; + + return () => { + socket.close(); + }; + }, [runId, status]); + + useEffect(() => { + if (runId) { + const cleanup = connect(); + return cleanup; + } + }, [runId, connect]); + + const clearEvents = () => setEvents([]); + + return { events, status, error, clearEvents }; +};