This commit is contained in:
MarkLo 2025-12-17 05:41:30 +08:00
parent 79ab46dae8
commit 4bbeaa8e18
6 changed files with 110 additions and 8 deletions

View File

@ -157,6 +157,50 @@ async def get_task_status(task_id: str):
return TaskStatusResponse(**task)
@router.delete("/task/{task_id}/cleanup")
async def cleanup_task(task_id: str):
"""
Manually cleanup a completed/failed task from Redis storage.
This endpoint allows the frontend to proactively delete task data
after the user has saved the results locally or to cloud storage.
This helps keep Redis storage clean and reduces memory usage.
Note: Tasks are also automatically cleaned up 10 minutes after
completion/failure, so calling this endpoint is optional but recommended.
Args:
task_id: Task identifier
Returns:
Success message
Raises:
HTTPException: If task not found
"""
task = task_manager.get_task_status(task_id)
if not task:
raise HTTPException(status_code=404, detail=f"Task {task_id} not found")
# Only allow cleanup of completed or failed tasks
if task.get("status") not in ["completed", "failed"]:
raise HTTPException(
status_code=400,
detail=f"Can only cleanup completed or failed tasks. Current status: {task.get('status')}"
)
# Delete from both Redis and in-memory storage
task_manager.delete_task(task_id)
logger.info(f"🧹 Task {task_id} manually cleaned up from storage")
return {
"success": True,
"message": f"Task {task_id} has been cleaned up from storage",
"task_id": task_id
}
@router.get("/tickers")
async def get_tickers():
"""Get list of popular tickers (example endpoint)"""

View File

@ -39,7 +39,8 @@ class HybridTaskManager:
self._tasks: Dict[str, Dict[str, Any]] = {}
self._lock = threading.RLock()
self._cleanup_interval = 3600 # 1 hour
self._task_expiry = 86400 # 24 hours
self._task_expiry = 86400 # 24 hours for pending/running tasks
self._completed_task_expiry = 600 # 10 minutes for completed/failed tasks (auto cleanup)
# Check Redis availability on startup
if is_redis_available():
@ -79,15 +80,23 @@ class HybridTaskManager:
for key in expired_keys:
del self._tasks[key]
def _save_to_storage(self, task_id: str, task_data: dict):
"""Save task to both Redis (if available) and in-memory"""
def _save_to_storage(self, task_id: str, task_data: dict, use_short_expiry: bool = False):
"""
Save task to both Redis (if available) and in-memory.
Args:
task_id: Task ID
task_data: Task data dictionary
use_short_expiry: If True, use shorter TTL for completed/failed tasks
"""
# Always save to in-memory (fast access)
with self._lock:
self._tasks[task_id] = task_data
# Also save to Redis if available (persistence)
if is_redis_available():
save_task_to_redis(task_id, task_data, self._task_expiry)
expiry = self._completed_task_expiry if use_short_expiry else self._task_expiry
save_task_to_redis(task_id, task_data, expiry)
def _get_from_storage(self, task_id: str) -> Optional[dict]:
"""Get task from in-memory first, then Redis"""
@ -165,7 +174,10 @@ class HybridTaskManager:
def set_task_result(self, task_id: str, result: Any):
"""
Set task result and mark as completed
Set task result and mark as completed.
Note: Completed tasks will be automatically cleaned up from Redis
after a short TTL (10 minutes by default) to free up space.
Args:
task_id: Task ID
@ -177,11 +189,16 @@ class HybridTaskManager:
task_data["result"] = result
task_data["progress"] = "Analysis completed"
task_data["completed_at"] = datetime.now().isoformat()
self._save_to_storage(task_id, task_data)
# Save with shorter TTL for auto cleanup
self._save_to_storage(task_id, task_data, use_short_expiry=True)
logger.info(f"✅ Task {task_id} completed, will be auto-cleaned from Redis in {self._completed_task_expiry} seconds")
def set_task_error(self, task_id: str, error: str):
"""
Set task error and mark as failed
Set task error and mark as failed.
Note: Failed tasks will be automatically cleaned up from Redis
after a short TTL (10 minutes by default) to free up space.
Args:
task_id: Task ID
@ -193,7 +210,9 @@ class HybridTaskManager:
task_data["error"] = error
task_data["progress"] = "Analysis failed"
task_data["failed_at"] = datetime.now().isoformat()
self._save_to_storage(task_id, task_data)
# Save with shorter TTL for auto cleanup
self._save_to_storage(task_id, task_data, use_short_expiry=True)
logger.info(f"❌ Task {task_id} failed, will be auto-cleaned from Redis in {self._completed_task_expiry} seconds")
def get_task(self, task_id: str) -> Optional[Dict[str, Any]]:
"""

View File

@ -61,6 +61,8 @@ export default function AnalysisPage() {
console.log("☁️ Auto-saved report to cloud");
}
}
// Note: Redis cleanup is handled immediately when analysis completes
// in useAnalysis hook, so no need to cleanup here
} catch (error) {
console.error("Auto-save failed:", error);
}

View File

@ -164,6 +164,8 @@ export default function AnalysisResultsPage() {
setSavedToCloud(true);
}
}
// Note: Redis cleanup is handled immediately when analysis completes
// in useAnalysis hook, so no need to cleanup here
setSaveSuccess(true);
// Reset success message after 3 seconds

View File

@ -42,6 +42,16 @@ export function useAnalysis() {
const { clearPendingTask } = await import('@/lib/pending-task');
clearPendingTask();
// 🧹 Immediately cleanup Redis cache after receiving result
// The result is already stored in React state, so Redis data is no longer needed
try {
await api.cleanupTask(id);
console.log("🧹 Redis cache cleaned up immediately after analysis completed");
} catch (cleanupErr) {
// Silently fail - cleanup is optional, task will auto-expire anyway
console.warn("Redis cleanup failed (will auto-expire):", cleanupErr);
}
// Stop polling
if (pollingIntervalRef.current) {
clearInterval(pollingIntervalRef.current);
@ -70,6 +80,14 @@ export function useAnalysis() {
const { clearPendingTask } = await import('@/lib/pending-task');
clearPendingTask();
// 🧹 Cleanup Redis cache for failed task
try {
await api.cleanupTask(id);
console.log("🧹 Redis cache cleaned up after analysis failed");
} catch (cleanupErr) {
console.warn("Redis cleanup failed (will auto-expire):", cleanupErr);
}
// Stop polling
if (pollingIntervalRef.current) {
clearInterval(pollingIntervalRef.current);

View File

@ -63,4 +63,21 @@ export const api = {
const response = await apiClient.get<{ tickers: Ticker[] }>("/api/tickers");
return response.data;
},
/**
* Cleanup task from Redis storage after saving results
* This helps keep Redis memory usage low
*/
async cleanupTask(taskId: string): Promise<{ success: boolean; message: string }> {
try {
const response = await apiClient.delete<{ success: boolean; message: string; task_id: string }>(
`/api/task/${taskId}/cleanup`
);
return response.data;
} catch (error) {
// Silently fail - cleanup is optional, task will auto-expire anyway
console.warn("Task cleanup failed (will auto-expire in 10 minutes):", error);
return { success: false, message: "Cleanup failed silently" };
}
},
};