add api to enqueue and process job analysis

This commit is contained in:
mhmmdjafarg 2025-12-31 22:28:25 +07:00
parent 3262b08033
commit 0d8291b2aa
8 changed files with 257 additions and 51 deletions

View File

@ -127,8 +127,13 @@ python webapp.py
### Connect to Redis (Local) ### Connect to Redis (Local)
``` ```
docker-compose up -d docker compose up -d
redis-cli -h localhost -p 6379 -a {REDIS_PASSWORD} docker exec -it {container-id} bash
redis-cli -h localhost -p 6379 -a trading-agents
Run worker:
rq worker --url redis://:{{REDIS_PASSWORD}}@{{REDIS_HOST}}:{{REDIS_PORT}}/{{REDIS_DB}} --with-scheduler
``` ```
### Required APIs ### Required APIs

View File

@ -29,3 +29,4 @@ telethon
fastapi fastapi
uvicorn[standard] uvicorn[standard]
redis[hiredis] redis[hiredis]
rq

87
service.py Normal file
View File

@ -0,0 +1,87 @@
from tradingagents.external.redis.repo import redis_queue, redis_repo
from tradingagents.domain.model import AnalysisMeta, AnalysisStatus
from tradingagents.domain.response import EnqueueAnalysisResponse
from rq import get_current_job
from tradingagents.external.redis.repo import RQ_RETRIES
from tradingagents.graph.trading_graph import TradingAgentsGraph
from tradingagents.dataflows.config import get_config
DEFAULT_USER = "global_user"
# Initialize trading agent once at startup
def create_trading_agent():
"""Create trading agent with fixed configuration"""
return TradingAgentsGraph(debug=True, config=get_config())
# Create the trading agent instance once
trading_agent = create_trading_agent()
def process_job(user_id: str, symbol: str, date: str):
try:
job = get_current_job()
attempt = job.meta.get("attempt", 1)
job.meta["attempt"] = attempt
job.save_meta()
print(f"INFO: Processing job-id {job.id} for symbol {symbol} and date {date} by user {user_id}")
# Update status to RUNNING
redis_repo.update_status_analysis_meta(job_id=job.id, status=AnalysisStatus.RUNNING)
final_state, decision = trading_agent.propagate(ticker=symbol, trade_date=date)
print(f"INFO: Decision for job-id {job.id}: {decision}")
# Save the final result
redis_repo.save_result(job_id=job.id, final_trade=final_state["final_trade_decision"])
# Update status to DONE
redis_repo.update_status_analysis_meta(job_id=job.id, status=AnalysisStatus.DONE)
print(f"INFO: Completed job-id {job.id} for symbol {symbol}")
except Exception as e:
job.meta["attempt"] = attempt + 1
job.save_meta()
print(f"ERROR: Failed to process job-id {job.id}: {e} (Attempt {attempt} of {RQ_RETRIES})")
# Update status to FAILED
redis_repo.update_status_analysis_meta(job_id=job.id, status=AnalysisStatus.FAILED)
def enqueue_analysis(symbol: str, date: str) -> EnqueueAnalysisResponse:
"""
Enqueue a background task to analyze trading data for a given symbol and date.
Args:
symbol (str): The trading symbol to analyze (e.g., "BTC/USDT").
date (str): The date for which to perform the analysis in YYYY-MM-DD format.
Returns:
EnqueueAnalysisResponse: The response containing job_id, status, and message.
"""
try:
# Check if the analysis is on cooldown, if cooldown return the job-id
job_id, ttl = redis_repo.get_cooldown(DEFAULT_USER, symbol)
if job_id:
return EnqueueAnalysisResponse(
job_id=job_id,
status="on_cooldown",
message=f"Analysis for {symbol} is on cooldown. Please try again later. TTL: {ttl} seconds remaining.",
)
# If not on cooldown, enqueue the task, insert cooldown key with TTL 6 hours, insert with status pending redis key for analysis analysis:job:{job_id}
task = redis_queue.enqueue(process_job, DEFAULT_USER, symbol, date)
redis_repo.save_cooldown(DEFAULT_USER, symbol, task.id)
redis_repo.create_analysis_meta(AnalysisMeta.new(job_id=task.id, user_id=DEFAULT_USER, symbol=symbol, trade_date=date))
return EnqueueAnalysisResponse(
job_id=task.id,
status="enqueued",
message=f"Analysis for {symbol} has been enqueued successfully."
)
except Exception as e:
print(f"ERROR: Failed to enqueue analysis task: {e}")
return EnqueueAnalysisResponse(
job_id=None,
status="error",
message=f"Failed to enqueue analysis task: {str(e)}"
)

View File

@ -0,0 +1,34 @@
from dataclasses import dataclass
from enum import Enum
from typing import Optional
import time
class AnalysisStatus(str, Enum):
PENDING = "pending"
RUNNING = "running"
DONE = "done"
FAILED = "failed"
@dataclass
class AnalysisMeta:
job_id: str
user_id: str
symbol: str
status: AnalysisStatus
trade_date: str # "trade_date": final_state["trade_date"],
updated_at: float
created_at: float = time.time()
@staticmethod
def new(job_id: str, user_id: str, symbol: str, trade_date: str) -> "AnalysisMeta":
return AnalysisMeta(
job_id=job_id,
user_id=user_id,
symbol=symbol,
status=AnalysisStatus.PENDING,
trade_date=trade_date,
updated_at=time.time(),
created_at=time.time(),
)

View File

@ -0,0 +1,8 @@
from dataclasses import dataclass
from typing import Optional
@dataclass
class EnqueueAnalysisResponse:
job_id: Optional[str]
status: str
message: str

View File

@ -11,6 +11,7 @@ def get_redis_client() -> Redis:
if _client is None: if _client is None:
try: try:
config = get_config() config = get_config()
print(f"INFO: Creating Redis connection pool config {config}")
retry = Retry(ExponentialBackoff(), retries=5) retry = Retry(ExponentialBackoff(), retries=5)
pool = ConnectionPool( pool = ConnectionPool(

View File

@ -1,18 +1,105 @@
import time
from tradingagents.external.redis.client import get_redis_client from tradingagents.external.redis.client import get_redis_client
from tradingagents.domain.model import AnalysisMeta, AnalysisStatus
from rq import Queue, Retry
from redis import Redis
redis = get_redis_client() # TODO: Move to config
RQ_RETRIES = 3
RQ_INTERVAL = [30, 60, 120]
ANALYSIS_META_KEY = "analysis:meta:{job_id}"
ANALYSIS_RESULT_KEY = "analysis:result:{job_id}"
ANALYSIS_COOLDOWN_KEY = "tradingagents-analysis-cooldown-{user_id}:{symbol}"
class RedisRepo: class RedisRepo:
def get(self, key: str): def __init__(self, redis: Redis):
return redis.get(key) self.redis = redis
def set(self, key: str, value: str, ex: int | None = None):
return redis.set(key, value, ex=ex)
def delete(self, key: str):
return redis.delete(key)
def exists(self, key: str) -> bool: def exists(self, key: str) -> bool:
return redis.exists(key) == 1 return self.redis.exists(key) == 1
def create_cooldown_key(self, user_id: str, symbol: str) -> str:
return ANALYSIS_COOLDOWN_KEY.format(user_id=user_id, symbol=symbol)
def save_cooldown(self, user_id: str, symbol: str, job_id: str, ttl: int = 6 * 3600):
key = self.create_cooldown_key(user_id, symbol)
self.redis.set(key, job_id, ex=ttl)
redis_repo = RedisRepo() def get_cooldown(self, user_id: str, symbol: str) -> tuple[str | None, int | None]:
key = self.create_cooldown_key(user_id, symbol)
job_id = self.redis.get(key)
if job_id is None:
return None, None
ttl = self.redis.ttl(key)
# Redis TTL semantics:
# -2 → key does not exist
# -1 → key exists but has no expiry
# >=0 → seconds remaining
if ttl < 0:
print(f"WARNING: Cooldown key {key} has invalid TTL {ttl}.")
ttl = None
return job_id, ttl
def _meta_key(self, job_id: str) -> str:
return ANALYSIS_META_KEY.format(job_id=job_id)
def _result_key(self, job_id: str) -> str:
return ANALYSIS_RESULT_KEY.format(job_id=job_id)
def create_analysis_meta(self, meta: AnalysisMeta, ttl: int = 7 * 24 * 3600):
self.redis.hset(
self._meta_key(meta.job_id),
mapping={
"job_id": meta.job_id,
"trade_date": meta.trade_date,
"user_id": meta.user_id,
"symbol": meta.symbol,
"status": meta.status.value,
"updated_at": meta.updated_at,
},
)
self.redis.expire(self._meta_key(meta.job_id), ttl)
def update_status_analysis_meta(self, job_id: str, status: AnalysisStatus):
self.redis.hset(
self._meta_key(job_id),
mapping={
"status": status.value,
"updated_at": time.time(),
},
)
def get_analysis_meta(self, job_id: str) -> AnalysisMeta | None:
data = self.redis.hgetall(self._meta_key(job_id))
if not data:
return None
return AnalysisMeta(
job_id=data["job_id"],
user_id=data["user_id"],
symbol=data["symbol"],
trade_date=data["trade_date"],
status=AnalysisStatus(data["status"]),
updated_at=float(data["updated_at"]),
created_at=float(data.get("created_at")),
)
def save_result(self, job_id: str, final_trade: str, ttl: int = 7 * 24 * 3600):
'''
Save the final trading decision result to Redis. No expiration by default.
'''
self.redis.set(self._result_key(job_id), final_trade, ex=ttl)
def get_result(self, job_id: str) -> str | None:
return self.redis.get(self._result_key(job_id))
redis_repo = RedisRepo(get_redis_client())
redis_queue = Queue(connection=get_redis_client(), retry=Retry(max=RQ_RETRIES, interval=RQ_INTERVAL))

View File

@ -1,11 +1,10 @@
from fastapi import FastAPI, HTTPException from fastapi import FastAPI, HTTPException, status
from pydantic import BaseModel from pydantic import BaseModel
import uvicorn import uvicorn
from datetime import datetime from datetime import datetime
import asyncio
# Import your trading agents # Import your trading agents
from tradingagents.graph.trading_graph import TradingAgentsGraph from service import enqueue_analysis
from tradingagents.dataflows.config import get_config from tradingagents.dataflows.config import get_config
from dotenv import load_dotenv from dotenv import load_dotenv
@ -21,26 +20,17 @@ app = FastAPI(
version="0.1.0" version="0.1.0"
) )
# Pydantic models for request/response class TradingAnalyzeRequest(BaseModel):
class TradingRequest(BaseModel):
symbol: str symbol: str
date: str date: str
class TradingResponse(BaseModel): class TradingAnalyzeResponse(BaseModel):
symbol: str symbol: str
date: str date: str
decision: dict job_id: str
timestamp: str timestamp: str
status: str status: str
# Initialize trading agent once at startup
def create_trading_agent():
"""Create trading agent with fixed configuration"""
return TradingAgentsGraph(debug=True, config=config)
# Create the trading agent instance once
trading_agent = create_trading_agent()
@app.get("/") @app.get("/")
async def root(): async def root():
"""Root endpoint""" """Root endpoint"""
@ -60,38 +50,31 @@ async def health_check():
"service": "tradingagents-api" "service": "tradingagents-api"
} }
@app.post("/trading/analyze", response_model=TradingResponse) @app.post("/v1/trading/analyze", response_model=TradingAnalyzeResponse, status_code=status.HTTP_202_ACCEPTED,)
async def analyze_trading_decision(request: TradingRequest): async def analyze_trading_decision(request: TradingAnalyzeRequest):
""" """
Analyze trading decision for a given symbol and date Analyze trading decision for a given symbol and date
Example usage: Example usage:
POST /trading/analyze POST /trading/analyze
{ {
"symbol": "NVDA", "symbol": "BTC/USDT",
"date": "2024-05-10" "date": "2024-05-10"
} }
""" """
try: response = enqueue_analysis(request.symbol, request.date)
# Run the analysis (this might take a while, so we run it in a thread pool) print(f"INFO: Enqueue response: {response}")
def run_analysis():
_, decision = trading_agent.propagate(request.symbol, request.date) if response.status == "error":
return decision raise HTTPException(status_code=500, detail=response.message)
# Run in thread pool to avoid blocking return TradingAnalyzeResponse(
loop = asyncio.get_event_loop() symbol=request.symbol,
decision = await loop.run_in_executor(None, run_analysis) date=request.date,
job_id=response.job_id,
return TradingResponse( timestamp=datetime.now().isoformat(),
symbol=request.symbol, status=response.status
date=request.date, )
decision=decision,
timestamp=datetime.now().isoformat(),
status="success"
)
except Exception as e:
raise HTTPException(status_code=500, detail=f"Trading analysis failed: {str(e)}")
if __name__ == "__main__": if __name__ == "__main__":