[add] backend

This commit is contained in:
kimheesu 2025-07-02 17:11:24 +09:00
parent bea0c528e5
commit 7d7da12f58
57 changed files with 825 additions and 1660 deletions

View File

View File

@ -0,0 +1,67 @@
from typing import Generator, Optional
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from jose import jwt, JWTError
from pydantic import BaseModel
from sqlmodel import Session
from app.core.config import settings
from app.infrastructure.database import get_db
from app.domain.models import User
from app.infrastructure.repositories.user import UserRepository
from app.core.services.trading_analysis import TradingAnalysisService
reusable_oauth2 = OAuth2PasswordBearer(tokenUrl=f"{settings.API_V1_STR}/login/access-token")
class TokenData(BaseModel):
username: Optional[str] = None
def get_user_repository(db: Session = Depends(get_db)) -> UserRepository:
return UserRepository(db)
def get_user_from_token(token: str, db: Session) -> Optional[User]:
try:
payload = jwt.decode(
token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM]
)
token_data = TokenData(username=payload.get("sub"))
except JWTError:
return None
user_repo = UserRepository(db)
user = user_repo.get_by_email(email=token_data.username)
return user
def get_current_user(
db: Session = Depends(get_db), token: str = Depends(reusable_oauth2)
) -> User:
user = get_user_from_token(token=token, db=db)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
return user
def get_current_active_user(
current_user: User = Depends(get_current_user),
) -> User:
if not current_user.is_active:
raise HTTPException(status_code=400, detail="Inactive user")
return current_user
def get_current_active_superuser(
current_user: User = Depends(get_current_active_user),
) -> User:
if not current_user.is_superuser:
raise HTTPException(
status_code=403, detail="The user doesn't have enough privileges"
)
return current_user
def get_analysis_service(
db: Session = Depends(get_db),
user: User = Depends(get_current_active_user)
) -> TradingAnalysisService:
return TradingAnalysisService(user=user, db=db)

View File

@ -0,0 +1,94 @@
from typing import Any, List
from fastapi import APIRouter, Depends, HTTPException, WebSocket, WebSocketDisconnect, BackgroundTasks
from app.api import deps
from app.core.schemas.analysis import AnalysisSession, AnalysisSessionCreate
from app.domain.models import User as UserModel
from app.core.services.trading_analysis import TradingAnalysisService
from app.core.websocket_manager import WebSocketManager
from sqlmodel import Session
from cli.utils import SHALLOW_AGENT_OPTIONS, DEEP_AGENT_OPTIONS, BASE_URLS
router = APIRouter()
manager = WebSocketManager()
@router.post("/start", response_model=AnalysisSession)
def start_analysis(
*,
analysis_in: AnalysisSessionCreate,
background_tasks: BackgroundTasks,
service: TradingAnalysisService = Depends(deps.get_analysis_service),
) -> Any:
"""
Start a new analysis session.
"""
session = service.create_session(analysis_in=analysis_in)
background_tasks.add_task(service.run_analysis, session_id=session.id)
return session
@router.get("/history", response_model=List[AnalysisSession])
def get_analysis_history(
service: TradingAnalysisService = Depends(deps.get_analysis_service),
skip: int = 0,
limit: int = 100,
) -> Any:
"""
Get analysis history for the current user.
"""
return service.get_user_sessions(skip=skip, limit=limit)
@router.get("/options")
def get_analysis_options():
"""
Get available options for analysis.
"""
return {
'analysts': [
{'value': 'market', 'label': 'Market Analyst'},
{'value': 'social', 'label': 'Social Analyst'},
{'value': 'news', 'label': 'News Analyst'},
{'value': 'fundamentals', 'label': 'Fundamentals Analyst'},
],
'research_depths': [
{'value': 1, 'label': 'Shallow'},
{'value': 3, 'label': 'Medium'},
{'value': 5, 'label': 'Deep'},
],
'llm_providers': [{'name': p[0], 'url': p[1]} for p in BASE_URLS],
'shallow_thinkers': SHALLOW_AGENT_OPTIONS,
'deep_thinkers': DEEP_AGENT_OPTIONS,
}
@router.get("/{session_id}", response_model=AnalysisSession)
def get_analysis_session(
session_id: int,
service: TradingAnalysisService = Depends(deps.get_analysis_service),
) -> Any:
"""
Get a specific analysis session by ID.
"""
session = service.get_session(session_id=session_id)
if not session:
raise HTTPException(status_code=404, detail="Analysis session not found")
return session
@router.websocket("/ws")
async def websocket_endpoint(
websocket: WebSocket,
token: str,
db: Session = Depends(deps.get_db)
):
"""
WebSocket endpoint for real-time analysis updates.
"""
user = deps.get_user_from_token(token=token, db=db)
if not user or not user.is_active:
await websocket.close(code=1008)
return
await manager.connect(user.id, websocket)
try:
while True:
# Keep the connection alive
await websocket.receive_text()
except WebSocketDisconnect:
manager.disconnect(user.id, websocket)

View File

@ -0,0 +1,35 @@
from datetime import timedelta
from fastapi import APIRouter, Depends, HTTPException
from fastapi.security import OAuth2PasswordRequestForm
from sqlmodel import Session
from app.api import deps
from app.core.config import settings
from app.core.schemas.token import Token
from app.core import security
from app.infrastructure.repositories.user import UserRepository
router = APIRouter()
@router.post("/login/access-token", response_model=Token)
def login_access_token(
db: Session = Depends(deps.get_db), form_data: OAuth2PasswordRequestForm = Depends()
):
"""
OAuth2 compatible token login, get an access token for future requests
"""
user_repo = UserRepository(db)
user = user_repo.get_by_email(email=form_data.username)
if not user or not security.verify_password(form_data.password, user.hashed_password):
raise HTTPException(status_code=400, detail="Incorrect email or password")
elif not user.is_active:
raise HTTPException(status_code=400, detail="Inactive user")
access_token_expires = timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
return {
"access_token": security.create_access_token(
user.email, expires_delta=access_token_expires
),
"token_type": "bearer",
}

View File

@ -0,0 +1,89 @@
from typing import Any, List
from fastapi import APIRouter, Depends, HTTPException
from app.api import deps
from app.core.schemas.user import User, UserCreate, UserUpdate
from app.domain.models import User as UserModel
from app.domain.repositories import IUserRepository
router = APIRouter()
@router.get("/", response_model=List[User])
def read_users(
repo: IUserRepository = Depends(deps.get_user_repository),
skip: int = 0,
limit: int = 100,
current_user: UserModel = Depends(deps.get_current_active_superuser),
) -> Any:
"""
Retrieve users.
"""
users = repo.get_multi(skip=skip, limit=limit)
return users
@router.post("/", response_model=User)
def create_user(
*,
repo: IUserRepository = Depends(deps.get_user_repository),
user_in: UserCreate,
current_user: UserModel = Depends(deps.get_current_active_superuser),
) -> Any:
"""
Create new user.
"""
user = repo.get_by_email(email=user_in.email)
if user:
raise HTTPException(
status_code=400,
detail="The user with this username already exists in the system.",
)
user = repo.create(obj_in=user_in)
return user
@router.get("/me", response_model=User)
def read_user_me(
current_user: UserModel = Depends(deps.get_current_active_user),
) -> Any:
"""
Get current user.
"""
return current_user
@router.get("/{user_id}", response_model=User)
def read_user_by_id(
user_id: int,
repo: IUserRepository = Depends(deps.get_user_repository),
current_user: UserModel = Depends(deps.get_current_active_user),
) -> Any:
"""
Get a specific user by id.
"""
user = repo.get(id=user_id)
if not user:
raise HTTPException(status_code=404, detail="User not found")
if user == current_user:
return user
if not repo.is_superuser(user=current_user):
raise HTTPException(
status_code=403, detail="The user doesn't have enough privileges"
)
return user
@router.put("/{user_id}", response_model=User)
def update_user(
*,
repo: IUserRepository = Depends(deps.get_user_repository),
user_id: int,
user_in: UserUpdate,
current_user: UserModel = Depends(deps.get_current_active_superuser),
) -> Any:
"""
Update a user.
"""
user = repo.get(id=user_id)
if not user:
raise HTTPException(
status_code=404,
detail="The user with this username does not exist in the system",
)
user = repo.update(db_obj=user, obj_in=user_in)
return user

View File

@ -0,0 +1,7 @@
from fastapi import APIRouter
from app.api.endpoints import login, users, analysis
api_router = APIRouter()
api_router.include_router(login.router, tags=["login"])
api_router.include_router(users.router, prefix="/users", tags=["users"])
api_router.include_router(analysis.router, prefix="/analysis", tags=["analysis"])

View File

View File

@ -0,0 +1,26 @@
import os
from pydantic import BaseSettings
from typing import List, Optional
class Settings(BaseSettings):
PROJECT_NAME: str = "TradingAgents Backend"
API_V1_STR: str = "/api/v1"
# Security
SECRET_KEY: str = os.getenv("SECRET_KEY", "a_very_secret_key")
ACCESS_TOKEN_EXPIRE_MINUTES: int = 60 * 24 * 8 # 8 days
ALGORITHM: str = "HS256"
# Database
DATABASE_URL: str = os.getenv("DATABASE_URL", "sqlite:///./tradingagents.db")
# OpenAI
OPENAI_API_KEY: str = os.getenv("OPENAI_API_KEY", "")
# CORS
CORS_ALLOWED_ORIGINS: List[str] = os.getenv("CORS_ALLOWED_ORIGINS", "http://localhost:3000,http://127.0.0.1:3000").split(',')
class Config:
case_sensitive = True
settings = Settings()

View File

@ -0,0 +1,4 @@
from .user import User, UserCreate, UserUpdate
from .token import Token, TokenPayload
from .profile import Profile, ProfileCreate, ProfileUpdate
from .analysis import AnalysisSession, AnalysisSessionCreate, AnalysisSessionUpdate

View File

@ -0,0 +1,38 @@
from pydantic import BaseModel
from typing import List, Optional
from datetime import date, datetime
from app.domain.models import AnalysisStatus
class AnalysisSessionBase(BaseModel):
ticker: str
analysts_selected: List[str]
research_depth: int
llm_provider: str
backend_url: str
shallow_thinker: str
deep_thinker: str
class AnalysisSessionCreate(AnalysisSessionBase):
pass
class AnalysisSessionUpdate(BaseModel):
status: Optional[AnalysisStatus] = None
final_report: Optional[str] = None
error_message: Optional[str] = None
class AnalysisSessionInDBBase(AnalysisSessionBase):
id: int
user_id: int
analysis_date: date
status: AnalysisStatus
final_report: Optional[str] = None
error_message: Optional[str] = None
created_at: datetime
started_at: Optional[datetime] = None
completed_at: Optional[datetime] = None
class Config:
orm_mode = True
class AnalysisSession(AnalysisSessionInDBBase):
pass

View File

@ -0,0 +1,20 @@
from pydantic import BaseModel
from typing import Optional
class ProfileBase(BaseModel):
default_ticker: str = "SPY"
preferred_research_depth: int = 3
preferred_shallow_thinker: str = "gpt-4o-mini"
preferred_deep_thinker: str = "gpt-4o"
class ProfileCreate(ProfileBase):
pass
class ProfileUpdate(ProfileBase):
openai_api_key: Optional[str] = None
class Profile(ProfileBase):
has_openai_api_key: bool
class Config:
orm_mode = True

View File

@ -0,0 +1,9 @@
from pydantic import BaseModel
from typing import Optional
class Token(BaseModel):
access_token: str
token_type: str
class TokenPayload(BaseModel):
sub: Optional[int] = None

View File

@ -0,0 +1,28 @@
from pydantic import BaseModel, EmailStr
from typing import Optional
class UserBase(BaseModel):
email: EmailStr
username: str
first_name: Optional[str] = None
last_name: Optional[str] = None
class UserCreate(UserBase):
password: str
class UserUpdate(UserBase):
pass
class UserInDBBase(UserBase):
id: int
is_active: bool
is_superuser: bool
class Config:
orm_mode = True
class User(UserInDBBase):
pass
class UserInDB(UserInDBBase):
hashed_password: str

View File

@ -0,0 +1,23 @@
from datetime import datetime, timedelta
from typing import Any, Union, Optional
from jose import jwt
from passlib.context import CryptContext
from app.core.config import settings
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
def create_access_token(subject: Union[str, Any], expires_delta: Optional[timedelta] = None) -> str:
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
to_encode = {"exp": expire, "sub": str(subject)}
encoded_jwt = jwt.encode(to_encode, settings.SECRET_KEY, algorithm=settings.ALGORITHM)
return encoded_jwt
def verify_password(plain_password: str, hashed_password: str) -> bool:
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password: str) -> str:
return pwd_context.hash(password)

View File

@ -0,0 +1,128 @@
import asyncio
import datetime
import json
from typing import Dict, List, Optional
from sqlmodel import Session, select
from app.domain.models import User, AnalysisSession, AnalysisStatus
from app.core.schemas.analysis import AnalysisSessionCreate
from app.core.config import settings
from cli.models import AnalystType
from tradingagents.graph.trading_graph import TradingAgentsGraph
from tradingagents.default_config import DEFAULT_CONFIG
from app.api.deps import get_db
from app.core.websocket_manager import WebSocketManager
class TradingAnalysisService:
def __init__(self, user: User, db: Session):
self.user = user
self.db = db
self.websocket_manager = WebSocketManager()
async def run_analysis(self, session_id: int):
"""분석 실행"""
session = self.get_session(session_id=session_id)
if not session:
return
try:
session.status = AnalysisStatus.RUNNING
session.started_at = datetime.datetime.utcnow()
self.db.add(session)
self.db.commit()
self.db.refresh(session)
await self.websocket_manager.send_to_user(
self.user.id,
{
'type': 'analysis_started',
'session_id': session.id,
'message': '분석을 시작합니다...'
}
)
# Prepare config for TradingAgentsGraph
config = DEFAULT_CONFIG.copy()
config.update({
'openai_api_key': settings.OPENAI_API_KEY,
'llm_provider': session.llm_provider,
'backend_url': session.backend_url,
'shallow_thinking_model': session.shallow_thinker,
'deep_thinking_model': session.deep_thinker,
})
# Progress callback for websocket
async def progress_callback(message_type: str, content: str, agent: str = None, step: int = 0, total: int = 0):
progress_percent = int((step / total) * 99) if total > 0 else 0
await self.websocket_manager.send_to_user(self.user.id, {
'type': 'analysis_progress',
'session_id': session.id,
'message_type': message_type,
'content': content,
'agent': agent,
'progress': progress_percent,
})
trading_graph = TradingAgentsGraph(
config=config,
selected_analysts=session.analysts_selected,
)
input_data = {
'company_of_interest': session.ticker,
'trade_date': session.analysis_date.strftime('%Y-%m-%d'),
}
final_state, result = await asyncio.to_thread(
trading_graph.propagate,
input_data['company_of_interest'],
input_data['trade_date']
)
session.status = AnalysisStatus.COMPLETED
session.completed_at = datetime.datetime.utcnow()
session.final_report = json.dumps(final_state) # Store full state as JSON
self.db.add(session)
self.db.commit()
await self.websocket_manager.send_to_user(
self.user.id,
{
'type': 'analysis_completed',
'session_id': session.id,
'message': '분석이 완료되었습니다.',
'result': result
}
)
except Exception as e:
session.status = AnalysisStatus.FAILED
session.error_message = str(e)
self.db.add(session)
self.db.commit()
await self.websocket_manager.send_to_user(
self.user.id,
{
'type': 'analysis_failed',
'session_id': session.id,
'message': f'분석 중 오류가 발생했습니다: {str(e)}'
}
)
def create_session(self, *, analysis_in: AnalysisSessionCreate) -> AnalysisSession:
session = AnalysisSession(
**analysis_in.dict(),
user_id=self.user.id,
analysis_date=datetime.date.today()
)
self.db.add(session)
self.db.commit()
self.db.refresh(session)
return session
def get_session(self, *, session_id: int) -> Optional[AnalysisSession]:
statement = select(AnalysisSession).where(AnalysisSession.id == session_id, AnalysisSession.user_id == self.user.id)
return self.db.exec(statement).first()
def get_user_sessions(self, *, skip: int = 0, limit: int = 100) -> List[AnalysisSession]:
statement = select(AnalysisSession).where(AnalysisSession.user_id == self.user.id).order_by(AnalysisSession.created_at.desc()).offset(skip).limit(limit)
return self.db.exec(statement).all()

View File

@ -0,0 +1,23 @@
from typing import Dict, List
from fastapi import WebSocket
class WebSocketManager:
def __init__(self):
self.active_connections: Dict[int, List[WebSocket]] = {}
async def connect(self, user_id: int, websocket: WebSocket):
await websocket.accept()
if user_id not in self.active_connections:
self.active_connections[user_id] = []
self.active_connections[user_id].append(websocket)
def disconnect(self, user_id: int, websocket: WebSocket):
if user_id in self.active_connections:
self.active_connections[user_id].remove(websocket)
if not self.active_connections[user_id]:
del self.active_connections[user_id]
async def send_to_user(self, user_id: int, message: dict):
if user_id in self.active_connections:
for connection in self.active_connections[user_id]:
await connection.send_json(message)

View File

View File

@ -0,0 +1,56 @@
from datetime import date, datetime
from typing import List, Optional
from sqlmodel import Field, SQLModel, JSON, Column
import enum
class User(SQLModel, table=True):
id: Optional[int] = Field(default=None, primary_key=True)
email: str = Field(unique=True, index=True)
username: str = Field(unique=True, index=True)
hashed_password: str
first_name: Optional[str] = None
last_name: Optional[str] = None
is_active: bool = Field(default=True)
is_superuser: bool = Field(default=False)
created_at: datetime = Field(default_factory=datetime.utcnow)
updated_at: datetime = Field(default_factory=datetime.utcnow, sa_column_kwargs={"onupdate": datetime.utcnow})
class UserProfile(SQLModel, table=True):
id: Optional[int] = Field(default=None, primary_key=True)
user_id: int = Field(foreign_key="user.id", unique=True)
encrypted_openai_api_key: Optional[str] = None
default_ticker: str = Field(default="SPY")
preferred_research_depth: int = Field(default=3)
preferred_shallow_thinker: str = Field(default="gpt-4o-mini")
preferred_deep_thinker: str = Field(default="gpt-4o")
created_at: datetime = Field(default_factory=datetime.utcnow)
updated_at: datetime = Field(default_factory=datetime.utcnow, sa_column_kwargs={"onupdate": datetime.utcnow})
class AnalysisStatus(str, enum.Enum):
PENDING = "pending"
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
CANCELLED = "cancelled"
class AnalysisSession(SQLModel, table=True):
id: Optional[int] = Field(default=None, primary_key=True)
user_id: int = Field(foreign_key="user.id")
ticker: str
analysis_date: date
analysts_selected: List[str] = Field(sa_column=Column(JSON))
research_depth: int
llm_provider: str
backend_url: str
shallow_thinker: str
deep_thinker: str
status: AnalysisStatus = Field(default=AnalysisStatus.PENDING)
final_report: Optional[str] = None
error_message: Optional[str] = None
created_at: datetime = Field(default_factory=datetime.utcnow)
started_at: Optional[datetime] = None
completed_at: Optional[datetime] = None

View File

@ -0,0 +1,48 @@
from abc import ABC, abstractmethod
from typing import Generic, TypeVar, Optional, List
from sqlmodel import SQLModel
from app.core.schemas.user import UserCreate, UserUpdate
from app.domain.models import User
ModelType = TypeVar("ModelType", bound=SQLModel)
CreateSchemaType = TypeVar("CreateSchemaType", bound=SQLModel)
UpdateSchemaType = TypeVar("UpdateSchemaType", bound=SQLModel)
class IRepository(Generic[ModelType], ABC):
@abstractmethod
def get(self, id: int) -> Optional[ModelType]:
pass
@abstractmethod
def get_multi(self, *, skip: int = 0, limit: int = 100) -> List[ModelType]:
pass
@abstractmethod
def create(self, *, obj_in: CreateSchemaType) -> ModelType:
pass
@abstractmethod
def update(self, *, db_obj: ModelType, obj_in: UpdateSchemaType) -> ModelType:
pass
@abstractmethod
def remove(self, *, id: int) -> ModelType:
pass
class IUserRepository(IRepository[User], ABC):
@abstractmethod
def get_by_email(self, *, email: str) -> Optional[User]:
pass
@abstractmethod
def create(self, *, obj_in: UserCreate) -> User:
pass
@abstractmethod
def update(self, *, db_obj: User, obj_in: UserUpdate) -> User:
pass
@abstractmethod
def is_superuser(self, *, user: User) -> bool:
pass

View File

@ -0,0 +1,9 @@
from sqlmodel import create_engine, Session
from app.core.config import settings
engine = create_engine(settings.DATABASE_URL, echo=True, connect_args={"check_same_thread": False})
def get_db():
with Session(engine) as session:
yield session

View File

@ -0,0 +1,53 @@
from typing import Optional
from sqlmodel import Session, select
from app.domain.models import User
from app.core.schemas.user import UserCreate, UserUpdate
from app.domain.repositories import IUserRepository
from app.core.security import get_password_hash
class UserRepository(IUserRepository):
def __init__(self, db: Session):
self.db = db
def get(self, id: int) -> Optional[User]:
return self.db.get(User, id)
def get_by_email(self, *, email: str) -> Optional[User]:
statement = select(User).where(User.email == email)
return self.db.exec(statement).first()
def get_multi(self, *, skip: int = 0, limit: int = 100) -> list[User]:
statement = select(User).offset(skip).limit(limit)
return self.db.exec(statement).all()
def create(self, *, obj_in: UserCreate) -> User:
db_obj = User(
email=obj_in.email,
username=obj_in.username,
hashed_password=get_password_hash(obj_in.password),
first_name=obj_in.first_name,
last_name=obj_in.last_name,
)
self.db.add(db_obj)
self.db.commit()
self.db.refresh(db_obj)
return db_obj
def update(self, *, db_obj: User, obj_in: UserUpdate) -> User:
update_data = obj_in.dict(exclude_unset=True)
for field, value in update_data.items():
setattr(db_obj, field, value)
self.db.add(db_obj)
self.db.commit()
self.db.refresh(db_obj)
return db_obj
def remove(self, *, id: int) -> User:
db_obj = self.db.get(User, id)
self.db.delete(db_obj)
self.db.commit()
return db_obj
def is_superuser(self, *, user: User) -> bool:
return user.is_superuser

36
web/backend/app/main.py Normal file
View File

@ -0,0 +1,36 @@
import sys
import os
from fastapi import FastAPI
from starlette.middleware.cors import CORSMiddleware
# Add project root to path to allow importing tradingagents
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..", "..")))
from app.api.router import api_router
from app.core.config import settings
from app.infrastructure.database import engine
from sqlmodel import SQLModel
def create_tables():
SQLModel.metadata.create_all(engine)
app = FastAPI(
title=settings.PROJECT_NAME,
openapi_url=f"{settings.API_V1_STR}/openapi.json"
)
@app.on_event("startup")
def on_startup():
create_tables()
# Set all CORS enabled origins
if settings.CORS_ALLOWED_ORIGINS:
app.add_middleware(
CORSMiddleware,
allow_origins=[str(origin) for origin in settings.CORS_ALLOWED_ORIGINS],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
app.include_router(api_router, prefix=settings.API_V1_STR)

View File

@ -1 +0,0 @@
# Apps package

View File

@ -1 +0,0 @@
# Authentication app

View File

@ -1,86 +0,0 @@
from django.contrib import admin
from django.contrib.auth.admin import UserAdmin as BaseUserAdmin
from .models import User, UserProfile, AnalysisSession
class UserProfileInline(admin.StackedInline):
"""사용자 프로필 인라인"""
model = UserProfile
can_delete = False
verbose_name_plural = '프로필'
fields = ('default_ticker', 'preferred_research_depth', 'preferred_shallow_thinker', 'preferred_deep_thinker', 'has_openai_api_key')
readonly_fields = ('has_openai_api_key',)
def has_openai_api_key(self, obj):
return obj.has_openai_api_key()
has_openai_api_key.boolean = True
has_openai_api_key.short_description = 'OpenAI API 키 보유'
@admin.register(User)
class UserAdmin(BaseUserAdmin):
"""사용자 관리자"""
inlines = (UserProfileInline,)
list_display = ('email', 'username', 'first_name', 'last_name', 'is_staff', 'date_joined')
list_filter = ('is_staff', 'is_superuser', 'is_active', 'date_joined')
search_fields = ('email', 'username', 'first_name', 'last_name')
ordering = ('email',)
fieldsets = (
(None, {'fields': ('email', 'password')}),
('개인정보', {'fields': ('first_name', 'last_name', 'username')}),
('권한', {
'fields': ('is_active', 'is_staff', 'is_superuser', 'groups', 'user_permissions'),
}),
('중요한 날짜', {'fields': ('last_login', 'date_joined')}),
)
add_fieldsets = (
(None, {
'classes': ('wide',),
'fields': ('email', 'username', 'password1', 'password2'),
}),
)
@admin.register(UserProfile)
class UserProfileAdmin(admin.ModelAdmin):
"""사용자 프로필 관리자"""
list_display = ('user', 'default_ticker', 'preferred_research_depth', 'has_openai_api_key', 'created_at')
list_filter = ('preferred_research_depth', 'created_at')
search_fields = ('user__email', 'user__username', 'default_ticker')
readonly_fields = ('created_at', 'updated_at', 'has_openai_api_key')
fields = (
'user', 'default_ticker', 'preferred_research_depth',
'preferred_shallow_thinker', 'preferred_deep_thinker',
'has_openai_api_key', 'created_at', 'updated_at'
)
def has_openai_api_key(self, obj):
return obj.has_openai_api_key()
has_openai_api_key.boolean = True
has_openai_api_key.short_description = 'OpenAI API 키 보유'
@admin.register(AnalysisSession)
class AnalysisSessionAdmin(admin.ModelAdmin):
"""분석 세션 관리자"""
list_display = ('user', 'ticker', 'analysis_date', 'status', 'created_at', 'duration')
list_filter = ('status', 'analysis_date', 'created_at')
search_fields = ('user__email', 'user__username', 'ticker')
readonly_fields = ('created_at', 'duration')
fields = (
'user', 'ticker', 'analysis_date',
'analysts_selected', 'research_depth', 'shallow_thinker', 'deep_thinker',
'status', 'final_report', 'error_message',
'created_at', 'started_at', 'completed_at', 'duration'
)
def duration(self, obj):
if obj.started_at and obj.completed_at:
duration = obj.completed_at - obj.started_at
return f"{int(duration.total_seconds())}"
return "미완료"
duration.short_description = '소요시간'

View File

@ -1,10 +0,0 @@
from django.apps import AppConfig
class AuthenticationConfig(AppConfig):
default_auto_field = 'django.db.models.BigAutoField'
name = 'apps.authentication'
verbose_name = '사용자 인증'
def ready(self):
import apps.authentication.signals

View File

@ -1,88 +0,0 @@
# Generated by Django 4.2.7 on 2025-06-13 05:07
from django.conf import settings
import django.contrib.auth.models
import django.contrib.auth.validators
from django.db import migrations, models
import django.db.models.deletion
import django.utils.timezone
class Migration(migrations.Migration):
initial = True
dependencies = [
('auth', '0012_alter_user_first_name_max_length'),
]
operations = [
migrations.CreateModel(
name='User',
fields=[
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('password', models.CharField(max_length=128, verbose_name='password')),
('last_login', models.DateTimeField(blank=True, null=True, verbose_name='last login')),
('is_superuser', models.BooleanField(default=False, help_text='Designates that this user has all permissions without explicitly assigning them.', verbose_name='superuser status')),
('username', models.CharField(error_messages={'unique': 'A user with that username already exists.'}, help_text='Required. 150 characters or fewer. Letters, digits and @/./+/-/_ only.', max_length=150, unique=True, validators=[django.contrib.auth.validators.UnicodeUsernameValidator()], verbose_name='username')),
('first_name', models.CharField(blank=True, max_length=150, verbose_name='first name')),
('last_name', models.CharField(blank=True, max_length=150, verbose_name='last name')),
('is_staff', models.BooleanField(default=False, help_text='Designates whether the user can log into this admin site.', verbose_name='staff status')),
('is_active', models.BooleanField(default=True, help_text='Designates whether this user should be treated as active. Unselect this instead of deleting accounts.', verbose_name='active')),
('date_joined', models.DateTimeField(default=django.utils.timezone.now, verbose_name='date joined')),
('email', models.EmailField(max_length=254, unique=True)),
('created_at', models.DateTimeField(auto_now_add=True)),
('updated_at', models.DateTimeField(auto_now=True)),
('groups', models.ManyToManyField(blank=True, help_text='The groups this user belongs to. A user will get all permissions granted to each of their groups.', related_name='user_set', related_query_name='user', to='auth.group', verbose_name='groups')),
('user_permissions', models.ManyToManyField(blank=True, help_text='Specific permissions for this user.', related_name='user_set', related_query_name='user', to='auth.permission', verbose_name='user permissions')),
],
options={
'verbose_name': 'user',
'verbose_name_plural': 'users',
'abstract': False,
},
managers=[
('objects', django.contrib.auth.models.UserManager()),
],
),
migrations.CreateModel(
name='UserProfile',
fields=[
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('encrypted_openai_api_key', models.TextField(blank=True, null=True)),
('default_ticker', models.CharField(default='SPY', max_length=10)),
('preferred_research_depth', models.IntegerField(choices=[(1, 'Shallow'), (3, 'Medium'), (5, 'Deep')], default=3)),
('preferred_shallow_thinker', models.CharField(default='gpt-4o-mini', max_length=50)),
('preferred_deep_thinker', models.CharField(default='gpt-4o', max_length=50)),
('created_at', models.DateTimeField(auto_now_add=True)),
('updated_at', models.DateTimeField(auto_now=True)),
('user', models.OneToOneField(on_delete=django.db.models.deletion.CASCADE, related_name='profile', to=settings.AUTH_USER_MODEL)),
],
options={
'db_table': 'user_profiles',
},
),
migrations.CreateModel(
name='AnalysisSession',
fields=[
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('ticker', models.CharField(max_length=10)),
('analysis_date', models.DateField()),
('analysts_selected', models.JSONField()),
('research_depth', models.IntegerField()),
('shallow_thinker', models.CharField(max_length=50)),
('deep_thinker', models.CharField(max_length=50)),
('status', models.CharField(choices=[('pending', 'Pending'), ('running', 'Running'), ('completed', 'Completed'), ('failed', 'Failed'), ('cancelled', 'Cancelled')], default='pending', max_length=20)),
('final_report', models.TextField(blank=True, null=True)),
('error_message', models.TextField(blank=True, null=True)),
('created_at', models.DateTimeField(auto_now_add=True)),
('started_at', models.DateTimeField(blank=True, null=True)),
('completed_at', models.DateTimeField(blank=True, null=True)),
('user', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='analysis_sessions', to=settings.AUTH_USER_MODEL)),
],
options={
'db_table': 'analysis_sessions',
'ordering': ['-created_at'],
},
),
]

View File

@ -1,126 +0,0 @@
from django.contrib.auth.models import AbstractUser
from django.db import models
from django.core.exceptions import ValidationError
from cryptography.fernet import Fernet
from django.conf import settings
import base64
import os
class User(AbstractUser):
"""확장된 사용자 모델"""
email = models.EmailField(unique=True)
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
USERNAME_FIELD = 'email'
REQUIRED_FIELDS = ['username']
class UserProfile(models.Model):
"""사용자 프로필 및 API 키 관리"""
user = models.OneToOneField(User, on_delete=models.CASCADE, related_name='profile')
# 암호화된 OpenAI API 키 저장
encrypted_openai_api_key = models.TextField(blank=True, null=True)
# 기본 설정
default_ticker = models.CharField(max_length=10, default='SPY')
preferred_research_depth = models.IntegerField(default=3, choices=[
(1, 'Shallow'),
(3, 'Medium'),
(5, 'Deep')
])
preferred_shallow_thinker = models.CharField(max_length=50, default='gpt-4o-mini')
preferred_deep_thinker = models.CharField(max_length=50, default='gpt-4o')
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
class Meta:
db_table = 'user_profiles'
def __str__(self):
return f"{self.user.username}'s Profile"
@staticmethod
def _get_cipher_key():
"""암호화/복호화용 키 생성"""
# Django SECRET_KEY를 기반으로 암호화 키 생성
key = base64.urlsafe_b64encode(settings.SECRET_KEY[:32].encode())
return Fernet(key)
def set_openai_api_key(self, api_key):
"""OpenAI API 키를 암호화하여 저장"""
if api_key:
cipher = self._get_cipher_key()
encrypted_key = cipher.encrypt(api_key.encode())
self.encrypted_openai_api_key = base64.urlsafe_b64encode(encrypted_key).decode()
else:
self.encrypted_openai_api_key = None
def get_openai_api_key(self):
"""저장된 OpenAI API 키를 복호화하여 반환"""
if not self.encrypted_openai_api_key:
return None
try:
cipher = self._get_cipher_key()
encrypted_key = base64.urlsafe_b64decode(self.encrypted_openai_api_key.encode())
decrypted_key = cipher.decrypt(encrypted_key)
return decrypted_key.decode()
except Exception:
return None
def has_openai_api_key(self):
"""사용자가 OpenAI API 키를 설정했는지 확인"""
return bool(self.encrypted_openai_api_key)
def get_effective_openai_api_key(self):
"""
사용자 API 키가 있으면 사용자 키를, 없으면 개발자 기본 키를 반환
"""
user_key = self.get_openai_api_key()
if user_key:
return user_key
# 개발자가 등록한 기본 키 사용
return getattr(settings, 'OPENAI_API_KEY', '')
class AnalysisSession(models.Model):
"""분석 세션 관리"""
user = models.ForeignKey(User, on_delete=models.CASCADE, related_name='analysis_sessions')
# 분석 파라미터
ticker = models.CharField(max_length=10)
analysis_date = models.DateField()
analysts_selected = models.JSONField() # 선택된 분석가들
research_depth = models.IntegerField()
shallow_thinker = models.CharField(max_length=50)
deep_thinker = models.CharField(max_length=50)
# 세션 상태
status = models.CharField(max_length=20, choices=[
('pending', 'Pending'),
('running', 'Running'),
('completed', 'Completed'),
('failed', 'Failed'),
('cancelled', 'Cancelled'),
], default='pending')
# 결과 저장
final_report = models.TextField(blank=True, null=True)
error_message = models.TextField(blank=True, null=True)
# 시간 추적
created_at = models.DateTimeField(auto_now_add=True)
started_at = models.DateTimeField(blank=True, null=True)
completed_at = models.DateTimeField(blank=True, null=True)
class Meta:
db_table = 'analysis_sessions'
ordering = ['-created_at']
def __str__(self):
return f"{self.user.username} - {self.ticker} ({self.status})"

View File

@ -1,152 +0,0 @@
from rest_framework import serializers
from django.contrib.auth import authenticate
from django.contrib.auth.password_validation import validate_password
from .models import User, UserProfile, AnalysisSession
from datetime import date
class UserRegistrationSerializer(serializers.ModelSerializer):
"""사용자 회원가입 시리얼라이저"""
password = serializers.CharField(write_only=True, validators=[validate_password])
password_confirm = serializers.CharField(write_only=True)
class Meta:
model = User
fields = ('email', 'username', 'password', 'password_confirm', 'first_name', 'last_name')
def validate(self, attrs):
if attrs['password'] != attrs['password_confirm']:
raise serializers.ValidationError("비밀번호가 일치하지 않습니다.")
return attrs
def create(self, validated_data):
validated_data.pop('password_confirm')
user = User.objects.create_user(**validated_data)
return user
class UserLoginSerializer(serializers.Serializer):
"""사용자 로그인 시리얼라이저"""
email = serializers.EmailField()
password = serializers.CharField(write_only=True)
def validate(self, attrs):
email = attrs.get('email')
password = attrs.get('password')
if email and password:
user = authenticate(username=email, password=password)
if not user:
raise serializers.ValidationError('올바르지 않은 이메일 또는 비밀번호입니다.')
if not user.is_active:
raise serializers.ValidationError('비활성화된 계정입니다.')
attrs['user'] = user
else:
raise serializers.ValidationError('이메일과 비밀번호를 모두 입력해주세요.')
return attrs
class UserProfileSerializer(serializers.ModelSerializer):
"""사용자 프로필 시리얼라이저"""
has_openai_api_key = serializers.SerializerMethodField()
openai_api_key = serializers.CharField(write_only=True, required=False, allow_blank=True)
class Meta:
model = UserProfile
fields = (
'default_ticker',
'preferred_research_depth',
'preferred_shallow_thinker',
'preferred_deep_thinker',
'has_openai_api_key',
'openai_api_key',
'created_at',
'updated_at'
)
read_only_fields = ('created_at', 'updated_at')
def get_has_openai_api_key(self, obj):
return obj.has_openai_api_key()
def update(self, instance, validated_data):
openai_api_key = validated_data.pop('openai_api_key', None)
# OpenAI API 키 업데이트
if openai_api_key is not None:
instance.set_openai_api_key(openai_api_key)
# 다른 필드 업데이트
for attr, value in validated_data.items():
setattr(instance, attr, value)
instance.save()
return instance
class UserSerializer(serializers.ModelSerializer):
"""사용자 정보 시리얼라이저"""
profile = UserProfileSerializer(read_only=True)
class Meta:
model = User
fields = ('id', 'email', 'username', 'first_name', 'last_name', 'profile', 'date_joined')
read_only_fields = ('id', 'email', 'date_joined')
class AnalysisSessionSerializer(serializers.ModelSerializer):
"""분석 세션 시리얼라이저"""
user_email = serializers.CharField(source='user.email', read_only=True)
duration = serializers.SerializerMethodField()
class Meta:
model = AnalysisSession
fields = (
'id', 'user_email', 'ticker', 'analysis_date',
'analysts_selected', 'research_depth', 'shallow_thinker', 'deep_thinker',
'status', 'final_report', 'error_message',
'created_at', 'started_at', 'completed_at', 'duration'
)
read_only_fields = ('id', 'user_email', 'created_at', 'duration')
def get_duration(self, obj):
if obj.started_at and obj.completed_at:
duration = obj.completed_at - obj.started_at
return int(duration.total_seconds())
return None
class CreateAnalysisSessionSerializer(serializers.ModelSerializer):
"""분석 세션 생성 시리얼라이저"""
class Meta:
model = AnalysisSession
fields = (
'ticker',
'analysts_selected', 'research_depth',
'shallow_thinker', 'deep_thinker'
)
# analysis_date는 create 시점에 자동 생성되므로 필드에서 제외
def create(self, validated_data):
"""오늘 날짜를 추가하여 세션 생성"""
validated_data['analysis_date'] = date.today()
return super().create(validated_data)
def validate_analysts_selected(self, value):
"""선택된 분석가들 검증"""
if not isinstance(value, list) or len(value) == 0:
raise serializers.ValidationError("최소 하나의 분석가를 선택해야 합니다.")
valid_analysts = ['market', 'social', 'news', 'fundamentals']
for analyst in value:
if analyst not in valid_analysts:
raise serializers.ValidationError(f"올바르지 않은 분석가: {analyst}")
return value
def validate_research_depth(self, value):
"""연구 깊이 검증"""
if value not in [1, 3, 5]:
raise serializers.ValidationError("연구 깊이는 1, 3, 5 중 하나여야 합니다.")
return value

View File

@ -1,17 +0,0 @@
from django.db.models.signals import post_save
from django.dispatch import receiver
from .models import User, UserProfile
@receiver(post_save, sender=User)
def create_user_profile(sender, instance, created, **kwargs):
"""사용자 생성 시 자동으로 프로필 생성"""
if created:
UserProfile.objects.create(user=instance)
@receiver(post_save, sender=User)
def save_user_profile(sender, instance, **kwargs):
"""사용자 저장 시 프로필도 함께 저장"""
if hasattr(instance, 'profile'):
instance.profile.save()

View File

@ -1,24 +0,0 @@
from django.urls import path
from rest_framework_simplejwt.views import TokenRefreshView
from . import views
app_name = 'authentication'
urlpatterns = [
# 인증 관련
path('register/', views.UserRegistrationView.as_view(), name='register'),
path('login/', views.UserLoginView.as_view(), name='login'),
path('token/refresh/', TokenRefreshView.as_view(), name='token_refresh'),
# 사용자 정보
path('user/', views.UserInfoView.as_view(), name='user_info'),
path('profile/', views.UserProfileView.as_view(), name='user_profile'),
# OpenAI API 키 관리
path('check-api-key/', views.check_openai_api_key, name='check_api_key'),
path('remove-api-key/', views.remove_openai_api_key, name='remove_api_key'),
# 분석 세션 관리
path('sessions/', views.AnalysisSessionListView.as_view(), name='analysis_sessions'),
path('sessions/<int:pk>/', views.AnalysisSessionDetailView.as_view(), name='analysis_session_detail'),
]

View File

@ -1,181 +0,0 @@
from rest_framework import status, generics, permissions
from rest_framework.decorators import api_view, permission_classes
from rest_framework.response import Response
from rest_framework.views import APIView
from rest_framework_simplejwt.tokens import RefreshToken
from django.contrib.auth import authenticate
from django.shortcuts import get_object_or_404
from .models import User, UserProfile, AnalysisSession
from .serializers import (
UserRegistrationSerializer,
UserLoginSerializer,
UserSerializer,
UserProfileSerializer,
AnalysisSessionSerializer,
CreateAnalysisSessionSerializer
)
class UserRegistrationView(APIView):
"""사용자 회원가입"""
permission_classes = [permissions.AllowAny]
def post(self, request):
serializer = UserRegistrationSerializer(data=request.data)
if serializer.is_valid():
user = serializer.save()
print(f"user: {user}")
# JWT 토큰 생성
refresh = RefreshToken.for_user(user)
return Response({
'message': '회원가입이 완료되었습니다.',
'user': UserSerializer(user).data,
'tokens': {
'refresh': str(refresh),
'access': str(refresh.access_token),
}
}, status=status.HTTP_201_CREATED)
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
class UserLoginView(APIView):
"""사용자 로그인"""
permission_classes = [permissions.AllowAny]
def post(self, request):
serializer = UserLoginSerializer(data=request.data)
if serializer.is_valid():
user = serializer.validated_data['user']
# JWT 토큰 생성
refresh = RefreshToken.for_user(user)
return Response({
'message': '로그인이 완료되었습니다.',
'user': UserSerializer(user).data,
'tokens': {
'refresh': str(refresh),
'access': str(refresh.access_token),
}
}, status=status.HTTP_200_OK)
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
class UserProfileView(APIView):
"""사용자 프로필 조회 및 수정"""
permission_classes = [permissions.IsAuthenticated]
def get(self, request):
"""프로필 조회"""
profile, created = UserProfile.objects.get_or_create(user=request.user)
serializer = UserProfileSerializer(profile)
return Response(serializer.data)
def put(self, request):
"""프로필 수정"""
profile, created = UserProfile.objects.get_or_create(user=request.user)
serializer = UserProfileSerializer(profile, data=request.data, partial=True)
if serializer.is_valid():
serializer.save()
return Response({
'message': '프로필이 업데이트되었습니다.',
'profile': serializer.data
})
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
class UserInfoView(APIView):
"""사용자 정보 조회"""
permission_classes = [permissions.IsAuthenticated]
def get(self, request):
serializer = UserSerializer(request.user)
return Response(serializer.data)
class AnalysisSessionListView(generics.ListCreateAPIView):
"""분석 세션 목록 조회 및 생성"""
permission_classes = [permissions.IsAuthenticated]
def get_queryset(self):
return AnalysisSession.objects.filter(user=self.request.user)
def get_serializer_class(self):
if self.request.method == 'POST':
return CreateAnalysisSessionSerializer
return AnalysisSessionSerializer
def perform_create(self, serializer):
serializer.save(user=self.request.user)
class AnalysisSessionDetailView(generics.RetrieveUpdateDestroyAPIView):
"""분석 세션 상세 조회, 수정, 삭제"""
permission_classes = [permissions.IsAuthenticated]
serializer_class = AnalysisSessionSerializer
def get_queryset(self):
return AnalysisSession.objects.filter(user=self.request.user)
@api_view(['POST'])
@permission_classes([permissions.IsAuthenticated])
def check_openai_api_key(request):
"""OpenAI API 키 유효성 검사"""
try:
profile = request.user.profile
api_key = profile.get_effective_openai_api_key()
if not api_key:
return Response({
'valid': False,
'message': 'OpenAI API 키가 설정되지 않았습니다.'
}, status=status.HTTP_400_BAD_REQUEST)
# 실제 OpenAI API 호출로 키 검증 (간단한 모델 목록 요청)
import openai
openai.api_key = api_key
try:
# 간단한 API 호출로 키 유효성 확인
response = openai.models.list()
return Response({
'valid': True,
'message': 'OpenAI API 키가 유효합니다.',
'using_user_key': profile.has_openai_api_key()
})
except Exception as e:
return Response({
'valid': False,
'message': f'OpenAI API 키가 유효하지 않습니다: {str(e)}'
}, status=status.HTTP_400_BAD_REQUEST)
except Exception as e:
return Response({
'error': str(e)
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
@api_view(['DELETE'])
@permission_classes([permissions.IsAuthenticated])
def remove_openai_api_key(request):
"""사용자의 OpenAI API 키 제거"""
try:
profile = request.user.profile
profile.set_openai_api_key(None)
profile.save()
return Response({
'message': 'OpenAI API 키가 제거되었습니다. 이제 기본 키를 사용합니다.'
})
except Exception as e:
return Response({
'error': str(e)
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)

View File

@ -1 +0,0 @@

View File

@ -1,7 +0,0 @@
from django.apps import AppConfig
class TradingApiConfig(AppConfig):
default_auto_field = 'django.db.models.BigAutoField'
name = 'apps.trading_api'
verbose_name = '거래 분석 API'

View File

@ -1,223 +0,0 @@
import asyncio
import datetime
from typing import Dict, List, Optional
from django.conf import settings
from channels.layers import get_channel_layer
from channels.db import database_sync_to_async
from asgiref.sync import async_to_sync
# CLI 모듈 import (경로 조정 필요)
import sys
import os
sys.path.append(os.path.join(settings.BASE_DIR.parent.parent))
from cli.models import AnalystType
from tradingagents.graph.trading_graph import TradingAgentsGraph
from tradingagents.default_config import DEFAULT_CONFIG
from apps.authentication.models import AnalysisSession, UserProfile
class TradingAnalysisService:
"""거래 분석 서비스"""
def __init__(self, user, analysis_session: AnalysisSession):
self.user = user
self.session = analysis_session
self.channel_layer = get_channel_layer()
self.user_channel_group = f"user_{user.id}"
@database_sync_to_async
def _update_session_status(self, status: str, error_message: Optional[str] = None, final_report: Optional[str] = None):
"""세션 상태 업데이트 (비동기 안전)"""
self.session.status = status
now = datetime.datetime.now()
if status == 'running':
self.session.started_at = now
else:
self.session.completed_at = now
if error_message:
self.session.error_message = error_message
if final_report:
self.session.final_report = final_report
self.session.save()
@database_sync_to_async
def _get_user_profile_and_key(self):
"""사용자 프로필 및 API 키 조회 (비동기 안전)"""
profile = self.user.profile
return profile.get_effective_openai_api_key()
async def run_analysis(self):
"""분석 실행"""
try:
await self._update_session_status('running')
await self._send_websocket_message({
'type': 'analysis_started',
'session_id': self.session.id,
'message': '분석을 시작합니다...'
})
api_key = await self._get_user_profile_and_key()
if not api_key:
raise Exception("OpenAI API 키가 설정되지 않았습니다.")
config = self._prepare_analysis_config(api_key)
result = await self._execute_trading_analysis(config)
await self._update_session_status('completed', final_report=result)
await self._send_websocket_message({
'type': 'analysis_completed',
'session_id': self.session.id,
'message': '분석이 완료되었습니다.',
'result': result
})
return result
except Exception as e:
error_msg = str(e)
await self._update_session_status('failed', error_message=error_msg)
await self._send_websocket_message({
'type': 'analysis_failed',
'session_id': self.session.id,
'message': f'분석 중 오류가 발생했습니다: {error_msg}'
})
raise e
def _prepare_analysis_config(self, api_key: str) -> Dict:
"""분석 설정 준비"""
# AnalysisSession의 설정을 CLI 형식으로 변환
analysts = []
for analyst_str in self.session.analysts_selected:
analysts.append(AnalystType(analyst_str))
config = {
'ticker': self.session.ticker,
'analysis_date': self.session.analysis_date.strftime('%Y-%m-%d'),
'analysts': analysts,
'research_depth': self.session.research_depth,
'shallow_thinker': self.session.shallow_thinker,
'deep_thinker': self.session.deep_thinker,
'openai_api_key': api_key
}
return config
async def _execute_trading_analysis(self, config: Dict) -> str:
"""실제 거래 분석 실행"""
try:
# 기본 설정 업데이트
analysis_config = DEFAULT_CONFIG.copy()
analysis_config.update({
'openai_api_key': config['openai_api_key'],
'shallow_thinking_model': config['shallow_thinker'],
'deep_thinking_model': config['deep_thinker'],
})
# 진행 상황 콜백 함수 수정
async def progress_callback(message_type: str, content: str, agent: str = None, step: int = 0, total: int = 0):
# 백엔드에서 진행률 계산
progress_percent = int((step / total) * 99) if total > 0 else 0 # 100%는 완료 시에만
await self._send_websocket_message({
'type': 'analysis_progress',
'session_id': self.session.id,
'message_type': message_type,
'content': content,
'agent': agent,
'progress': progress_percent,
})
# TradingAgentsGraph 초기화 (더 상세한 예외 처리)
try:
trading_graph = TradingAgentsGraph(
config=analysis_config,
progress_callback=progress_callback
)
except Exception as e:
raise Exception(f"TradingAgentsGraph 초기화 실패: {str(e)}")
# 분석 입력 데이터 준비
input_data = {
'ticker': config['ticker'],
'date': config['analysis_date'],
'selected_analysts': [analyst.value for analyst in config['analysts']],
'research_depth': config['research_depth'],
}
# 분석 실행 (실제 CLI 로직 호출)
try:
# 여기서 trading_graph.invoke를 비동기로 실행해야 합니다.
# 현재 trading_graph.invoke가 동기 함수라고 가정하고,
# asyncio.to_thread를 사용해 비동기 컨텍스트에서 실행합니다.
result = await asyncio.to_thread(
trading_graph.invoke,
input_data
)
return result['final_report'] # 결과 형식에 따라 조정 필요
except Exception as e:
raise Exception(f"trading_graph.invoke 실행 실패: {str(e)}")
except Exception as e:
# 에러 메시지를 명확하게 다시 던짐
raise Exception(f"분석 실행 중 오류: {str(e)}")
async def _send_websocket_message(self, message: Dict):
"""WebSocket으로 메시지 전송"""
try:
await self.channel_layer.group_send(
self.user_channel_group,
{
'type': 'trading_analysis_message',
'message': message
}
)
except Exception as e:
print(f"WebSocket 메시지 전송 실패: {e}")
class TradingAnalysisManager:
"""거래 분석 관리자"""
@staticmethod
@database_sync_to_async
def _get_session(user, session_id):
return AnalysisSession.objects.get(id=session_id, user=user)
@staticmethod
async def start_analysis(user, session_id: int):
"""분석 시작"""
try:
session = await TradingAnalysisManager._get_session(user, session_id)
service = TradingAnalysisService(user, session)
result = await service.run_analysis()
return result
except AnalysisSession.DoesNotExist:
raise Exception("분석 세션을 찾을 수 없습니다.")
@staticmethod
@database_sync_to_async
def get_user_analysis_sessions(user) -> List[AnalysisSession]:
"""사용자의 분석 세션 목록 조회"""
return list(AnalysisSession.objects.filter(user=user).order_by('-created_at'))
@staticmethod
@database_sync_to_async
def cancel_analysis(user, session_id: int):
"""분석 취소"""
try:
session = AnalysisSession.objects.get(id=session_id, user=user)
if session.status == 'running':
session.status = 'cancelled'
session.completed_at = datetime.datetime.now()
session.save()
return True
return False
except AnalysisSession.DoesNotExist:
raise Exception("분석 세션을 찾을 수 없습니다.")

View File

@ -1,20 +0,0 @@
from django.urls import path
from . import views
app_name = 'trading_api'
urlpatterns = [
# 분석 설정 및 옵션
path('config/', views.AnalysisConfigView.as_view(), name='analysis_config'),
path('options/', views.get_analysis_options, name='analysis_options'),
# 분석 실행
path('start/', views.StartAnalysisView.as_view(), name='start_analysis'),
path('status/<int:session_id>/', views.AnalysisStatusView.as_view(), name='analysis_status'),
path('cancel/<int:session_id>/', views.CancelAnalysisView.as_view(), name='cancel_analysis'),
# 분석 기록 및 결과
path('history/', views.AnalysisHistoryView.as_view(), name='analysis_history'),
path('report/<int:session_id>/', views.AnalysisReportView.as_view(), name='analysis_report'),
path('running/', views.get_running_analyses, name='running_analyses'),
]

View File

@ -1,220 +0,0 @@
from rest_framework import status, permissions
from rest_framework.decorators import api_view, permission_classes
from rest_framework.response import Response
from rest_framework.views import APIView
from django.shortcuts import get_object_or_404
from asgiref.sync import sync_to_async
import asyncio
import threading
from datetime import datetime
from apps.authentication.models import AnalysisSession
from apps.authentication.serializers import AnalysisSessionSerializer, CreateAnalysisSessionSerializer
from .services import TradingAnalysisManager, TradingAnalysisService
class AnalysisConfigView(APIView):
"""분석 설정 정보 조회"""
permission_classes = [permissions.IsAuthenticated]
def get(self, request):
"""분석 설정 옵션들 반환"""
config = {
'analysts': [
{'value': 'market', 'label': 'Market Analyst', 'description': '시장 데이터 분석'},
{'value': 'social', 'label': 'Social Analyst', 'description': '소셜 센티멘트 분석'},
{'value': 'news', 'label': 'News Analyst', 'description': '뉴스 분석'},
{'value': 'fundamentals', 'label': 'Fundamentals Analyst', 'description': '기본 분석'},
],
'research_depths': [
{'value': 1, 'label': 'Shallow', 'description': '빠른 분석, 적은 토론 라운드'},
{'value': 3, 'label': 'Medium', 'description': '중간 정도 분석, 보통 토론 라운드'},
{'value': 5, 'label': 'Deep', 'description': '깊은 분석, 많은 토론 라운드'},
],
'shallow_thinkers': [
{'value': 'gpt-4o-mini', 'label': 'GPT-4o-mini', 'description': '빠르고 효율적'},
{'value': 'gpt-4.1-nano', 'label': 'GPT-4.1-nano', 'description': '초경량 모델'},
{'value': 'gpt-4.1-mini', 'label': 'GPT-4.1-mini', 'description': '컴팩트 모델'},
{'value': 'gpt-4o', 'label': 'GPT-4o', 'description': '표준 모델'},
],
'deep_thinkers': [
{'value': 'gpt-4.1-nano', 'label': 'GPT-4.1-nano', 'description': '초경량 모델'},
{'value': 'gpt-4.1-mini', 'label': 'GPT-4.1-mini', 'description': '컴팩트 모델'},
{'value': 'gpt-4o', 'label': 'GPT-4o', 'description': '표준 모델'},
{'value': 'o4-mini', 'label': 'o4-mini', 'description': '추론 특화 모델 (컴팩트)'},
{'value': 'o3-mini', 'label': 'o3-mini', 'description': '고급 추론 모델 (경량)'},
{'value': 'o3', 'label': 'o3', 'description': '완전한 고급 추론 모델'},
{'value': 'o1', 'label': 'o1', 'description': '최고급 추론 및 문제 해결 모델'},
]
}
return Response(config)
class StartAnalysisView(APIView):
"""분석 시작"""
permission_classes = [permissions.IsAuthenticated]
def post(self, request):
"""새로운 분석 시작"""
print(f"request.data: {request.data}")
serializer = CreateAnalysisSessionSerializer(data=request.data)
if serializer.is_valid():
session = serializer.save(user=request.user)
# 별도의 스레드에서 비동기 작업 실행
thread = threading.Thread(target=self.run_async_task, args=(self._start_analysis_async(request.user, session.id),))
thread.start()
return Response({
'message': '분석이 시작되었습니다.',
'session_id': session.id,
'session': AnalysisSessionSerializer(session).data
}, status=status.HTTP_201_CREATED)
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
def run_async_task(self, coro):
"""새 이벤트 루프에서 비동기 코루틴 실행"""
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(coro)
loop.close()
async def _start_analysis_async(self, user, session_id):
"""비동기 분석 실행"""
try:
await TradingAnalysisManager.start_analysis(user, session_id)
except Exception as e:
print(f"분석 실행 중 오류: {e}")
class AnalysisStatusView(APIView):
"""분석 상태 조회"""
permission_classes = [permissions.IsAuthenticated]
def get(self, request, session_id):
"""특정 분석 세션의 상태 조회"""
session = get_object_or_404(
AnalysisSession,
id=session_id,
user=request.user
)
serializer = AnalysisSessionSerializer(session)
return Response(serializer.data)
class CancelAnalysisView(APIView):
"""분석 취소"""
permission_classes = [permissions.IsAuthenticated]
def post(self, request, session_id):
"""분석 취소"""
try:
success = TradingAnalysisManager.cancel_analysis(request.user, session_id)
if success:
return Response({
'message': '분석이 취소되었습니다.',
'session_id': session_id
})
else:
return Response({
'message': '취소할 수 없는 상태입니다.',
'session_id': session_id
}, status=status.HTTP_400_BAD_REQUEST)
except Exception as e:
return Response({
'error': str(e)
}, status=status.HTTP_404_NOT_FOUND)
class AnalysisHistoryView(APIView):
"""분석 기록 조회"""
permission_classes = [permissions.IsAuthenticated]
def get(self, request):
"""사용자의 분석 기록 조회"""
sessions = TradingAnalysisManager.get_user_analysis_sessions(request.user)
serializer = AnalysisSessionSerializer(sessions, many=True)
return Response({
'count': len(sessions),
'results': serializer.data
})
class AnalysisReportView(APIView):
"""분석 보고서 조회"""
permission_classes = [permissions.IsAuthenticated]
def get(self, request, session_id):
"""특정 분석 세션의 보고서 조회"""
session = get_object_or_404(
AnalysisSession,
id=session_id,
user=request.user
)
if session.status != 'completed':
return Response({
'message': '분석이 완료되지 않았습니다.',
'status': session.status
}, status=status.HTTP_400_BAD_REQUEST)
return Response({
'session_id': session.id,
'ticker': session.ticker,
'analysis_date': session.analysis_date,
'final_report': session.final_report,
'completed_at': session.completed_at,
'duration': (session.completed_at - session.started_at).total_seconds() if session.started_at and session.completed_at else None
})
@api_view(['GET'])
@permission_classes([permissions.IsAuthenticated])
def get_analysis_options(request):
"""분석 옵션 조회 (간단한 버전)"""
options = {
'default_values': {
'ticker': 'SPY',
'analysis_date': datetime.now().strftime('%Y-%m-%d'),
'analysts_selected': ['market', 'social', 'news', 'fundamentals'],
'research_depth': 3,
'shallow_thinker': 'gpt-4o-mini',
'deep_thinker': 'gpt-4o'
}
}
# 사용자 프로필의 기본값이 있다면 사용
if hasattr(request.user, 'profile'):
profile = request.user.profile
options['user_preferences'] = {
'default_ticker': profile.default_ticker,
'preferred_research_depth': profile.preferred_research_depth,
'preferred_shallow_thinker': profile.preferred_shallow_thinker,
'preferred_deep_thinker': profile.preferred_deep_thinker,
}
return Response(options)
@api_view(['GET'])
@permission_classes([permissions.IsAuthenticated])
def get_running_analyses(request):
"""실행 중인 분석 조회"""
running_sessions = AnalysisSession.objects.filter(
user=request.user,
status='running'
)
serializer = AnalysisSessionSerializer(running_sessions, many=True)
return Response({
'count': len(running_sessions),
'results': serializer.data
})

View File

@ -1 +0,0 @@
# WebSocket app

View File

@ -1,7 +0,0 @@
from django.apps import AppConfig
class WebsocketConfig(AppConfig):
default_auto_field = 'django.db.models.BigAutoField'
name = 'apps.websocket'
verbose_name = 'WebSocket 통신'

View File

@ -1,182 +0,0 @@
import json
from channels.generic.websocket import AsyncWebsocketConsumer
from channels.db import database_sync_to_async
from rest_framework_simplejwt.tokens import UntypedToken
from rest_framework_simplejwt.exceptions import InvalidToken, TokenError
from django.conf import settings
import jwt
class TradingAnalysisConsumer(AsyncWebsocketConsumer):
"""거래 분석 실시간 업데이트 WebSocket Consumer"""
async def connect(self):
"""WebSocket 연결"""
# JWT 토큰 인증
user = await self.get_user_from_token()
if user and user.is_authenticated:
self.user = user
self.user_group_name = f"user_{user.id}"
# 사용자 그룹에 추가
await self.channel_layer.group_add(
self.user_group_name,
self.channel_name
)
await self.accept()
# 연결 성공 메시지 전송
await self.send(text_data=json.dumps({
'type': 'connection_established',
'message': 'WebSocket 연결이 성공적으로 설정되었습니다.',
'user_id': user.id
}))
else:
await self.close()
async def disconnect(self, close_code):
"""WebSocket 연결 해제"""
if hasattr(self, 'user_group_name'):
await self.channel_layer.group_discard(
self.user_group_name,
self.channel_name
)
async def receive(self, text_data):
"""클라이언트로부터 메시지 수신"""
try:
text_data_json = json.loads(text_data)
message_type = text_data_json.get('type', '')
if message_type == 'ping':
# 연결 상태 확인용 ping
await self.send(text_data=json.dumps({
'type': 'pong',
'timestamp': text_data_json.get('timestamp')
}))
elif message_type == 'subscribe_analysis':
# 특정 분석 세션 구독
session_id = text_data_json.get('session_id')
if session_id:
await self.subscribe_to_analysis(session_id)
except json.JSONDecodeError:
await self.send(text_data=json.dumps({
'type': 'error',
'message': '잘못된 JSON 형식입니다.'
}))
async def subscribe_to_analysis(self, session_id):
"""특정 분석 세션 구독"""
# 분석 세션이 사용자의 것인지 확인
session_exists = await self.check_session_ownership(session_id)
if session_exists:
analysis_group = f"analysis_{session_id}"
await self.channel_layer.group_add(
analysis_group,
self.channel_name
)
await self.send(text_data=json.dumps({
'type': 'subscription_confirmed',
'session_id': session_id,
'message': f'분석 세션 {session_id}에 구독되었습니다.'
}))
else:
await self.send(text_data=json.dumps({
'type': 'subscription_failed',
'session_id': session_id,
'message': '해당 분석 세션에 대한 권한이 없습니다.'
}))
# 분석 관련 메시지 핸들러들
async def trading_analysis_message(self, event):
"""분석 관련 메시지 전송"""
message = event['message']
await self.send(text_data=json.dumps(message))
async def analysis_progress(self, event):
"""분석 진행 상황 업데이트"""
await self.send(text_data=json.dumps(event))
async def analysis_started(self, event):
"""분석 시작 알림"""
await self.send(text_data=json.dumps(event))
async def analysis_completed(self, event):
"""분석 완료 알림"""
await self.send(text_data=json.dumps(event))
async def analysis_failed(self, event):
"""분석 실패 알림"""
await self.send(text_data=json.dumps(event))
@database_sync_to_async
def get_user_from_token(self):
"""JWT 토큰에서 사용자 정보 추출"""
from django.contrib.auth import get_user_model
from django.contrib.auth.models import AnonymousUser
User = get_user_model()
try:
# URL에서 토큰 추출 (query parameter 또는 header)
token = None
# Query parameter에서 토큰 추출
query_string = self.scope.get('query_string', b'').decode()
if 'token=' in query_string:
token = query_string.split('token=')[1].split('&')[0]
# 헤더에서 토큰 추출
if not token:
headers = dict(self.scope['headers'])
auth_header = headers.get(b'authorization', b'').decode()
if auth_header.startswith('Bearer '):
token = auth_header.split(' ')[1]
if not token:
return AnonymousUser()
# JWT 토큰 검증
try:
# simplejwt 설정에서 올바른 서명 키와 알고리즘 가져오기
from rest_framework_simplejwt.settings import api_settings
UntypedToken(token) # 토큰 기본 구조 검증
# 토큰에서 사용자 ID 추출
decoded_token = jwt.decode(
token,
api_settings.SIGNING_KEY, # 올바른 서명 키 사용
algorithms=[api_settings.ALGORITHM] # 올바른 알고리즘 사용
)
user_id = decoded_token.get('user_id')
if user_id:
user = User.objects.get(id=user_id)
return user
except (InvalidToken, TokenError, jwt.ExpiredSignatureError):
return AnonymousUser()
except User.DoesNotExist:
return AnonymousUser()
except Exception as e:
print(f"WebSocket 인증 중 오류: {e}")
return AnonymousUser()
return AnonymousUser()
@database_sync_to_async
def check_session_ownership(self, session_id):
"""분석 세션 소유권 확인"""
try:
# 지연 import
from apps.authentication.models import AnalysisSession
session = AnalysisSession.objects.get(id=session_id, user=self.user)
return True
except AnalysisSession.DoesNotExist:
return False

View File

@ -1,6 +0,0 @@
from django.urls import re_path
from . import consumers
websocket_urlpatterns = [
re_path(r'ws/trading-analysis/$', consumers.TradingAnalysisConsumer.as_asgi()),
]

View File

@ -1,8 +0,0 @@
from django.urls import path
app_name = 'websocket'
# WebSocket은 ASGI routing을 통해 처리되므로 HTTP URL은 없음
urlpatterns = [
# WebSocket 관련 HTTP 엔드포인트가 필요한 경우 여기에 추가
]

View File

@ -1,22 +0,0 @@
#!/usr/bin/env python
"""Django's command-line utility for administrative tasks."""
import os
import sys
def main():
"""Run administrative tasks."""
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'tradingagents_web.settings')
try:
from django.core.management import execute_from_command_line
except ImportError as exc:
raise ImportError(
"Couldn't import Django. Are you sure it's installed and "
"available on your PYTHONPATH environment variable? Did you "
"forget to activate a virtual environment?"
) from exc
execute_from_command_line(sys.argv)
if __name__ == '__main__':
main()

View File

@ -0,0 +1,23 @@
[tool.uv]
preview = true
[project]
name = "tradingagents-backend"
version = "0.1.0"
dependencies = [
"fastapi",
"uvicorn[standard]",
"sqlmodel",
"python-jose[cryptography]",
"passlib[bcrypt]",
"python-multipart",
"pydantic[email]",
"tenacity",
"aiomysql"
]
[project.optional-dependencies]
dev = [
"pytest",
"requests",
]

View File

@ -0,0 +1,9 @@
fastapi
uvicorn[standard]
sqlmodel
python-jose[cryptography]
passlib[bcrypt]
python-multipart
pydantic[email]
tenacity
aiomysql

View File

@ -1,33 +0,0 @@
# #!/bin/bash
# echo "🚀 Django 서버 시작 - 데이터베이스 초기화"
# # Django 설정 모듈 환경 변수 설정
# export DJANGO_SETTINGS_MODULE=tradingagents_web.settings
# # 1. 데이터베이스 초기화
# echo "🔄 데이터베이스 초기화 중..."
# docker exec -i tradingagents_mysql mysql -u root -ppassword -e "
# DROP DATABASE IF EXISTS tradingagents_db;
# CREATE DATABASE tradingagents_db CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;
# "
# # 2. 마이그레이션
# echo "🔄 마이그레이션 중..."
# python manage.py makemigrations authentication
# python manage.py makemigrations
# python manage.py migrate
# # 3. 관리자 계정 생성
# echo "🔄 관리자 계정 생성 중..."
# python manage.py shell -c "
# from django.contrib.auth import get_user_model;
# User = get_user_model();
# if not User.objects.filter(email='admin@example.com').exists():
# User.objects.create_superuser('admin@example.com', 'admin', 'admin123!');
# print('✅ 관리자: admin@example.com / admin123!');
# "
# 4. 서버 시작 (환경 변수와 함께)
echo "🎉 서버 시작!"
daphne -b 0.0.0.0 -p 8000 tradingagents_web.asgi:application

View File

@ -1 +0,0 @@
# This file makes Python treat the directory as a package

View File

@ -1,28 +0,0 @@
"""
ASGI config for tradingagents_web project.
It exposes the ASGI callable as a module-level variable named ``application``.
For more information on this file, see
https://docs.djangoproject.com/en/4.2/howto/deployment/asgi/
"""
import os
import django
from django.core.asgi import get_asgi_application
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'tradingagents_web.settings')
django.setup()
from channels.auth import AuthMiddlewareStack
from channels.routing import ProtocolTypeRouter, URLRouter
import apps.websocket.routing
application = ProtocolTypeRouter({
"http": get_asgi_application(),
"websocket": AuthMiddlewareStack(
URLRouter(
apps.websocket.routing.websocket_urlpatterns
)
),
})

View File

@ -1,180 +0,0 @@
"""
Django settings for tradingagents_web project.
"""
from pathlib import Path
from decouple import config
import os
# Build paths inside the project like this: BASE_DIR / 'subdir'.
BASE_DIR = Path(__file__).resolve().parent.parent
# SECURITY WARNING: keep the secret key used in production secret!
SECRET_KEY = config('SECRET_KEY', default='django-insecure-your-secret-key-here')
# SECURITY WARNING: don't run with debug turned on in production!
DEBUG = config('DEBUG', default=True, cast=bool)
ALLOWED_HOSTS = config('ALLOWED_HOSTS', default='localhost,127.0.0.1').split(',')
# Application definition
DJANGO_APPS = [
'django.contrib.admin',
'django.contrib.auth',
'django.contrib.contenttypes',
'django.contrib.sessions',
'django.contrib.messages',
'django.contrib.staticfiles',
]
THIRD_PARTY_APPS = [
'rest_framework',
'rest_framework_simplejwt',
'corsheaders',
'channels',
]
LOCAL_APPS = [
'apps.authentication',
'apps.trading_api',
'apps.websocket',
]
INSTALLED_APPS = DJANGO_APPS + THIRD_PARTY_APPS + LOCAL_APPS
MIDDLEWARE = [
'corsheaders.middleware.CorsMiddleware',
'django.middleware.security.SecurityMiddleware',
'django.contrib.sessions.middleware.SessionMiddleware',
'django.middleware.common.CommonMiddleware',
'django.middleware.csrf.CsrfViewMiddleware',
'django.contrib.auth.middleware.AuthenticationMiddleware',
'django.contrib.messages.middleware.MessageMiddleware',
'django.middleware.clickjacking.XFrameOptionsMiddleware',
]
ROOT_URLCONF = 'tradingagents_web.urls'
TEMPLATES = [
{
'BACKEND': 'django.template.backends.django.DjangoTemplates',
'DIRS': [],
'APP_DIRS': True,
'OPTIONS': {
'context_processors': [
'django.template.context_processors.debug',
'django.template.context_processors.request',
'django.contrib.auth.context_processors.auth',
'django.contrib.messages.context_processors.messages',
],
},
},
]
WSGI_APPLICATION = 'tradingagents_web.wsgi.application'
ASGI_APPLICATION = 'tradingagents_web.asgi.application'
# Database
DATABASES = {
'default': {
'ENGINE': 'django.db.backends.mysql',
'NAME': config('DB_NAME', default='tradingagents_web'),
'USER': config('DB_USER', default='root'),
'PASSWORD': config('DB_PASSWORD', default='password'),
'HOST': config('DB_HOST', default='localhost'),
'PORT': config('DB_PORT', default='3306'),
'OPTIONS': {
'charset': 'utf8mb4',
'init_command': "SET sql_mode='STRICT_TRANS_TABLES'",
},
}
}
# Password validation
AUTH_PASSWORD_VALIDATORS = [
{
'NAME': 'django.contrib.auth.password_validation.UserAttributeSimilarityValidator',
},
{
'NAME': 'django.contrib.auth.password_validation.MinimumLengthValidator',
},
{
'NAME': 'django.contrib.auth.password_validation.CommonPasswordValidator',
},
{
'NAME': 'django.contrib.auth.password_validation.NumericPasswordValidator',
},
]
# Internationalization
LANGUAGE_CODE = 'ko-kr'
TIME_ZONE = 'Asia/Seoul'
USE_I18N = True
USE_TZ = True
# Static files (CSS, JavaScript, Images)
STATIC_URL = '/static/'
STATIC_ROOT = BASE_DIR / 'staticfiles'
# Media files
MEDIA_URL = '/media/'
MEDIA_ROOT = BASE_DIR / 'media'
# Default primary key field type
DEFAULT_AUTO_FIELD = 'django.db.models.BigAutoField'
# Custom User Model
AUTH_USER_MODEL = 'authentication.User'
# REST Framework configuration
REST_FRAMEWORK = {
'DEFAULT_AUTHENTICATION_CLASSES': [
'rest_framework_simplejwt.authentication.JWTAuthentication',
],
'DEFAULT_PERMISSION_CLASSES': [
'rest_framework.permissions.IsAuthenticated',
],
'DEFAULT_RENDERER_CLASSES': [
'rest_framework.renderers.JSONRenderer',
],
'DEFAULT_PAGINATION_CLASS': 'rest_framework.pagination.PageNumberPagination',
'PAGE_SIZE': 20,
}
# JWT configuration
from datetime import timedelta
SIMPLE_JWT = {
'ACCESS_TOKEN_LIFETIME': timedelta(minutes=60),
'REFRESH_TOKEN_LIFETIME': timedelta(days=7),
'ROTATE_REFRESH_TOKENS': True,
'BLACKLIST_AFTER_ROTATION': True,
}
# CORS configuration for React frontend
CORS_ALLOWED_ORIGINS = [
"http://localhost:3000", # React development server
"http://127.0.0.1:3000",
]
CORS_ALLOW_CREDENTIALS = True
# Channels configuration for WebSocket
CHANNEL_LAYERS = {
'default': {
'BACKEND': 'channels_redis.core.RedisChannelLayer',
'CONFIG': {
"hosts": [(config('REDIS_HOST', default='127.0.0.1'), config('REDIS_PORT', default=6379, cast=int))],
},
},
}
# OpenAI Configuration
OPENAI_API_KEY = config('OPENAI_API_KEY', default='') # 개발자가 등록한 기본 API 키
# Trading Agents Configuration
TRADING_AGENTS_CONFIG = {
'DEFAULT_TICKER': 'SPY',
'MAX_CONCURRENT_ANALYSES': 5,
'ANALYSIS_TIMEOUT': 300, # 5 minutes
}

View File

@ -1,19 +0,0 @@
"""
tradingagents_web URL Configuration
"""
from django.contrib import admin
from django.urls import path, include
from django.conf import settings
from django.conf.urls.static import static
urlpatterns = [
path('admin/', admin.site.urls),
path('api/auth/', include('apps.authentication.urls')),
path('api/trading/', include('apps.trading_api.urls')),
path('ws/', include('apps.websocket.urls')),
]
# Serve media files in development
if settings.DEBUG:
urlpatterns += static(settings.MEDIA_URL, document_root=settings.MEDIA_ROOT)
urlpatterns += static(settings.STATIC_URL, document_root=settings.STATIC_ROOT)

View File

@ -1,16 +0,0 @@
"""
WSGI config for tradingagents_web project.
It exposes the WSGI callable as a module-level variable named ``application``.
For more information on this file, see
https://docs.djangoproject.com/en/4.2/howto/deployment/wsgi/
"""
import os
from django.core.wsgi import get_wsgi_application
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'tradingagents_web.settings')
application = get_wsgi_application()

0
web/backend/uv.lock Normal file
View File