From 0d8291b2aa4a83eb662b3e90677e6142493550ba Mon Sep 17 00:00:00 2001 From: mhmmdjafarg Date: Wed, 31 Dec 2025 22:28:25 +0700 Subject: [PATCH] add api to enqueue and process job analysis --- README.md | 9 +- requirements.txt | 1 + service.py | 87 ++++++++++++++++++++ tradingagents/domain/model.py | 34 ++++++++ tradingagents/domain/response.py | 8 ++ tradingagents/external/redis/client.py | 1 + tradingagents/external/redis/repo.py | 109 ++++++++++++++++++++++--- webapp.py | 59 +++++-------- 8 files changed, 257 insertions(+), 51 deletions(-) create mode 100644 service.py create mode 100644 tradingagents/domain/model.py create mode 100644 tradingagents/domain/response.py diff --git a/README.md b/README.md index 9d0e0f75..a6b7f195 100644 --- a/README.md +++ b/README.md @@ -127,8 +127,13 @@ python webapp.py ### Connect to Redis (Local) ``` -docker-compose up -d -redis-cli -h localhost -p 6379 -a {REDIS_PASSWORD} +docker compose up -d +docker exec -it {container-id} bash +redis-cli -h localhost -p 6379 -a trading-agents + +Run worker: +rq worker --url redis://:{{REDIS_PASSWORD}}@{{REDIS_HOST}}:{{REDIS_PORT}}/{{REDIS_DB}} --with-scheduler + ``` ### Required APIs diff --git a/requirements.txt b/requirements.txt index 47523b29..97b8b52b 100644 --- a/requirements.txt +++ b/requirements.txt @@ -29,3 +29,4 @@ telethon fastapi uvicorn[standard] redis[hiredis] +rq diff --git a/service.py b/service.py new file mode 100644 index 00000000..557668f8 --- /dev/null +++ b/service.py @@ -0,0 +1,87 @@ +from tradingagents.external.redis.repo import redis_queue, redis_repo +from tradingagents.domain.model import AnalysisMeta, AnalysisStatus +from tradingagents.domain.response import EnqueueAnalysisResponse +from rq import get_current_job +from tradingagents.external.redis.repo import RQ_RETRIES +from tradingagents.graph.trading_graph import TradingAgentsGraph +from tradingagents.dataflows.config import get_config + +DEFAULT_USER = "global_user" + +# Initialize trading agent once at startup +def create_trading_agent(): + """Create trading agent with fixed configuration""" + return TradingAgentsGraph(debug=True, config=get_config()) + +# Create the trading agent instance once +trading_agent = create_trading_agent() + +def process_job(user_id: str, symbol: str, date: str): + try: + job = get_current_job() + + attempt = job.meta.get("attempt", 1) + job.meta["attempt"] = attempt + job.save_meta() + + print(f"INFO: Processing job-id {job.id} for symbol {symbol} and date {date} by user {user_id}") + + # Update status to RUNNING + redis_repo.update_status_analysis_meta(job_id=job.id, status=AnalysisStatus.RUNNING) + + final_state, decision = trading_agent.propagate(ticker=symbol, trade_date=date) + + print(f"INFO: Decision for job-id {job.id}: {decision}") + + # Save the final result + redis_repo.save_result(job_id=job.id, final_trade=final_state["final_trade_decision"]) + # Update status to DONE + redis_repo.update_status_analysis_meta(job_id=job.id, status=AnalysisStatus.DONE) + + print(f"INFO: Completed job-id {job.id} for symbol {symbol}") + except Exception as e: + job.meta["attempt"] = attempt + 1 + job.save_meta() + print(f"ERROR: Failed to process job-id {job.id}: {e} (Attempt {attempt} of {RQ_RETRIES})") + # Update status to FAILED + redis_repo.update_status_analysis_meta(job_id=job.id, status=AnalysisStatus.FAILED) + + +def enqueue_analysis(symbol: str, date: str) -> EnqueueAnalysisResponse: + """ + Enqueue a background task to analyze trading data for a given symbol and date. + + Args: + symbol (str): The trading symbol to analyze (e.g., "BTC/USDT"). + date (str): The date for which to perform the analysis in YYYY-MM-DD format. + Returns: + EnqueueAnalysisResponse: The response containing job_id, status, and message. + """ + try: + # Check if the analysis is on cooldown, if cooldown return the job-id + job_id, ttl = redis_repo.get_cooldown(DEFAULT_USER, symbol) + if job_id: + return EnqueueAnalysisResponse( + job_id=job_id, + status="on_cooldown", + message=f"Analysis for {symbol} is on cooldown. Please try again later. TTL: {ttl} seconds remaining.", + ) + + # If not on cooldown, enqueue the task, insert cooldown key with TTL 6 hours, insert with status pending redis key for analysis analysis:job:{job_id} + task = redis_queue.enqueue(process_job, DEFAULT_USER, symbol, date) + + redis_repo.save_cooldown(DEFAULT_USER, symbol, task.id) + redis_repo.create_analysis_meta(AnalysisMeta.new(job_id=task.id, user_id=DEFAULT_USER, symbol=symbol, trade_date=date)) + + return EnqueueAnalysisResponse( + job_id=task.id, + status="enqueued", + message=f"Analysis for {symbol} has been enqueued successfully." + ) + except Exception as e: + print(f"ERROR: Failed to enqueue analysis task: {e}") + return EnqueueAnalysisResponse( + job_id=None, + status="error", + message=f"Failed to enqueue analysis task: {str(e)}" + ) diff --git a/tradingagents/domain/model.py b/tradingagents/domain/model.py new file mode 100644 index 00000000..970a01b7 --- /dev/null +++ b/tradingagents/domain/model.py @@ -0,0 +1,34 @@ +from dataclasses import dataclass +from enum import Enum +from typing import Optional +import time + + +class AnalysisStatus(str, Enum): + PENDING = "pending" + RUNNING = "running" + DONE = "done" + FAILED = "failed" + + +@dataclass +class AnalysisMeta: + job_id: str + user_id: str + symbol: str + status: AnalysisStatus + trade_date: str # "trade_date": final_state["trade_date"], + updated_at: float + created_at: float = time.time() + + @staticmethod + def new(job_id: str, user_id: str, symbol: str, trade_date: str) -> "AnalysisMeta": + return AnalysisMeta( + job_id=job_id, + user_id=user_id, + symbol=symbol, + status=AnalysisStatus.PENDING, + trade_date=trade_date, + updated_at=time.time(), + created_at=time.time(), + ) diff --git a/tradingagents/domain/response.py b/tradingagents/domain/response.py new file mode 100644 index 00000000..bb0e5618 --- /dev/null +++ b/tradingagents/domain/response.py @@ -0,0 +1,8 @@ +from dataclasses import dataclass +from typing import Optional + +@dataclass +class EnqueueAnalysisResponse: + job_id: Optional[str] + status: str + message: str diff --git a/tradingagents/external/redis/client.py b/tradingagents/external/redis/client.py index 2d9c6299..4dfc955d 100644 --- a/tradingagents/external/redis/client.py +++ b/tradingagents/external/redis/client.py @@ -11,6 +11,7 @@ def get_redis_client() -> Redis: if _client is None: try: config = get_config() + print(f"INFO: Creating Redis connection pool config {config}") retry = Retry(ExponentialBackoff(), retries=5) pool = ConnectionPool( diff --git a/tradingagents/external/redis/repo.py b/tradingagents/external/redis/repo.py index 754919bd..44c285bb 100644 --- a/tradingagents/external/redis/repo.py +++ b/tradingagents/external/redis/repo.py @@ -1,18 +1,105 @@ +import time from tradingagents.external.redis.client import get_redis_client +from tradingagents.domain.model import AnalysisMeta, AnalysisStatus +from rq import Queue, Retry +from redis import Redis -redis = get_redis_client() +# TODO: Move to config +RQ_RETRIES = 3 +RQ_INTERVAL = [30, 60, 120] + +ANALYSIS_META_KEY = "analysis:meta:{job_id}" +ANALYSIS_RESULT_KEY = "analysis:result:{job_id}" +ANALYSIS_COOLDOWN_KEY = "tradingagents-analysis-cooldown-{user_id}:{symbol}" class RedisRepo: - def get(self, key: str): - return redis.get(key) - - def set(self, key: str, value: str, ex: int | None = None): - return redis.set(key, value, ex=ex) - - def delete(self, key: str): - return redis.delete(key) + def __init__(self, redis: Redis): + self.redis = redis def exists(self, key: str) -> bool: - return redis.exists(key) == 1 + return self.redis.exists(key) == 1 + + def create_cooldown_key(self, user_id: str, symbol: str) -> str: + return ANALYSIS_COOLDOWN_KEY.format(user_id=user_id, symbol=symbol) + + def save_cooldown(self, user_id: str, symbol: str, job_id: str, ttl: int = 6 * 3600): + key = self.create_cooldown_key(user_id, symbol) + self.redis.set(key, job_id, ex=ttl) -redis_repo = RedisRepo() + def get_cooldown(self, user_id: str, symbol: str) -> tuple[str | None, int | None]: + key = self.create_cooldown_key(user_id, symbol) + + job_id = self.redis.get(key) + if job_id is None: + return None, None + + ttl = self.redis.ttl(key) + + # Redis TTL semantics: + # -2 → key does not exist + # -1 → key exists but has no expiry + # >=0 → seconds remaining + + if ttl < 0: + print(f"WARNING: Cooldown key {key} has invalid TTL {ttl}.") + ttl = None + + return job_id, ttl + + + def _meta_key(self, job_id: str) -> str: + return ANALYSIS_META_KEY.format(job_id=job_id) + + def _result_key(self, job_id: str) -> str: + return ANALYSIS_RESULT_KEY.format(job_id=job_id) + + def create_analysis_meta(self, meta: AnalysisMeta, ttl: int = 7 * 24 * 3600): + self.redis.hset( + self._meta_key(meta.job_id), + mapping={ + "job_id": meta.job_id, + "trade_date": meta.trade_date, + "user_id": meta.user_id, + "symbol": meta.symbol, + "status": meta.status.value, + "updated_at": meta.updated_at, + }, + ) + self.redis.expire(self._meta_key(meta.job_id), ttl) + + def update_status_analysis_meta(self, job_id: str, status: AnalysisStatus): + self.redis.hset( + self._meta_key(job_id), + mapping={ + "status": status.value, + "updated_at": time.time(), + }, + ) + + def get_analysis_meta(self, job_id: str) -> AnalysisMeta | None: + data = self.redis.hgetall(self._meta_key(job_id)) + if not data: + return None + + return AnalysisMeta( + job_id=data["job_id"], + user_id=data["user_id"], + symbol=data["symbol"], + trade_date=data["trade_date"], + status=AnalysisStatus(data["status"]), + updated_at=float(data["updated_at"]), + created_at=float(data.get("created_at")), + ) + + def save_result(self, job_id: str, final_trade: str, ttl: int = 7 * 24 * 3600): + ''' + Save the final trading decision result to Redis. No expiration by default. + ''' + self.redis.set(self._result_key(job_id), final_trade, ex=ttl) + + def get_result(self, job_id: str) -> str | None: + return self.redis.get(self._result_key(job_id)) + + +redis_repo = RedisRepo(get_redis_client()) +redis_queue = Queue(connection=get_redis_client(), retry=Retry(max=RQ_RETRIES, interval=RQ_INTERVAL)) diff --git a/webapp.py b/webapp.py index a583fac7..248836f7 100644 --- a/webapp.py +++ b/webapp.py @@ -1,11 +1,10 @@ -from fastapi import FastAPI, HTTPException +from fastapi import FastAPI, HTTPException, status from pydantic import BaseModel import uvicorn from datetime import datetime -import asyncio # Import your trading agents -from tradingagents.graph.trading_graph import TradingAgentsGraph +from service import enqueue_analysis from tradingagents.dataflows.config import get_config from dotenv import load_dotenv @@ -21,26 +20,17 @@ app = FastAPI( version="0.1.0" ) -# Pydantic models for request/response -class TradingRequest(BaseModel): +class TradingAnalyzeRequest(BaseModel): symbol: str date: str -class TradingResponse(BaseModel): +class TradingAnalyzeResponse(BaseModel): symbol: str date: str - decision: dict + job_id: str timestamp: str status: str -# Initialize trading agent once at startup -def create_trading_agent(): - """Create trading agent with fixed configuration""" - return TradingAgentsGraph(debug=True, config=config) - -# Create the trading agent instance once -trading_agent = create_trading_agent() - @app.get("/") async def root(): """Root endpoint""" @@ -60,38 +50,31 @@ async def health_check(): "service": "tradingagents-api" } -@app.post("/trading/analyze", response_model=TradingResponse) -async def analyze_trading_decision(request: TradingRequest): +@app.post("/v1/trading/analyze", response_model=TradingAnalyzeResponse, status_code=status.HTTP_202_ACCEPTED,) +async def analyze_trading_decision(request: TradingAnalyzeRequest): """ Analyze trading decision for a given symbol and date Example usage: POST /trading/analyze { - "symbol": "NVDA", + "symbol": "BTC/USDT", "date": "2024-05-10" } """ - try: - # Run the analysis (this might take a while, so we run it in a thread pool) - def run_analysis(): - _, decision = trading_agent.propagate(request.symbol, request.date) - return decision - - # Run in thread pool to avoid blocking - loop = asyncio.get_event_loop() - decision = await loop.run_in_executor(None, run_analysis) - - return TradingResponse( - symbol=request.symbol, - date=request.date, - decision=decision, - timestamp=datetime.now().isoformat(), - status="success" - ) - - except Exception as e: - raise HTTPException(status_code=500, detail=f"Trading analysis failed: {str(e)}") + response = enqueue_analysis(request.symbol, request.date) + print(f"INFO: Enqueue response: {response}") + + if response.status == "error": + raise HTTPException(status_code=500, detail=response.message) + + return TradingAnalyzeResponse( + symbol=request.symbol, + date=request.date, + job_id=response.job_id, + timestamp=datetime.now().isoformat(), + status=response.status + ) if __name__ == "__main__":