From bfe345c4ab3fc307c96b76f1f196225b49ef5042 Mon Sep 17 00:00:00 2001 From: mhmmdjafarg Date: Thu, 1 Jan 2026 13:26:39 +0700 Subject: [PATCH] fix --- service.py | 75 +++++++++++++++------------- tradingagents/dataflows/interface.py | 4 +- tradingagents/default_config.py | 2 +- 3 files changed, 42 insertions(+), 39 deletions(-) diff --git a/service.py b/service.py index 70501ab6..ac910cfd 100644 --- a/service.py +++ b/service.py @@ -2,50 +2,53 @@ from tradingagents.external.redis.repo import redis_queue, redis_repo from tradingagents.domain.model import AnalysisMeta, AnalysisStatus from tradingagents.domain.response import EnqueueAnalysisResponse from rq import get_current_job -# from tradingagents.external.redis.repo import RQ_RETRIES +from tradingagents.graph.trading_graph import TradingAgentsGraph from tradingagents.dataflows.config import get_config DEFAULT_USER = "global_user" -# Initialize trading agent once at startup -# def create_trading_agent(): -# """Create trading agent with fixed configuration""" -# return TradingAgentsGraph(debug=True, config=get_config()) +trading_agent = None -# # Create the trading agent instance once -# trading_agent = create_trading_agent() +def get_trading_agent(): + global trading_agent + if trading_agent is None: + print("INFO: Initializing TradingAgent (once per worker)") + trading_agent = TradingAgentsGraph( + debug=False, + config=get_config() + ) + return trading_agent def process_job(user_id: str, symbol: str, date: str): print(f"INFO: Starting job for symbol {symbol} and date {date} by user {user_id}") - print(f"DEBUG: Job function called - this should only happen with a worker!") - # try: - # job = get_current_job() + try: + job = get_current_job() + attempt = job.meta.get("attempt", 1) + job.meta["attempt"] = attempt + job.save_meta() + + print(f"INFO: Processing job-id {job.id} for symbol {symbol} and date {date} by user {user_id}") + + # Update status to RUNNING + redis_repo.update_status_analysis_meta(job_id=job.id, status=AnalysisStatus.RUNNING) + + final_state, decision = get_trading_agent().propagate(ticker=symbol, trade_date=date) + + print(f"INFO: Decision for job-id {job.id}: {decision}") + + # Save the final result + redis_repo.save_result(job_id=job.id, final_trade=final_state["final_trade_decision"]) + # Update status to DONE + redis_repo.update_status_analysis_meta(job_id=job.id, status=AnalysisStatus.DONE) - # attempt = job.meta.get("attempt", 1) - # job.meta["attempt"] = attempt - # job.save_meta() - - # print(f"INFO: Processing job-id {job.id} for symbol {symbol} and date {date} by user {user_id}") - - # # Update status to RUNNING - # redis_repo.update_status_analysis_meta(job_id=job.id, status=AnalysisStatus.RUNNING) - - # final_state, decision = trading_agent.propagate(ticker=symbol, trade_date=date) - - # print(f"INFO: Decision for job-id {job.id}: {decision}") - - # # Save the final result - # redis_repo.save_result(job_id=job.id, final_trade=final_state["final_trade_decision"]) - # # Update status to DONE - # redis_repo.update_status_analysis_meta(job_id=job.id, status=AnalysisStatus.DONE) - - # print(f"INFO: Completed job-id {job.id} for symbol {symbol}") - # except Exception as e: - # job.meta["attempt"] = attempt + 1 - # job.save_meta() - # print(f"ERROR: Failed to process job-id {job.id}: {e} (Attempt {attempt} of {RQ_RETRIES})") - # # Update status to FAILED - # redis_repo.update_status_analysis_meta(job_id=job.id, status=AnalysisStatus.FAILED) + print(f"INFO: Completed job-id {job.id} for symbol {symbol}") + except Exception as e: + job.meta["attempt"] = attempt + 1 + job.save_meta() + print(f"ERROR: Failed to process job-id {job.id}: {e} (Attempt {attempt})") + # Update status to FAILED + redis_repo.update_status_analysis_meta(job_id=job.id, status=AnalysisStatus.FAILED) + raise e def enqueue_analysis(symbol: str, date: str) -> EnqueueAnalysisResponse: @@ -69,7 +72,7 @@ def enqueue_analysis(symbol: str, date: str) -> EnqueueAnalysisResponse: ) # If not on cooldown, enqueue the task, insert cooldown key with TTL 6 hours, insert with status pending redis key for analysis analysis:job:{job_id} - task = redis_queue.enqueue(process_job, DEFAULT_USER, symbol, date) + task = redis_queue.enqueue(process_job, DEFAULT_USER, symbol, date, job_timeout=7200) redis_repo.save_cooldown(DEFAULT_USER, symbol, task.id) redis_repo.create_analysis_meta(AnalysisMeta.new(job_id=task.id, user_id=DEFAULT_USER, symbol=symbol, trade_date=date)) diff --git a/tradingagents/dataflows/interface.py b/tradingagents/dataflows/interface.py index 859c4812..998fc549 100644 --- a/tradingagents/dataflows/interface.py +++ b/tradingagents/dataflows/interface.py @@ -117,8 +117,8 @@ VENDOR_METHODS = { # "local": get_stock_stats_indicators_window }, "get_indicators_bulk": { - # "taapi": get_crypto_stats_indicators, - "bybit": get_bybit_crypto_indicators_bulk + "bybit": get_bybit_crypto_indicators_bulk, + "taapi": get_crypto_stats_indicators }, # fundamental_data "get_fundamentals": { diff --git a/tradingagents/default_config.py b/tradingagents/default_config.py index b592d7c6..3c098444 100644 --- a/tradingagents/default_config.py +++ b/tradingagents/default_config.py @@ -42,7 +42,7 @@ DEFAULT_CONFIG = { "data_vendors": { "core_crypto_apis": "bybit", # Options: binance, bybit "core_stock_apis": "yfinance", # Options: yfinance, alpha_vantage, local - "technical_indicators": "bybit", # Options: bybit + "technical_indicators": "bybit", # Options: bybit, taapi "fundamental_data": "alpha_vantage", # Options: openai, alpha_vantage, local "news_data": "openai", # Options: openai, alpha_vantage, google, local "profile_data": "bybit", # Options: bybit, local