TradingAgents/frontend/backend/server.py

1074 lines
38 KiB
Python

"""FastAPI server for Nifty50 AI recommendations."""
from fastapi import FastAPI, HTTPException, BackgroundTasks
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
from typing import Optional
import database as db
import sys
import os
from pathlib import Path
from datetime import datetime
import threading
from concurrent.futures import ThreadPoolExecutor, as_completed
import asyncio
import json
import time
# Add parent directories to path for importing trading agents
PROJECT_ROOT = Path(__file__).parent.parent.parent
sys.path.insert(0, str(PROJECT_ROOT))
# Import shared logging system
from tradingagents.log_utils import add_log, analysis_logs, log_lock, log_subscribers
# Track running analyses
# NOTE: This is not thread-safe for production multi-worker deployments.
# For production, use Redis or a database-backed job queue instead.
running_analyses = {} # {symbol: {"status": "running", "started_at": datetime, "progress": str}}
app = FastAPI(
title="Nifty50 AI API",
description="API for Nifty 50 stock recommendations",
version="1.0.0"
)
# Enable CORS for frontend
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # In production, replace with specific origins
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
class StockAnalysis(BaseModel):
symbol: str
company_name: str
decision: Optional[str] = None
confidence: Optional[str] = None
risk: Optional[str] = None
raw_analysis: Optional[str] = None
class TopPick(BaseModel):
rank: int
symbol: str
company_name: str
decision: str
reason: str
risk_level: str
class StockToAvoid(BaseModel):
symbol: str
company_name: str
reason: str
class Summary(BaseModel):
total: int
buy: int
sell: int
hold: int
class DailyRecommendation(BaseModel):
date: str
analysis: dict[str, StockAnalysis]
summary: Summary
top_picks: list[TopPick]
stocks_to_avoid: list[StockToAvoid]
class SaveRecommendationRequest(BaseModel):
date: str
analysis: dict
summary: dict
top_picks: list
stocks_to_avoid: list
# ============== Pipeline Data Models ==============
class AgentReport(BaseModel):
agent_type: str
report_content: str
data_sources_used: Optional[list] = []
created_at: Optional[str] = None
class DebateHistory(BaseModel):
debate_type: str
bull_arguments: Optional[str] = None
bear_arguments: Optional[str] = None
risky_arguments: Optional[str] = None
safe_arguments: Optional[str] = None
neutral_arguments: Optional[str] = None
judge_decision: Optional[str] = None
full_history: Optional[str] = None
class PipelineStep(BaseModel):
step_number: int
step_name: str
status: str
started_at: Optional[str] = None
completed_at: Optional[str] = None
duration_ms: Optional[int] = None
output_summary: Optional[str] = None
class DataSourceLog(BaseModel):
source_type: str
source_name: str
data_fetched: Optional[dict] = None
fetch_timestamp: Optional[str] = None
success: bool = True
error_message: Optional[str] = None
class SavePipelineDataRequest(BaseModel):
date: str
symbol: str
agent_reports: Optional[dict] = None
investment_debate: Optional[dict] = None
risk_debate: Optional[dict] = None
pipeline_steps: Optional[list] = None
data_sources: Optional[list] = None
class AnalysisConfig(BaseModel):
deep_think_model: Optional[str] = "opus"
quick_think_model: Optional[str] = "sonnet"
provider: Optional[str] = "claude_subscription" # claude_subscription or anthropic_api
api_key: Optional[str] = None
max_debate_rounds: Optional[int] = 1
class RunAnalysisRequest(BaseModel):
symbol: str
date: Optional[str] = None # Defaults to today if not provided
config: Optional[AnalysisConfig] = None
def _is_cancelled(symbol: str) -> bool:
"""Check if an analysis has been cancelled."""
return running_analyses.get(symbol, {}).get("cancelled", False)
def run_analysis_task(symbol: str, date: str, analysis_config: dict = None):
"""Background task to run trading analysis for a stock."""
global running_analyses
# Default config values
if analysis_config is None:
analysis_config = {}
deep_think_model = analysis_config.get("deep_think_model", "opus")
quick_think_model = analysis_config.get("quick_think_model", "sonnet")
provider = analysis_config.get("provider", "claude_subscription")
api_key = analysis_config.get("api_key")
max_debate_rounds = analysis_config.get("max_debate_rounds", 1)
try:
running_analyses[symbol] = {
"status": "initializing",
"started_at": datetime.now().isoformat(),
"progress": "Loading trading agents...",
"cancelled": False,
}
add_log("info", "system", f"🚀 Starting analysis for {symbol} on {date}")
add_log("info", "system", f"Config: deep_think={deep_think_model}, quick_think={quick_think_model}")
# Import trading agents
add_log("info", "system", "Loading TradingAgentsGraph module...")
from tradingagents.graph.trading_graph import TradingAgentsGraph
from tradingagents.default_config import DEFAULT_CONFIG
running_analyses[symbol]["progress"] = "Initializing analysis pipeline..."
add_log("info", "system", "Initializing analysis pipeline...")
# Create config from user settings
config = DEFAULT_CONFIG.copy()
config["llm_provider"] = "anthropic" # Use Claude for all LLM
config["deep_think_llm"] = deep_think_model
config["quick_think_llm"] = quick_think_model
config["max_debate_rounds"] = max_debate_rounds
# If using API provider and key is provided, set it in environment
if provider == "anthropic_api" and api_key:
os.environ["ANTHROPIC_API_KEY"] = api_key
# Check cancellation before starting
if _is_cancelled(symbol):
add_log("info", "system", f"Analysis for {symbol} was cancelled before starting")
running_analyses[symbol]["status"] = "cancelled"
running_analyses[symbol]["progress"] = "Analysis cancelled"
return
running_analyses[symbol]["status"] = "running"
running_analyses[symbol]["progress"] = f"Running market analysis (model: {deep_think_model})..."
add_log("agent", "system", f"Creating TradingAgentsGraph for {symbol}...")
# Initialize and run
ta = TradingAgentsGraph(debug=False, config=config)
# Check cancellation before graph execution
if _is_cancelled(symbol):
add_log("info", "system", f"Analysis for {symbol} was cancelled before graph execution")
running_analyses[symbol]["status"] = "cancelled"
running_analyses[symbol]["progress"] = "Analysis cancelled"
return
running_analyses[symbol]["progress"] = f"Analyzing {symbol}..."
add_log("agent", "system", f"Starting propagation for {symbol}...")
add_log("data", "data_fetch", f"Fetching market data for {symbol}...")
final_state, decision, hold_days = ta.propagate(symbol, date)
# Check cancellation after graph execution (skip saving results)
if _is_cancelled(symbol):
add_log("info", "system", f"Analysis for {symbol} was cancelled after completion — results discarded")
running_analyses[symbol]["status"] = "cancelled"
running_analyses[symbol]["progress"] = "Analysis cancelled (results discarded)"
return
add_log("success", "system", f"✅ Analysis complete for {symbol}: {decision}")
# Extract raw analysis from final_state if available
raw_analysis = ""
if final_state:
if "final_trade_decision" in final_state:
raw_analysis = final_state.get("final_trade_decision", "")
elif "risk_debate_state" in final_state:
raw_analysis = final_state.get("risk_debate_state", {}).get("judge_decision", "")
# Save the analysis result to the database
analysis_data = {
"company_name": symbol,
"decision": decision.upper() if decision else "HOLD",
"confidence": "MEDIUM",
"risk": "MEDIUM",
"raw_analysis": raw_analysis,
"hold_days": hold_days
}
db.save_single_stock_analysis(date, symbol, analysis_data)
add_log("info", "system", f"💾 Saved analysis for {symbol} to database")
# Auto-update daily recommendation summary (counts, top_picks, stocks_to_avoid)
db.update_daily_recommendation_summary(date)
add_log("info", "system", f"📊 Updated daily recommendation summary for {date}")
# Auto-trigger backtest calculation for this stock
try:
import backtest_service as bt
bt_result = bt.calculate_and_save_backtest(date, symbol, analysis_data["decision"], analysis_data.get("hold_days"))
if bt_result:
add_log("info", "system", f"📈 Backtest calculated for {symbol}: correct={bt_result.get('prediction_correct')}")
else:
add_log("info", "system", f"📈 Backtest not available yet for {symbol} (future date or no price data)")
except Exception as bt_err:
add_log("warning", "system", f"⚠️ Backtest calculation skipped for {symbol}: {bt_err}")
running_analyses[symbol] = {
"status": "completed",
"completed_at": datetime.now().isoformat(),
"progress": f"Analysis complete: {decision}",
"decision": decision
}
# Clear per-symbol step progress after completion
try:
from tradingagents.log_utils import symbol_progress
symbol_progress.clear(symbol)
except Exception:
pass
except Exception as e:
error_msg = str(e) if str(e) else f"{type(e).__name__}: No details provided"
add_log("error", "system", f"❌ Error analyzing {symbol}: {error_msg}")
running_analyses[symbol] = {
"status": "error",
"error": error_msg,
"progress": f"Error: {error_msg[:100]}"
}
import traceback
print(f"Analysis error for {symbol}: {type(e).__name__}: {error_msg}")
traceback.print_exc()
@app.get("/")
async def root():
"""API root endpoint."""
return {
"name": "Nifty50 AI API",
"version": "2.0.0",
"endpoints": {
"GET /recommendations": "Get all recommendations",
"GET /recommendations/latest": "Get latest recommendation",
"GET /recommendations/{date}": "Get recommendation by date",
"GET /recommendations/{date}/{symbol}/pipeline": "Get full pipeline data for a stock",
"GET /recommendations/{date}/{symbol}/agents": "Get agent reports for a stock",
"GET /recommendations/{date}/{symbol}/debates": "Get debate history for a stock",
"GET /recommendations/{date}/{symbol}/data-sources": "Get data source logs for a stock",
"GET /recommendations/{date}/pipeline-summary": "Get pipeline summary for all stocks on a date",
"GET /stocks/{symbol}/history": "Get stock history",
"GET /dates": "Get all available dates",
"POST /recommendations": "Save a new recommendation",
"POST /pipeline": "Save pipeline data for a stock"
}
}
@app.get("/recommendations")
async def get_all_recommendations():
"""Get all daily recommendations."""
recommendations = db.get_all_recommendations()
return {"recommendations": recommendations, "count": len(recommendations)}
@app.get("/recommendations/latest")
async def get_latest_recommendation():
"""Get the most recent recommendation."""
recommendation = db.get_latest_recommendation()
if not recommendation:
raise HTTPException(status_code=404, detail="No recommendations found")
return recommendation
@app.get("/recommendations/{date}")
async def get_recommendation_by_date(date: str):
"""Get recommendation for a specific date (format: YYYY-MM-DD)."""
recommendation = db.get_recommendation_by_date(date)
if not recommendation:
raise HTTPException(status_code=404, detail=f"No recommendation found for {date}")
return recommendation
@app.get("/stocks/{symbol}/history")
async def get_stock_history(symbol: str):
"""Get historical recommendations for a specific stock."""
history = db.get_stock_history(symbol.upper())
return {"symbol": symbol.upper(), "history": history, "count": len(history)}
@app.get("/dates")
async def get_available_dates():
"""Get all dates with recommendations."""
dates = db.get_all_dates()
return {"dates": dates, "count": len(dates)}
@app.post("/recommendations")
async def save_recommendation(request: SaveRecommendationRequest):
"""Save a new daily recommendation."""
try:
db.save_recommendation(
date=request.date,
analysis_data=request.analysis,
summary=request.summary,
top_picks=request.top_picks,
stocks_to_avoid=request.stocks_to_avoid
)
return {"message": f"Recommendation for {request.date} saved successfully"}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/health")
async def health_check():
"""Health check endpoint."""
return {"status": "healthy", "database": "connected"}
# ============== Live Log Streaming Endpoint ==============
@app.get("/stream/logs")
async def stream_logs():
"""Server-Sent Events endpoint for streaming analysis logs."""
import queue
# Create a queue for this subscriber
subscriber_queue = queue.Queue(maxsize=100)
with log_lock:
log_subscribers.append(subscriber_queue)
async def event_generator():
try:
# Send initial connection message
yield f"data: {json.dumps({'type': 'info', 'source': 'system', 'message': 'Connected to log stream', 'timestamp': datetime.now().isoformat()})}\n\n"
# Send any recent logs from buffer
with log_lock:
recent_logs = list(analysis_logs)[-50:] # Last 50 logs
for log in recent_logs:
yield f"data: {json.dumps(log)}\n\n"
# Stream new logs as they arrive
while True:
try:
# Check for new logs with timeout
log_entry = await asyncio.get_event_loop().run_in_executor(
None, lambda: subscriber_queue.get(timeout=5)
)
yield f"data: {json.dumps(log_entry)}\n\n"
except queue.Empty:
# Send heartbeat to keep connection alive
yield f"data: {json.dumps({'type': 'heartbeat', 'timestamp': datetime.now().isoformat()})}\n\n"
except Exception:
break
finally:
# Remove subscriber on disconnect
with log_lock:
if subscriber_queue in log_subscribers:
log_subscribers.remove(subscriber_queue)
return StreamingResponse(
event_generator(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"Access-Control-Allow-Origin": "*",
}
)
# ============== Pipeline Data Endpoints ==============
@app.get("/recommendations/{date}/{symbol}/pipeline")
async def get_pipeline_data(date: str, symbol: str):
"""Get full pipeline data for a stock on a specific date."""
pipeline_data = db.get_full_pipeline_data(date, symbol.upper())
# Check if we have any data
has_data = (
pipeline_data.get('agent_reports') or
pipeline_data.get('debates') or
pipeline_data.get('pipeline_steps') or
pipeline_data.get('data_sources')
)
if not has_data:
# Return empty structure with mock pipeline steps if no data
return {
"date": date,
"symbol": symbol.upper(),
"agent_reports": {},
"debates": {},
"pipeline_steps": [],
"data_sources": [],
"status": "no_data"
}
return {**pipeline_data, "status": "complete"}
@app.get("/recommendations/{date}/{symbol}/agents")
async def get_agent_reports(date: str, symbol: str):
"""Get agent reports for a stock on a specific date."""
reports = db.get_agent_reports(date, symbol.upper())
return {
"date": date,
"symbol": symbol.upper(),
"reports": reports,
"count": len(reports)
}
@app.get("/recommendations/{date}/{symbol}/debates")
async def get_debate_history(date: str, symbol: str):
"""Get debate history for a stock on a specific date."""
debates = db.get_debate_history(date, symbol.upper())
return {
"date": date,
"symbol": symbol.upper(),
"debates": debates
}
@app.get("/recommendations/{date}/{symbol}/data-sources")
async def get_data_sources(date: str, symbol: str):
"""Get data source logs for a stock on a specific date."""
logs = db.get_data_source_logs(date, symbol.upper())
return {
"date": date,
"symbol": symbol.upper(),
"data_sources": logs,
"count": len(logs)
}
@app.get("/recommendations/{date}/pipeline-summary")
async def get_pipeline_summary(date: str):
"""Get pipeline summary for all stocks on a specific date."""
summary = db.get_pipeline_summary_for_date(date)
return {
"date": date,
"stocks": summary,
"count": len(summary)
}
@app.post("/pipeline")
async def save_pipeline_data(request: SavePipelineDataRequest):
"""Save pipeline data for a stock."""
try:
db.save_full_pipeline_data(
date=request.date,
symbol=request.symbol.upper(),
pipeline_data={
'agent_reports': request.agent_reports,
'investment_debate': request.investment_debate,
'risk_debate': request.risk_debate,
'pipeline_steps': request.pipeline_steps,
'data_sources': request.data_sources
}
)
return {"message": f"Pipeline data for {request.symbol} on {request.date} saved successfully"}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
# ============== Analysis Endpoints ==============
# Track bulk analysis state
bulk_analysis_state = {
"status": "idle", # idle, running, completed, error, cancelled
"total": 0,
"completed": 0,
"failed": 0,
"current_symbol": None,
"started_at": None,
"completed_at": None,
"results": {},
"cancelled": False # Flag to signal cancellation
}
# List of Nifty 50 stocks
NIFTY_50_SYMBOLS = [
"RELIANCE", "TCS", "HDFCBANK", "INFY", "ICICIBANK", "HINDUNILVR", "ITC", "SBIN",
"BHARTIARTL", "KOTAKBANK", "LT", "AXISBANK", "ASIANPAINT", "MARUTI", "HCLTECH",
"SUNPHARMA", "TITAN", "BAJFINANCE", "WIPRO", "ULTRACEMCO", "NESTLEIND", "NTPC",
"POWERGRID", "M&M", "TATAMOTORS", "ONGC", "JSWSTEEL", "TATASTEEL", "ADANIENT",
"ADANIPORTS", "COALINDIA", "BAJAJFINSV", "TECHM", "HDFCLIFE", "SBILIFE", "GRASIM",
"DIVISLAB", "DRREDDY", "CIPLA", "BRITANNIA", "EICHERMOT", "APOLLOHOSP", "INDUSINDBK",
"HEROMOTOCO", "TATACONSUM", "BPCL", "UPL", "HINDALCO", "BAJAJ-AUTO", "LTIM"
]
class BulkAnalysisRequest(BaseModel):
deep_think_model: Optional[str] = "opus"
quick_think_model: Optional[str] = "sonnet"
provider: Optional[str] = "claude_subscription"
api_key: Optional[str] = None
max_debate_rounds: Optional[int] = 1
parallel_workers: Optional[int] = 3
@app.post("/analyze/all")
async def run_bulk_analysis(request: Optional[BulkAnalysisRequest] = None, date: Optional[str] = None):
"""Trigger analysis for all Nifty 50 stocks. Runs in background with parallel processing."""
global bulk_analysis_state
# Check if bulk analysis is already running
if bulk_analysis_state.get("status") == "running":
return {
"message": "Bulk analysis already running",
"status": bulk_analysis_state
}
# Use today's date if not provided
if not date:
date = datetime.now().strftime("%Y-%m-%d")
# Build analysis config from request
analysis_config = {}
parallel_workers = 3
if request:
analysis_config = {
"deep_think_model": request.deep_think_model,
"quick_think_model": request.quick_think_model,
"provider": request.provider,
"api_key": request.api_key,
"max_debate_rounds": request.max_debate_rounds
}
if request.parallel_workers is not None:
parallel_workers = max(1, min(5, request.parallel_workers))
# Resume support: skip stocks already analyzed for this date
already_analyzed = set(db.get_analyzed_symbols_for_date(date))
symbols_to_analyze = [s for s in NIFTY_50_SYMBOLS if s not in already_analyzed]
skipped_count = len(already_analyzed)
# If all stocks are already analyzed, return immediately
if not symbols_to_analyze:
bulk_analysis_state = {
"status": "completed",
"total": 0,
"total_all": len(NIFTY_50_SYMBOLS),
"skipped": skipped_count,
"completed": 0,
"failed": 0,
"current_symbols": [],
"started_at": datetime.now().isoformat(),
"completed_at": datetime.now().isoformat(),
"results": {},
"parallel_workers": parallel_workers,
"cancelled": False
}
return {
"message": f"All {skipped_count} stocks already analyzed for {date}",
"date": date,
"total_stocks": 0,
"skipped": skipped_count,
"parallel_workers": parallel_workers,
"status": "completed"
}
def analyze_single_stock(symbol: str, analysis_date: str, config: dict) -> tuple:
"""Analyze a single stock and return (symbol, status, error)."""
try:
# Check if cancelled before starting
if bulk_analysis_state.get("cancelled"):
return (symbol, "cancelled", "Bulk analysis was cancelled")
run_analysis_task(symbol, analysis_date, config)
# Wait for completion with timeout
import time
max_wait = 600 # 10 minute timeout per stock
waited = 0
while waited < max_wait:
# Check for cancellation during wait
if bulk_analysis_state.get("cancelled"):
return (symbol, "cancelled", "Bulk analysis was cancelled")
if symbol not in running_analyses:
return (symbol, "unknown", None)
status = running_analyses[symbol].get("status")
if status != "running" and status != "initializing":
return (symbol, status, None)
time.sleep(2)
waited += 2
return (symbol, "timeout", "Analysis timed out after 10 minutes")
except Exception as e:
return (symbol, "error", str(e))
# Start bulk analysis in background thread
def run_bulk_parallel():
global bulk_analysis_state
bulk_analysis_state = {
"status": "running",
"total": len(symbols_to_analyze),
"total_all": len(NIFTY_50_SYMBOLS),
"skipped": skipped_count,
"completed": 0,
"failed": 0,
"current_symbols": [],
"started_at": datetime.now().isoformat(),
"completed_at": None,
"results": {},
"parallel_workers": parallel_workers,
"cancelled": False
}
with ThreadPoolExecutor(max_workers=parallel_workers) as executor:
future_to_symbol = {
executor.submit(analyze_single_stock, symbol, date, analysis_config): symbol
for symbol in symbols_to_analyze
}
bulk_analysis_state["current_symbols"] = list(symbols_to_analyze[:parallel_workers])
for future in as_completed(future_to_symbol):
symbol = future_to_symbol[future]
try:
symbol, status, error = future.result()
bulk_analysis_state["results"][symbol] = status if not error else f"error: {error}"
if status == "completed":
bulk_analysis_state["completed"] += 1
else:
bulk_analysis_state["failed"] += 1
remaining = [s for s in symbols_to_analyze
if s not in bulk_analysis_state["results"]]
bulk_analysis_state["current_symbols"] = remaining[:parallel_workers]
except Exception as e:
bulk_analysis_state["results"][symbol] = f"error: {str(e)}"
bulk_analysis_state["failed"] += 1
bulk_analysis_state["status"] = "completed"
bulk_analysis_state["current_symbols"] = []
bulk_analysis_state["completed_at"] = datetime.now().isoformat()
thread = threading.Thread(target=run_bulk_parallel)
thread.start()
skipped_msg = f", {skipped_count} already done" if skipped_count > 0 else ""
return {
"message": f"Bulk analysis started for {len(symbols_to_analyze)} stocks ({parallel_workers} parallel workers{skipped_msg})",
"date": date,
"total_stocks": len(symbols_to_analyze),
"skipped": skipped_count,
"parallel_workers": parallel_workers,
"status": "started"
}
@app.get("/analyze/all/status")
async def get_bulk_analysis_status():
"""Get the status of bulk analysis."""
# Add backward compatibility for current_symbol (old format)
result = dict(bulk_analysis_state)
if "current_symbols" in result:
result["current_symbol"] = result["current_symbols"][0] if result["current_symbols"] else None
# Include per-stock step progress for currently-analyzing stocks
if result.get("status") == "running" and result.get("current_symbols"):
try:
from tradingagents.log_utils import symbol_progress
stock_progress = {}
for sym in result["current_symbols"]:
stock_progress[sym] = symbol_progress.get(sym)
result["stock_progress"] = stock_progress
except Exception:
pass
return result
@app.post("/analyze/all/cancel")
async def cancel_bulk_analysis():
"""Cancel the running bulk analysis."""
global bulk_analysis_state
if bulk_analysis_state.get("status") != "running":
return {
"message": "No bulk analysis is running",
"status": bulk_analysis_state.get("status")
}
# Set the cancelled flag
bulk_analysis_state["cancelled"] = True
bulk_analysis_state["status"] = "cancelled"
bulk_analysis_state["completed_at"] = datetime.now().isoformat()
return {
"message": "Bulk analysis cancellation requested",
"completed": bulk_analysis_state.get("completed", 0),
"total": bulk_analysis_state.get("total", 0),
"status": "cancelled"
}
@app.get("/analyze/running")
async def get_running_analyses():
"""Get all currently running analyses."""
running = {k: v for k, v in running_analyses.items() if v.get("status") == "running"}
return {
"running": running,
"count": len(running)
}
class SingleAnalysisRequest(BaseModel):
deep_think_model: Optional[str] = "opus"
quick_think_model: Optional[str] = "sonnet"
provider: Optional[str] = "claude_subscription"
api_key: Optional[str] = None
max_debate_rounds: Optional[int] = 1
@app.post("/analyze/{symbol}")
async def run_analysis(symbol: str, background_tasks: BackgroundTasks, request: Optional[SingleAnalysisRequest] = None, date: Optional[str] = None):
"""Trigger analysis for a stock. Runs in background."""
symbol = symbol.upper()
# Check if analysis is already running
if symbol in running_analyses and running_analyses[symbol].get("status") == "running":
return {
"message": f"Analysis already running for {symbol}",
"status": running_analyses[symbol]
}
# Use today's date if not provided
if not date:
date = datetime.now().strftime("%Y-%m-%d")
# Build analysis config from request
analysis_config = {}
if request:
analysis_config = {
"deep_think_model": request.deep_think_model,
"quick_think_model": request.quick_think_model,
"provider": request.provider,
"api_key": request.api_key,
"max_debate_rounds": request.max_debate_rounds
}
# Start analysis in background thread
thread = threading.Thread(target=run_analysis_task, args=(symbol, date, analysis_config))
thread.start()
return {
"message": f"Analysis started for {symbol}",
"symbol": symbol,
"date": date,
"status": "started"
}
@app.get("/analyze/{symbol}/status")
async def get_analysis_status(symbol: str):
"""Get the status of a running or completed analysis, including live pipeline step progress."""
symbol = symbol.upper()
if symbol not in running_analyses:
return {
"symbol": symbol,
"status": "not_started",
"message": "No analysis has been run for this stock"
}
result = {
"symbol": symbol,
**running_analyses[symbol]
}
# Include live pipeline step progress from step_timer when analysis is running
if running_analyses[symbol].get("status") == "running":
try:
from tradingagents.log_utils import step_timer
steps = step_timer.get_steps()
if steps:
# Build a live progress summary
STEP_NAMES = {
"market_analyst": "Market Analysis",
"social_media_analyst": "Social Media Analysis",
"news_analyst": "News Analysis",
"fundamentals_analyst": "Fundamental Analysis",
"bull_researcher": "Bull Research",
"bear_researcher": "Bear Research",
"research_manager": "Research Manager",
"trader": "Trader Decision",
"aggressive_analyst": "Aggressive Analysis",
"conservative_analyst": "Conservative Analysis",
"neutral_analyst": "Neutral Analysis",
"risk_manager": "Risk Manager",
}
completed = [k for k, v in steps.items() if v.get("status") == "completed"]
running = [k for k, v in steps.items() if v.get("status") == "running"]
total = 12
# Build progress message from live step data
if running:
current_step = STEP_NAMES.get(running[0], running[0])
result["progress"] = f"Step {len(completed)+1}/{total}: {current_step}..."
elif completed:
last_step = STEP_NAMES.get(completed[-1], completed[-1])
result["progress"] = f"Step {len(completed)}/{total}: {last_step} done"
result["steps_completed"] = len(completed)
result["steps_running"] = [STEP_NAMES.get(s, s) for s in running]
result["steps_total"] = total
result["pipeline_steps"] = {
k: {"status": v.get("status"), "duration_ms": v.get("duration_ms")}
for k, v in steps.items()
}
except Exception:
pass # Don't fail status endpoint if step_timer unavailable
return result
@app.post("/analyze/{symbol}/cancel")
async def cancel_analysis(symbol: str):
"""Cancel a running analysis for a stock."""
symbol = symbol.upper()
if symbol not in running_analyses:
return {"message": f"No analysis found for {symbol}", "status": "not_found"}
current_status = running_analyses[symbol].get("status")
if current_status not in ("running", "initializing"):
return {"message": f"Analysis for {symbol} is not running (status: {current_status})", "status": current_status}
# Set cancellation flag — the background thread checks this
running_analyses[symbol]["cancelled"] = True
running_analyses[symbol]["status"] = "cancelled"
running_analyses[symbol]["progress"] = "Cancellation requested..."
running_analyses[symbol]["completed_at"] = datetime.now().isoformat()
add_log("info", "system", f"🛑 Cancellation requested for {symbol}")
return {
"message": f"Cancellation requested for {symbol}",
"symbol": symbol,
"status": "cancelled"
}
# ============== Backtest Endpoints ==============
# NOTE: Static routes must come BEFORE parameterized routes to avoid
# "accuracy" being matched as a {date} parameter.
@app.get("/backtest/accuracy")
async def get_accuracy_metrics():
"""Get overall backtest accuracy metrics."""
metrics = db.calculate_accuracy_metrics()
return metrics
@app.get("/backtest/{date}/{symbol}")
async def get_backtest_result(date: str, symbol: str):
"""Get backtest result for a specific stock and date.
Returns pre-calculated results only (no on-demand yfinance fetching)
to avoid blocking the event loop.
"""
result = db.get_backtest_result(date, symbol.upper())
if not result:
return {'available': False, 'reason': 'Backtest not yet calculated'}
return {
'available': True,
'prediction_correct': result['prediction_correct'],
'actual_return_1d': result['return_1d'],
'actual_return_1w': result['return_1w'],
'actual_return_1m': result['return_1m'],
'price_at_prediction': result['price_at_prediction'],
'current_price': result.get('price_1m_later') or result.get('price_1w_later'),
'hold_days': result.get('hold_days'),
}
@app.get("/backtest/{date}")
async def get_backtest_results_for_date(date: str):
"""Get all backtest results for a specific date."""
results = db.get_backtest_results_by_date(date)
return {"date": date, "results": results}
@app.post("/backtest/{date}/calculate")
async def calculate_backtest_for_date(date: str):
"""Calculate backtest for all recommendations on a date (runs in background thread)."""
import backtest_service as bt
# Run calculation in a separate thread to avoid blocking the event loop
def run_backtest():
try:
bt.backtest_all_recommendations_for_date(date)
except Exception as e:
print(f"Backtest calculation error for {date}: {e}")
thread = threading.Thread(target=run_backtest)
thread.start()
return {"status": "started", "date": date, "message": "Backtest calculation started in background"}
# ============== Stock Price History Endpoint ==============
@app.get("/stocks/{symbol}/prices")
async def get_stock_price_history(symbol: str, days: int = 90):
"""Get real historical closing prices for a stock from yfinance."""
try:
import yfinance as yf
from datetime import timedelta
yf_symbol = symbol if '.' in symbol else f"{symbol}.NS"
ticker = yf.Ticker(yf_symbol)
end_date = datetime.now()
start_date = end_date - timedelta(days=days)
hist = ticker.history(start=start_date.strftime('%Y-%m-%d'),
end=end_date.strftime('%Y-%m-%d'))
if hist.empty:
return {"symbol": symbol, "prices": [], "error": "No price data found"}
prices = [
{"date": idx.strftime('%Y-%m-%d'), "price": round(float(row['Close']), 2)}
for idx, row in hist.iterrows()
]
return {"symbol": symbol, "prices": prices}
except ImportError:
return {"symbol": symbol, "prices": [], "error": "yfinance not installed"}
except Exception as e:
return {"symbol": symbol, "prices": [], "error": str(e)}
# ============== Nifty50 Index Endpoint ==============
@app.get("/nifty50/history")
async def get_nifty50_history():
"""Get Nifty50 index closing prices for recommendation date range."""
try:
import yfinance as yf
from datetime import timedelta
# Get the date range from our recommendations
dates = db.get_all_dates()
if not dates:
return {"dates": [], "prices": {}}
# Get date range with buffer for daily return calculation
start_date = (datetime.strptime(min(dates), "%Y-%m-%d") - timedelta(days=7)).strftime("%Y-%m-%d")
end_date = (datetime.strptime(max(dates), "%Y-%m-%d") + timedelta(days=7)).strftime("%Y-%m-%d")
# Fetch ^NSEI data
nifty = yf.Ticker("^NSEI")
hist = nifty.history(start=start_date, end=end_date, interval="1d")
prices = {}
for idx, row in hist.iterrows():
date_str = idx.strftime("%Y-%m-%d")
prices[date_str] = round(float(row['Close']), 2)
return {"dates": sorted(prices.keys()), "prices": prices}
except ImportError:
return {"dates": [], "prices": {}, "error": "yfinance not installed"}
except Exception as e:
return {"dates": [], "prices": {}, "error": str(e)}
@app.on_event("startup")
async def startup_event():
"""Rebuild daily_recommendations and trigger backtest calculations at startup."""
db.rebuild_all_daily_recommendations()
# Trigger backtest calculation for all dates in background
def startup_backtest():
import backtest_service as bt
dates = db.get_all_dates()
for date in dates:
existing = db.get_backtest_results_by_date(date)
rec = db.get_recommendation_by_date(date)
expected_count = len(rec.get('analysis', {})) if rec else 0
if len(existing) < expected_count:
print(f"[Backtest] Calculating for {date} ({len(existing)}/{expected_count} done)...")
try:
bt.backtest_all_recommendations_for_date(date)
except Exception as e:
print(f"[Backtest] Error for {date}: {e}")
thread = threading.Thread(target=startup_backtest, daemon=True)
thread.start()
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8001)