maybe good

This commit is contained in:
HotSwapp
2025-08-08 15:55:15 -05:00
parent ab6f163c15
commit b257a06787
80 changed files with 19739 additions and 0 deletions

1
app/auth/__init__.py Normal file
View File

@@ -0,0 +1 @@
# Authentication package

52
app/auth/schemas.py Normal file
View File

@@ -0,0 +1,52 @@
"""
Authentication schemas
"""
from typing import Optional
from pydantic import BaseModel, EmailStr
class UserBase(BaseModel):
"""Base user schema"""
username: str
email: EmailStr
full_name: Optional[str] = None
class UserCreate(UserBase):
"""User creation schema"""
password: str
class UserUpdate(BaseModel):
"""User update schema"""
username: Optional[str] = None
email: Optional[EmailStr] = None
full_name: Optional[str] = None
is_active: Optional[bool] = None
class UserResponse(UserBase):
"""User response schema"""
id: int
is_active: bool
is_admin: bool
class Config:
from_attributes = True
class Token(BaseModel):
"""Token response schema"""
access_token: str
token_type: str
class TokenData(BaseModel):
"""Token payload schema"""
username: Optional[str] = None
class LoginRequest(BaseModel):
"""Login request schema"""
username: str
password: str

107
app/auth/security.py Normal file
View File

@@ -0,0 +1,107 @@
"""
Authentication and security utilities
"""
from datetime import datetime, timedelta
from typing import Optional, Union
from jose import JWTError, jwt
from passlib.context import CryptContext
from sqlalchemy.orm import Session
from fastapi import HTTPException, status, Depends
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from app.config import settings
from app.database.base import get_db
from app.models.user import User
# Password hashing
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
# JWT Security
security = HTTPBearer()
def verify_password(plain_password: str, hashed_password: str) -> bool:
"""Verify a plain password against its hash"""
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password: str) -> str:
"""Generate password hash"""
return pwd_context.hash(password)
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None) -> str:
"""Create JWT access token"""
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=settings.access_token_expire_minutes)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, settings.secret_key, algorithm=settings.algorithm)
return encoded_jwt
def verify_token(token: str) -> Optional[str]:
"""Verify JWT token and return username"""
try:
payload = jwt.decode(token, settings.secret_key, algorithms=[settings.algorithm])
username: str = payload.get("sub")
if username is None:
return None
return username
except JWTError:
return None
def authenticate_user(db: Session, username: str, password: str) -> Optional[User]:
"""Authenticate user credentials"""
user = db.query(User).filter(User.username == username).first()
if not user:
return None
if not verify_password(password, user.hashed_password):
return None
return user
def get_current_user(
credentials: HTTPAuthorizationCredentials = Depends(security),
db: Session = Depends(get_db)
) -> User:
"""Get current authenticated user"""
token = credentials.credentials
username = verify_token(token)
if username is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
user = db.query(User).filter(User.username == username).first()
if user is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="User not found",
headers={"WWW-Authenticate": "Bearer"},
)
if not user.is_active:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Inactive user"
)
return user
def get_admin_user(current_user: User = Depends(get_current_user)) -> User:
"""Require admin privileges"""
if not current_user.is_admin:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Not enough permissions"
)
return current_user