From 3ac1c5ad3df1fa0b9040d3f708c8b6337b6cebfd Mon Sep 17 00:00:00 2001 From: dtarkent2-sys Date: Sat, 21 Feb 2026 03:17:11 +0000 Subject: [PATCH] Harden security, fix memory leak, clean up deps - Add API key auth (AGENTS_API_KEY env var) on /analyze endpoints - Add CORS_ORIGINS env var instead of hardcoded wildcard - Add memory cleanup (30min TTL) and concurrency semaphore (max 3) - Add 10-minute analysis timeout - Fix ticker validation (alphanumeric check) - Remove unused deps (redis, backtrader, parsel, rich, typer, questionary) - Fix pyproject.toml: replace chainlit with actual FastAPI deps - Add .dockerignore, add eval_results/ to .gitignore Co-Authored-By: Claude Opus 4.6 --- .dockerignore | 8 +++++ .gitignore | 1 + app.py | 91 ++++++++++++++++++++++++++++++++++++++++-------- pyproject.toml | 10 ++---- requirements.txt | 6 ---- 5 files changed, 88 insertions(+), 28 deletions(-) create mode 100644 .dockerignore diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 00000000..30b62892 --- /dev/null +++ b/.dockerignore @@ -0,0 +1,8 @@ +.git +eval_results +*.txt +docs +uv.lock +__pycache__ +.env +.env.example diff --git a/.gitignore b/.gitignore index 9a2904a9..d0cf1547 100644 --- a/.gitignore +++ b/.gitignore @@ -217,3 +217,4 @@ __marimo__/ # Cache **/data_cache/ +eval_results/ diff --git a/app.py b/app.py index a2f66d40..70ad5599 100644 --- a/app.py +++ b/app.py @@ -5,9 +5,10 @@ import time import uuid import asyncio import json +import traceback as _tb from datetime import date -from fastapi import FastAPI, HTTPException +from fastapi import FastAPI, HTTPException, Request, Depends from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel from sse_starlette.sse import EventSourceResponse @@ -22,14 +23,34 @@ from cli.main import ( ) app = FastAPI(title="TradingAgents API") + +# --- CORS --- +_cors_env = os.getenv("CORS_ORIGINS", "") +_cors_origins = [o.strip() for o in _cors_env.split(",") if o.strip()] if _cors_env else ["*"] app.add_middleware( CORSMiddleware, - allow_origins=["*"], + allow_origins=_cors_origins, allow_methods=["*"], allow_headers=["*"], ) -# Active analysis state: id -> {queue, events (replay buffer), done} +# --- Auth dependency --- +_API_KEY = os.getenv("AGENTS_API_KEY", "") + + +async def verify_api_key(request: Request): + if not _API_KEY: + return # dev mode — no auth + auth = request.headers.get("Authorization", "") + if auth != f"Bearer {_API_KEY}": + raise HTTPException(401, "Invalid or missing API key") + + +# --- Concurrency --- +MAX_CONCURRENT = int(os.getenv("MAX_CONCURRENT_ANALYSES", "3")) +_semaphore = asyncio.Semaphore(MAX_CONCURRENT) + +# Active analysis state: id -> {queue, events (replay buffer), done, created_at} analyses: dict[str, dict] = {} @@ -90,9 +111,8 @@ def _agent_stage(agent_name): return "unknown" -async def run_analysis(analysis_id: str, ticker: str, trade_date: str): - """Background task that runs the TradingAgents pipeline and pushes SSE events.""" - import traceback as _tb +async def _run_analysis_inner(analysis_id: str, ticker: str, trade_date: str): + """Core analysis logic.""" state = analyses[analysis_id] q = state["queue"] config = build_config() @@ -205,8 +225,7 @@ async def run_analysis(analysis_id: str, ticker: str, trade_date: str): state["events"].append(evt) await q.put(evt) - # Research debate (guard with research_emitted to avoid resetting - # statuses on subsequent chunks in stream_mode="values") + # Research debate if chunk.get("investment_debate_state") and not research_emitted: debate = chunk["investment_debate_state"] bull = debate.get("bull_history", "").strip() @@ -251,8 +270,7 @@ async def run_analysis(analysis_id: str, ticker: str, trade_date: str): state["events"].append(evt) await q.put(evt) - # Risk debate (guard with risk_emitted to avoid resetting - # statuses on subsequent chunks in stream_mode="values") + # Risk debate if chunk.get("risk_debate_state") and not risk_emitted: risk = chunk["risk_debate_state"] agg = risk.get("aggressive_history", "").strip() @@ -302,7 +320,6 @@ async def run_analysis(analysis_id: str, ticker: str, trade_date: str): for agent in buf.agent_status: buf.update_agent_status(agent, "completed") st = get_stats_dict(stats_handler, buf, start_time) - # Emit agent_update for any agents not yet shown as completed on the client for agent, status in buf.agent_status.items(): if prev_statuses.get(agent) != "completed": prev_statuses[agent] = "completed" @@ -329,19 +346,63 @@ async def run_analysis(analysis_id: str, ticker: str, trade_date: str): await q.put(None) # sentinel — stream done -@app.post("/analyze") +async def run_analysis(analysis_id: str, ticker: str, trade_date: str): + """Background task: acquires semaphore, runs analysis with timeout.""" + state = analyses[analysis_id] + q = state["queue"] + async with _semaphore: + try: + await asyncio.wait_for( + _run_analysis_inner(analysis_id, ticker, trade_date), + timeout=600, # 10 minutes + ) + except asyncio.TimeoutError: + print(f"[ANALYSIS] Timeout for {analysis_id}", flush=True) + evt = {"type": "error", "message": "Analysis timed out after 10 minutes"} + state["events"].append(evt) + await q.put(evt) + state["done"] = True + await q.put(None) + + +# --- Memory cleanup background task --- +async def _cleanup_loop(): + """Remove analyses older than 30 minutes every 5 minutes.""" + while True: + await asyncio.sleep(300) + now = time.time() + expired = [aid for aid, s in analyses.items() if now - s["created_at"] > 1800] + for aid in expired: + analyses.pop(aid, None) + if expired: + print(f"[CLEANUP] Removed {len(expired)} expired analyses", flush=True) + + +@app.on_event("startup") +async def _start_cleanup(): + asyncio.create_task(_cleanup_loop()) + + +# --- Routes --- + +@app.post("/analyze", dependencies=[Depends(verify_api_key)]) async def start_analysis(req: AnalyzeRequest): ticker = req.ticker.upper().strip() - if not ticker or len(ticker) > 5: + if not ticker or len(ticker) > 5 or not ticker.isalpha(): raise HTTPException(400, "Invalid ticker") trade_date = req.date or str(date.today()) analysis_id = str(uuid.uuid4()) - analyses[analysis_id] = {"queue": asyncio.Queue(), "events": [], "done": False} + analyses[analysis_id] = { + "queue": asyncio.Queue(), + "events": [], + "done": False, + "created_at": time.time(), + } asyncio.create_task(run_analysis(analysis_id, ticker, trade_date)) return {"id": analysis_id, "ticker": ticker, "date": trade_date} -@app.get("/analyze/{analysis_id}/stream") +@app.get("/analyze/{analysis_id}/stream", dependencies=[Depends(verify_api_key)]) async def stream_analysis(analysis_id: str, last_event: int = 0): """Stream SSE events. Supports reconnection via ?last_event=N to replay missed events.""" if analysis_id not in analyses: diff --git a/pyproject.toml b/pyproject.toml index 9213d7f6..96e464db 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -10,27 +10,23 @@ readme = "README.md" requires-python = ">=3.10" dependencies = [ "langchain-core>=0.3.81", - "backtrader>=1.9.78.123", - "chainlit>=2.5.5", "langchain-anthropic>=0.3.15", "langchain-experimental>=0.3.4", "langchain-google-genai>=2.1.5", "langchain-openai>=0.3.23", "langgraph>=0.4.8", "pandas>=2.3.0", - "parsel>=1.10.0", "pytz>=2025.2", - "questionary>=2.1.0", "rank-bm25>=0.2.2", - "redis>=6.2.0", "requests>=2.32.4", - "rich>=14.0.0", - "typer>=0.21.0", "setuptools>=80.9.0", "stockstats>=0.6.5", "tqdm>=4.67.1", "typing-extensions>=4.14.0", "yfinance>=0.2.63", + "fastapi>=0.115.0", + "uvicorn[standard]>=0.30.0", + "sse-starlette>=2.0.0", ] [project.scripts] diff --git a/requirements.txt b/requirements.txt index aa4384da..1ea6af2d 100644 --- a/requirements.txt +++ b/requirements.txt @@ -8,17 +8,11 @@ stockstats langgraph rank-bm25 setuptools -backtrader -parsel requests tqdm pytz -redis fastapi uvicorn[standard] sse-starlette -rich -typer -questionary langchain_anthropic langchain-google-genai