Files
delphi-database-v2/app/auth.py
HotSwapp 950d261eb4 File Cabinet MVP: case detail with inline Ledger CRUD
- Extend Transaction with ledger fields (item_no, employee_number, t_code, t_type_l, quantity, rate, billed)
- Startup SQLite migration to add missing columns on transactions
- Ledger create/update/delete endpoints with validations and auto-compute Amount = Quantity × Rate
- Uniqueness: ensure (transaction_date, item_no) per case by auto-incrementing
- Compute case totals (billed/unbilled/overall) and display in case view
- Update case.html for master-detail ledger UI; add client-side auto-compute JS
- Enhance import_ledger_data to populate extended fields
- Close/Reopen actions retained; case detail sorting by date/item
- Auth: switch to pbkdf2_sha256 default (bcrypt fallback) and seed admin robustness

Tested in Docker: health OK, login OK, import ROLODEX/FILES OK, ledger create persisted and totals displayed.
2025-10-07 09:26:58 -05:00

205 lines
6.1 KiB
Python

"""
Authentication utilities for Delphi Database application.
This module provides password hashing, user authentication, and session management
functions for secure user login/logout functionality.
"""
import logging
from passlib.context import CryptContext
from sqlalchemy.orm import Session
from .models import User
from .database import SessionLocal
# Configure password hashing context
# Prefer pbkdf2_sha256 for portability; include bcrypt for legacy compatibility
pwd_context = CryptContext(schemes=["pbkdf2_sha256", "bcrypt"], deprecated="auto")
logger = logging.getLogger(__name__)
def hash_password(password: str) -> str:
"""
Hash a password using bcrypt.
Args:
password (str): Plain text password to hash
Returns:
str: Hashed password
"""
return pwd_context.hash(password)
def verify_password(plain_password: str, hashed_password: str) -> bool:
"""
Verify a password against its hash.
Args:
plain_password (str): Plain text password to verify
hashed_password (str): Hashed password to check against
Returns:
bool: True if password matches hash, False otherwise
"""
return pwd_context.verify(plain_password, hashed_password)
def authenticate_user(username: str, password: str) -> User | None:
"""
Authenticate a user with username and password.
Args:
username (str): Username to authenticate
password (str): Password to verify
Returns:
User | None: User object if authentication successful, None otherwise
"""
db = SessionLocal()
try:
user = db.query(User).filter(User.username == username).first()
if not user:
logger.warning(f"Authentication failed: User '{username}' not found")
return None
if not verify_password(password, user.password_hash):
logger.warning(f"Authentication failed: Invalid password for user '{username}'")
return None
if not user.is_active:
logger.warning(f"Authentication failed: User '{username}' is inactive")
return None
logger.info(f"User '{username}' authenticated successfully")
return user
except Exception as e:
logger.error(f"Authentication error for user '{username}': {e}")
return None
finally:
db.close()
def create_user(username: str, password: str, is_active: bool = True) -> User | None:
"""
Create a new user with hashed password.
Args:
username (str): Username for the new user
password (str): Plain text password (will be hashed)
is_active (bool): Whether the user should be active
Returns:
User | None: Created user object if successful, None if user already exists
"""
db = SessionLocal()
try:
# Check if user already exists
existing_user = db.query(User).filter(User.username == username).first()
if existing_user:
logger.warning(f"User creation failed: User '{username}' already exists")
return None
# Hash the password
password_hash = hash_password(password)
# Create new user
new_user = User(
username=username,
password_hash=password_hash,
is_active=is_active
)
db.add(new_user)
db.commit()
db.refresh(new_user)
logger.info(f"User '{username}' created successfully")
return new_user
except Exception as e:
db.rollback()
logger.error(f"Error creating user '{username}': {e}")
return None
finally:
db.close()
def seed_admin_user() -> None:
"""
Create a default admin user if one doesn't exist.
This function should be called during application startup to ensure
there's at least one admin user for initial access.
"""
admin_username = "admin"
admin_password = "admin123" # In production, use a more secure default
db = SessionLocal()
try:
# Check if admin user already exists
existing_admin = db.query(User).filter(User.username == admin_username).first()
if existing_admin:
# Ensure default credentials work in development
needs_reset = False
try:
needs_reset = not verify_password(admin_password, existing_admin.password_hash)
except Exception as e:
logger.warning(f"Password verify failed for admin (will reset): {e}")
needs_reset = True
if needs_reset:
try:
existing_admin.password_hash = hash_password(admin_password)
db.add(existing_admin)
db.commit()
logger.info(f"Admin user '{admin_username}' password reset to default")
except Exception as e:
logger.error(f"Error updating admin password: {e}")
else:
logger.info(f"Admin user '{admin_username}' already exists")
return
# Create admin user
admin_user = create_user(admin_username, admin_password)
if admin_user:
logger.info(f"Default admin user '{admin_username}' created successfully")
else:
logger.error("Failed to create default admin user")
except Exception as e:
logger.error(f"Error seeding admin user: {e}")
finally:
db.close()
def get_current_user_from_session(session_data: dict) -> User | None:
"""
Get current user from session data.
Args:
session_data (dict): Session data dictionary
Returns:
User | None: Current user if session is valid, None otherwise
"""
user_id = session_data.get("user_id")
if not user_id:
return None
db = SessionLocal()
try:
user = db.query(User).filter(User.id == user_id, User.is_active == True).first()
if not user:
logger.warning(f"No active user found for session user_id: {user_id}")
return None
return user
except Exception as e:
logger.error(f"Error retrieving current user from session: {e}")
return None
finally:
db.close()