TradingAgents/tests/api/test_auth.py

826 lines
28 KiB
Python

"""
Test suite for authentication endpoints and JWT handling.
This module tests Issue #48 authentication features:
1. User login with JWT token generation
2. Password hashing with Argon2 (via pwdlib)
3. JWT token validation and expiration
4. Invalid credentials handling
5. Token refresh functionality
6. Security best practices
Tests follow TDD - written before implementation.
"""
import pytest
from datetime import datetime, timedelta
from typing import Dict, Any
pytestmark = pytest.mark.asyncio
# ============================================================================
# Unit Tests: Password Hashing
# ============================================================================
class TestPasswordHashing:
"""Test password hashing using Argon2 via pwdlib."""
def test_hash_password_generates_hash(self):
"""Test that hash_password creates a valid hash."""
# Arrange
password = "SecurePassword123!"
try:
from tradingagents.api.services.auth_service import hash_password
# Act
hashed = hash_password(password)
# Assert
assert hashed is not None
assert hashed != password # Hash should differ from plaintext
assert len(hashed) > 50 # Argon2 hashes are long
assert hashed.startswith("$argon2") # Argon2 hash format
except ImportError:
pytest.skip("auth_service not implemented yet")
def test_hash_password_deterministic_with_same_input(self):
"""Test that same password produces different hashes (salted)."""
# Arrange
password = "SecurePassword123!"
try:
from tradingagents.api.services.auth_service import hash_password
# Act
hash1 = hash_password(password)
hash2 = hash_password(password)
# Assert: Different hashes (due to random salt)
assert hash1 != hash2
except ImportError:
pytest.skip("auth_service not implemented yet")
def test_verify_password_with_correct_password(self):
"""Test that verify_password succeeds with correct password."""
# Arrange
password = "SecurePassword123!"
try:
from tradingagents.api.services.auth_service import hash_password, verify_password
hashed = hash_password(password)
# Act
result = verify_password(password, hashed)
# Assert
assert result is True
except ImportError:
pytest.skip("auth_service not implemented yet")
def test_verify_password_with_incorrect_password(self):
"""Test that verify_password fails with incorrect password."""
# Arrange
correct_password = "SecurePassword123!"
wrong_password = "WrongPassword456!"
try:
from tradingagents.api.services.auth_service import hash_password, verify_password
hashed = hash_password(correct_password)
# Act
result = verify_password(wrong_password, hashed)
# Assert
assert result is False
except ImportError:
pytest.skip("auth_service not implemented yet")
def test_hash_password_handles_special_characters(self):
"""Test password hashing with special characters."""
# Arrange
passwords = [
"P@ssw0rd!",
"密码123", # Chinese characters
"пароль", # Cyrillic
"🔒secure🔑", # Emojis
]
try:
from tradingagents.api.services.auth_service import hash_password, verify_password
for password in passwords:
# Act
hashed = hash_password(password)
# Assert
assert verify_password(password, hashed)
except ImportError:
pytest.skip("auth_service not implemented yet")
def test_hash_password_empty_string(self):
"""Test hashing empty password."""
# Arrange
password = ""
try:
from tradingagents.api.services.auth_service import hash_password
# Act
hashed = hash_password(password)
# Assert: Should still create a hash (validation happens elsewhere)
assert hashed is not None
assert len(hashed) > 0
except ImportError:
pytest.skip("auth_service not implemented yet")
# ============================================================================
# Unit Tests: JWT Token Generation
# ============================================================================
class TestJWTTokenGeneration:
"""Test JWT token creation and encoding."""
def test_create_access_token_generates_valid_token(self, mock_env_jwt_secret):
"""Test that create_access_token generates a valid JWT."""
# Arrange
token_data = {"sub": "testuser"}
try:
from tradingagents.api.services.auth_service import create_access_token
# Act
token = create_access_token(token_data)
# Assert
assert token is not None
assert isinstance(token, str)
assert len(token) > 50 # JWT tokens are long
assert token.count(".") == 2 # JWT format: header.payload.signature
except ImportError:
pytest.skip("auth_service not implemented yet")
def test_create_access_token_includes_expiration(self, mock_env_jwt_secret):
"""Test that token includes expiration claim."""
# Arrange
token_data = {"sub": "testuser"}
try:
from tradingagents.api.services.auth_service import create_access_token
import jwt
import os
# Act
token = create_access_token(token_data)
# Decode token to inspect claims
secret_key = os.getenv("JWT_SECRET_KEY", "test-secret-key")
algorithm = os.getenv("JWT_ALGORITHM", "HS256")
decoded = jwt.decode(token, secret_key, algorithms=[algorithm])
# Assert
assert "exp" in decoded
assert "sub" in decoded
assert decoded["sub"] == "testuser"
except ImportError:
pytest.skip("auth_service not implemented yet")
def test_create_access_token_custom_expiration(self, mock_env_jwt_secret):
"""Test creating token with custom expiration time."""
# Arrange
token_data = {"sub": "testuser"}
expires_delta = timedelta(hours=1)
try:
from tradingagents.api.services.auth_service import create_access_token
import jwt
import os
# Act
token = create_access_token(token_data, expires_delta=expires_delta)
# Decode token
secret_key = os.getenv("JWT_SECRET_KEY", "test-secret-key")
algorithm = os.getenv("JWT_ALGORITHM", "HS256")
decoded = jwt.decode(token, secret_key, algorithms=[algorithm])
# Assert: Expiration is approximately 1 hour from now
exp_time = datetime.fromtimestamp(decoded["exp"])
expected_exp = datetime.utcnow() + expires_delta
time_diff = abs((exp_time - expected_exp).total_seconds())
assert time_diff < 5 # Within 5 seconds
except ImportError:
pytest.skip("auth_service not implemented yet")
def test_create_access_token_includes_custom_claims(self, mock_env_jwt_secret):
"""Test that custom claims are included in token."""
# Arrange
token_data = {
"sub": "testuser",
"email": "test@example.com",
"role": "admin",
}
try:
from tradingagents.api.services.auth_service import create_access_token
import jwt
import os
# Act
token = create_access_token(token_data)
# Decode token
secret_key = os.getenv("JWT_SECRET_KEY", "test-secret-key")
algorithm = os.getenv("JWT_ALGORITHM", "HS256")
decoded = jwt.decode(token, secret_key, algorithms=[algorithm])
# Assert
assert decoded["sub"] == "testuser"
assert decoded["email"] == "test@example.com"
assert decoded["role"] == "admin"
except ImportError:
pytest.skip("auth_service not implemented yet")
# ============================================================================
# Unit Tests: JWT Token Validation
# ============================================================================
class TestJWTTokenValidation:
"""Test JWT token decoding and validation."""
def test_decode_token_with_valid_token(self, mock_env_jwt_secret):
"""Test decoding a valid JWT token."""
# Arrange
token_data = {"sub": "testuser"}
try:
from tradingagents.api.services.auth_service import (
create_access_token,
decode_access_token,
)
token = create_access_token(token_data)
# Act
decoded = decode_access_token(token)
# Assert
assert decoded is not None
assert decoded["sub"] == "testuser"
except ImportError:
pytest.skip("auth_service not implemented yet")
def test_decode_token_with_expired_token(self, mock_env_jwt_secret):
"""Test that expired tokens are rejected."""
# Arrange
token_data = {"sub": "testuser"}
try:
from tradingagents.api.services.auth_service import (
create_access_token,
decode_access_token,
)
from jwt.exceptions import ExpiredSignatureError
# Create already-expired token
token = create_access_token(token_data, expires_delta=timedelta(seconds=-1))
# Act & Assert
with pytest.raises(ExpiredSignatureError):
decode_access_token(token)
except ImportError:
pytest.skip("auth_service not implemented yet")
def test_decode_token_with_invalid_signature(self, mock_env_jwt_secret):
"""Test that tokens with invalid signature are rejected."""
# Arrange
token_data = {"sub": "testuser"}
try:
from tradingagents.api.services.auth_service import (
create_access_token,
decode_access_token,
)
from jwt.exceptions import InvalidSignatureError
token = create_access_token(token_data)
# Tamper with token
tampered_token = token[:-10] + "tampered00"
# Act & Assert
with pytest.raises(InvalidSignatureError):
decode_access_token(tampered_token)
except ImportError:
pytest.skip("auth_service not implemented yet")
def test_decode_token_with_malformed_token(self, mock_env_jwt_secret):
"""Test that malformed tokens are rejected."""
# Arrange
malformed_tokens = [
"not.a.jwt",
"invalid",
"",
"a.b", # Only 2 parts instead of 3
]
try:
from tradingagents.api.services.auth_service import decode_access_token
from jwt.exceptions import DecodeError
for token in malformed_tokens:
# Act & Assert
with pytest.raises(DecodeError):
decode_access_token(token)
except ImportError:
pytest.skip("auth_service not implemented yet")
# ============================================================================
# Integration Tests: Login Endpoint
# ============================================================================
class TestLoginEndpoint:
"""Test POST /api/v1/auth/login endpoint."""
async def test_login_with_valid_credentials(self, client, test_user, test_user_data):
"""Test successful login with correct username and password."""
# Arrange
login_data = {
"username": test_user_data["username"],
"password": test_user_data["password"],
}
# Act
response = await client.post("/api/v1/auth/login", json=login_data)
# Assert
assert response.status_code == 200
data = response.json()
assert "access_token" in data
assert "token_type" in data
assert data["token_type"] == "bearer"
assert len(data["access_token"]) > 50
async def test_login_with_invalid_username(self, client, test_user):
"""Test login fails with non-existent username."""
# Arrange
login_data = {
"username": "nonexistent",
"password": "SomePassword123!",
}
# Act
response = await client.post("/api/v1/auth/login", json=login_data)
# Assert
assert response.status_code == 401
data = response.json()
assert "detail" in data
assert "incorrect" in data["detail"].lower() or "invalid" in data["detail"].lower()
async def test_login_with_invalid_password(self, client, test_user, test_user_data):
"""Test login fails with incorrect password."""
# Arrange
login_data = {
"username": test_user_data["username"],
"password": "WrongPassword123!",
}
# Act
response = await client.post("/api/v1/auth/login", json=login_data)
# Assert
assert response.status_code == 401
data = response.json()
assert "detail" in data
async def test_login_with_missing_username(self, client):
"""Test login validation requires username."""
# Arrange
login_data = {
"password": "SomePassword123!",
}
# Act
response = await client.post("/api/v1/auth/login", json=login_data)
# Assert
assert response.status_code == 422 # Validation error
async def test_login_with_missing_password(self, client):
"""Test login validation requires password."""
# Arrange
login_data = {
"username": "testuser",
}
# Act
response = await client.post("/api/v1/auth/login", json=login_data)
# Assert
assert response.status_code == 422
async def test_login_with_empty_credentials(self, client):
"""Test login with empty username and password."""
# Arrange
login_data = {
"username": "",
"password": "",
}
# Act
response = await client.post("/api/v1/auth/login", json=login_data)
# Assert
assert response.status_code in [401, 422]
async def test_login_returns_user_info(self, client, test_user, test_user_data):
"""Test that login response includes user information."""
# Arrange
login_data = {
"username": test_user_data["username"],
"password": test_user_data["password"],
}
# Act
response = await client.post("/api/v1/auth/login", json=login_data)
# Assert
assert response.status_code == 200
data = response.json()
# May include user info like username, email
assert "access_token" in data
async def test_login_token_is_valid_jwt(self, client, test_user, test_user_data, mock_env_jwt_secret):
"""Test that login returns a valid, decodable JWT token."""
# Arrange
login_data = {
"username": test_user_data["username"],
"password": test_user_data["password"],
}
# Act
response = await client.post("/api/v1/auth/login", json=login_data)
# Assert
assert response.status_code == 200
data = response.json()
token = data["access_token"]
# Verify token format
assert token.count(".") == 2
# Try to decode
try:
from tradingagents.api.services.auth_service import decode_access_token
decoded = decode_access_token(token)
assert decoded["sub"] == test_user_data["username"]
except ImportError:
# Just verify format if service not implemented
assert len(token) > 50
# ============================================================================
# Integration Tests: Protected Endpoints
# ============================================================================
class TestProtectedEndpoints:
"""Test that endpoints require valid JWT authentication."""
async def test_protected_endpoint_without_token(self, client):
"""Test that protected endpoint rejects requests without token."""
# Act
response = await client.get("/api/v1/strategies")
# Assert
assert response.status_code == 401
data = response.json()
assert "detail" in data
async def test_protected_endpoint_with_valid_token(self, client, test_user, auth_headers):
"""Test that protected endpoint accepts valid token."""
# Act
response = await client.get("/api/v1/strategies", headers=auth_headers)
# Assert
assert response.status_code == 200
async def test_protected_endpoint_with_expired_token(self, client, expired_jwt_token):
"""Test that expired token is rejected."""
# Arrange
headers = {"Authorization": f"Bearer {expired_jwt_token}"}
# Act
response = await client.get("/api/v1/strategies", headers=headers)
# Assert
assert response.status_code == 401
data = response.json()
assert "expired" in data["detail"].lower() or "invalid" in data["detail"].lower()
async def test_protected_endpoint_with_invalid_token(self, client, invalid_jwt_token):
"""Test that invalid token is rejected."""
# Arrange
headers = {"Authorization": f"Bearer {invalid_jwt_token}"}
# Act
response = await client.get("/api/v1/strategies", headers=headers)
# Assert
assert response.status_code == 401
async def test_protected_endpoint_with_malformed_header(self, client):
"""Test various malformed Authorization headers."""
# Arrange
malformed_headers = [
{"Authorization": "Bearer"}, # Missing token
{"Authorization": "token123"}, # Missing 'Bearer'
{"Authorization": "Basic token123"}, # Wrong scheme
{"Authorization": ""}, # Empty
]
for headers in malformed_headers:
# Act
response = await client.get("/api/v1/strategies", headers=headers)
# Assert
assert response.status_code == 401
async def test_protected_endpoint_extracts_user_from_token(self, client, test_user, auth_headers):
"""Test that endpoint can access user info from token."""
# Act
response = await client.get("/api/v1/strategies", headers=auth_headers)
# Assert
assert response.status_code == 200
# User context should be available to endpoint handler
# ============================================================================
# Edge Cases: Authentication
# ============================================================================
class TestAuthenticationEdgeCases:
"""Test edge cases and boundary conditions for authentication."""
async def test_login_case_sensitive_username(self, client, test_user, test_user_data):
"""Test that username is case-sensitive."""
# Arrange
login_data = {
"username": test_user_data["username"].upper(),
"password": test_user_data["password"],
}
# Act
response = await client.post("/api/v1/auth/login", json=login_data)
# Assert: Should fail if username case doesn't match
# (depends on implementation - could be case-insensitive)
assert response.status_code in [200, 401]
async def test_login_with_sql_injection_attempt(self, client, sample_sql_injection_payloads):
"""Test that SQL injection in login is prevented."""
# Arrange
for payload in sample_sql_injection_payloads:
login_data = {
"username": payload,
"password": "password",
}
# Act
response = await client.post("/api/v1/auth/login", json=login_data)
# Assert: Should return 401, not 500 (error) or 200 (bypass)
assert response.status_code in [401, 422]
async def test_login_with_very_long_username(self, client):
"""Test login with extremely long username."""
# Arrange
login_data = {
"username": "a" * 10000,
"password": "password",
}
# Act
response = await client.post("/api/v1/auth/login", json=login_data)
# Assert: Should handle gracefully (not crash)
assert response.status_code in [401, 422]
async def test_login_with_very_long_password(self, client):
"""Test login with extremely long password."""
# Arrange
login_data = {
"username": "testuser",
"password": "p" * 10000,
}
# Act
response = await client.post("/api/v1/auth/login", json=login_data)
# Assert
assert response.status_code in [401, 422]
async def test_concurrent_logins_same_user(self, client, test_user, test_user_data):
"""Test multiple concurrent logins for same user."""
# Arrange
login_data = {
"username": test_user_data["username"],
"password": test_user_data["password"],
}
# Act: Login multiple times
response1 = await client.post("/api/v1/auth/login", json=login_data)
response2 = await client.post("/api/v1/auth/login", json=login_data)
response3 = await client.post("/api/v1/auth/login", json=login_data)
# Assert: All should succeed with different tokens
assert response1.status_code == 200
assert response2.status_code == 200
assert response3.status_code == 200
token1 = response1.json()["access_token"]
token2 = response2.json()["access_token"]
token3 = response3.json()["access_token"]
# Tokens should be different (each has unique exp timestamp)
assert token1 != token2
assert token2 != token3
async def test_token_with_tampered_payload(self, client, auth_headers, mock_env_jwt_secret):
"""Test that tampering with token payload is detected."""
# Arrange
import base64
import json
token = auth_headers["Authorization"].split(" ")[1]
parts = token.split(".")
# Tamper with payload
try:
payload = json.loads(base64.urlsafe_b64decode(parts[1] + "=="))
payload["sub"] = "admin" # Change username to admin
tampered_payload = base64.urlsafe_b64encode(
json.dumps(payload).encode()
).decode().rstrip("=")
tampered_token = f"{parts[0]}.{tampered_payload}.{parts[2]}"
headers = {"Authorization": f"Bearer {tampered_token}"}
# Act
response = await client.get("/api/v1/strategies", headers=headers)
# Assert: Should reject due to invalid signature
assert response.status_code == 401
except Exception:
# If token format is different, skip test
pytest.skip("Token format not as expected")
async def test_multiple_authorization_headers(self, client, auth_headers):
"""Test behavior with multiple Authorization headers."""
# Arrange
# This tests HTTP header handling edge case
# Most frameworks use the first or last header
# Act & Assert: Should handle gracefully
response = await client.get("/api/v1/strategies", headers=auth_headers)
assert response.status_code in [200, 400, 401]
async def test_bearer_token_case_insensitive(self, client, jwt_token):
"""Test that 'Bearer' scheme is case-insensitive."""
# Arrange
headers_variants = [
{"Authorization": f"Bearer {jwt_token}"},
{"Authorization": f"bearer {jwt_token}"},
{"Authorization": f"BEARER {jwt_token}"},
]
for headers in headers_variants:
# Act
response = await client.get("/api/v1/strategies", headers=headers)
# Assert: Should accept regardless of case
assert response.status_code in [200, 401]
# ============================================================================
# Security Tests
# ============================================================================
class TestAuthenticationSecurity:
"""Test security aspects of authentication."""
async def test_login_does_not_leak_user_existence(self, client):
"""Test that login error doesn't reveal if user exists."""
# Arrange
valid_user_wrong_pass = {
"username": "testuser",
"password": "wrongpassword",
}
invalid_user = {
"username": "nonexistent",
"password": "somepassword",
}
# Act
response1 = await client.post("/api/v1/auth/login", json=valid_user_wrong_pass)
response2 = await client.post("/api/v1/auth/login", json=invalid_user)
# Assert: Both should return same error (don't reveal user existence)
assert response1.status_code == 401
assert response2.status_code == 401
# Error messages should be generic
assert response1.json()["detail"] == response2.json()["detail"]
async def test_password_not_in_response(self, client, test_user, test_user_data):
"""Test that password is never returned in responses."""
# Arrange
login_data = {
"username": test_user_data["username"],
"password": test_user_data["password"],
}
# Act
response = await client.post("/api/v1/auth/login", json=login_data)
# Assert
assert response.status_code == 200
response_text = response.text.lower()
# Password should not appear in response
assert test_user_data["password"].lower() not in response_text
assert "password" not in response.json()
async def test_timing_attack_resistance(self, client, test_user, test_user_data):
"""Test that login timing doesn't reveal user existence."""
# Arrange
import time
valid_user = {
"username": test_user_data["username"],
"password": "wrongpassword",
}
invalid_user = {
"username": "nonexistent_user_xyz",
"password": "wrongpassword",
}
# Act: Measure login time for both
start1 = time.time()
response1 = await client.post("/api/v1/auth/login", json=valid_user)
time1 = time.time() - start1
start2 = time.time()
response2 = await client.post("/api/v1/auth/login", json=invalid_user)
time2 = time.time() - start2
# Assert: Times should be similar (within 100ms)
# This tests constant-time password verification
time_diff = abs(time1 - time2)
# Note: This is a weak test due to network/process variations
# Real timing attack prevention needs constant-time comparison in code
assert response1.status_code == 401
assert response2.status_code == 401
async def test_token_not_logged(self, client, test_user, test_user_data, caplog):
"""Test that JWT tokens are not logged."""
# Arrange
login_data = {
"username": test_user_data["username"],
"password": test_user_data["password"],
}
# Act
response = await client.post("/api/v1/auth/login", json=login_data)
# Assert
if response.status_code == 200:
token = response.json()["access_token"]
# Check that token doesn't appear in logs
for record in caplog.records:
assert token not in record.message
async def test_rate_limiting_on_login(self, client, test_user_data):
"""Test that excessive login attempts are rate-limited."""
# Arrange
login_data = {
"username": test_user_data["username"],
"password": "wrongpassword",
}
# Act: Make many rapid login attempts
responses = []
for _ in range(20):
response = await client.post("/api/v1/auth/login", json=login_data)
responses.append(response)
# Assert: After many attempts, should get rate limited
# (Implementation dependent - may return 429 Too Many Requests)
status_codes = [r.status_code for r in responses]
# Either all 401, or some 429 (rate limited)
assert all(code in [401, 429] for code in status_codes)