diff --git a/tradingagents/agents/utils/memory.py b/tradingagents/agents/utils/memory.py
index 27bdc36e..2791f8d3 100644
--- a/tradingagents/agents/utils/memory.py
+++ b/tradingagents/agents/utils/memory.py
@@ -49,7 +49,12 @@ class FinancialSituationMemory:
embeddings=embeddings,
ids=ids,
)
-
+
+ def get_latest_situation(self):
+ """Retrieve the latest entry from a situation_collection."""
+ latest_entry = self.situation_collection.query(sort_by="id:descending", limit=1)
+ return latest_entry
+
def get_memories(self, current_situation, n_matches=1):
"""Find matching recommendations using OpenAI embeddings"""
query_embedding = self.get_embedding(current_situation)
diff --git a/web_app/backend/main.py b/web_app/backend/main.py
index c16d39e9..17e14257 100644
--- a/web_app/backend/main.py
+++ b/web_app/backend/main.py
@@ -4,6 +4,7 @@ from pydantic import BaseModel
from typing import Dict, Any, Optional
import json
import os
+import time
from datetime import datetime
import glob
import uuid
@@ -59,10 +60,12 @@ class AnalysisResponse(BaseModel):
class JobStatus(BaseModel):
job_id: str
+ start_time: Optional[float] = None
status: str
progress: Optional[str] = None
result: Optional[Dict[str, Any]] = None
error: Optional[str] = None
+ trading_agent: Any = None # Store the trading agent instance here
# In-memory job storage (in production, use Redis or database)
jobs: Dict[str, JobStatus] = {}
@@ -77,6 +80,15 @@ async def health_check():
async def run_analysis_task(job_id: str, symbol: str, analysis_date: str, config_overrides: Dict[str, Any] = None):
"""Background task to run the trading analysis without blocking the event loop"""
+
+ def execution_time() -> Optional[str]:
+ if jobs[job_id].status == "completed" or jobs[job_id].status == "failed":
+ return f"{time.time() - jobs[job_id].start_time:.2f} seconds"
+ else:
+ return None
+
+ jobs[job_id].start_time = time.time() # Save the start time in the job
+
try:
jobs[job_id].status = "running"
jobs[job_id].progress = "Initializing TradingAgents..."
@@ -87,16 +99,13 @@ async def run_analysis_task(job_id: str, symbol: str, analysis_date: str, config
config.update(config_overrides)
jobs[job_id].progress = "Setting up trading graph..."
-
- # Define blocking work as sync function
- def _do_work():
- ta = TradingAgentsGraph(debug=True, config=config)
- jobs[job_id].progress = f"Analyzing {symbol} for {analysis_date}..."
- _, decision = ta.propagate(symbol, analysis_date)
- return decision
-
- # Run blocking work in threadpool so the event loop stays responsive
- decision = await run_in_threadpool(_do_work)
+ # Create and store the trading agent instance
+ ta = TradingAgentsGraph(debug=True, config=config)
+ jobs[job_id].trading_agent = ta # Store the instance
+ jobs[job_id].progress = f"Analyzing {symbol} for {analysis_date}..."
+
+ # Run the propagate method in a threadpool
+ _, decision = await run_in_threadpool(ta.propagate, symbol, analysis_date)
jobs[job_id].status = "completed"
jobs[job_id].result = {
@@ -104,6 +113,20 @@ async def run_analysis_task(job_id: str, symbol: str, analysis_date: str, config
"date": analysis_date,
"decision": decision,
"completed_at": datetime.now().isoformat(),
+ "execution_time": execution_time()
+ }
+ jobs[job_id].progress = "Analysis completed successfully"
+
+ except Exception as e:
+ jobs[job_id].status = "failed"
+ jobs[job_id].error = str(e)
+ jobs[job_id].progress = f"Error: {str(e)}"
+ jobs[job_id].result = {
+ "symbol": symbol,
+ "date": analysis_date,
+ "decision": decision,
+ "completed_at": datetime.now().isoformat(),
+ "execution_time": execution_time()
}
jobs[job_id].progress = "Analysis completed successfully"
@@ -342,15 +365,81 @@ async def get_jobs():
"""Get all jobs"""
job_lst = []
for job_id, job in jobs.items():
- job_lst.append({
+ job_dict = {
"job_id": job_id,
"status": job.status,
"progress": job.progress,
"result": job.result,
- "error": job.error
- })
+ "error": job.error,
+ }
+
+ # Add execution_time if available
+ if hasattr(job, 'start_time') and job.start_time is not None:
+ if job.status in ["completed", "failed"]:
+ job_dict["execution_time"] = f"{time.time() - job.start_time:.2f} seconds"
+
+ job_lst.append(job_dict)
+
return {"jobs": job_lst}
+@app.post("/reflect-on-analysis/{symbol}/{date}")
+async def reflect_on_analysis(symbol: str, date: str, request: dict):
+ """Get latest financial situation memory for a specific analysis"""
+ returns_losses = request.get("returns_losses")
+ if returns_losses is None:
+ raise HTTPException(status_code=400, detail="returns_losses is required in request body")
+
+ # Find the job that matches the symbol and date
+ matching_job = None
+ for job_id, job in jobs.items():
+ if (job.result.get("symbol") == symbol.upper() and
+ job.result.get("date") == date and
+ hasattr(job, 'trading_agent') and
+ job.trading_agent):
+ matching_job = job
+ break
+
+ if not matching_job:
+ raise HTTPException(status_code=404, detail=f"No active job found for {symbol} on {date}")
+
+ if not hasattr(matching_job.trading_agent, 'memory') or not matching_job.trading_agent.memory:
+ raise HTTPException(status_code=404, detail="No memory found for this analysis")
+
+ matching_job.trading_agent.reflect_and_remember(returns_losses)
+
+ try:
+ bull_memory = matching_job.trading_agent.bull_memory
+ bear_memory = matching_job.trading_agent.bear_memory
+ trader_memory = matching_job.trading_agent.trader_memory
+ invest_judge_memory = matching_job.trading_agent.invest_judge_memory
+ risk_manager_memory = matching_job.trading_agent.risk_manager_memory
+
+ reflections = {}
+
+ latest_entry = bull_memory.get_latest_situation()
+ reflections["bull_memory"] = latest_entry
+
+ latest_entry = bear_memory.get_latest_situation()
+ reflections["bear_memory"] = latest_entry
+
+ latest_entry = trader_memory.get_latest_situation()
+ reflections["trader_memory"] = latest_entry
+
+ latest_entry = invest_judge_memory.get_latest_situation()
+ reflections["invest_judge_memory"] = latest_entry
+
+ latest_entry = risk_manager_memory.get_latest_situation()
+ reflections["risk_manager_memory"] = latest_entry
+
+ return {
+ "symbol": symbol.upper(),
+ "date": date,
+ "job_id": matching_job.job_id,
+ "reflections": reflections
+ }
+ except Exception as e:
+ raise HTTPException(status_code=500, detail=f"Error retrieving latest situation: {str(e)}")
+
@app.get("/config")
async def get_default_config():
"""Get the default configuration"""
@@ -358,4 +447,4 @@ async def get_default_config():
if __name__ == "__main__":
import uvicorn
- uvicorn.run(app, host="0.0.0.0", port=8000)
+ uvicorn.run("main:app", host="0.0.0.0", port=8000, reload=True)
diff --git a/web_app/frontend/src/App.js b/web_app/frontend/src/App.js
index 26277b18..0f25f0da 100644
--- a/web_app/frontend/src/App.js
+++ b/web_app/frontend/src/App.js
@@ -687,7 +687,13 @@ function App() {
{selectedTransformedData ? (
{typeof reflection === 'string' ? reflection : JSON.stringify(reflection)}
++ Please enter your loss results to reflect on the analysis. +
+ setLossResults(e.target.value)} + className="w-full px-3 py-2 border border-gray-300 rounded-md focus:outline-none focus:ring-2 focus:ring-blue-500" + /> + {reflectionError && ( +{reflectionError}
+ )} +