826 lines
28 KiB
Python
826 lines
28 KiB
Python
"""
|
|
Test suite for authentication endpoints and JWT handling.
|
|
|
|
This module tests Issue #48 authentication features:
|
|
1. User login with JWT token generation
|
|
2. Password hashing with Argon2 (via pwdlib)
|
|
3. JWT token validation and expiration
|
|
4. Invalid credentials handling
|
|
5. Token refresh functionality
|
|
6. Security best practices
|
|
|
|
Tests follow TDD - written before implementation.
|
|
"""
|
|
|
|
import pytest
|
|
from datetime import datetime, timedelta
|
|
from typing import Dict, Any
|
|
|
|
pytestmark = pytest.mark.asyncio
|
|
|
|
|
|
# ============================================================================
|
|
# Unit Tests: Password Hashing
|
|
# ============================================================================
|
|
|
|
class TestPasswordHashing:
|
|
"""Test password hashing using Argon2 via pwdlib."""
|
|
|
|
def test_hash_password_generates_hash(self):
|
|
"""Test that hash_password creates a valid hash."""
|
|
# Arrange
|
|
password = "SecurePassword123!"
|
|
|
|
try:
|
|
from tradingagents.api.services.auth_service import hash_password
|
|
|
|
# Act
|
|
hashed = hash_password(password)
|
|
|
|
# Assert
|
|
assert hashed is not None
|
|
assert hashed != password # Hash should differ from plaintext
|
|
assert len(hashed) > 50 # Argon2 hashes are long
|
|
assert hashed.startswith("$argon2") # Argon2 hash format
|
|
except ImportError:
|
|
pytest.skip("auth_service not implemented yet")
|
|
|
|
def test_hash_password_deterministic_with_same_input(self):
|
|
"""Test that same password produces different hashes (salted)."""
|
|
# Arrange
|
|
password = "SecurePassword123!"
|
|
|
|
try:
|
|
from tradingagents.api.services.auth_service import hash_password
|
|
|
|
# Act
|
|
hash1 = hash_password(password)
|
|
hash2 = hash_password(password)
|
|
|
|
# Assert: Different hashes (due to random salt)
|
|
assert hash1 != hash2
|
|
except ImportError:
|
|
pytest.skip("auth_service not implemented yet")
|
|
|
|
def test_verify_password_with_correct_password(self):
|
|
"""Test that verify_password succeeds with correct password."""
|
|
# Arrange
|
|
password = "SecurePassword123!"
|
|
|
|
try:
|
|
from tradingagents.api.services.auth_service import hash_password, verify_password
|
|
|
|
hashed = hash_password(password)
|
|
|
|
# Act
|
|
result = verify_password(password, hashed)
|
|
|
|
# Assert
|
|
assert result is True
|
|
except ImportError:
|
|
pytest.skip("auth_service not implemented yet")
|
|
|
|
def test_verify_password_with_incorrect_password(self):
|
|
"""Test that verify_password fails with incorrect password."""
|
|
# Arrange
|
|
correct_password = "SecurePassword123!"
|
|
wrong_password = "WrongPassword456!"
|
|
|
|
try:
|
|
from tradingagents.api.services.auth_service import hash_password, verify_password
|
|
|
|
hashed = hash_password(correct_password)
|
|
|
|
# Act
|
|
result = verify_password(wrong_password, hashed)
|
|
|
|
# Assert
|
|
assert result is False
|
|
except ImportError:
|
|
pytest.skip("auth_service not implemented yet")
|
|
|
|
def test_hash_password_handles_special_characters(self):
|
|
"""Test password hashing with special characters."""
|
|
# Arrange
|
|
passwords = [
|
|
"P@ssw0rd!",
|
|
"密码123", # Chinese characters
|
|
"пароль", # Cyrillic
|
|
"🔒secure🔑", # Emojis
|
|
]
|
|
|
|
try:
|
|
from tradingagents.api.services.auth_service import hash_password, verify_password
|
|
|
|
for password in passwords:
|
|
# Act
|
|
hashed = hash_password(password)
|
|
|
|
# Assert
|
|
assert verify_password(password, hashed)
|
|
except ImportError:
|
|
pytest.skip("auth_service not implemented yet")
|
|
|
|
def test_hash_password_empty_string(self):
|
|
"""Test hashing empty password."""
|
|
# Arrange
|
|
password = ""
|
|
|
|
try:
|
|
from tradingagents.api.services.auth_service import hash_password
|
|
|
|
# Act
|
|
hashed = hash_password(password)
|
|
|
|
# Assert: Should still create a hash (validation happens elsewhere)
|
|
assert hashed is not None
|
|
assert len(hashed) > 0
|
|
except ImportError:
|
|
pytest.skip("auth_service not implemented yet")
|
|
|
|
|
|
# ============================================================================
|
|
# Unit Tests: JWT Token Generation
|
|
# ============================================================================
|
|
|
|
class TestJWTTokenGeneration:
|
|
"""Test JWT token creation and encoding."""
|
|
|
|
def test_create_access_token_generates_valid_token(self, mock_env_jwt_secret):
|
|
"""Test that create_access_token generates a valid JWT."""
|
|
# Arrange
|
|
token_data = {"sub": "testuser"}
|
|
|
|
try:
|
|
from tradingagents.api.services.auth_service import create_access_token
|
|
|
|
# Act
|
|
token = create_access_token(token_data)
|
|
|
|
# Assert
|
|
assert token is not None
|
|
assert isinstance(token, str)
|
|
assert len(token) > 50 # JWT tokens are long
|
|
assert token.count(".") == 2 # JWT format: header.payload.signature
|
|
except ImportError:
|
|
pytest.skip("auth_service not implemented yet")
|
|
|
|
def test_create_access_token_includes_expiration(self, mock_env_jwt_secret):
|
|
"""Test that token includes expiration claim."""
|
|
# Arrange
|
|
token_data = {"sub": "testuser"}
|
|
|
|
try:
|
|
from tradingagents.api.services.auth_service import create_access_token
|
|
import jwt
|
|
import os
|
|
|
|
# Act
|
|
token = create_access_token(token_data)
|
|
|
|
# Decode token to inspect claims
|
|
secret_key = os.getenv("JWT_SECRET_KEY", "test-secret-key")
|
|
algorithm = os.getenv("JWT_ALGORITHM", "HS256")
|
|
decoded = jwt.decode(token, secret_key, algorithms=[algorithm])
|
|
|
|
# Assert
|
|
assert "exp" in decoded
|
|
assert "sub" in decoded
|
|
assert decoded["sub"] == "testuser"
|
|
except ImportError:
|
|
pytest.skip("auth_service not implemented yet")
|
|
|
|
def test_create_access_token_custom_expiration(self, mock_env_jwt_secret):
|
|
"""Test creating token with custom expiration time."""
|
|
# Arrange
|
|
token_data = {"sub": "testuser"}
|
|
expires_delta = timedelta(hours=1)
|
|
|
|
try:
|
|
from tradingagents.api.services.auth_service import create_access_token
|
|
import jwt
|
|
import os
|
|
|
|
# Act
|
|
token = create_access_token(token_data, expires_delta=expires_delta)
|
|
|
|
# Decode token
|
|
secret_key = os.getenv("JWT_SECRET_KEY", "test-secret-key")
|
|
algorithm = os.getenv("JWT_ALGORITHM", "HS256")
|
|
decoded = jwt.decode(token, secret_key, algorithms=[algorithm])
|
|
|
|
# Assert: Expiration is approximately 1 hour from now
|
|
exp_time = datetime.fromtimestamp(decoded["exp"])
|
|
expected_exp = datetime.utcnow() + expires_delta
|
|
time_diff = abs((exp_time - expected_exp).total_seconds())
|
|
assert time_diff < 5 # Within 5 seconds
|
|
except ImportError:
|
|
pytest.skip("auth_service not implemented yet")
|
|
|
|
def test_create_access_token_includes_custom_claims(self, mock_env_jwt_secret):
|
|
"""Test that custom claims are included in token."""
|
|
# Arrange
|
|
token_data = {
|
|
"sub": "testuser",
|
|
"email": "test@example.com",
|
|
"role": "admin",
|
|
}
|
|
|
|
try:
|
|
from tradingagents.api.services.auth_service import create_access_token
|
|
import jwt
|
|
import os
|
|
|
|
# Act
|
|
token = create_access_token(token_data)
|
|
|
|
# Decode token
|
|
secret_key = os.getenv("JWT_SECRET_KEY", "test-secret-key")
|
|
algorithm = os.getenv("JWT_ALGORITHM", "HS256")
|
|
decoded = jwt.decode(token, secret_key, algorithms=[algorithm])
|
|
|
|
# Assert
|
|
assert decoded["sub"] == "testuser"
|
|
assert decoded["email"] == "test@example.com"
|
|
assert decoded["role"] == "admin"
|
|
except ImportError:
|
|
pytest.skip("auth_service not implemented yet")
|
|
|
|
|
|
# ============================================================================
|
|
# Unit Tests: JWT Token Validation
|
|
# ============================================================================
|
|
|
|
class TestJWTTokenValidation:
|
|
"""Test JWT token decoding and validation."""
|
|
|
|
def test_decode_token_with_valid_token(self, mock_env_jwt_secret):
|
|
"""Test decoding a valid JWT token."""
|
|
# Arrange
|
|
token_data = {"sub": "testuser"}
|
|
|
|
try:
|
|
from tradingagents.api.services.auth_service import (
|
|
create_access_token,
|
|
decode_access_token,
|
|
)
|
|
|
|
token = create_access_token(token_data)
|
|
|
|
# Act
|
|
decoded = decode_access_token(token)
|
|
|
|
# Assert
|
|
assert decoded is not None
|
|
assert decoded["sub"] == "testuser"
|
|
except ImportError:
|
|
pytest.skip("auth_service not implemented yet")
|
|
|
|
def test_decode_token_with_expired_token(self, mock_env_jwt_secret):
|
|
"""Test that expired tokens are rejected."""
|
|
# Arrange
|
|
token_data = {"sub": "testuser"}
|
|
|
|
try:
|
|
from tradingagents.api.services.auth_service import (
|
|
create_access_token,
|
|
decode_access_token,
|
|
)
|
|
from jwt.exceptions import ExpiredSignatureError
|
|
|
|
# Create already-expired token
|
|
token = create_access_token(token_data, expires_delta=timedelta(seconds=-1))
|
|
|
|
# Act & Assert
|
|
with pytest.raises(ExpiredSignatureError):
|
|
decode_access_token(token)
|
|
except ImportError:
|
|
pytest.skip("auth_service not implemented yet")
|
|
|
|
def test_decode_token_with_invalid_signature(self, mock_env_jwt_secret):
|
|
"""Test that tokens with invalid signature are rejected."""
|
|
# Arrange
|
|
token_data = {"sub": "testuser"}
|
|
|
|
try:
|
|
from tradingagents.api.services.auth_service import (
|
|
create_access_token,
|
|
decode_access_token,
|
|
)
|
|
from jwt.exceptions import InvalidSignatureError
|
|
|
|
token = create_access_token(token_data)
|
|
# Tamper with token
|
|
tampered_token = token[:-10] + "tampered00"
|
|
|
|
# Act & Assert
|
|
with pytest.raises(InvalidSignatureError):
|
|
decode_access_token(tampered_token)
|
|
except ImportError:
|
|
pytest.skip("auth_service not implemented yet")
|
|
|
|
def test_decode_token_with_malformed_token(self, mock_env_jwt_secret):
|
|
"""Test that malformed tokens are rejected."""
|
|
# Arrange
|
|
malformed_tokens = [
|
|
"not.a.jwt",
|
|
"invalid",
|
|
"",
|
|
"a.b", # Only 2 parts instead of 3
|
|
]
|
|
|
|
try:
|
|
from tradingagents.api.services.auth_service import decode_access_token
|
|
from jwt.exceptions import DecodeError
|
|
|
|
for token in malformed_tokens:
|
|
# Act & Assert
|
|
with pytest.raises(DecodeError):
|
|
decode_access_token(token)
|
|
except ImportError:
|
|
pytest.skip("auth_service not implemented yet")
|
|
|
|
|
|
# ============================================================================
|
|
# Integration Tests: Login Endpoint
|
|
# ============================================================================
|
|
|
|
class TestLoginEndpoint:
|
|
"""Test POST /api/v1/auth/login endpoint."""
|
|
|
|
async def test_login_with_valid_credentials(self, client, test_user, test_user_data):
|
|
"""Test successful login with correct username and password."""
|
|
# Arrange
|
|
login_data = {
|
|
"username": test_user_data["username"],
|
|
"password": test_user_data["password"],
|
|
}
|
|
|
|
# Act
|
|
response = await client.post("/api/v1/auth/login", json=login_data)
|
|
|
|
# Assert
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
assert "access_token" in data
|
|
assert "token_type" in data
|
|
assert data["token_type"] == "bearer"
|
|
assert len(data["access_token"]) > 50
|
|
|
|
async def test_login_with_invalid_username(self, client, test_user):
|
|
"""Test login fails with non-existent username."""
|
|
# Arrange
|
|
login_data = {
|
|
"username": "nonexistent",
|
|
"password": "SomePassword123!",
|
|
}
|
|
|
|
# Act
|
|
response = await client.post("/api/v1/auth/login", json=login_data)
|
|
|
|
# Assert
|
|
assert response.status_code == 401
|
|
data = response.json()
|
|
assert "detail" in data
|
|
assert "incorrect" in data["detail"].lower() or "invalid" in data["detail"].lower()
|
|
|
|
async def test_login_with_invalid_password(self, client, test_user, test_user_data):
|
|
"""Test login fails with incorrect password."""
|
|
# Arrange
|
|
login_data = {
|
|
"username": test_user_data["username"],
|
|
"password": "WrongPassword123!",
|
|
}
|
|
|
|
# Act
|
|
response = await client.post("/api/v1/auth/login", json=login_data)
|
|
|
|
# Assert
|
|
assert response.status_code == 401
|
|
data = response.json()
|
|
assert "detail" in data
|
|
|
|
async def test_login_with_missing_username(self, client):
|
|
"""Test login validation requires username."""
|
|
# Arrange
|
|
login_data = {
|
|
"password": "SomePassword123!",
|
|
}
|
|
|
|
# Act
|
|
response = await client.post("/api/v1/auth/login", json=login_data)
|
|
|
|
# Assert
|
|
assert response.status_code == 422 # Validation error
|
|
|
|
async def test_login_with_missing_password(self, client):
|
|
"""Test login validation requires password."""
|
|
# Arrange
|
|
login_data = {
|
|
"username": "testuser",
|
|
}
|
|
|
|
# Act
|
|
response = await client.post("/api/v1/auth/login", json=login_data)
|
|
|
|
# Assert
|
|
assert response.status_code == 422
|
|
|
|
async def test_login_with_empty_credentials(self, client):
|
|
"""Test login with empty username and password."""
|
|
# Arrange
|
|
login_data = {
|
|
"username": "",
|
|
"password": "",
|
|
}
|
|
|
|
# Act
|
|
response = await client.post("/api/v1/auth/login", json=login_data)
|
|
|
|
# Assert
|
|
assert response.status_code in [401, 422]
|
|
|
|
async def test_login_returns_user_info(self, client, test_user, test_user_data):
|
|
"""Test that login response includes user information."""
|
|
# Arrange
|
|
login_data = {
|
|
"username": test_user_data["username"],
|
|
"password": test_user_data["password"],
|
|
}
|
|
|
|
# Act
|
|
response = await client.post("/api/v1/auth/login", json=login_data)
|
|
|
|
# Assert
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
# May include user info like username, email
|
|
assert "access_token" in data
|
|
|
|
async def test_login_token_is_valid_jwt(self, client, test_user, test_user_data, mock_env_jwt_secret):
|
|
"""Test that login returns a valid, decodable JWT token."""
|
|
# Arrange
|
|
login_data = {
|
|
"username": test_user_data["username"],
|
|
"password": test_user_data["password"],
|
|
}
|
|
|
|
# Act
|
|
response = await client.post("/api/v1/auth/login", json=login_data)
|
|
|
|
# Assert
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
token = data["access_token"]
|
|
|
|
# Verify token format
|
|
assert token.count(".") == 2
|
|
|
|
# Try to decode
|
|
try:
|
|
from tradingagents.api.services.auth_service import decode_access_token
|
|
decoded = decode_access_token(token)
|
|
assert decoded["sub"] == test_user_data["username"]
|
|
except ImportError:
|
|
# Just verify format if service not implemented
|
|
assert len(token) > 50
|
|
|
|
|
|
# ============================================================================
|
|
# Integration Tests: Protected Endpoints
|
|
# ============================================================================
|
|
|
|
class TestProtectedEndpoints:
|
|
"""Test that endpoints require valid JWT authentication."""
|
|
|
|
async def test_protected_endpoint_without_token(self, client):
|
|
"""Test that protected endpoint rejects requests without token."""
|
|
# Act
|
|
response = await client.get("/api/v1/strategies")
|
|
|
|
# Assert
|
|
assert response.status_code == 401
|
|
data = response.json()
|
|
assert "detail" in data
|
|
|
|
async def test_protected_endpoint_with_valid_token(self, client, test_user, auth_headers):
|
|
"""Test that protected endpoint accepts valid token."""
|
|
# Act
|
|
response = await client.get("/api/v1/strategies", headers=auth_headers)
|
|
|
|
# Assert
|
|
assert response.status_code == 200
|
|
|
|
async def test_protected_endpoint_with_expired_token(self, client, expired_jwt_token):
|
|
"""Test that expired token is rejected."""
|
|
# Arrange
|
|
headers = {"Authorization": f"Bearer {expired_jwt_token}"}
|
|
|
|
# Act
|
|
response = await client.get("/api/v1/strategies", headers=headers)
|
|
|
|
# Assert
|
|
assert response.status_code == 401
|
|
data = response.json()
|
|
assert "expired" in data["detail"].lower() or "invalid" in data["detail"].lower()
|
|
|
|
async def test_protected_endpoint_with_invalid_token(self, client, invalid_jwt_token):
|
|
"""Test that invalid token is rejected."""
|
|
# Arrange
|
|
headers = {"Authorization": f"Bearer {invalid_jwt_token}"}
|
|
|
|
# Act
|
|
response = await client.get("/api/v1/strategies", headers=headers)
|
|
|
|
# Assert
|
|
assert response.status_code == 401
|
|
|
|
async def test_protected_endpoint_with_malformed_header(self, client):
|
|
"""Test various malformed Authorization headers."""
|
|
# Arrange
|
|
malformed_headers = [
|
|
{"Authorization": "Bearer"}, # Missing token
|
|
{"Authorization": "token123"}, # Missing 'Bearer'
|
|
{"Authorization": "Basic token123"}, # Wrong scheme
|
|
{"Authorization": ""}, # Empty
|
|
]
|
|
|
|
for headers in malformed_headers:
|
|
# Act
|
|
response = await client.get("/api/v1/strategies", headers=headers)
|
|
|
|
# Assert
|
|
assert response.status_code == 401
|
|
|
|
async def test_protected_endpoint_extracts_user_from_token(self, client, test_user, auth_headers):
|
|
"""Test that endpoint can access user info from token."""
|
|
# Act
|
|
response = await client.get("/api/v1/strategies", headers=auth_headers)
|
|
|
|
# Assert
|
|
assert response.status_code == 200
|
|
# User context should be available to endpoint handler
|
|
|
|
|
|
# ============================================================================
|
|
# Edge Cases: Authentication
|
|
# ============================================================================
|
|
|
|
class TestAuthenticationEdgeCases:
|
|
"""Test edge cases and boundary conditions for authentication."""
|
|
|
|
async def test_login_case_sensitive_username(self, client, test_user, test_user_data):
|
|
"""Test that username is case-sensitive."""
|
|
# Arrange
|
|
login_data = {
|
|
"username": test_user_data["username"].upper(),
|
|
"password": test_user_data["password"],
|
|
}
|
|
|
|
# Act
|
|
response = await client.post("/api/v1/auth/login", json=login_data)
|
|
|
|
# Assert: Should fail if username case doesn't match
|
|
# (depends on implementation - could be case-insensitive)
|
|
assert response.status_code in [200, 401]
|
|
|
|
async def test_login_with_sql_injection_attempt(self, client, sample_sql_injection_payloads):
|
|
"""Test that SQL injection in login is prevented."""
|
|
# Arrange
|
|
for payload in sample_sql_injection_payloads:
|
|
login_data = {
|
|
"username": payload,
|
|
"password": "password",
|
|
}
|
|
|
|
# Act
|
|
response = await client.post("/api/v1/auth/login", json=login_data)
|
|
|
|
# Assert: Should return 401, not 500 (error) or 200 (bypass)
|
|
assert response.status_code in [401, 422]
|
|
|
|
async def test_login_with_very_long_username(self, client):
|
|
"""Test login with extremely long username."""
|
|
# Arrange
|
|
login_data = {
|
|
"username": "a" * 10000,
|
|
"password": "password",
|
|
}
|
|
|
|
# Act
|
|
response = await client.post("/api/v1/auth/login", json=login_data)
|
|
|
|
# Assert: Should handle gracefully (not crash)
|
|
assert response.status_code in [401, 422]
|
|
|
|
async def test_login_with_very_long_password(self, client):
|
|
"""Test login with extremely long password."""
|
|
# Arrange
|
|
login_data = {
|
|
"username": "testuser",
|
|
"password": "p" * 10000,
|
|
}
|
|
|
|
# Act
|
|
response = await client.post("/api/v1/auth/login", json=login_data)
|
|
|
|
# Assert
|
|
assert response.status_code in [401, 422]
|
|
|
|
async def test_concurrent_logins_same_user(self, client, test_user, test_user_data):
|
|
"""Test multiple concurrent logins for same user."""
|
|
# Arrange
|
|
login_data = {
|
|
"username": test_user_data["username"],
|
|
"password": test_user_data["password"],
|
|
}
|
|
|
|
# Act: Login multiple times
|
|
response1 = await client.post("/api/v1/auth/login", json=login_data)
|
|
response2 = await client.post("/api/v1/auth/login", json=login_data)
|
|
response3 = await client.post("/api/v1/auth/login", json=login_data)
|
|
|
|
# Assert: All should succeed with different tokens
|
|
assert response1.status_code == 200
|
|
assert response2.status_code == 200
|
|
assert response3.status_code == 200
|
|
|
|
token1 = response1.json()["access_token"]
|
|
token2 = response2.json()["access_token"]
|
|
token3 = response3.json()["access_token"]
|
|
|
|
# Tokens should be different (each has unique exp timestamp)
|
|
assert token1 != token2
|
|
assert token2 != token3
|
|
|
|
async def test_token_with_tampered_payload(self, client, auth_headers, mock_env_jwt_secret):
|
|
"""Test that tampering with token payload is detected."""
|
|
# Arrange
|
|
import base64
|
|
import json
|
|
|
|
token = auth_headers["Authorization"].split(" ")[1]
|
|
parts = token.split(".")
|
|
|
|
# Tamper with payload
|
|
try:
|
|
payload = json.loads(base64.urlsafe_b64decode(parts[1] + "=="))
|
|
payload["sub"] = "admin" # Change username to admin
|
|
tampered_payload = base64.urlsafe_b64encode(
|
|
json.dumps(payload).encode()
|
|
).decode().rstrip("=")
|
|
tampered_token = f"{parts[0]}.{tampered_payload}.{parts[2]}"
|
|
|
|
headers = {"Authorization": f"Bearer {tampered_token}"}
|
|
|
|
# Act
|
|
response = await client.get("/api/v1/strategies", headers=headers)
|
|
|
|
# Assert: Should reject due to invalid signature
|
|
assert response.status_code == 401
|
|
except Exception:
|
|
# If token format is different, skip test
|
|
pytest.skip("Token format not as expected")
|
|
|
|
async def test_multiple_authorization_headers(self, client, auth_headers):
|
|
"""Test behavior with multiple Authorization headers."""
|
|
# Arrange
|
|
# This tests HTTP header handling edge case
|
|
# Most frameworks use the first or last header
|
|
# Act & Assert: Should handle gracefully
|
|
response = await client.get("/api/v1/strategies", headers=auth_headers)
|
|
assert response.status_code in [200, 400, 401]
|
|
|
|
async def test_bearer_token_case_insensitive(self, client, jwt_token):
|
|
"""Test that 'Bearer' scheme is case-insensitive."""
|
|
# Arrange
|
|
headers_variants = [
|
|
{"Authorization": f"Bearer {jwt_token}"},
|
|
{"Authorization": f"bearer {jwt_token}"},
|
|
{"Authorization": f"BEARER {jwt_token}"},
|
|
]
|
|
|
|
for headers in headers_variants:
|
|
# Act
|
|
response = await client.get("/api/v1/strategies", headers=headers)
|
|
|
|
# Assert: Should accept regardless of case
|
|
assert response.status_code in [200, 401]
|
|
|
|
|
|
# ============================================================================
|
|
# Security Tests
|
|
# ============================================================================
|
|
|
|
class TestAuthenticationSecurity:
|
|
"""Test security aspects of authentication."""
|
|
|
|
async def test_login_does_not_leak_user_existence(self, client):
|
|
"""Test that login error doesn't reveal if user exists."""
|
|
# Arrange
|
|
valid_user_wrong_pass = {
|
|
"username": "testuser",
|
|
"password": "wrongpassword",
|
|
}
|
|
invalid_user = {
|
|
"username": "nonexistent",
|
|
"password": "somepassword",
|
|
}
|
|
|
|
# Act
|
|
response1 = await client.post("/api/v1/auth/login", json=valid_user_wrong_pass)
|
|
response2 = await client.post("/api/v1/auth/login", json=invalid_user)
|
|
|
|
# Assert: Both should return same error (don't reveal user existence)
|
|
assert response1.status_code == 401
|
|
assert response2.status_code == 401
|
|
# Error messages should be generic
|
|
assert response1.json()["detail"] == response2.json()["detail"]
|
|
|
|
async def test_password_not_in_response(self, client, test_user, test_user_data):
|
|
"""Test that password is never returned in responses."""
|
|
# Arrange
|
|
login_data = {
|
|
"username": test_user_data["username"],
|
|
"password": test_user_data["password"],
|
|
}
|
|
|
|
# Act
|
|
response = await client.post("/api/v1/auth/login", json=login_data)
|
|
|
|
# Assert
|
|
assert response.status_code == 200
|
|
response_text = response.text.lower()
|
|
# Password should not appear in response
|
|
assert test_user_data["password"].lower() not in response_text
|
|
assert "password" not in response.json()
|
|
|
|
async def test_timing_attack_resistance(self, client, test_user, test_user_data):
|
|
"""Test that login timing doesn't reveal user existence."""
|
|
# Arrange
|
|
import time
|
|
|
|
valid_user = {
|
|
"username": test_user_data["username"],
|
|
"password": "wrongpassword",
|
|
}
|
|
invalid_user = {
|
|
"username": "nonexistent_user_xyz",
|
|
"password": "wrongpassword",
|
|
}
|
|
|
|
# Act: Measure login time for both
|
|
start1 = time.time()
|
|
response1 = await client.post("/api/v1/auth/login", json=valid_user)
|
|
time1 = time.time() - start1
|
|
|
|
start2 = time.time()
|
|
response2 = await client.post("/api/v1/auth/login", json=invalid_user)
|
|
time2 = time.time() - start2
|
|
|
|
# Assert: Times should be similar (within 100ms)
|
|
# This tests constant-time password verification
|
|
time_diff = abs(time1 - time2)
|
|
# Note: This is a weak test due to network/process variations
|
|
# Real timing attack prevention needs constant-time comparison in code
|
|
assert response1.status_code == 401
|
|
assert response2.status_code == 401
|
|
|
|
async def test_token_not_logged(self, client, test_user, test_user_data, caplog):
|
|
"""Test that JWT tokens are not logged."""
|
|
# Arrange
|
|
login_data = {
|
|
"username": test_user_data["username"],
|
|
"password": test_user_data["password"],
|
|
}
|
|
|
|
# Act
|
|
response = await client.post("/api/v1/auth/login", json=login_data)
|
|
|
|
# Assert
|
|
if response.status_code == 200:
|
|
token = response.json()["access_token"]
|
|
# Check that token doesn't appear in logs
|
|
for record in caplog.records:
|
|
assert token not in record.message
|
|
|
|
async def test_rate_limiting_on_login(self, client, test_user_data):
|
|
"""Test that excessive login attempts are rate-limited."""
|
|
# Arrange
|
|
login_data = {
|
|
"username": test_user_data["username"],
|
|
"password": "wrongpassword",
|
|
}
|
|
|
|
# Act: Make many rapid login attempts
|
|
responses = []
|
|
for _ in range(20):
|
|
response = await client.post("/api/v1/auth/login", json=login_data)
|
|
responses.append(response)
|
|
|
|
# Assert: After many attempts, should get rate limited
|
|
# (Implementation dependent - may return 429 Too Many Requests)
|
|
status_codes = [r.status_code for r in responses]
|
|
# Either all 401, or some 429 (rate limited)
|
|
assert all(code in [401, 429] for code in status_codes)
|