""" Authentication utilities for Delphi Database application. This module provides password hashing, user authentication, and session management functions for secure user login/logout functionality. """ import logging from passlib.context import CryptContext from sqlalchemy.orm import Session from .models import User from .database import SessionLocal # Configure password hashing context pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") logger = logging.getLogger(__name__) def hash_password(password: str) -> str: """ Hash a password using bcrypt. Args: password (str): Plain text password to hash Returns: str: Hashed password """ return pwd_context.hash(password) def verify_password(plain_password: str, hashed_password: str) -> bool: """ Verify a password against its hash. Args: plain_password (str): Plain text password to verify hashed_password (str): Hashed password to check against Returns: bool: True if password matches hash, False otherwise """ return pwd_context.verify(plain_password, hashed_password) def authenticate_user(username: str, password: str) -> User | None: """ Authenticate a user with username and password. Args: username (str): Username to authenticate password (str): Password to verify Returns: User | None: User object if authentication successful, None otherwise """ db = SessionLocal() try: user = db.query(User).filter(User.username == username).first() if not user: logger.warning(f"Authentication failed: User '{username}' not found") return None if not verify_password(password, user.password_hash): logger.warning(f"Authentication failed: Invalid password for user '{username}'") return None if not user.is_active: logger.warning(f"Authentication failed: User '{username}' is inactive") return None logger.info(f"User '{username}' authenticated successfully") return user except Exception as e: logger.error(f"Authentication error for user '{username}': {e}") return None finally: db.close() def create_user(username: str, password: str, is_active: bool = True) -> User | None: """ Create a new user with hashed password. Args: username (str): Username for the new user password (str): Plain text password (will be hashed) is_active (bool): Whether the user should be active Returns: User | None: Created user object if successful, None if user already exists """ db = SessionLocal() try: # Check if user already exists existing_user = db.query(User).filter(User.username == username).first() if existing_user: logger.warning(f"User creation failed: User '{username}' already exists") return None # Hash the password password_hash = hash_password(password) # Create new user new_user = User( username=username, password_hash=password_hash, is_active=is_active ) db.add(new_user) db.commit() db.refresh(new_user) logger.info(f"User '{username}' created successfully") return new_user except Exception as e: db.rollback() logger.error(f"Error creating user '{username}': {e}") return None finally: db.close() def seed_admin_user() -> None: """ Create a default admin user if one doesn't exist. This function should be called during application startup to ensure there's at least one admin user for initial access. """ admin_username = "admin" admin_password = "admin123" # In production, use a more secure default db = SessionLocal() try: # Check if admin user already exists existing_admin = db.query(User).filter(User.username == admin_username).first() if existing_admin: logger.info(f"Admin user '{admin_username}' already exists") return # Create admin user admin_user = create_user(admin_username, admin_password) if admin_user: logger.info(f"Default admin user '{admin_username}' created successfully") else: logger.error("Failed to create default admin user") except Exception as e: logger.error(f"Error seeding admin user: {e}") finally: db.close() def get_current_user_from_session(session_data: dict) -> User | None: """ Get current user from session data. Args: session_data (dict): Session data dictionary Returns: User | None: Current user if session is valid, None otherwise """ user_id = session_data.get("user_id") if not user_id: return None db = SessionLocal() try: user = db.query(User).filter(User.id == user_id, User.is_active == True).first() if not user: logger.warning(f"No active user found for session user_id: {user_id}") return None return user except Exception as e: logger.error(f"Error retrieving current user from session: {e}") return None finally: db.close()