From fbd96e9c186844643911975da9b2d34918fe8a74 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=EA=B9=80=ED=9D=AC=EC=88=98?= Date: Sun, 6 Jul 2025 20:11:29 +0900 Subject: [PATCH] [add] claude --- .../analysis/application/analysis_service.py | 227 +++++++++++++++++- backend/analysis/domain/analysis.py | 30 ++- .../domain/repository/analysis_repo.py | 9 +- .../infra/repository/analysis_repo.py | 59 ++++- .../controller/analysis_controller.py | 91 ++++++- backend/analysis/interface/dto.py | 46 +++- 6 files changed, 437 insertions(+), 25 deletions(-) diff --git a/backend/analysis/application/analysis_service.py b/backend/analysis/application/analysis_service.py index a08f7f3f..1192d632 100644 --- a/backend/analysis/application/analysis_service.py +++ b/backend/analysis/application/analysis_service.py @@ -2,27 +2,246 @@ from sqlmodel import Session from analysis.domain.repository.analysis_repo import IAnalysisRepository from ulid import ULID from analysis.domain.analysis import Analysis as AnalysisVO +from analysis.interface.dto import TradingAnalysisRequest, AnalysisProgressUpdate from fastapi import HTTPException, status, BackgroundTasks +import asyncio +from datetime import datetime + +from tradingagents.graph.trading_graph import TradingAgentsGraph +from tradingagents.default_config import DEFAULT_CONFIG + class AnalysisService: def __init__( self, analysis_repo: IAnalysisRepository, - db_session: Session, + session: Session, ulid: ULID ): self.analysis_repo = analysis_repo - self.db_session = db_session + self.session = session self.ulid = ulid def get_analysis_list( self, member_id: str - )->list[AnalysisVO]: + ) -> list[AnalysisVO]: analyses = self.analysis_repo.find_by_member_id(member_id) if not analyses: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Analysis not found") return analyses - \ No newline at end of file + def get_analysis_by_id( + self, + analysis_id: str, + member_id: str + ) -> AnalysisVO: + analysis = self.analysis_repo.find_by_id(analysis_id) + if not analysis: + raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Analysis not found") + + if analysis.member_id != member_id: + raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Access denied") + + return analysis + + def create_analysis( + self, + member_id: str, + request: TradingAnalysisRequest, + background_tasks: BackgroundTasks + ) -> AnalysisVO: + # 분석 요청 생성 + analysis_id = self.ulid.generate() + now = datetime.now() + + analysis_vo = AnalysisVO( + id=analysis_id, + member_id=member_id, + ticker=request.ticker, + analysis_date=request.analysis_date, + analysts_selected=[analyst.value for analyst in request.analysts], + research_depth=request.research_depth, + llm_provider=request.llm_provider, + backend_url=request.backend_url, + shallow_thinker=request.shallow_thinker, + deep_thinker=request.deep_thinker, + status="pending", + created_at=now, + updated_at=now + ) + + saved_analysis = self.analysis_repo.save(analysis_vo) + self.session.commit() + + # 백그라운드에서 분석 실행 + background_tasks.add_task(self._run_analysis, saved_analysis.id) + + return saved_analysis + + async def _run_analysis(self, analysis_id: str): + """백그라운드에서 실제 분석을 실행하는 메서드""" + try: + # 분석 상태를 RUNNING으로 변경 + analysis = self.analysis_repo.find_by_id(analysis_id) + if analysis: + analysis.status = "running" + analysis.updated_at = datetime.now() + self.analysis_repo.update(analysis) + self.session.commit() + + # 분석 정보 조회 + analysis = self.analysis_repo.find_by_id(analysis_id) + if not analysis: + return + + # TradingAgentsGraph 설정 및 실행 + config = self._create_config(analysis) + + # 분석 실행 (실제 구현) + await self._execute_trading_analysis(analysis_id, analysis, config) + + # 분석 완료 상태로 변경 + analysis = self.analysis_repo.find_by_id(analysis_id) + if analysis: + analysis.status = "completed" + analysis.completed_at = datetime.now() + analysis.updated_at = datetime.now() + self.analysis_repo.update(analysis) + self.session.commit() + + except Exception as e: + # 에러 발생 시 실패 상태로 변경 + analysis = self.analysis_repo.find_by_id(analysis_id) + if analysis: + analysis.status = "failed" + analysis.error_message = str(e) + analysis.completed_at = datetime.now() + analysis.updated_at = datetime.now() + self.analysis_repo.update(analysis) + self.session.commit() + + def _create_config(self, analysis: AnalysisVO) -> dict: + """분석 설정을 생성하는 메서드""" + config = DEFAULT_CONFIG.copy() if DEFAULT_CONFIG else {} + config.update({ + "max_debate_rounds": analysis.research_depth, + "max_risk_discuss_rounds": analysis.research_depth, + "quick_think_llm": analysis.shallow_thinker, + "deep_think_llm": analysis.deep_thinker, + "backend_url": analysis.backend_url, + "llm_provider": analysis.llm_provider.lower(), + }) + return config + + async def _execute_trading_analysis(self, analysis_id: str, analysis: AnalysisVO, config: dict): + """실제 TradingAgentsGraph를 실행하는 메서드""" + try: + # TradingAgentsGraph 초기화 + graph = TradingAgentsGraph( + analysis.analysts_selected, + config=config, + debug=True + ) + + # 초기 상태 생성 + init_agent_state = graph.propagator.create_initial_state( + analysis.ticker, + analysis.analysis_date + ) + args = graph.propagator.get_graph_args() + + # 분석 실행 및 결과 처리 + trace = [] + async for chunk in graph.graph.astream(init_agent_state, **args): + trace.append(chunk) + + # 실시간으로 분석 결과 업데이트 + await self._process_analysis_chunk(analysis_id, chunk) + + # 최종 결과 처리 + if trace: + final_state = trace[-1] + final_decision = graph.process_signal(final_state.get("final_trade_decision", "")) + + # 최종 보고서 생성 + final_report = self._generate_final_report(final_state) + + # 최종 결과 저장 + self.analysis_repo.update(analysis_id, { + "final_trade_decision": final_decision, + "final_report": final_report + }) + self.session.commit() + + except Exception as e: + raise Exception(f"Analysis execution failed: {str(e)}") + + async def _process_analysis_chunk(self, analysis_id: str, chunk: dict): + """분석 중간 결과를 처리하고 저장하는 메서드""" + updates = {} + + # 개별 분석가 보고서 업데이트 + if "market_report" in chunk and chunk["market_report"]: + updates["market_report"] = chunk["market_report"] + + if "sentiment_report" in chunk and chunk["sentiment_report"]: + updates["sentiment_report"] = chunk["sentiment_report"] + + if "news_report" in chunk and chunk["news_report"]: + updates["news_report"] = chunk["news_report"] + + if "fundamentals_report" in chunk and chunk["fundamentals_report"]: + updates["fundamentals_report"] = chunk["fundamentals_report"] + + # 팀별 의사결정 과정 업데이트 + if "investment_debate_state" in chunk and chunk["investment_debate_state"]: + updates["investment_debate_state"] = chunk["investment_debate_state"] + + if "trader_investment_plan" in chunk and chunk["trader_investment_plan"]: + updates["trader_investment_plan"] = chunk["trader_investment_plan"] + + if "risk_debate_state" in chunk and chunk["risk_debate_state"]: + updates["risk_debate_state"] = chunk["risk_debate_state"] + + # 업데이트가 있는 경우 저장 + if updates: + self.analysis_repo.update(analysis_id, updates) + self.session.commit() + + def _generate_final_report(self, final_state: dict) -> str: + """최종 통합 보고서를 생성하는 메서드""" + report_parts = [] + + # Analyst Team Reports + if any(final_state.get(section) for section in ["market_report", "sentiment_report", "news_report", "fundamentals_report"]): + report_parts.append("## Analyst Team Reports") + + if final_state.get("market_report"): + report_parts.append(f"### Market Analysis\n{final_state['market_report']}") + if final_state.get("sentiment_report"): + report_parts.append(f"### Social Sentiment\n{final_state['sentiment_report']}") + if final_state.get("news_report"): + report_parts.append(f"### News Analysis\n{final_state['news_report']}") + if final_state.get("fundamentals_report"): + report_parts.append(f"### Fundamentals Analysis\n{final_state['fundamentals_report']}") + + # Research Team Reports + if final_state.get("investment_debate_state"): + report_parts.append("## Research Team Decision") + debate_state = final_state["investment_debate_state"] + if debate_state.get("judge_decision"): + report_parts.append(f"{debate_state['judge_decision']}") + + # Trading Team Reports + if final_state.get("trader_investment_plan"): + report_parts.append("## Trading Team Plan") + report_parts.append(f"{final_state['trader_investment_plan']}") + + # Portfolio Management Decision + if final_state.get("risk_debate_state") and final_state["risk_debate_state"].get("judge_decision"): + report_parts.append("## Portfolio Management Decision") + report_parts.append(f"{final_state['risk_debate_state']['judge_decision']}") + + return "\n\n".join(report_parts) if report_parts else "No analysis results available." \ No newline at end of file diff --git a/backend/analysis/domain/analysis.py b/backend/analysis/domain/analysis.py index 59259df5..0d8e19d4 100644 --- a/backend/analysis/domain/analysis.py +++ b/backend/analysis/domain/analysis.py @@ -1,11 +1,37 @@ from pydantic import BaseModel from datetime import datetime +from typing import List, Dict class Analysis(BaseModel): id: str | None = None member_id: str ticker: str + analysis_date: str + analysts_selected: List[str] = [] + research_depth: int = 3 + llm_provider: str = "openai" + backend_url: str = "https://api.openai.com/v1" + shallow_thinker: str = "gpt-4o-mini" + deep_thinker: str = "gpt-4o" status: str + + # 개별 분석가 리포트들 + market_report: str | None = None + sentiment_report: str | None = None + news_report: str | None = None + fundamentals_report: str | None = None + + # 팀별 의사결정 과정 + investment_debate_state: Dict | None = None + trader_investment_plan: str | None = None + risk_debate_state: Dict | None = None + + # 최종 결과물 + final_trade_decision: str | None = None + final_report: str | None = None + + # 실행 결과 정보 + error_message: str | None = None + completed_at: datetime | None = None created_at: datetime - updated_at: datetime - # 여기에 더 많은 필드들이 추가될 수 있습니다. \ No newline at end of file + updated_at: datetime \ No newline at end of file diff --git a/backend/analysis/domain/repository/analysis_repo.py b/backend/analysis/domain/repository/analysis_repo.py index cf6904b7..f5e5f7f8 100644 --- a/backend/analysis/domain/repository/analysis_repo.py +++ b/backend/analysis/domain/repository/analysis_repo.py @@ -1,5 +1,6 @@ from abc import ABC, abstractmethod from analysis.domain.analysis import Analysis as AnalysisVO +from analysis.interface.dto import TradingAnalysisRequest class IAnalysisRepository(ABC): @abstractmethod @@ -7,9 +8,13 @@ class IAnalysisRepository(ABC): raise NotImplementedError() @abstractmethod - def update(self, analysis_id: str, updates: dict) -> AnalysisVO | None: + def find_by_id(self, analysis_id: str) -> AnalysisVO | None: raise NotImplementedError() @abstractmethod - def create(self, member_id: str) -> AnalysisVO: + def update(self, analysis: AnalysisVO) -> AnalysisVO | None: + raise NotImplementedError() + + @abstractmethod + def save(self, analysis: AnalysisVO) -> AnalysisVO: raise NotImplementedError() \ No newline at end of file diff --git a/backend/analysis/infra/repository/analysis_repo.py b/backend/analysis/infra/repository/analysis_repo.py index f4653e27..dfe77c18 100644 --- a/backend/analysis/infra/repository/analysis_repo.py +++ b/backend/analysis/infra/repository/analysis_repo.py @@ -1,11 +1,11 @@ from analysis.domain.repository.analysis_repo import IAnalysisRepository from sqlmodel import Session, select from analysis.domain.analysis import Analysis as AnalysisVO -from analysis.infra.db_models.analysis import Analysis +from analysis.infra.db_models.analysis import Analysis, AnalysisStatus +from analysis.interface.dto import TradingAnalysisRequest from utils.db_utils import row_to_dict from sqlalchemy.orm import selectinload -from datetime import datetime -import uuid +from datetime import datetime, date class AnalysisRepository(IAnalysisRepository): def __init__(self, session: Session): @@ -20,14 +20,59 @@ class AnalysisRepository(IAnalysisRepository): return [AnalysisVO(**row_to_dict(analysis)) for analysis in analyses] + def find_by_id(self, analysis_id: str) -> AnalysisVO | None: + analysis = self.session.get(Analysis, analysis_id) + if not analysis: + return None + return AnalysisVO(**row_to_dict(analysis)) - def update(self, analysis_id: str, updates: AnalysisVO) -> AnalysisVO | None: - analysis : Analysis | None = self.session.get(Analysis, analysis_id) + def save(self, analysis: AnalysisVO) -> AnalysisVO: + new_analysis = Analysis( + id=analysis.id, + member_id=analysis.member_id, + ticker=analysis.ticker, + analysis_date=date.fromisoformat(analysis.analysis_date), + analysts_selected=analysis.analysts_selected, + research_depth=analysis.research_depth, + llm_provider=analysis.llm_provider, + backend_url=analysis.backend_url, + shallow_thinker=analysis.shallow_thinker, + deep_thinker=analysis.deep_thinker, + status=analysis.status, + market_report=analysis.market_report, + sentiment_report=analysis.sentiment_report, + news_report=analysis.news_report, + fundamentals_report=analysis.fundamentals_report, + investment_debate_state=analysis.investment_debate_state, + trader_investment_plan=analysis.trader_investment_plan, + risk_debate_state=analysis.risk_debate_state, + final_trade_decision=analysis.final_trade_decision, + final_report=analysis.final_report, + error_message=analysis.error_message, + completed_at=analysis.completed_at, + created_at=analysis.created_at, + updated_at=analysis.updated_at + ) + + self.session.add(new_analysis) + self.session.flush() + self.session.refresh(new_analysis) + + analysis.id = new_analysis.id + return analysis + + def update(self, analysis_vo: AnalysisVO) -> AnalysisVO | None: + analysis = self.session.get(Analysis, analysis_vo.id) if not analysis: return None - analysis_data = updates.model_dump(exclude_unset=True) - analysis.sqlmodel_dump(analysis_data) + # AnalysisVO의 데이터를 SQLModel 객체에 업데이트 + vo_data = analysis_vo.sqlmodel_dump(exclude_unset=True) + for key, value in vo_data.items(): + if hasattr(analysis, key) and key != 'id': # id는 변경하지 않음 + setattr(analysis, key, value) + + analysis.updated_at = datetime.now() self.session.add(analysis) self.session.flush() self.session.refresh(analysis) diff --git a/backend/analysis/interface/controller/analysis_controller.py b/backend/analysis/interface/controller/analysis_controller.py index 12497d97..72b27dc5 100644 --- a/backend/analysis/interface/controller/analysis_controller.py +++ b/backend/analysis/interface/controller/analysis_controller.py @@ -1,6 +1,10 @@ from typing import Annotated -from fastapi import APIRouter, Depends, BackgroundTasks -from analysis.interface.dto import AnalysisSessionResponse +from fastapi import APIRouter, Depends, BackgroundTasks, HTTPException, status +from analysis.interface.dto import ( + AnalysisSessionResponse, + TradingAnalysisRequest, + AnalysisResultResponse +) from utils.auth import get_current_member, CurrentMember from dependency_injector.wiring import inject, Provide from analysis.application.analysis_service import AnalysisService @@ -8,28 +12,97 @@ from utils.containers import Container router = APIRouter(prefix="/analysis", tags=["analysis"]) -@router.get("/") +@router.get("/", response_model=list[AnalysisSessionResponse]) @inject def get_analysis_list_for_member( - current_member : Annotated[CurrentMember, Depends(get_current_member)], + current_member: Annotated[CurrentMember, Depends(get_current_member)], analysis_service: Annotated[AnalysisService, Depends(Provide[Container.analysis_service])] ): """ 현재 로그인한 사용자의 모든 분석 세션 목록을 조회합니다. """ - return analysis_service.get_analysis_list(current_member.id) + analyses = analysis_service.get_analysis_list(current_member.id) + return [ + AnalysisSessionResponse( + id=analysis.id, + ticker=analysis.ticker, + status=analysis.status + ) for analysis in analyses + ] -@router.post("/start", status_code=201) +@router.post("/start", status_code=201, response_model=AnalysisSessionResponse) @inject def start_analysis_session( - current_member : Annotated[CurrentMember, Depends(get_current_member)], + request: TradingAnalysisRequest, + current_member: Annotated[CurrentMember, Depends(get_current_member)], analysis_service: Annotated[AnalysisService, Depends(Provide[Container.analysis_service])], background_tasks: BackgroundTasks ): """ 새로운 분석 세션을 시작합니다. """ - new_analysis = analysis_service.create_analysis(current_member.id, background_tasks) - return new_analysis + try: + new_analysis = analysis_service.create_analysis(current_member.id, request, background_tasks) + return AnalysisSessionResponse( + id=new_analysis.id, + ticker=new_analysis.ticker, + status=new_analysis.status + ) + except Exception as e: + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=f"Failed to start analysis: {str(e)}" + ) +@router.get("/{analysis_id}", response_model=AnalysisResultResponse) +@inject +def get_analysis_result( + analysis_id: str, + current_member: Annotated[CurrentMember, Depends(get_current_member)], + analysis_service: Annotated[AnalysisService, Depends(Provide[Container.analysis_service])] +): + """ + 특정 분석 세션의 결과를 조회합니다. + """ + analysis = analysis_service.get_analysis_by_id(analysis_id, current_member.id) + + return AnalysisResultResponse( + id=analysis.id, + ticker=analysis.ticker, + analysis_date=analysis.analysis_date, + status=analysis.status, + market_report=analysis.market_report, + sentiment_report=analysis.sentiment_report, + news_report=analysis.news_report, + fundamentals_report=analysis.fundamentals_report, + investment_debate_state=analysis.investment_debate_state, + trader_investment_plan=analysis.trader_investment_plan, + risk_debate_state=analysis.risk_debate_state, + final_trade_decision=analysis.final_trade_decision, + final_report=analysis.final_report, + created_at=analysis.created_at.isoformat(), + completed_at=analysis.completed_at.isoformat() if analysis.completed_at else None, + error_message=analysis.error_message + ) +@router.get("/{analysis_id}/status") +@inject +def get_analysis_status( + analysis_id: str, + current_member: Annotated[CurrentMember, Depends(get_current_member)], + analysis_service: Annotated[AnalysisService, Depends(Provide[Container.analysis_service])] +): + """ + 분석 진행 상황을 조회합니다. + """ + analysis = analysis_service.get_analysis_by_id(analysis_id, current_member.id) + + return { + "analysis_id": analysis.id, + "status": analysis.status, + "ticker": analysis.ticker, + "analysis_date": analysis.analysis_date, + "created_at": analysis.created_at.isoformat(), + "updated_at": analysis.updated_at.isoformat(), + "error_message": analysis.error_message + } diff --git a/backend/analysis/interface/dto.py b/backend/analysis/interface/dto.py index c32328ff..fc7edd2d 100644 --- a/backend/analysis/interface/dto.py +++ b/backend/analysis/interface/dto.py @@ -1,8 +1,52 @@ from pydantic import BaseModel from datetime import date +from typing import List from analysis.infra.db_models.analysis import AnalysisStatus +from enum import Enum + +class AnalystType(str, Enum): + MARKET = "market" + SOCIAL = "social" + NEWS = "news" + FUNDAMENTALS = "fundamentals" + +class TradingAnalysisRequest(BaseModel): + ticker: str + analysis_date: str + analysts: List[AnalystType] + research_depth: int = 3 + llm_provider: str = "openai" + backend_url: str = "https://api.openai.com/v1" + shallow_thinker: str = "gpt-4o-mini" + deep_thinker: str = "gpt-4o" class AnalysisSessionResponse(BaseModel): id : str ticker : str - status : AnalysisStatus \ No newline at end of file + status : AnalysisStatus + +class AnalysisProgressUpdate(BaseModel): + analysis_id: str + current_agent: str + status: str + progress_percentage: float + current_report_section: str | None = None + message: str | None = None + +class AnalysisResultResponse(BaseModel): + id: str + ticker: str + analysis_date: str + status: AnalysisStatus + market_report: str | None = None + sentiment_report: str | None = None + news_report: str | None = None + fundamentals_report: str | None = None + investment_debate_state: dict | None = None + trader_investment_plan: str | None = None + risk_debate_state: dict | None = None + final_trade_decision: str | None = None + final_report: str | None = None + created_at: str + completed_at: str | None = None + error_message: str | None = None \ No newline at end of file