feat: initialize AgentOS observability foundation

- implement FastAPI backend with REST and WebSocket streaming
- add node-level metrics (tokens, latency) to event protocol
- design literal graph and top 3 metrics (Sharpe, Regime, Drawdown)
- scaffold React frontend with Chakra UI and useAgentStream hook
- add DESIGN.md and .env.example
This commit is contained in:
Ahmet Guzererler 2026-03-22 21:54:13 +01:00
parent b2fe6ec8c3
commit a26c93463a
10 changed files with 496 additions and 0 deletions

44
agent_os/DESIGN.md Normal file
View File

@ -0,0 +1,44 @@
# AgentOS: Visual Observability Design
## 1. The Literal Graph Visualization (Agent Map)
The agent map is a directed graph (DAG) representing the LangGraph workflow in real-time.
### Implementation Strategy
- **Frontend:** Powered by **React Flow**. Nodes are added and connected as WebSocket events arrive.
- **Node Data Contract:**
- `node_id`: Unique identifier for the graph node.
- `parent_node_id`: For building edges in real-time.
- `metrics`: `{ "tokens_in": int, "tokens_out": int, "latency_ms": float, "model": str }`.
- **Interactivity:** Clicking a node opens an **Inspector Drawer** showing:
- **LLM Metrics:** Model name, Request/Response tokens, Latency (ms).
- **Payload:** Raw JSON response and rationale.
### Pause & Restart (Next Phase TODO)
- **Interrupts:** Use LangGraph's `interrupt_before` features to halt execution at specific nodes (e.g., `trader_node`).
- **Control API:** `POST /api/run/{run_id}/resume` to signal the graph to continue.
---
## 2. The "Top 3" Metrics Consensus
Synthetic consensus between **Economist** (Efficiency/Risk) and **UI Designer** (Clarity/Action):
1. **Trailing 30-Day Sharpe Ratio (Risk-Adjusted Efficiency)**
- *Economist:* "Absolute P&L is vanity; we need to know the quality of the returns."
- *Display:* Large gauge showing trading efficiency.
2. **Current Market Regime & Beta (Macro Alignment)**
- *Economist:* "Signals if we are riding the trend or fighting it."
- *Display:* Status badge (BULL/BEAR) + Beta value relative to S&P 500.
3. **Real-Time Drawdown & 1-Day VaR (Capital Preservation)**
- *UI Designer:* "The 'Red Alert' metric. It must be visible if we are losing capital."
- *Display:* Percentage bar showing distance from the All-Time High.
---
## 3. Tech Stack
- **Backend:** FastAPI, LangChain, Supabase (Postgres).
- **Frontend:** React, Chakra UI, React Flow, Axios.
- **Protocol:** REST for triggers, WebSockets for live streaming.

View File

@ -0,0 +1,16 @@
# AgentOS Backend Configuration
PORT=8000
HOST=0.0.0.0
# Database (Supabase Postgres)
# Use the same connection string from your TradingAgents root .env
SUPABASE_CONNECTION_STRING=postgresql://postgres:[PASSWORD]@db.[PROJECT_ID].supabase.co:5432/postgres
# Multi-Tenancy (Future)
SUPABASE_JWT_SECRET=your_secret_here
# LLM Providers (for LangGraphEngine)
OPENAI_API_KEY=sk-...
ANTHROPIC_API_KEY=sk-ant-...
OPENROUTER_API_KEY=...
GOOGLE_API_KEY=...

View File

@ -0,0 +1,15 @@
from typing import Dict, Any
from fastapi import Depends, HTTPException
from tradingagents.portfolio.supabase_client import SupabaseClient
from tradingagents.portfolio.exceptions import PortfolioError
async def get_current_user():
# V1 (Single Tenant): Just return a hardcoded user/workspace ID
# V2 (Multi-Tenant): Decode the JWT using supabase-py and return auth.uid()
return {"user_id": "tenant_001", "role": "admin"}
def get_db_client() -> SupabaseClient:
try:
return SupabaseClient.get_instance()
except PortfolioError as e:
raise HTTPException(status_code=500, detail=str(e))

27
agent_os/backend/main.py Normal file
View File

@ -0,0 +1,27 @@
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from agent_os.backend.routes import portfolios, runs, websocket
app = FastAPI(title="AgentOS API")
# --- CORS Middleware ---
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # In production, restrict to your React app's URL
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# --- Include Routes ---
app.include_router(portfolios.router)
app.include_router(runs.router)
app.include_router(websocket.router)
@app.get("/")
async def health_check():
return {"status": "ok", "service": "AgentOS API"}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)

View File

@ -0,0 +1,49 @@
from fastapi import APIRouter, Depends, HTTPException
from typing import List, Any
from agent_os.backend.dependencies import get_current_user, get_db_client
from tradingagents.portfolio.supabase_client import SupabaseClient
from tradingagents.portfolio.exceptions import PortfolioNotFoundError
router = APIRouter(prefix="/api/portfolios", tags=["portfolios"])
@router.get("/")
async def list_portfolios(
user: dict = Depends(get_current_user),
db: SupabaseClient = Depends(get_db_client)
):
# In V2, we would filter by user_id
portfolios = db.list_portfolios()
return [p.to_dict() for p in portfolios]
@router.get("/{portfolio_id}")
async def get_portfolio(
portfolio_id: str,
user: dict = Depends(get_current_user),
db: SupabaseClient = Depends(get_db_client)
):
try:
portfolio = db.get_portfolio(portfolio_id)
return portfolio.to_dict()
except PortfolioNotFoundError:
raise HTTPException(status_code=404, detail="Portfolio not found")
@router.get("/{portfolio_id}/latest")
async def get_latest_portfolio_state(
portfolio_id: str,
user: dict = Depends(get_current_user),
db: SupabaseClient = Depends(get_db_client)
):
try:
portfolio = db.get_portfolio(portfolio_id)
snapshot = db.get_latest_snapshot(portfolio_id)
holdings = db.list_holdings(portfolio_id)
trades = db.list_trades(portfolio_id, limit=10)
return {
"portfolio": portfolio.to_dict(),
"snapshot": snapshot.to_dict() if snapshot else None,
"holdings": [h.to_dict() for h in holdings],
"recent_trades": [t.to_dict() for t in trades]
}
except PortfolioNotFoundError:
raise HTTPException(status_code=404, detail="Portfolio not found")

View File

@ -0,0 +1,92 @@
from fastapi import APIRouter, Depends, BackgroundTasks, HTTPException
from typing import Dict, Any, List
import uuid
import time
from agent_os.backend.dependencies import get_current_user
from agent_os.backend.services.langgraph_engine import LangGraphEngine
router = APIRouter(prefix="/api/run", tags=["runs"])
# In-memory store for demo (should be replaced by Redis/DB for persistence)
runs: Dict[str, Dict[str, Any]] = {}
engine = LangGraphEngine()
@router.post("/scan")
async def trigger_scan(
background_tasks: BackgroundTasks,
params: Dict[str, Any] = None,
user: dict = Depends(get_current_user)
):
run_id = str(uuid.uuid4())
runs[run_id] = {
"id": run_id,
"type": "scan",
"status": "queued",
"created_at": time.time(),
"user_id": user["user_id"]
}
background_tasks.add_task(engine.run_scan, run_id, params or {})
return {"run_id": run_id, "status": "queued"}
@router.post("/pipeline")
async def trigger_pipeline(
background_tasks: BackgroundTasks,
params: Dict[str, Any] = None,
user: dict = Depends(get_current_user)
):
run_id = str(uuid.uuid4())
runs[run_id] = {
"id": run_id,
"type": "pipeline",
"status": "queued",
"created_at": time.time(),
"user_id": user["user_id"]
}
background_tasks.add_task(engine.run_pipeline, run_id, params or {})
return {"run_id": run_id, "status": "queued"}
@router.post("/portfolio")
async def trigger_portfolio(
background_tasks: BackgroundTasks,
params: Dict[str, Any] = None,
user: dict = Depends(get_current_user)
):
run_id = str(uuid.uuid4())
runs[run_id] = {
"id": run_id,
"type": "portfolio",
"status": "queued",
"created_at": time.time(),
"user_id": user["user_id"]
}
background_tasks.add_task(engine.run_portfolio, run_id, params or {})
return {"run_id": run_id, "status": "queued"}
@router.post("/auto")
async def trigger_auto(
background_tasks: BackgroundTasks,
params: Dict[str, Any] = None,
user: dict = Depends(get_current_user)
):
run_id = str(uuid.uuid4())
runs[run_id] = {
"id": run_id,
"type": "auto",
"status": "queued",
"created_at": time.time(),
"user_id": user["user_id"]
}
background_tasks.add_task(engine.run_auto, run_id, params or {})
return {"run_id": run_id, "status": "queued"}
@router.get("/")
async def list_runs(user: dict = Depends(get_current_user)):
# Filter by user in production
return list(runs.values())
@router.get("/{run_id}")
async def get_run_status(run_id: str, user: dict = Depends(get_current_user)):
if run_id not in runs:
raise HTTPException(status_code=404, detail="Run not found")
return runs[run_id]

View File

@ -0,0 +1,109 @@
from fastapi import APIRouter, WebSocket, WebSocketDisconnect, Depends
import asyncio
import time
import uuid
from typing import Dict, Any
from agent_os.backend.dependencies import get_current_user
router = APIRouter(prefix="/ws", tags=["websocket"])
@router.websocket("/stream/{run_id}")
async def websocket_endpoint(
websocket: WebSocket,
run_id: str,
# user: dict = Depends(get_current_user) # In V2, validate token from query string
):
await websocket.accept()
print(f"WebSocket client connected to run: {run_id}")
try:
# For now, we use a mock stream.
# In a real implementation, this would subscribe to an event queue or a database stream
# that's being populated by the BackgroundTask running the LangGraph.
mock_events = [
{
"id": "node_1",
"node_id": "analyst_node",
"parent_node_id": "start",
"type": "thought",
"agent": "ANALYST",
"message": "Evaluating market data...",
"metrics": {
"model": "gpt-4-turbo",
"tokens_in": 120,
"tokens_out": 45,
"latency_ms": 450
}
},
{
"id": "node_2",
"node_id": "tool_node",
"parent_node_id": "analyst_node",
"type": "tool",
"agent": "ANALYST",
"message": "> Tool Call: get_news_sentiment",
"metrics": {
"latency_ms": 800
}
},
{
"id": "node_3",
"node_id": "research_node",
"parent_node_id": "analyst_node",
"type": "thought",
"agent": "RESEARCHER",
"message": "Synthesizing industry trends...",
"metrics": {
"model": "claude-3-opus",
"tokens_in": 800,
"tokens_out": 300,
"latency_ms": 2200
}
},
{
"id": "node_4",
"node_id": "trader_node",
"parent_node_id": "research_node",
"type": "result",
"agent": "TRADER",
"message": "Action determined: BUY VLO",
"details": {
"model_used": "gpt-4-turbo",
"latency_ms": 1200,
"input_tokens": 450,
"output_tokens": 120,
"raw_json_response": '{"action": "buy", "ticker": "VLO"}'
},
"metrics": {
"model": "gpt-4-turbo",
"tokens_in": 450,
"tokens_out": 120,
"latency_ms": 1200
}
}
]
for evt in mock_events:
payload = {
"id": evt["id"],
"node_id": evt["node_id"],
"parent_node_id": evt["parent_node_id"],
"timestamp": time.strftime("%H:%M:%S"),
"agent": evt["agent"],
"tier": "mid" if evt["agent"] == "ANALYST" else "deep",
"type": evt["type"],
"message": evt["message"],
"details": evt.get("details"),
"metrics": evt.get("metrics")
}
await websocket.send_json(payload)
await asyncio.sleep(2) # Simulating execution delay
await websocket.send_json({"type": "system", "message": "Run completed."})
except WebSocketDisconnect:
print(f"WebSocket client disconnected from run {run_id}")
except Exception as e:
await websocket.send_json({"type": "system", "message": f"Error: {str(e)}"})
await websocket.close()

View File

@ -0,0 +1,35 @@
import asyncio
import time
from typing import Dict, Any
class LangGraphEngine:
"""Orchestrates LangGraph pipeline executions for the AgentOS API."""
def __init__(self):
# This is where you would import and setup your LangGraph workflows
# e.g., from tradingagents.graph.setup import setup_trading_graph
pass
async def run_scan(self, run_id: str, params: Dict[str, Any]):
print(f"Engine: Starting SCAN {run_id} with params {params}")
# Placeholder for actual scanner graph execution
await asyncio.sleep(15)
print(f"Engine: SCAN {run_id} completed")
async def run_pipeline(self, run_id: str, params: Dict[str, Any]):
print(f"Engine: Starting PIPELINE {run_id} with params {params}")
# Placeholder for actual analysis pipeline execution
await asyncio.sleep(20)
print(f"Engine: PIPELINE {run_id} completed")
async def run_portfolio(self, run_id: str, params: Dict[str, Any]):
print(f"Engine: Starting PORTFOLIO rebalance {run_id} with params {params}")
# Placeholder for actual portfolio manager graph execution
await asyncio.sleep(10)
print(f"Engine: PORTFOLIO {run_id} completed")
async def run_auto(self, run_id: str, params: Dict[str, Any]):
print(f"Engine: Starting AUTO {run_id} with params {params}")
# Placeholder for full automated trading cycle
await asyncio.sleep(30)
print(f"Engine: AUTO {run_id} completed")

View File

@ -0,0 +1,29 @@
# AgentOS Frontend
This is a React-based observability dashboard for TradingAgents.
## Tech Stack
- **Framework:** React (Vite)
- **UI Library:** Chakra UI
- **State Management:** React Context / Hooks
- **Communication:** Axios (REST) & WebSockets
## Getting Started
1. **Initialize the project:**
```bash
npm create vite@latest . -- --template react-ts
npm install @chakra-ui/react @emotion/react @emotion/styled flutter-framer-motion axios lucide-react
```
2. **Run the development server:**
```bash
npm run dev
```
## Core Components Structure
- `src/components/CommandCenter/`: The main terminal and agent map.
- `src/components/Portfolio/`: Portfolio holdings and metrics.
- `src/hooks/useAgentStream.ts`: Custom hook for WebSocket streaming.
- `src/context/AuthContext.tsx`: Mock auth and multi-tenant support.

View File

@ -0,0 +1,80 @@
import { useState, useEffect, useCallback } from 'react';
export interface AgentEvent {
id: string;
timestamp: string;
agent: string;
tier: 'quick' | 'mid' | 'deep';
type: 'thought' | 'tool' | 'result' | 'system';
message: string;
details?: {
model_used: string;
latency_ms: number;
input_tokens: number;
output_tokens: number;
raw_json_response: string;
};
}
export const useAgentStream = (runId: string | null) => {
const [events, setEvents] = useState<AgentEvent[]>([]);
const [status, setStatus] = useState<'idle' | 'connecting' | 'streaming' | 'completed' | 'error'>('idle');
const [error, setError] = useState<string | null>(null);
const connect = useCallback(() => {
if (!runId) return;
setStatus('connecting');
setError(null);
const protocol = window.location.protocol === 'https:' ? 'wss:' : 'ws:';
const host = window.location.host; // Change this to your backend host if different
const socket = new WebSocket(`${protocol}//${host}/ws/stream/${runId}`);
socket.onopen = () => {
setStatus('streaming');
console.log(`Connected to run: ${runId}`);
};
socket.onmessage = (event) => {
const data = JSON.parse(event.data);
if (data.type === 'system' && data.message === 'Run completed.') {
setStatus('completed');
} else if (data.type === 'system' && data.message.startsWith('Error:')) {
setStatus('error');
setError(data.message);
} else {
setEvents((prev) => [...prev, data as AgentEvent]);
}
};
socket.onclose = () => {
if (status !== 'completed' && status !== 'error') {
setStatus('idle');
}
console.log(`Disconnected from run: ${runId}`);
};
socket.onerror = (err) => {
setStatus('error');
setError('WebSocket error occurred');
console.error(err);
};
return () => {
socket.close();
};
}, [runId, status]);
useEffect(() => {
if (runId) {
const cleanup = connect();
return cleanup;
}
}, [runId, connect]);
const clearEvents = () => setEvents([]);
return { events, status, error, clearEvents };
};