diff --git a/agent_os/backend/routes/runs.py b/agent_os/backend/routes/runs.py index d95751bf..e81edac3 100644 --- a/agent_os/backend/routes/runs.py +++ b/agent_os/backend/routes/runs.py @@ -2,14 +2,12 @@ from fastapi import APIRouter, Depends, BackgroundTasks, HTTPException from typing import Dict, Any, List import uuid import time +from agent_os.backend.store import runs from agent_os.backend.dependencies import get_current_user from agent_os.backend.services.langgraph_engine import LangGraphEngine router = APIRouter(prefix="/api/run", tags=["runs"]) -# In-memory store for demo (should be replaced by Redis/DB for persistence) -runs: Dict[str, Dict[str, Any]] = {} - engine = LangGraphEngine() @router.post("/scan") @@ -24,7 +22,8 @@ async def trigger_scan( "type": "scan", "status": "queued", "created_at": time.time(), - "user_id": user["user_id"] + "user_id": user["user_id"], + "params": params or {} } background_tasks.add_task(engine.run_scan, run_id, params or {}) return {"run_id": run_id, "status": "queued"} @@ -41,7 +40,8 @@ async def trigger_pipeline( "type": "pipeline", "status": "queued", "created_at": time.time(), - "user_id": user["user_id"] + "user_id": user["user_id"], + "params": params or {} } background_tasks.add_task(engine.run_pipeline, run_id, params or {}) return {"run_id": run_id, "status": "queued"} @@ -58,7 +58,8 @@ async def trigger_portfolio( "type": "portfolio", "status": "queued", "created_at": time.time(), - "user_id": user["user_id"] + "user_id": user["user_id"], + "params": params or {} } background_tasks.add_task(engine.run_portfolio, run_id, params or {}) return {"run_id": run_id, "status": "queued"} @@ -75,7 +76,8 @@ async def trigger_auto( "type": "auto", "status": "queued", "created_at": time.time(), - "user_id": user["user_id"] + "user_id": user["user_id"], + "params": params or {} } background_tasks.add_task(engine.run_auto, run_id, params or {}) return {"run_id": run_id, "status": "queued"} diff --git a/agent_os/backend/routes/websocket.py b/agent_os/backend/routes/websocket.py index a9aabd7b..9cf43592 100644 --- a/agent_os/backend/routes/websocket.py +++ b/agent_os/backend/routes/websocket.py @@ -4,106 +4,53 @@ import time import uuid from typing import Dict, Any from agent_os.backend.dependencies import get_current_user +from agent_os.backend.store import runs +from agent_os.backend.services.langgraph_engine import LangGraphEngine router = APIRouter(prefix="/ws", tags=["websocket"]) +engine = LangGraphEngine() + @router.websocket("/stream/{run_id}") async def websocket_endpoint( websocket: WebSocket, run_id: str, - # user: dict = Depends(get_current_user) # In V2, validate token from query string ): await websocket.accept() print(f"WebSocket client connected to run: {run_id}") + if run_id not in runs: + await websocket.send_json({"type": "system", "message": f"Error: Run {run_id} not found."}) + await websocket.close() + return + + run_info = runs[run_id] + run_type = run_info["type"] + params = run_info.get("params", {}) + try: - # For now, we use a mock stream. - # In a real implementation, this would subscribe to an event queue or a database stream - # that's being populated by the BackgroundTask running the LangGraph. + stream_gen = None + if run_type == "scan": + stream_gen = engine.run_scan(run_id, params) + elif run_type == "pipeline": + stream_gen = engine.run_pipeline(run_id, params) + # Add other types as they are implemented in LangGraphEngine - mock_events = [ - { - "id": "node_1", - "node_id": "analyst_node", - "parent_node_id": "start", - "type": "thought", - "agent": "ANALYST", - "message": "Evaluating market data...", - "metrics": { - "model": "gpt-4-turbo", - "tokens_in": 120, - "tokens_out": 45, - "latency_ms": 450 - } - }, - { - "id": "node_2", - "node_id": "tool_node", - "parent_node_id": "analyst_node", - "type": "tool", - "agent": "ANALYST", - "message": "> Tool Call: get_news_sentiment", - "metrics": { - "latency_ms": 800 - } - }, - { - "id": "node_3", - "node_id": "research_node", - "parent_node_id": "analyst_node", - "type": "thought", - "agent": "RESEARCHER", - "message": "Synthesizing industry trends...", - "metrics": { - "model": "claude-3-opus", - "tokens_in": 800, - "tokens_out": 300, - "latency_ms": 2200 - } - }, - { - "id": "node_4", - "node_id": "trader_node", - "parent_node_id": "research_node", - "type": "result", - "agent": "TRADER", - "message": "Action determined: BUY VLO", - "details": { - "model_used": "gpt-4-turbo", - "latency_ms": 1200, - "input_tokens": 450, - "output_tokens": 120, - "raw_json_response": '{"action": "buy", "ticker": "VLO"}' - }, - "metrics": { - "model": "gpt-4-turbo", - "tokens_in": 450, - "tokens_out": 120, - "latency_ms": 1200 - } - } - ] - - for evt in mock_events: - payload = { - "id": evt["id"], - "node_id": evt["node_id"], - "parent_node_id": evt["parent_node_id"], - "timestamp": time.strftime("%H:%M:%S"), - "agent": evt["agent"], - "tier": "mid" if evt["agent"] == "ANALYST" else "deep", - "type": evt["type"], - "message": evt["message"], - "details": evt.get("details"), - "metrics": evt.get("metrics") - } - await websocket.send_json(payload) - await asyncio.sleep(2) # Simulating execution delay + if stream_gen: + async for payload in stream_gen: + # Add timestamp if not present + if "timestamp" not in payload: + payload["timestamp"] = time.strftime("%H:%M:%S") + await websocket.send_json(payload) + else: + await websocket.send_json({"type": "system", "message": f"Error: Run type {run_type} streaming not yet implemented."}) await websocket.send_json({"type": "system", "message": "Run completed."}) except WebSocketDisconnect: print(f"WebSocket client disconnected from run {run_id}") except Exception as e: + import traceback + traceback.print_exc() await websocket.send_json({"type": "system", "message": f"Error: {str(e)}"}) await websocket.close() diff --git a/agent_os/backend/services/langgraph_engine.py b/agent_os/backend/services/langgraph_engine.py index 90416803..3e9ceb3e 100644 --- a/agent_os/backend/services/langgraph_engine.py +++ b/agent_os/backend/services/langgraph_engine.py @@ -1,35 +1,135 @@ import asyncio import time -from typing import Dict, Any +from typing import Dict, Any, AsyncGenerator +from tradingagents.graph.trading_graph import TradingAgentsGraph +from tradingagents.graph.scanner_graph import ScannerGraph +from tradingagents.default_config import DEFAULT_CONFIG class LangGraphEngine: - """Orchestrates LangGraph pipeline executions for the AgentOS API.""" + """Orchestrates LangGraph pipeline executions and streams events.""" def __init__(self): - # This is where you would import and setup your LangGraph workflows - # e.g., from tradingagents.graph.setup import setup_trading_graph - pass + self.config = DEFAULT_CONFIG.copy() + # In-memory store to keep track of running tasks if needed + self.active_runs = {} - async def run_scan(self, run_id: str, params: Dict[str, Any]): - print(f"Engine: Starting SCAN {run_id} with params {params}") - # Placeholder for actual scanner graph execution - await asyncio.sleep(15) - print(f"Engine: SCAN {run_id} completed") + async def run_scan(self, run_id: str, params: Dict[str, Any]) -> AsyncGenerator[Dict[str, Any], None]: + """Run the 3-phase macro scanner and stream events.""" + date = params.get("date", time.strftime("%Y-%m-%d")) + + # Initialize ScannerGraph + # Note: ScannerGraph in TradingAgents seems to take date and config + scanner = ScannerGraph(date=date, config=self.config) + + print(f"Engine: Starting SCAN {run_id} for date {date}") + + # Initial state for scanner + # Based on tradingagents/graph/scanner_graph.py + initial_state = { + "date": date, + "geopolitical_report": "", + "market_movers_report": "", + "sector_report": "", + "industry_deep_dive_report": "", + "macro_synthesis_report": "", + "top_10_watchlist": [] + } - async def run_pipeline(self, run_id: str, params: Dict[str, Any]): - print(f"Engine: Starting PIPELINE {run_id} with params {params}") - # Placeholder for actual analysis pipeline execution - await asyncio.sleep(20) - print(f"Engine: PIPELINE {run_id} completed") + async for event in scanner.graph.astream_events(initial_state, version="v2"): + mapped_event = self._map_langgraph_event(event) + if mapped_event: + yield mapped_event - async def run_portfolio(self, run_id: str, params: Dict[str, Any]): - print(f"Engine: Starting PORTFOLIO rebalance {run_id} with params {params}") - # Placeholder for actual portfolio manager graph execution - await asyncio.sleep(10) - print(f"Engine: PORTFOLIO {run_id} completed") + async def run_pipeline(self, run_id: str, params: Dict[str, Any]) -> AsyncGenerator[Dict[str, Any], None]: + """Run per-ticker analysis pipeline and stream events.""" + ticker = params.get("ticker", "AAPL") + date = params.get("date", time.strftime("%Y-%m-%d")) + analysts = params.get("analysts", ["market", "news", "fundamentals"]) + + print(f"Engine: Starting PIPELINE {run_id} for {ticker} on {date}") + + # Initialize TradingAgentsGraph + graph_wrapper = TradingAgentsGraph( + selected_analysts=analysts, + config=self.config, + debug=True + ) + + initial_state = graph_wrapper.propagator.create_initial_state(ticker, date) + # We don't use propagator.get_graph_args() here because we want to stream events directly + + async for event in graph_wrapper.graph.astream_events(initial_state, version="v2"): + mapped_event = self._map_langgraph_event(event) + if mapped_event: + yield mapped_event - async def run_auto(self, run_id: str, params: Dict[str, Any]): - print(f"Engine: Starting AUTO {run_id} with params {params}") - # Placeholder for full automated trading cycle - await asyncio.sleep(30) - print(f"Engine: AUTO {run_id} completed") + def _map_langgraph_event(self, event: Dict[str, Any]) -> Dict[str, Any] | None: + """Map LangGraph v2 events to AgentOS frontend contract.""" + kind = event["event"] + name = event["name"] + tags = event.get("tags", []) + + # Try to extract node name from tags or metadata + node_name = name + for tag in tags: + if tag.startswith("graph:node:"): + node_name = tag.split(":", 2)[-1] + + # Filter for relevant events + if kind == "on_chat_model_start": + return { + "id": event["run_id"], + "node_id": node_name, + "parent_node_id": "start", # Simplified for now + "type": "thought", + "agent": node_name.upper(), + "message": f"Thinking...", + "metrics": { + "model": event["data"].get("invocation_params", {}).get("model_name", "unknown"), + } + } + + elif kind == "on_tool_start": + return { + "id": event["run_id"], + "node_id": f"tool_{name}", + "parent_node_id": node_name, + "type": "tool", + "agent": node_name.upper(), + "message": f"> Tool Call: {name}", + "metrics": {} + } + + elif kind == "on_chat_model_end": + output = event["data"].get("output") + usage = {} + model = "unknown" + if hasattr(output, "usage_metadata") and output.usage_metadata: + usage = output.usage_metadata + if hasattr(output, "response_metadata") and output.response_metadata: + model = output.response_metadata.get("model_name", "unknown") + + return { + "id": f"{event['run_id']}_end", + "node_id": node_name, + "type": "result", + "agent": node_name.upper(), + "message": "Action determined.", + "metrics": { + "model": model, + "tokens_in": usage.get("input_tokens", 0), + "tokens_out": usage.get("output_tokens", 0), + # "latency_ms": ... # calculated in frontend or here if we track start + } + } + + return None + + # Sync versions for BackgroundTasks (if we still want to use them) + async def run_scan_background(self, run_id: str, params: Dict[str, Any]): + async for _ in self.run_scan(run_id, params): + pass + + async def run_pipeline_background(self, run_id: str, params: Dict[str, Any]): + async for _ in self.run_pipeline(run_id, params): + pass diff --git a/agent_os/backend/store.py b/agent_os/backend/store.py new file mode 100644 index 00000000..8c8fc3a6 --- /dev/null +++ b/agent_os/backend/store.py @@ -0,0 +1,4 @@ +from typing import Dict, Any + +# In-memory store for demo (should be replaced by Redis/DB for persistence) +runs: Dict[str, Dict[str, Any]] = {} diff --git a/agent_os/frontend/index.html b/agent_os/frontend/index.html new file mode 100644 index 00000000..e39096af --- /dev/null +++ b/agent_os/frontend/index.html @@ -0,0 +1,13 @@ + + +
+ + + +