from tradingagents.external.redis.repo import redis_queue, redis_repo from tradingagents.domain.model import AnalysisMeta, AnalysisStatus from tradingagents.domain.response import EnqueueAnalysisResponse from rq import get_current_job # from tradingagents.external.redis.repo import RQ_RETRIES from tradingagents.dataflows.config import get_config DEFAULT_USER = "global_user" # Initialize trading agent once at startup # def create_trading_agent(): # """Create trading agent with fixed configuration""" # return TradingAgentsGraph(debug=True, config=get_config()) # # Create the trading agent instance once # trading_agent = create_trading_agent() def process_job(user_id: str, symbol: str, date: str): print(f"INFO: Starting job for symbol {symbol} and date {date} by user {user_id}") print(f"DEBUG: Job function called - this should only happen with a worker!") # try: # job = get_current_job() # attempt = job.meta.get("attempt", 1) # job.meta["attempt"] = attempt # job.save_meta() # print(f"INFO: Processing job-id {job.id} for symbol {symbol} and date {date} by user {user_id}") # # Update status to RUNNING # redis_repo.update_status_analysis_meta(job_id=job.id, status=AnalysisStatus.RUNNING) # final_state, decision = trading_agent.propagate(ticker=symbol, trade_date=date) # print(f"INFO: Decision for job-id {job.id}: {decision}") # # Save the final result # redis_repo.save_result(job_id=job.id, final_trade=final_state["final_trade_decision"]) # # Update status to DONE # redis_repo.update_status_analysis_meta(job_id=job.id, status=AnalysisStatus.DONE) # print(f"INFO: Completed job-id {job.id} for symbol {symbol}") # except Exception as e: # job.meta["attempt"] = attempt + 1 # job.save_meta() # print(f"ERROR: Failed to process job-id {job.id}: {e} (Attempt {attempt} of {RQ_RETRIES})") # # Update status to FAILED # redis_repo.update_status_analysis_meta(job_id=job.id, status=AnalysisStatus.FAILED) def enqueue_analysis(symbol: str, date: str) -> EnqueueAnalysisResponse: """ Enqueue a background task to analyze trading data for a given symbol and date. Args: symbol (str): The trading symbol to analyze (e.g., "BTC/USDT"). date (str): The date for which to perform the analysis in YYYY-MM-DD format. Returns: EnqueueAnalysisResponse: The response containing job_id, status, and message. """ try: # Check if the analysis is on cooldown, if cooldown return the job-id job_id, ttl = redis_repo.get_cooldown(DEFAULT_USER, symbol) if job_id: return EnqueueAnalysisResponse( job_id=job_id, status="on_cooldown", message=f"Analysis for {symbol} is on cooldown. Please try again later. TTL: {ttl} seconds remaining.", ) # If not on cooldown, enqueue the task, insert cooldown key with TTL 6 hours, insert with status pending redis key for analysis analysis:job:{job_id} task = redis_queue.enqueue(process_job, DEFAULT_USER, symbol, date) redis_repo.save_cooldown(DEFAULT_USER, symbol, task.id) redis_repo.create_analysis_meta(AnalysisMeta.new(job_id=task.id, user_id=DEFAULT_USER, symbol=symbol, trade_date=date)) return EnqueueAnalysisResponse( job_id=task.id, status="enqueued", message=f"Analysis for {symbol} has been enqueued successfully." ) except Exception as e: print(f"ERROR: Failed to enqueue analysis task: {e}") return EnqueueAnalysisResponse( job_id=None, status="error", message=f"Failed to enqueue analysis task: {str(e)}" )