Files
delphi-database/app/api/auth.py
HotSwapp bac8cc4bd5 changes
2025-08-18 20:20:04 -05:00

359 lines
12 KiB
Python

"""
Authentication API endpoints
"""
from datetime import datetime, timedelta, timezone
from typing import List
from fastapi import APIRouter, Depends, HTTPException, status, Request
from fastapi.security import OAuth2PasswordRequestForm
from sqlalchemy.orm import Session
from app.database.base import get_db
from app.models.user import User
from app.auth.security import (
authenticate_user,
create_access_token,
create_refresh_token,
decode_refresh_token,
is_refresh_token_revoked,
revoke_refresh_token,
get_password_hash,
get_current_user,
get_admin_user,
)
from app.utils.enhanced_auth import (
validate_and_authenticate_user,
PasswordValidator,
AccountLockoutManager,
)
from app.utils.session_manager import SessionManager, get_session_manager
from app.auth.schemas import (
Token,
UserCreate,
UserResponse,
LoginRequest,
ThemePreferenceUpdate,
RefreshRequest,
)
from app.config import settings
from app.core.logging import get_logger, log_auth_attempt
router = APIRouter()
logger = get_logger("auth")
@router.post("/login", response_model=Token)
async def login(
login_data: LoginRequest,
request: Request,
db: Session = Depends(get_db),
session_manager: SessionManager = Depends(get_session_manager)
):
"""Enhanced login endpoint with session management and security features"""
client_ip = request.client.host if request.client else "unknown"
user_agent = request.headers.get("user-agent", "")
logger.info(
"Login attempt started",
username=login_data.username,
client_ip=client_ip,
user_agent=user_agent
)
# Use enhanced authentication with lockout protection
user, auth_errors = validate_and_authenticate_user(
db, login_data.username, login_data.password, request
)
if not user or auth_errors:
error_message = auth_errors[0] if auth_errors else "Incorrect username or password"
logger.warning(
"Login failed - enhanced auth",
username=login_data.username,
client_ip=client_ip,
errors=auth_errors
)
# Get lockout info for response headers
lockout_info = AccountLockoutManager.get_lockout_info(db, login_data.username)
headers = {"WWW-Authenticate": "Bearer"}
if lockout_info["is_locked"]:
headers["X-Account-Locked"] = "true"
headers["X-Unlock-Time"] = lockout_info["unlock_time"] or ""
else:
headers["X-Attempts-Remaining"] = str(lockout_info["attempts_remaining"])
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=error_message,
headers=headers,
)
# Successful authentication - create tokens
access_token_expires = timedelta(minutes=settings.access_token_expire_minutes)
access_token = create_access_token(
data={"sub": user.username}, expires_delta=access_token_expires
)
refresh_token = create_refresh_token(
user=user,
user_agent=request.headers.get("user-agent", ""),
ip_address=request.client.host if request.client else None,
db=db,
)
logger.info(
"Login successful - enhanced auth",
username=login_data.username,
user_id=user.id,
client_ip=client_ip
)
return {"access_token": access_token, "token_type": "bearer", "refresh_token": refresh_token}
@router.post("/register", response_model=UserResponse)
async def register(
user_data: UserCreate,
db: Session = Depends(get_db),
current_user: User = Depends(get_admin_user) # Only admins can create users
):
"""Register new user with password validation (admin only)"""
# Validate password strength
is_valid, password_errors = PasswordValidator.validate_password_strength(user_data.password)
if not is_valid:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Password validation failed: {'; '.join(password_errors)}"
)
# Check if username or email already exists
existing_user = db.query(User).filter(
(User.username == user_data.username) | (User.email == user_data.email)
).first()
if existing_user:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Username or email already registered"
)
# Create new user
hashed_password = get_password_hash(user_data.password)
new_user = User(
username=user_data.username,
email=user_data.email,
full_name=user_data.full_name,
hashed_password=hashed_password
)
db.add(new_user)
db.commit()
db.refresh(new_user)
logger.info(
"User registered",
username=new_user.username,
created_by=current_user.username
)
return new_user
@router.get("/me", response_model=UserResponse)
async def read_users_me(current_user: User = Depends(get_current_user)):
"""Get current user info"""
logger.debug("User info requested", username=current_user.username, user_id=current_user.id)
return current_user
@router.post("/refresh", response_model=Token)
async def refresh_token_endpoint(
request: Request,
db: Session = Depends(get_db),
body: RefreshRequest = None,
):
"""Issue a new access token using a valid, non-revoked refresh token.
For backwards compatibility with existing clients that may call this without a body,
consider falling back to Authorization header in the future if needed.
"""
# New flow: refresh token in body
if body and body.refresh_token:
payload = decode_refresh_token(body.refresh_token)
if not payload:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid refresh token")
jti = payload.get("jti")
username = payload.get("sub")
if not jti or not username:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid refresh token payload")
# Verify token not revoked/expired
if is_refresh_token_revoked(jti, db):
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Refresh token revoked or expired")
# Load user
user = db.query(User).filter(User.username == username).first()
if not user or not user.is_active:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="User not found or inactive")
# Rotate refresh token on use
revoke_refresh_token(jti, db)
new_refresh_token = create_refresh_token(
user=user,
user_agent=request.headers.get("user-agent", ""),
ip_address=request.client.host if request.client else None,
db=db,
)
# Issue new access token
access_token_expires = timedelta(minutes=settings.access_token_expire_minutes)
access_token = create_access_token(
data={"sub": user.username}, expires_delta=access_token_expires
)
return {"access_token": access_token, "token_type": "bearer", "refresh_token": new_refresh_token}
# Legacy flow: Authorization header-based refresh
auth_header = request.headers.get("authorization") or request.headers.get("Authorization")
if not auth_header or not auth_header.lower().startswith("bearer "):
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Missing credentials")
token = auth_header.split(" ", 1)[1].strip()
from app.auth.security import verify_token # local import to avoid circular
username = verify_token(token)
if not username:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token")
user = db.query(User).filter(User.username == username).first()
if not user or not user.is_active:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="User not found or inactive")
user.last_login = datetime.now(timezone.utc)
db.commit()
access_token_expires = timedelta(minutes=settings.access_token_expire_minutes)
access_token = create_access_token(
data={"sub": user.username}, expires_delta=access_token_expires
)
return {"access_token": access_token, "token_type": "bearer"}
@router.get("/users", response_model=List[UserResponse])
async def list_users(
db: Session = Depends(get_db),
current_user: User = Depends(get_admin_user)
):
"""List all users (admin only)"""
users = db.query(User).all()
return users
@router.post("/logout")
async def logout(body: RefreshRequest = None, db: Session = Depends(get_db)):
"""Revoke the provided refresh token. Idempotent and safe to call multiple times.
The client should send a JSON body: { "refresh_token": "..." }.
"""
try:
if body and body.refresh_token:
payload = decode_refresh_token(body.refresh_token)
if payload and payload.get("jti"):
revoke_refresh_token(payload["jti"], db)
except Exception:
# Don't leak details; logout should be best-effort
pass
return {"status": "ok"}
@router.post("/theme-preference")
async def update_theme_preference(
theme_data: ThemePreferenceUpdate,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""Update user's theme preference"""
if theme_data.theme_preference not in ['light', 'dark']:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Theme preference must be 'light' or 'dark'"
)
current_user.theme_preference = theme_data.theme_preference
db.commit()
return {"message": "Theme preference updated successfully", "theme": theme_data.theme_preference}
@router.post("/validate-password")
async def validate_password(password_data: dict):
"""Validate password strength and return detailed feedback"""
password = password_data.get("password", "")
if not password:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Password is required"
)
is_valid, errors = PasswordValidator.validate_password_strength(password)
strength_score = PasswordValidator.generate_password_strength_score(password)
return {
"is_valid": is_valid,
"errors": errors,
"strength_score": strength_score,
"strength_level": (
"Very Weak" if strength_score < 20 else
"Weak" if strength_score < 40 else
"Fair" if strength_score < 60 else
"Good" if strength_score < 80 else
"Strong"
)
}
@router.get("/account-status/{username}")
async def get_account_status(
username: str,
db: Session = Depends(get_db),
current_user: User = Depends(get_admin_user) # Admin only endpoint
):
"""Get account lockout status and security information (admin only)"""
lockout_info = AccountLockoutManager.get_lockout_info(db, username)
# Get recent login attempts
from app.utils.enhanced_auth import SuspiciousActivityDetector
is_suspicious, warnings = SuspiciousActivityDetector.is_login_suspicious(
db, username, "admin-check", "admin-request"
)
return {
"username": username,
"lockout_info": lockout_info,
"suspicious_activity": {
"is_suspicious": is_suspicious,
"warnings": warnings
}
}
@router.post("/unlock-account/{username}")
async def unlock_account(
username: str,
db: Session = Depends(get_db),
current_user: User = Depends(get_admin_user) # Admin only endpoint
):
"""Manually unlock a user account (admin only)"""
# Reset failed attempts by recording a successful "admin unlock"
AccountLockoutManager.reset_failed_attempts(db, username)
logger.info(
"Account manually unlocked",
username=username,
unlocked_by=current_user.username
)
return {"message": f"Account '{username}' has been unlocked"}