Files
delphi-database/app/api/auth.py
HotSwapp c2f3c4411d progress
2025-08-09 16:37:57 -05:00

139 lines
4.1 KiB
Python

"""
Authentication API endpoints
"""
from datetime import datetime, timedelta
from typing import List
from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordRequestForm
from sqlalchemy.orm import Session
from app.database.base import get_db
from app.models.user import User
from app.auth.security import (
authenticate_user,
create_access_token,
get_password_hash,
get_current_user,
get_admin_user
)
from app.auth.schemas import (
Token,
UserCreate,
UserResponse,
LoginRequest,
ThemePreferenceUpdate
)
from app.config import settings
router = APIRouter()
@router.post("/login", response_model=Token)
async def login(login_data: LoginRequest, db: Session = Depends(get_db)):
"""Login endpoint"""
user = authenticate_user(db, login_data.username, login_data.password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
# Update last login
user.last_login = datetime.utcnow()
db.commit()
access_token_expires = timedelta(minutes=settings.access_token_expire_minutes)
access_token = create_access_token(
data={"sub": user.username}, expires_delta=access_token_expires
)
return {"access_token": access_token, "token_type": "bearer"}
@router.post("/register", response_model=UserResponse)
async def register(
user_data: UserCreate,
db: Session = Depends(get_db),
current_user: User = Depends(get_admin_user) # Only admins can create users
):
"""Register new user (admin only)"""
# Check if username or email already exists
existing_user = db.query(User).filter(
(User.username == user_data.username) | (User.email == user_data.email)
).first()
if existing_user:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Username or email already registered"
)
# Create new user
hashed_password = get_password_hash(user_data.password)
new_user = User(
username=user_data.username,
email=user_data.email,
full_name=user_data.full_name,
hashed_password=hashed_password
)
db.add(new_user)
db.commit()
db.refresh(new_user)
return new_user
@router.get("/me", response_model=UserResponse)
async def read_users_me(current_user: User = Depends(get_current_user)):
"""Get current user info"""
return current_user
@router.post("/refresh", response_model=Token)
async def refresh_token(
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db)
):
"""Refresh access token for current user"""
# Update last login timestamp
current_user.last_login = datetime.utcnow()
db.commit()
# Create new token with full expiration time
access_token_expires = timedelta(minutes=settings.access_token_expire_minutes)
access_token = create_access_token(
data={"sub": current_user.username},
expires_delta=access_token_expires
)
return {"access_token": access_token, "token_type": "bearer"}
@router.get("/users", response_model=List[UserResponse])
async def list_users(
db: Session = Depends(get_db),
current_user: User = Depends(get_admin_user)
):
"""List all users (admin only)"""
users = db.query(User).all()
return users
@router.post("/theme-preference")
async def update_theme_preference(
theme_data: ThemePreferenceUpdate,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""Update user's theme preference"""
if theme_data.theme_preference not in ['light', 'dark']:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Theme preference must be 'light' or 'dark'"
)
current_user.theme_preference = theme_data.theme_preference
db.commit()
return {"message": "Theme preference updated successfully", "theme": theme_data.theme_preference}