88 lines
2.7 KiB
Python
88 lines
2.7 KiB
Python
import os
|
|
import sys
|
|
from pathlib import Path
|
|
from datetime import datetime, timedelta
|
|
|
|
import pytest
|
|
from jose import jwt
|
|
from sqlalchemy import create_engine
|
|
from sqlalchemy.orm import sessionmaker
|
|
|
|
ROOT = Path(__file__).resolve().parents[1]
|
|
if str(ROOT) not in sys.path:
|
|
sys.path.insert(0, str(ROOT))
|
|
|
|
from app.config import Settings, settings
|
|
from app.database.base import Base
|
|
from app.models.user import User
|
|
from app.auth.security import (
|
|
create_access_token,
|
|
create_refresh_token,
|
|
decode_refresh_token,
|
|
is_refresh_token_revoked,
|
|
revoke_refresh_token,
|
|
)
|
|
|
|
|
|
def test_settings_env_precedence(monkeypatch):
|
|
# Ensure env var overrides .env/default
|
|
monkeypatch.setenv("SECRET_KEY", "env_secret_value_12345678901234567890123456789012")
|
|
cfg = Settings()
|
|
assert cfg.secret_key == "env_secret_value_12345678901234567890123456789012"
|
|
|
|
|
|
def test_jwt_rotation_decode(monkeypatch):
|
|
# Simulate key rotation: token signed with previous key should validate
|
|
old_key = "old_secret_value_12345678901234567890123456789012"
|
|
new_key = "new_secret_value_12345678901234567890123456789012"
|
|
|
|
# Patch runtime settings
|
|
settings.previous_secret_key = old_key
|
|
settings.secret_key = new_key
|
|
|
|
# Sign token with old key
|
|
payload = {
|
|
"sub": "tester",
|
|
"exp": datetime.utcnow() + timedelta(minutes=5),
|
|
"iat": datetime.utcnow(),
|
|
"type": "access",
|
|
}
|
|
token = jwt.encode(payload, old_key, algorithm=settings.algorithm)
|
|
|
|
# Verify using public API verify_token via access token creation roundtrip
|
|
# Using internal decode through create_access_token is indirect; ensure no exception
|
|
from app.auth.security import verify_token
|
|
|
|
username = verify_token(token)
|
|
assert username == "tester"
|
|
|
|
|
|
def test_refresh_token_lifecycle(tmp_path):
|
|
# Build isolated in-memory database
|
|
engine = create_engine("sqlite:///:memory:", connect_args={"check_same_thread": False})
|
|
TestingSessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
|
|
Base.metadata.create_all(bind=engine)
|
|
|
|
db = TestingSessionLocal()
|
|
try:
|
|
# Create a user
|
|
user = User(username="alice", email="a@example.com", hashed_password="x", is_active=True)
|
|
db.add(user)
|
|
db.commit()
|
|
db.refresh(user)
|
|
|
|
# Issue refresh token
|
|
rtoken = create_refresh_token(user=user, user_agent="pytest", ip_address="127.0.0.1", db=db)
|
|
payload = decode_refresh_token(rtoken)
|
|
assert payload is not None
|
|
jti = payload["jti"]
|
|
assert not is_refresh_token_revoked(jti, db)
|
|
|
|
# Revoke and assert
|
|
revoke_refresh_token(jti, db)
|
|
assert is_refresh_token_revoked(jti, db)
|
|
finally:
|
|
db.close()
|
|
|
|
|