""" Authentication and security utilities """ from datetime import datetime, timedelta from typing import Optional, Union from jose import JWTError, jwt from passlib.context import CryptContext from sqlalchemy.orm import Session from fastapi import HTTPException, status, Depends from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials from app.config import settings from app.database.base import get_db from app.models.user import User # Password hashing pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") # JWT Security security = HTTPBearer() def verify_password(plain_password: str, hashed_password: str) -> bool: """Verify a plain password against its hash""" return pwd_context.verify(plain_password, hashed_password) def get_password_hash(password: str) -> str: """Generate password hash""" return pwd_context.hash(password) def create_access_token(data: dict, expires_delta: Optional[timedelta] = None) -> str: """Create JWT access token""" to_encode = data.copy() if expires_delta: expire = datetime.utcnow() + expires_delta else: expire = datetime.utcnow() + timedelta(minutes=settings.access_token_expire_minutes) to_encode.update({ "exp": expire, "iat": datetime.utcnow(), }) encoded_jwt = jwt.encode(to_encode, settings.secret_key, algorithm=settings.algorithm) return encoded_jwt def verify_token(token: str) -> Optional[str]: """Verify JWT token and return username""" try: payload = jwt.decode( token, settings.secret_key, algorithms=[settings.algorithm], leeway=30 # allow small clock skew ) username: str = payload.get("sub") if username is None: return None return username except JWTError: return None def authenticate_user(db: Session, username: str, password: str) -> Optional[User]: """Authenticate user credentials""" user = db.query(User).filter(User.username == username).first() if not user: return None if not verify_password(password, user.hashed_password): return None return user def get_current_user( credentials: HTTPAuthorizationCredentials = Depends(security), db: Session = Depends(get_db) ) -> User: """Get current authenticated user""" token = credentials.credentials username = verify_token(token) if username is None: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate credentials", headers={"WWW-Authenticate": "Bearer"}, ) user = db.query(User).filter(User.username == username).first() if user is None: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="User not found", headers={"WWW-Authenticate": "Bearer"}, ) if not user.is_active: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Inactive user" ) return user def get_admin_user(current_user: User = Depends(get_current_user)) -> User: """Require admin privileges""" if not current_user.is_admin: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Not enough permissions" ) return current_user