TradingAgents/tradingagents/api/services/auth_service.py

118 lines
2.8 KiB
Python

"""Authentication service for password hashing and JWT tokens."""
from datetime import datetime, timedelta, timezone
from typing import Optional, Dict, Any
import jwt
from pwdlib import PasswordHash
from tradingagents.api.config import settings
# Password hashing with Argon2
pwd_context = PasswordHash.recommended()
def hash_password(password: str) -> str:
"""
Hash a password using Argon2.
Args:
password: Plain text password
Returns:
Hashed password string
Example:
>>> hashed = hash_password("SecurePassword123!")
>>> hashed.startswith("$argon2")
True
"""
return pwd_context.hash(password)
def verify_password(plain_password: str, hashed_password: str) -> bool:
"""
Verify a password against a hash.
Args:
plain_password: Plain text password
hashed_password: Hashed password to verify against
Returns:
True if password matches, False otherwise
Example:
>>> hashed = hash_password("SecurePassword123!")
>>> verify_password("SecurePassword123!", hashed)
True
>>> verify_password("WrongPassword", hashed)
False
"""
return pwd_context.verify(plain_password, hashed_password)
def create_access_token(
data: Dict[str, Any],
expires_delta: Optional[timedelta] = None
) -> str:
"""
Create a JWT access token.
Args:
data: Data to encode in the token (e.g., {"sub": "username"})
expires_delta: Token expiration time (default: from settings)
Returns:
Encoded JWT token
Example:
>>> token = create_access_token({"sub": "testuser"})
>>> isinstance(token, str)
True
"""
to_encode = data.copy()
if expires_delta:
expire = datetime.now(timezone.utc) + expires_delta
else:
expire = datetime.now(timezone.utc) + timedelta(minutes=settings.JWT_EXPIRATION_MINUTES)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(
to_encode,
settings.JWT_SECRET_KEY,
algorithm=settings.JWT_ALGORITHM
)
return encoded_jwt
def decode_access_token(token: str) -> Optional[Dict[str, Any]]:
"""
Decode and validate a JWT access token.
Args:
token: JWT token to decode
Returns:
Decoded token payload, or None if invalid
Example:
>>> token = create_access_token({"sub": "testuser"})
>>> payload = decode_access_token(token)
>>> payload["sub"]
'testuser'
"""
try:
payload = jwt.decode(
token,
settings.JWT_SECRET_KEY,
algorithms=[settings.JWT_ALGORITHM]
)
return payload
except jwt.ExpiredSignatureError:
return None
except jwt.InvalidTokenError:
return None