Files
delphi-database/app/api/admin.py
2025-08-14 21:27:34 -05:00

1617 lines
51 KiB
Python

"""
Comprehensive Admin API endpoints - User management, system settings, audit logging
"""
from typing import List, Dict, Any, Optional, Union
from fastapi import APIRouter, Depends, HTTPException, status, UploadFile, File, Query, Body, Request
from fastapi.responses import FileResponse
from sqlalchemy.orm import Session, joinedload
from sqlalchemy import func, text, desc, asc, and_, or_
import csv
import io
import os
import hashlib
import secrets
import shutil
import time
from datetime import datetime, timedelta, date, timezone
from pathlib import Path
from app.database.base import get_db
from app.api.search_highlight import build_query_tokens
# Track application start time
APPLICATION_START_TIME = time.time()
from app.models import User, Rolodex, File as FileModel, Ledger, QDRO, AuditLog, LoginAttempt
from app.models.lookups import SystemSetup, Employee, FileType, FileStatus, TransactionType, TransactionCode, State, FormIndex, PrinterSetup
from app.auth.security import get_admin_user, get_password_hash, create_access_token
from app.services.audit import audit_service
from app.config import settings
from app.services.query_utils import apply_sorting, tokenized_ilike_filter, paginate_with_total
from app.utils.exceptions import handle_database_errors, safe_execute
from app.utils.logging import app_logger
router = APIRouter()
# Enhanced Admin Schemas
from pydantic import BaseModel, Field, EmailStr
from pydantic.config import ConfigDict
class SystemStats(BaseModel):
"""Enhanced system statistics"""
total_customers: int
total_files: int
total_transactions: int
total_qdros: int
total_users: int
total_active_users: int
total_admins: int
database_size: str
last_backup: str
system_uptime: str
recent_activity: List[Dict[str, Any]]
class HealthCheck(BaseModel):
"""Comprehensive system health check"""
status: str
database_connected: bool
disk_space_available: bool
memory_available: bool
version: str
uptime: str
last_backup: Optional[str]
active_sessions: int
cpu_usage: float
alerts: List[str]
class UserCreate(BaseModel):
"""Create new user"""
username: str = Field(..., min_length=3, max_length=50)
email: EmailStr
password: str = Field(..., min_length=6)
first_name: Optional[str] = None
last_name: Optional[str] = None
is_admin: bool = False
is_active: bool = True
class UserUpdate(BaseModel):
"""Update user information"""
username: Optional[str] = Field(None, min_length=3, max_length=50)
email: Optional[EmailStr] = None
first_name: Optional[str] = None
last_name: Optional[str] = None
is_admin: Optional[bool] = None
is_active: Optional[bool] = None
class UserResponse(BaseModel):
"""User response model"""
id: int
username: str
email: str
first_name: Optional[str]
last_name: Optional[str]
is_admin: bool
is_active: bool
last_login: Optional[datetime]
created_at: Optional[datetime]
updated_at: Optional[datetime]
model_config = ConfigDict(from_attributes=True)
class PasswordReset(BaseModel):
"""Password reset request"""
new_password: str = Field(..., min_length=6)
confirm_password: str = Field(..., min_length=6)
class SystemSetting(BaseModel):
"""System setting model"""
setting_key: str
setting_value: str
description: Optional[str] = None
setting_type: str = "STRING"
class SettingUpdate(BaseModel):
"""Update system setting"""
setting_value: str
description: Optional[str] = None
class AuditLogEntry(BaseModel):
"""Audit log entry"""
id: int
user_id: Optional[int]
username: Optional[str]
action: str
resource_type: str
resource_id: Optional[str]
details: Optional[Dict[str, Any]]
ip_address: Optional[str]
user_agent: Optional[str]
timestamp: datetime
model_config = ConfigDict(from_attributes=True)
class BackupInfo(BaseModel):
"""Backup information"""
filename: str
size: str
created_at: datetime
backup_type: str
status: str
class PrinterSetupBase(BaseModel):
"""Base schema for printer setup"""
description: Optional[str] = None
driver: Optional[str] = None
port: Optional[str] = None
default_printer: Optional[bool] = None
active: Optional[bool] = None
number: Optional[int] = None
page_break: Optional[str] = None
setup_st: Optional[str] = None
reset_st: Optional[str] = None
b_underline: Optional[str] = None
e_underline: Optional[str] = None
b_bold: Optional[str] = None
e_bold: Optional[str] = None
phone_book: Optional[bool] = None
rolodex_info: Optional[bool] = None
envelope: Optional[bool] = None
file_cabinet: Optional[bool] = None
accounts: Optional[bool] = None
statements: Optional[bool] = None
calendar: Optional[bool] = None
class PrinterSetupCreate(PrinterSetupBase):
printer_name: str
class PrinterSetupUpdate(PrinterSetupBase):
pass
class PrinterSetupResponse(PrinterSetupBase):
printer_name: str
created_at: Optional[datetime] = None
updated_at: Optional[datetime] = None
model_config = ConfigDict(from_attributes=True)
# Printer Setup Management
@router.get("/printers", response_model=List[PrinterSetupResponse])
async def list_printers(
db: Session = Depends(get_db),
current_user: User = Depends(get_admin_user)
):
printers = db.query(PrinterSetup).order_by(PrinterSetup.printer_name.asc()).all()
return printers
@router.get("/printers/{printer_name}", response_model=PrinterSetupResponse)
async def get_printer(
printer_name: str,
db: Session = Depends(get_db),
current_user: User = Depends(get_admin_user)
):
printer = db.query(PrinterSetup).filter(PrinterSetup.printer_name == printer_name).first()
if not printer:
raise HTTPException(status_code=404, detail="Printer not found")
return printer
@router.post("/printers", response_model=PrinterSetupResponse)
async def create_printer(
payload: PrinterSetupCreate,
db: Session = Depends(get_db),
current_user: User = Depends(get_admin_user)
):
exists = db.query(PrinterSetup).filter(PrinterSetup.printer_name == payload.printer_name).first()
if exists:
raise HTTPException(status_code=400, detail="Printer already exists")
data = payload.model_dump(exclude_unset=True)
instance = PrinterSetup(**data)
db.add(instance)
# Enforce single default printer
if data.get("default_printer"):
try:
db.query(PrinterSetup).filter(PrinterSetup.printer_name != instance.printer_name).update({PrinterSetup.default_printer: False})
except Exception:
pass
db.commit()
db.refresh(instance)
return instance
@router.put("/printers/{printer_name}", response_model=PrinterSetupResponse)
async def update_printer(
printer_name: str,
payload: PrinterSetupUpdate,
db: Session = Depends(get_db),
current_user: User = Depends(get_admin_user)
):
instance = db.query(PrinterSetup).filter(PrinterSetup.printer_name == printer_name).first()
if not instance:
raise HTTPException(status_code=404, detail="Printer not found")
updates = payload.model_dump(exclude_unset=True)
for k, v in updates.items():
setattr(instance, k, v)
# Enforce single default printer when set true
if updates.get("default_printer"):
try:
db.query(PrinterSetup).filter(PrinterSetup.printer_name != instance.printer_name).update({PrinterSetup.default_printer: False})
except Exception:
pass
db.commit()
db.refresh(instance)
return instance
@router.delete("/printers/{printer_name}")
async def delete_printer(
printer_name: str,
db: Session = Depends(get_db),
current_user: User = Depends(get_admin_user)
):
instance = db.query(PrinterSetup).filter(PrinterSetup.printer_name == printer_name).first()
if not instance:
raise HTTPException(status_code=404, detail="Printer not found")
db.delete(instance)
db.commit()
return {"message": "Printer deleted"}
class LookupTableInfo(BaseModel):
"""Lookup table information"""
table_name: str
display_name: str
record_count: int
last_updated: Optional[datetime]
description: str
class DatabaseMaintenanceResult(BaseModel):
"""Database maintenance operation result"""
operation: str
status: str
message: str
duration_seconds: float
records_affected: Optional[int] = None
# Enhanced Health and Statistics Endpoints
@router.get("/health", response_model=HealthCheck)
async def system_health(
db: Session = Depends(get_db),
current_user: User = Depends(get_admin_user)
):
"""Comprehensive system health check"""
alerts = []
# Test database connection
try:
db.execute(text("SELECT 1"))
db_connected = True
except Exception as e:
db_connected = False
alerts.append(f"Database connection failed: {str(e)}")
# Check disk space
try:
total, used, free = shutil.disk_usage(".")
free_gb = free / (1024**3)
disk_available = free_gb > 1.0 # 1GB minimum
if not disk_available:
alerts.append(f"Low disk space: {free_gb:.1f}GB remaining")
except (OSError, ImportError) as e:
app_logger.warning(f"Could not check disk space: {str(e)}")
disk_available = True
# Check memory (simplified)
try:
import psutil
memory = psutil.virtual_memory()
memory_available = memory.percent < 90
if not memory_available:
alerts.append(f"High memory usage: {memory.percent:.1f}%")
except ImportError:
memory_available = True
# Get CPU usage
try:
import psutil
cpu_usage = psutil.cpu_percent(interval=1)
if cpu_usage > 80:
alerts.append(f"High CPU usage: {cpu_usage:.1f}%")
except ImportError:
cpu_usage = 0.0
# Count active sessions (simplified)
try:
active_sessions = db.query(User).filter(
User.last_login > datetime.now(timezone.utc) - timedelta(hours=24)
).count()
except Exception as e:
app_logger.warning(f"Could not query active sessions: {str(e)}")
active_sessions = 0
# Check last backup
last_backup = None
try:
backup_dir = Path("backups")
if backup_dir.exists():
backup_files = list(backup_dir.glob("*.db"))
if backup_files:
latest_backup = max(backup_files, key=lambda p: p.stat().st_mtime)
backup_age = datetime.now(timezone.utc) - datetime.fromtimestamp(latest_backup.stat().st_mtime, tz=timezone.utc)
last_backup = latest_backup.name
if backup_age.days > 7:
alerts.append(f"Last backup is {backup_age.days} days old")
except (OSError, FileNotFoundError) as e:
app_logger.warning(f"Could not check backup status: {str(e)}")
alerts.append("Unable to check backup status")
# Application uptime
uptime_seconds = int(time.time() - APPLICATION_START_TIME)
uptime = str(timedelta(seconds=uptime_seconds))
status = "healthy" if db_connected and disk_available and memory_available else "unhealthy"
return HealthCheck(
status=status,
database_connected=db_connected,
disk_space_available=disk_available,
memory_available=memory_available,
version=settings.app_version,
uptime=uptime,
last_backup=last_backup,
active_sessions=active_sessions,
cpu_usage=cpu_usage,
alerts=alerts
)
@router.get("/stats", response_model=SystemStats)
async def system_statistics(
db: Session = Depends(get_db),
current_user: User = Depends(get_admin_user)
):
"""Enhanced system statistics with comprehensive metrics"""
total_customers = db.query(func.count(Rolodex.id)).scalar()
total_files = db.query(func.count(FileModel.file_no)).scalar()
total_transactions = db.query(func.count(Ledger.id)).scalar()
total_qdros = db.query(func.count(QDRO.id)).scalar()
total_users = db.query(func.count(User.id)).scalar()
# Count active users (logged in within last 30 days)
total_active_users = db.query(func.count(User.id)).filter(
User.last_login > datetime.now(timezone.utc) - timedelta(days=30)
).scalar()
# Count admin users
total_admins = db.query(func.count(User.id)).filter(User.is_admin == True).scalar()
# Database size (for SQLite)
db_size = "Unknown"
if "sqlite" in settings.database_url:
try:
db_path = settings.database_url.replace("sqlite:///", "")
if os.path.exists(db_path):
size_bytes = os.path.getsize(db_path)
db_size = f"{size_bytes / (1024*1024):.1f} MB"
except (OSError, ValueError) as e:
app_logger.warning(f"Could not get database size: {str(e)}")
# Check for recent backups
last_backup = "Not found"
try:
backup_dir = Path("backups")
if backup_dir.exists():
backup_files = list(backup_dir.glob("*.db"))
if backup_files:
latest_backup = max(backup_files, key=lambda p: p.stat().st_mtime)
last_backup = latest_backup.name
except (OSError, FileNotFoundError) as e:
app_logger.warning(f"Could not check for recent backups: {str(e)}")
# Application uptime
uptime_seconds = int(time.time() - APPLICATION_START_TIME)
system_uptime = str(timedelta(seconds=uptime_seconds))
# Recent activity (last 10 actions)
recent_activity = []
try:
# Get recent files created
recent_files = db.query(FileModel).order_by(desc(FileModel.opened)).limit(5).all()
for file in recent_files:
recent_activity.append({
"type": "file_created",
"description": f"File {file.file_no} created",
"timestamp": file.opened.isoformat() if file.opened else None
})
# Get recent customer additions
recent_customers = db.query(Rolodex).order_by(desc(Rolodex.id)).limit(5).all()
for customer in recent_customers:
recent_activity.append({
"type": "customer_added",
"description": f"Customer {customer.first} {customer.last} added",
"timestamp": datetime.now(timezone.utc).isoformat()
})
except Exception as e:
app_logger.warning(f"Could not get recent activity: {str(e)}")
return SystemStats(
total_customers=total_customers,
total_files=total_files,
total_transactions=total_transactions,
total_qdros=total_qdros,
total_users=total_users,
total_active_users=total_active_users,
total_admins=total_admins,
database_size=db_size,
last_backup=last_backup,
system_uptime=system_uptime,
recent_activity=recent_activity
)
@router.post("/import/csv")
async def import_csv(
table_name: str,
file: UploadFile = File(...),
db: Session = Depends(get_db),
current_user: User = Depends(get_admin_user)
):
"""Import data from CSV file"""
if not file.filename.endswith('.csv'):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="File must be a CSV"
)
# Read CSV content
content = await file.read()
csv_data = csv.DictReader(io.StringIO(content.decode('utf-8')))
imported_count = 0
errors = []
try:
if table_name.lower() == "customers" or table_name.lower() == "rolodex":
for row_num, row in enumerate(csv_data, start=1):
try:
customer = Rolodex(**row)
db.add(customer)
imported_count += 1
except Exception as e:
errors.append(f"Row {row_num}: {str(e)}")
elif table_name.lower() == "files":
for row_num, row in enumerate(csv_data, start=1):
try:
# Convert date strings to date objects if needed
if 'opened' in row and row['opened']:
row['opened'] = datetime.strptime(row['opened'], '%Y-%m-%d').date()
if 'closed' in row and row['closed']:
row['closed'] = datetime.strptime(row['closed'], '%Y-%m-%d').date()
file_obj = FileModel(**row)
db.add(file_obj)
imported_count += 1
except Exception as e:
errors.append(f"Row {row_num}: {str(e)}")
else:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Import not supported for table: {table_name}"
)
db.commit()
except Exception as e:
db.rollback()
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Import failed: {str(e)}"
)
return {
"message": f"Import completed",
"imported_count": imported_count,
"error_count": len(errors),
"errors": errors[:10] # Return first 10 errors
}
@router.get("/export/{table_name}")
async def export_table(
table_name: str,
db: Session = Depends(get_db),
current_user: User = Depends(get_admin_user)
):
"""Export table data to CSV"""
# Create exports directory if it doesn't exist
os.makedirs("exports", exist_ok=True)
filename = f"exports/{table_name}_{datetime.now(timezone.utc).strftime('%Y%m%d_%H%M%S')}.csv"
try:
if table_name.lower() == "customers" or table_name.lower() == "rolodex":
customers = db.query(Rolodex).all()
with open(filename, 'w', newline='', encoding='utf-8') as csvfile:
if customers:
fieldnames = ['id', 'last', 'first', 'middle', 'prefix', 'suffix',
'title', 'group', 'a1', 'a2', 'a3', 'city', 'abrev',
'zip', 'email', 'dob', 'ss_number', 'legal_status', 'memo']
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
writer.writeheader()
for customer in customers:
row = {field: getattr(customer, field) for field in fieldnames}
writer.writerow(row)
elif table_name.lower() == "files":
files = db.query(FileModel).all()
with open(filename, 'w', newline='', encoding='utf-8') as csvfile:
if files:
fieldnames = ['file_no', 'id', 'regarding', 'empl_num', 'file_type',
'opened', 'closed', 'status', 'footer_code', 'opposing',
'rate_per_hour', 'memo']
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
writer.writeheader()
for file_obj in files:
row = {field: getattr(file_obj, field) for field in fieldnames}
writer.writerow(row)
else:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Export not supported for table: {table_name}"
)
return FileResponse(
filename,
media_type='text/csv',
filename=f"{table_name}_export.csv"
)
except Exception as e:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Export failed: {str(e)}"
)
@router.get("/backup/download")
async def download_backup(
current_user: User = Depends(get_admin_user)
):
"""Download database backup"""
if "sqlite" in settings.database_url:
db_path = settings.database_url.replace("sqlite:///", "")
if os.path.exists(db_path):
return FileResponse(
db_path,
media_type='application/octet-stream',
filename=f"delphi_backup_{datetime.now(timezone.utc).strftime('%Y%m%d_%H%M%S')}.db"
)
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Database backup not available"
)
# User Management Endpoints
class PaginatedUsersResponse(BaseModel):
items: List[UserResponse]
total: int
@router.get("/users", response_model=Union[List[UserResponse], PaginatedUsersResponse])
async def list_users(
skip: int = Query(0, ge=0),
limit: int = Query(100, ge=1, le=1000),
search: Optional[str] = Query(None),
active_only: bool = Query(False),
sort_by: Optional[str] = Query(None, description="Sort by: username, email, first_name, last_name, created, updated"),
sort_dir: Optional[str] = Query("asc", description="Sort direction: asc or desc"),
include_total: bool = Query(False, description="When true, returns {items, total} instead of a plain list"),
db: Session = Depends(get_db),
current_user: User = Depends(get_admin_user)
):
"""List all users with pagination and filtering"""
query = db.query(User)
if search:
# DRY: tokenize and apply case-insensitive multi-field search
tokens = build_query_tokens(search)
filter_expr = tokenized_ilike_filter(tokens, [
User.username,
User.email,
User.first_name,
User.last_name,
])
if filter_expr is not None:
query = query.filter(filter_expr)
if active_only:
query = query.filter(User.is_active == True)
# Sorting (whitelisted)
query = apply_sorting(
query,
sort_by,
sort_dir,
allowed={
"username": [User.username],
"email": [User.email],
"first_name": [User.first_name],
"last_name": [User.last_name],
"created": [User.created_at],
"updated": [User.updated_at],
},
)
users, total = paginate_with_total(query, skip, limit, include_total)
if include_total:
return {"items": users, "total": total or 0}
return users
@router.get("/users/{user_id}", response_model=UserResponse)
async def get_user(
user_id: int,
db: Session = Depends(get_db),
current_user: User = Depends(get_admin_user)
):
"""Get user by ID"""
user = db.query(User).filter(User.id == user_id).first()
if not user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User not found"
)
return user
@router.post("/users", response_model=UserResponse)
async def create_user(
user_data: UserCreate,
request: Request,
db: Session = Depends(get_db),
current_user: User = Depends(get_admin_user)
):
"""Create new user"""
# Check if username already exists
existing_user = db.query(User).filter(User.username == user_data.username).first()
if existing_user:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Username already exists"
)
# Check if email already exists
existing_email = db.query(User).filter(User.email == user_data.email).first()
if existing_email:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Email already exists"
)
# Create new user
hashed_password = get_password_hash(user_data.password)
new_user = User(
username=user_data.username,
email=user_data.email,
first_name=user_data.first_name,
last_name=user_data.last_name,
hashed_password=hashed_password,
is_admin=user_data.is_admin,
is_active=user_data.is_active,
created_at=datetime.now(timezone.utc),
updated_at=datetime.now(timezone.utc)
)
db.add(new_user)
db.commit()
db.refresh(new_user)
# Log the user creation
audit_service.log_user_action(
db=db,
action="CREATE",
target_user=new_user,
acting_user=current_user,
changes={
"username": new_user.username,
"email": new_user.email,
"is_admin": new_user.is_admin,
"is_active": new_user.is_active
},
request=request
)
return new_user
@router.put("/users/{user_id}", response_model=UserResponse)
async def update_user(
user_id: int,
user_data: UserUpdate,
request: Request,
db: Session = Depends(get_db),
current_user: User = Depends(get_admin_user)
):
"""Update user information"""
user = db.query(User).filter(User.id == user_id).first()
if not user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User not found"
)
# Prevent self-deactivation
if user_id == current_user.id and user_data.is_active is False:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Cannot deactivate your own account"
)
# Check for username conflicts
if user_data.username and user_data.username != user.username:
existing_user = db.query(User).filter(
User.username == user_data.username,
User.id != user_id
).first()
if existing_user:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Username already exists"
)
# Check for email conflicts
if user_data.email and user_data.email != user.email:
existing_email = db.query(User).filter(
User.email == user_data.email,
User.id != user_id
).first()
if existing_email:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Email already exists"
)
# Track changes for audit log
original_values = {
"username": user.username,
"email": user.email,
"first_name": user.first_name,
"last_name": user.last_name,
"is_admin": user.is_admin,
"is_active": user.is_active
}
# Update user fields
update_data = user_data.model_dump(exclude_unset=True)
changes = {}
for field, value in update_data.items():
if getattr(user, field) != value:
changes[field] = {"from": getattr(user, field), "to": value}
setattr(user, field, value)
user.updated_at = datetime.now(timezone.utc)
db.commit()
db.refresh(user)
# Log the user update if there were changes
if changes:
audit_service.log_user_action(
db=db,
action="UPDATE",
target_user=user,
acting_user=current_user,
changes=changes,
request=request
)
return user
@router.delete("/users/{user_id}")
async def delete_user(
user_id: int,
request: Request,
db: Session = Depends(get_db),
current_user: User = Depends(get_admin_user)
):
"""Delete user (soft delete by deactivating)"""
if user_id == current_user.id:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Cannot delete your own account"
)
user = db.query(User).filter(User.id == user_id).first()
if not user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User not found"
)
# Soft delete by deactivating
user.is_active = False
user.updated_at = datetime.now(timezone.utc)
db.commit()
# Log the user deactivation
audit_service.log_user_action(
db=db,
action="DEACTIVATE",
target_user=user,
acting_user=current_user,
changes={"is_active": {"from": True, "to": False}},
request=request
)
return {"message": "User deactivated successfully"}
@router.post("/users/{user_id}/reset-password")
async def reset_user_password(
user_id: int,
password_data: PasswordReset,
request: Request,
db: Session = Depends(get_db),
current_user: User = Depends(get_admin_user)
):
"""Reset user password"""
if password_data.new_password != password_data.confirm_password:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Passwords do not match"
)
user = db.query(User).filter(User.id == user_id).first()
if not user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User not found"
)
# Update password
user.hashed_password = get_password_hash(password_data.new_password)
user.updated_at = datetime.now(timezone.utc)
db.commit()
# Log the password reset
audit_service.log_user_action(
db=db,
action="RESET_PASSWORD",
target_user=user,
acting_user=current_user,
changes={"password": "Password reset by administrator"},
request=request
)
return {"message": "Password reset successfully"}
# System Settings Management
@router.get("/settings")
async def get_system_settings(
db: Session = Depends(get_db),
current_user: User = Depends(get_admin_user)
):
"""Get all system settings"""
settings = db.query(SystemSetup).all()
return {
"settings": [
{
"setting_key": setting.setting_key,
"setting_value": setting.setting_value,
"description": setting.description,
"setting_type": setting.setting_type
}
for setting in settings
]
}
@router.get("/settings/{setting_key}")
async def get_setting(
setting_key: str,
db: Session = Depends(get_db),
current_user: User = Depends(get_admin_user)
):
"""Get specific system setting"""
setting = db.query(SystemSetup).filter(SystemSetup.setting_key == setting_key).first()
if not setting:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Setting not found"
)
return {
"setting_key": setting.setting_key,
"setting_value": setting.setting_value,
"description": setting.description,
"setting_type": setting.setting_type
}
@router.post("/settings")
async def create_setting(
setting_data: SystemSetting,
db: Session = Depends(get_db),
current_user: User = Depends(get_admin_user)
):
"""Create new system setting"""
# Check if setting already exists
existing_setting = db.query(SystemSetup).filter(
SystemSetup.setting_key == setting_data.setting_key
).first()
if existing_setting:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Setting already exists"
)
new_setting = SystemSetup(
setting_key=setting_data.setting_key,
setting_value=setting_data.setting_value,
description=setting_data.description,
setting_type=setting_data.setting_type
)
db.add(new_setting)
db.commit()
db.refresh(new_setting)
return {
"message": "Setting created successfully",
"setting": {
"setting_key": new_setting.setting_key,
"setting_value": new_setting.setting_value,
"description": new_setting.description,
"setting_type": new_setting.setting_type
}
}
@router.put("/settings/{setting_key}")
async def update_setting(
setting_key: str,
setting_data: SettingUpdate,
db: Session = Depends(get_db),
current_user: User = Depends(get_admin_user)
):
"""Update system setting"""
setting = db.query(SystemSetup).filter(SystemSetup.setting_key == setting_key).first()
if not setting:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Setting not found"
)
# Update setting
setting.setting_value = setting_data.setting_value
if setting_data.description:
setting.description = setting_data.description
db.commit()
db.refresh(setting)
return {
"message": "Setting updated successfully",
"setting": {
"setting_key": setting.setting_key,
"setting_value": setting.setting_value,
"description": setting.description,
"setting_type": setting.setting_type
}
}
@router.delete("/settings/{setting_key}")
async def delete_setting(
setting_key: str,
db: Session = Depends(get_db),
current_user: User = Depends(get_admin_user)
):
"""Delete system setting"""
setting = db.query(SystemSetup).filter(SystemSetup.setting_key == setting_key).first()
if not setting:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Setting not found"
)
db.delete(setting)
db.commit()
return {"message": "Setting deleted successfully"}
# Database Maintenance and Lookup Management
@router.get("/lookups/tables")
async def get_lookup_tables(
db: Session = Depends(get_db),
current_user: User = Depends(get_admin_user)
):
"""Get information about all lookup tables"""
tables = [
{
"table_name": "employees",
"display_name": "Employees",
"record_count": db.query(func.count(Employee.empl_num)).scalar(),
"description": "Staff/attorney information"
},
{
"table_name": "file_types",
"display_name": "File Types",
"record_count": db.query(func.count(FileType.type_code)).scalar(),
"description": "Case/file type definitions"
},
{
"table_name": "file_statuses",
"display_name": "File Statuses",
"record_count": db.query(func.count(FileStatus.status_code)).scalar(),
"description": "File status codes"
},
{
"table_name": "transaction_types",
"display_name": "Transaction Types",
"record_count": db.query(func.count(TransactionType.t_type)).scalar(),
"description": "Ledger transaction types"
},
{
"table_name": "transaction_codes",
"display_name": "Transaction Codes",
"record_count": db.query(func.count(TransactionCode.t_code)).scalar(),
"description": "Billing/expense codes"
},
{
"table_name": "states",
"display_name": "States",
"record_count": db.query(func.count(State.abbreviation)).scalar(),
"description": "US states and territories"
}
]
return {"tables": tables}
@router.post("/maintenance/vacuum")
async def vacuum_database(
db: Session = Depends(get_db),
current_user: User = Depends(get_admin_user)
):
"""Vacuum/optimize database (SQLite only)"""
if "sqlite" not in settings.database_url:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Database vacuum only supported for SQLite"
)
start_time = time.time()
try:
db.execute(text("VACUUM"))
db.commit()
end_time = time.time()
duration = end_time - start_time
return {
"operation": "vacuum",
"status": "success",
"message": "Database vacuum completed successfully",
"duration_seconds": duration
}
except Exception as e:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Database vacuum failed: {str(e)}"
)
@router.post("/maintenance/analyze")
async def analyze_database(
db: Session = Depends(get_db),
current_user: User = Depends(get_admin_user)
):
"""Analyze database statistics (SQLite only)"""
if "sqlite" not in settings.database_url:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Database analyze only supported for SQLite"
)
start_time = time.time()
try:
db.execute(text("ANALYZE"))
db.commit()
end_time = time.time()
duration = end_time - start_time
return {
"operation": "analyze",
"status": "success",
"message": "Database analysis completed successfully",
"duration_seconds": duration
}
except Exception as e:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Database analysis failed: {str(e)}"
)
@router.post("/backup/create")
async def create_backup(
db: Session = Depends(get_db),
current_user: User = Depends(get_admin_user)
):
"""Create database backup"""
if "sqlite" not in settings.database_url:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Backup creation only supported for SQLite"
)
try:
# Create backup directory if it doesn't exist
backup_dir = Path("backups")
backup_dir.mkdir(exist_ok=True)
# Generate backup filename
timestamp = datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%S")
backup_filename = f"delphi_backup_{timestamp}.db"
backup_path = backup_dir / backup_filename
# Copy database file
db_path = settings.database_url.replace("sqlite:///", "")
if os.path.exists(db_path):
shutil.copy2(db_path, backup_path)
# Get backup info
backup_size = os.path.getsize(backup_path)
return {
"message": "Backup created successfully",
"backup_info": {
"filename": backup_filename,
"size": f"{backup_size / (1024*1024):.1f} MB",
"created_at": datetime.now(timezone.utc).isoformat(),
"backup_type": "manual",
"status": "completed"
}
}
else:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Database file not found"
)
except Exception as e:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Backup creation failed: {str(e)}"
)
@router.get("/backup/list")
async def list_backups(
db: Session = Depends(get_db),
current_user: User = Depends(get_admin_user)
):
"""List available backups"""
backup_dir = Path("backups")
if not backup_dir.exists():
return {"backups": []}
backups = []
backup_files = list(backup_dir.glob("*.db"))
for backup_file in sorted(backup_files, key=lambda p: p.stat().st_mtime, reverse=True):
stat_info = backup_file.stat()
backups.append({
"filename": backup_file.name,
"size": f"{stat_info.st_size / (1024*1024):.1f} MB",
"created_at": datetime.fromtimestamp(stat_info.st_mtime).isoformat(),
"backup_type": "manual" if "backup_" in backup_file.name else "automatic",
"status": "completed"
})
return {"backups": backups}
# Audit Logging and Activity Monitoring
@router.get("/audit/logs")
async def get_audit_logs(
skip: int = Query(0, ge=0),
limit: int = Query(100, ge=1, le=1000),
user_id: Optional[int] = Query(None),
resource_type: Optional[str] = Query(None),
action: Optional[str] = Query(None),
hours_back: int = Query(168, ge=1, le=8760), # Default 7 days, max 1 year
sort_by: Optional[str] = Query("timestamp", description="Sort by: timestamp, username, action, resource_type"),
sort_dir: Optional[str] = Query("desc", description="Sort direction: asc or desc"),
include_total: bool = Query(False, description="When true, returns {items, total} instead of a plain list"),
db: Session = Depends(get_db),
current_user: User = Depends(get_admin_user)
):
"""Get audit log entries with filtering, sorting, and pagination"""
cutoff_time = datetime.now(timezone.utc) - timedelta(hours=hours_back)
query = db.query(AuditLog).filter(AuditLog.timestamp >= cutoff_time)
if user_id:
query = query.filter(AuditLog.user_id == user_id)
if resource_type:
query = query.filter(AuditLog.resource_type.ilike(f"%{resource_type}%"))
if action:
query = query.filter(AuditLog.action.ilike(f"%{action}%"))
# Sorting (whitelisted)
query = apply_sorting(
query,
sort_by,
sort_dir,
allowed={
"timestamp": [AuditLog.timestamp],
"username": [AuditLog.username],
"action": [AuditLog.action],
"resource_type": [AuditLog.resource_type],
},
)
logs, total = paginate_with_total(query, skip, limit, include_total)
items = [
{
"id": log.id,
"user_id": log.user_id,
"username": log.username,
"action": log.action,
"resource_type": log.resource_type,
"resource_id": log.resource_id,
"details": log.details,
"ip_address": log.ip_address,
"user_agent": log.user_agent,
"timestamp": log.timestamp.isoformat(),
}
for log in logs
]
if include_total:
return {"items": items, "total": total or 0}
return items
@router.get("/audit/login-attempts")
async def get_login_attempts(
skip: int = Query(0, ge=0),
limit: int = Query(100, ge=1, le=1000),
username: Optional[str] = Query(None),
failed_only: bool = Query(False),
hours_back: int = Query(168, ge=1, le=8760), # Default 7 days
sort_by: Optional[str] = Query("timestamp", description="Sort by: timestamp, username, ip_address, success"),
sort_dir: Optional[str] = Query("desc", description="Sort direction: asc or desc"),
include_total: bool = Query(False, description="When true, returns {items, total} instead of a plain list"),
db: Session = Depends(get_db),
current_user: User = Depends(get_admin_user)
):
"""Get login attempts with filtering, sorting, and pagination"""
cutoff_time = datetime.now(timezone.utc) - timedelta(hours=hours_back)
query = db.query(LoginAttempt).filter(LoginAttempt.timestamp >= cutoff_time)
if username:
query = query.filter(LoginAttempt.username.ilike(f"%{username}%"))
if failed_only:
query = query.filter(LoginAttempt.success == 0)
# Sorting (whitelisted)
query = apply_sorting(
query,
sort_by,
sort_dir,
allowed={
"timestamp": [LoginAttempt.timestamp],
"username": [LoginAttempt.username],
"ip_address": [LoginAttempt.ip_address],
"success": [LoginAttempt.success],
},
)
attempts, total = paginate_with_total(query, skip, limit, include_total)
items = [
{
"id": attempt.id,
"username": attempt.username,
"ip_address": attempt.ip_address,
"user_agent": attempt.user_agent,
"success": bool(attempt.success),
"failure_reason": attempt.failure_reason,
"timestamp": attempt.timestamp.isoformat(),
}
for attempt in attempts
]
if include_total:
return {"items": items, "total": total or 0}
return items
@router.get("/audit/user-activity/{user_id}")
async def get_user_activity(
user_id: int,
limit: int = Query(100, ge=1, le=500),
db: Session = Depends(get_db),
current_user: User = Depends(get_admin_user)
):
"""Get activity for a specific user"""
# Verify user exists
user = db.query(User).filter(User.id == user_id).first()
if not user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User not found"
)
logs = audit_service.get_user_activity(db, user_id, limit)
return {
"user": {
"id": user.id,
"username": user.username,
"email": user.email,
"first_name": user.first_name,
"last_name": user.last_name
},
"activity": [
{
"id": log.id,
"action": log.action,
"resource_type": log.resource_type,
"resource_id": log.resource_id,
"details": log.details,
"ip_address": log.ip_address,
"timestamp": log.timestamp.isoformat()
}
for log in logs
]
}
@router.get("/audit/security-alerts")
async def get_security_alerts(
hours_back: int = Query(24, ge=1, le=168), # Default 24 hours, max 7 days
db: Session = Depends(get_db),
current_user: User = Depends(get_admin_user)
):
"""Get security alerts and suspicious activity"""
cutoff_time = datetime.now(timezone.utc) - timedelta(hours=hours_back)
# Get failed login attempts
failed_logins = db.query(LoginAttempt).filter(
LoginAttempt.success == 0,
LoginAttempt.timestamp >= cutoff_time
).order_by(LoginAttempt.timestamp.desc()).all()
# Group failed logins by IP and username for analysis
failed_by_ip = {}
failed_by_username = {}
for attempt in failed_logins:
# Group by IP
if attempt.ip_address not in failed_by_ip:
failed_by_ip[attempt.ip_address] = []
failed_by_ip[attempt.ip_address].append(attempt)
# Group by username
if attempt.username not in failed_by_username:
failed_by_username[attempt.username] = []
failed_by_username[attempt.username].append(attempt)
alerts = []
# Check for suspicious IPs (multiple failed attempts)
for ip, attempts in failed_by_ip.items():
if len(attempts) >= 5: # Threshold for suspicious activity
alerts.append({
"type": "SUSPICIOUS_IP",
"severity": "HIGH" if len(attempts) >= 10 else "MEDIUM",
"description": f"IP {ip} had {len(attempts)} failed login attempts",
"details": {
"ip_address": ip,
"failed_attempts": len(attempts),
"usernames_targeted": list(set(a.username for a in attempts)),
"time_range": f"{attempts[-1].timestamp.isoformat()} to {attempts[0].timestamp.isoformat()}"
}
})
# Check for targeted usernames (multiple failed attempts on same account)
for username, attempts in failed_by_username.items():
if len(attempts) >= 3: # Threshold for account targeting
alerts.append({
"type": "ACCOUNT_TARGETED",
"severity": "HIGH" if len(attempts) >= 5 else "MEDIUM",
"description": f"Username '{username}' had {len(attempts)} failed login attempts",
"details": {
"username": username,
"failed_attempts": len(attempts),
"source_ips": list(set(a.ip_address for a in attempts)),
"time_range": f"{attempts[-1].timestamp.isoformat()} to {attempts[0].timestamp.isoformat()}"
}
})
# Get recent admin actions
admin_actions = db.query(AuditLog).filter(
AuditLog.timestamp >= cutoff_time,
AuditLog.action.in_(["DELETE", "DEACTIVATE", "RESET_PASSWORD", "GRANT_ADMIN"])
).order_by(AuditLog.timestamp.desc()).limit(10).all()
# Add alerts for sensitive admin actions
for action in admin_actions:
if action.action in ["DELETE", "DEACTIVATE"]:
alerts.append({
"type": "ADMIN_ACTION",
"severity": "MEDIUM",
"description": f"Admin {action.username} performed {action.action} on {action.resource_type}",
"details": {
"admin_user": action.username,
"action": action.action,
"resource_type": action.resource_type,
"resource_id": action.resource_id,
"timestamp": action.timestamp.isoformat()
}
})
return {
"alert_summary": {
"total_alerts": len(alerts),
"high_severity": len([a for a in alerts if a["severity"] == "HIGH"]),
"medium_severity": len([a for a in alerts if a["severity"] == "MEDIUM"]),
"failed_logins_total": len(failed_logins)
},
"alerts": alerts[:20], # Return top 20 alerts
"recent_failed_logins": [
{
"username": attempt.username,
"ip_address": attempt.ip_address,
"failure_reason": attempt.failure_reason,
"timestamp": attempt.timestamp.isoformat()
}
for attempt in failed_logins[:10]
]
}
@router.get("/audit/statistics")
async def get_audit_statistics(
days_back: int = Query(30, ge=1, le=365), # Default 30 days
db: Session = Depends(get_db),
current_user: User = Depends(get_admin_user)
):
"""Get audit statistics and metrics"""
cutoff_time = datetime.now(timezone.utc) - timedelta(days=days_back)
# Total activity counts
total_audit_entries = db.query(func.count(AuditLog.id)).filter(
AuditLog.timestamp >= cutoff_time
).scalar()
total_login_attempts = db.query(func.count(LoginAttempt.id)).filter(
LoginAttempt.timestamp >= cutoff_time
).scalar()
successful_logins = db.query(func.count(LoginAttempt.id)).filter(
LoginAttempt.timestamp >= cutoff_time,
LoginAttempt.success == 1
).scalar()
failed_logins = db.query(func.count(LoginAttempt.id)).filter(
LoginAttempt.timestamp >= cutoff_time,
LoginAttempt.success == 0
).scalar()
# Activity by action type
activity_by_action = db.query(
AuditLog.action,
func.count(AuditLog.id).label('count')
).filter(
AuditLog.timestamp >= cutoff_time
).group_by(AuditLog.action).all()
# Activity by resource type
activity_by_resource = db.query(
AuditLog.resource_type,
func.count(AuditLog.id).label('count')
).filter(
AuditLog.timestamp >= cutoff_time
).group_by(AuditLog.resource_type).all()
# Most active users
most_active_users = db.query(
AuditLog.username,
func.count(AuditLog.id).label('count')
).filter(
AuditLog.timestamp >= cutoff_time,
AuditLog.username != "system"
).group_by(AuditLog.username).order_by(func.count(AuditLog.id).desc()).limit(10).all()
return {
"period": f"Last {days_back} days",
"summary": {
"total_audit_entries": total_audit_entries,
"total_login_attempts": total_login_attempts,
"successful_logins": successful_logins,
"failed_logins": failed_logins,
"success_rate": round((successful_logins / total_login_attempts * 100) if total_login_attempts > 0 else 0, 1)
},
"activity_by_action": [
{"action": action, "count": count}
for action, count in activity_by_action
],
"activity_by_resource": [
{"resource_type": resource_type, "count": count}
for resource_type, count in activity_by_resource
],
"most_active_users": [
{"username": username, "activity_count": count}
for username, count in most_active_users
]
}