""" Comprehensive Admin API endpoints - User management, system settings, audit logging """ from typing import List, Dict, Any, Optional from fastapi import APIRouter, Depends, HTTPException, status, UploadFile, File, Query, Body, Request from fastapi.responses import FileResponse from sqlalchemy.orm import Session, joinedload from sqlalchemy import func, text, desc, asc, and_, or_ import csv import io import os import hashlib import secrets import shutil import time from datetime import datetime, timedelta, date from pathlib import Path from app.database.base import get_db from app.models import User, Rolodex, File as FileModel, Ledger, QDRO, AuditLog, LoginAttempt from app.models.lookups import SystemSetup, Employee, FileType, FileStatus, TransactionType, TransactionCode, State, FormIndex from app.auth.security import get_admin_user, get_password_hash, create_access_token from app.services.audit import audit_service from app.config import settings router = APIRouter() # Enhanced Admin Schemas from pydantic import BaseModel, Field, EmailStr class SystemStats(BaseModel): """Enhanced system statistics""" total_customers: int total_files: int total_transactions: int total_qdros: int total_users: int total_active_users: int total_admins: int database_size: str last_backup: str system_uptime: str recent_activity: List[Dict[str, Any]] class HealthCheck(BaseModel): """Comprehensive system health check""" status: str database_connected: bool disk_space_available: bool memory_available: bool version: str uptime: str last_backup: Optional[str] active_sessions: int cpu_usage: float alerts: List[str] class UserCreate(BaseModel): """Create new user""" username: str = Field(..., min_length=3, max_length=50) email: EmailStr password: str = Field(..., min_length=6) first_name: Optional[str] = None last_name: Optional[str] = None is_admin: bool = False is_active: bool = True class UserUpdate(BaseModel): """Update user information""" username: Optional[str] = Field(None, min_length=3, max_length=50) email: Optional[EmailStr] = None first_name: Optional[str] = None last_name: Optional[str] = None is_admin: Optional[bool] = None is_active: Optional[bool] = None class UserResponse(BaseModel): """User response model""" id: int username: str email: str first_name: Optional[str] last_name: Optional[str] is_admin: bool is_active: bool last_login: Optional[datetime] created_at: Optional[datetime] updated_at: Optional[datetime] class Config: from_attributes = True class PasswordReset(BaseModel): """Password reset request""" new_password: str = Field(..., min_length=6) confirm_password: str = Field(..., min_length=6) class SystemSetting(BaseModel): """System setting model""" setting_key: str setting_value: str description: Optional[str] = None setting_type: str = "STRING" class SettingUpdate(BaseModel): """Update system setting""" setting_value: str description: Optional[str] = None class AuditLogEntry(BaseModel): """Audit log entry""" id: int user_id: Optional[int] username: Optional[str] action: str resource_type: str resource_id: Optional[str] details: Optional[Dict[str, Any]] ip_address: Optional[str] user_agent: Optional[str] timestamp: datetime class Config: from_attributes = True class BackupInfo(BaseModel): """Backup information""" filename: str size: str created_at: datetime backup_type: str status: str class LookupTableInfo(BaseModel): """Lookup table information""" table_name: str display_name: str record_count: int last_updated: Optional[datetime] description: str class DatabaseMaintenanceResult(BaseModel): """Database maintenance operation result""" operation: str status: str message: str duration_seconds: float records_affected: Optional[int] = None # Enhanced Health and Statistics Endpoints @router.get("/health", response_model=HealthCheck) async def system_health( db: Session = Depends(get_db), current_user: User = Depends(get_admin_user) ): """Comprehensive system health check""" alerts = [] # Test database connection try: db.execute(text("SELECT 1")) db_connected = True except Exception as e: db_connected = False alerts.append(f"Database connection failed: {str(e)}") # Check disk space try: total, used, free = shutil.disk_usage(".") free_gb = free / (1024**3) disk_available = free_gb > 1.0 # 1GB minimum if not disk_available: alerts.append(f"Low disk space: {free_gb:.1f}GB remaining") except: disk_available = True # Check memory (simplified) try: import psutil memory = psutil.virtual_memory() memory_available = memory.percent < 90 if not memory_available: alerts.append(f"High memory usage: {memory.percent:.1f}%") except ImportError: memory_available = True # Get CPU usage try: import psutil cpu_usage = psutil.cpu_percent(interval=1) if cpu_usage > 80: alerts.append(f"High CPU usage: {cpu_usage:.1f}%") except ImportError: cpu_usage = 0.0 # Count active sessions (simplified) try: active_sessions = db.query(User).filter( User.last_login > datetime.now() - timedelta(hours=24) ).count() except: active_sessions = 0 # Check last backup last_backup = None try: backup_dir = Path("backups") if backup_dir.exists(): backup_files = list(backup_dir.glob("*.db")) if backup_files: latest_backup = max(backup_files, key=lambda p: p.stat().st_mtime) backup_age = datetime.now() - datetime.fromtimestamp(latest_backup.stat().st_mtime) last_backup = latest_backup.name if backup_age.days > 7: alerts.append(f"Last backup is {backup_age.days} days old") except: alerts.append("Unable to check backup status") # System uptime (simplified) try: import psutil uptime_seconds = time.time() - psutil.boot_time() uptime = str(timedelta(seconds=int(uptime_seconds))) except ImportError: uptime = "Unknown" status = "healthy" if db_connected and disk_available and memory_available else "unhealthy" return HealthCheck( status=status, database_connected=db_connected, disk_space_available=disk_available, memory_available=memory_available, version=settings.app_version, uptime=uptime, last_backup=last_backup, active_sessions=active_sessions, cpu_usage=cpu_usage, alerts=alerts ) @router.get("/stats", response_model=SystemStats) async def system_statistics( db: Session = Depends(get_db), current_user: User = Depends(get_admin_user) ): """Enhanced system statistics with comprehensive metrics""" total_customers = db.query(func.count(Rolodex.id)).scalar() total_files = db.query(func.count(FileModel.file_no)).scalar() total_transactions = db.query(func.count(Ledger.id)).scalar() total_qdros = db.query(func.count(QDRO.id)).scalar() total_users = db.query(func.count(User.id)).scalar() # Count active users (logged in within last 30 days) total_active_users = db.query(func.count(User.id)).filter( User.last_login > datetime.now() - timedelta(days=30) ).scalar() # Count admin users total_admins = db.query(func.count(User.id)).filter(User.is_admin == True).scalar() # Database size (for SQLite) db_size = "Unknown" if "sqlite" in settings.database_url: try: db_path = settings.database_url.replace("sqlite:///", "") if os.path.exists(db_path): size_bytes = os.path.getsize(db_path) db_size = f"{size_bytes / (1024*1024):.1f} MB" except: pass # Check for recent backups last_backup = "Not found" try: backup_dir = Path("backups") if backup_dir.exists(): backup_files = list(backup_dir.glob("*.db")) if backup_files: latest_backup = max(backup_files, key=lambda p: p.stat().st_mtime) last_backup = latest_backup.name except: pass # System uptime (simplified) system_uptime = "Unknown" try: import psutil uptime_seconds = time.time() - psutil.boot_time() system_uptime = str(timedelta(seconds=int(uptime_seconds))) except ImportError: pass # Recent activity (last 10 actions) recent_activity = [] try: # Get recent files created recent_files = db.query(FileModel).order_by(desc(FileModel.opened)).limit(5).all() for file in recent_files: recent_activity.append({ "type": "file_created", "description": f"File {file.file_no} created", "timestamp": file.opened.isoformat() if file.opened else None }) # Get recent customer additions recent_customers = db.query(Rolodex).order_by(desc(Rolodex.id)).limit(5).all() for customer in recent_customers: recent_activity.append({ "type": "customer_added", "description": f"Customer {customer.first} {customer.last} added", "timestamp": datetime.now().isoformat() }) except: pass return SystemStats( total_customers=total_customers, total_files=total_files, total_transactions=total_transactions, total_qdros=total_qdros, total_users=total_users, total_active_users=total_active_users, total_admins=total_admins, database_size=db_size, last_backup=last_backup, system_uptime=system_uptime, recent_activity=recent_activity ) @router.post("/import/csv") async def import_csv( table_name: str, file: UploadFile = File(...), db: Session = Depends(get_db), current_user: User = Depends(get_admin_user) ): """Import data from CSV file""" if not file.filename.endswith('.csv'): raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="File must be a CSV" ) # Read CSV content content = await file.read() csv_data = csv.DictReader(io.StringIO(content.decode('utf-8'))) imported_count = 0 errors = [] try: if table_name.lower() == "customers" or table_name.lower() == "rolodex": for row_num, row in enumerate(csv_data, start=1): try: customer = Rolodex(**row) db.add(customer) imported_count += 1 except Exception as e: errors.append(f"Row {row_num}: {str(e)}") elif table_name.lower() == "files": for row_num, row in enumerate(csv_data, start=1): try: # Convert date strings to date objects if needed if 'opened' in row and row['opened']: row['opened'] = datetime.strptime(row['opened'], '%Y-%m-%d').date() if 'closed' in row and row['closed']: row['closed'] = datetime.strptime(row['closed'], '%Y-%m-%d').date() file_obj = FileModel(**row) db.add(file_obj) imported_count += 1 except Exception as e: errors.append(f"Row {row_num}: {str(e)}") else: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=f"Import not supported for table: {table_name}" ) db.commit() except Exception as e: db.rollback() raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=f"Import failed: {str(e)}" ) return { "message": f"Import completed", "imported_count": imported_count, "error_count": len(errors), "errors": errors[:10] # Return first 10 errors } @router.get("/export/{table_name}") async def export_table( table_name: str, db: Session = Depends(get_db), current_user: User = Depends(get_admin_user) ): """Export table data to CSV""" # Create exports directory if it doesn't exist os.makedirs("exports", exist_ok=True) filename = f"exports/{table_name}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv" try: if table_name.lower() == "customers" or table_name.lower() == "rolodex": customers = db.query(Rolodex).all() with open(filename, 'w', newline='', encoding='utf-8') as csvfile: if customers: fieldnames = ['id', 'last', 'first', 'middle', 'prefix', 'suffix', 'title', 'group', 'a1', 'a2', 'a3', 'city', 'abrev', 'zip', 'email', 'dob', 'ss_number', 'legal_status', 'memo'] writer = csv.DictWriter(csvfile, fieldnames=fieldnames) writer.writeheader() for customer in customers: row = {field: getattr(customer, field) for field in fieldnames} writer.writerow(row) elif table_name.lower() == "files": files = db.query(FileModel).all() with open(filename, 'w', newline='', encoding='utf-8') as csvfile: if files: fieldnames = ['file_no', 'id', 'regarding', 'empl_num', 'file_type', 'opened', 'closed', 'status', 'footer_code', 'opposing', 'rate_per_hour', 'memo'] writer = csv.DictWriter(csvfile, fieldnames=fieldnames) writer.writeheader() for file_obj in files: row = {field: getattr(file_obj, field) for field in fieldnames} writer.writerow(row) else: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=f"Export not supported for table: {table_name}" ) return FileResponse( filename, media_type='text/csv', filename=f"{table_name}_export.csv" ) except Exception as e: raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Export failed: {str(e)}" ) @router.get("/backup/download") async def download_backup( current_user: User = Depends(get_admin_user) ): """Download database backup""" if "sqlite" in settings.database_url: db_path = settings.database_url.replace("sqlite:///", "") if os.path.exists(db_path): return FileResponse( db_path, media_type='application/octet-stream', filename=f"delphi_backup_{datetime.now().strftime('%Y%m%d_%H%M%S')}.db" ) raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Database backup not available" ) # User Management Endpoints @router.get("/users", response_model=List[UserResponse]) async def list_users( skip: int = Query(0, ge=0), limit: int = Query(100, ge=1, le=1000), search: Optional[str] = Query(None), active_only: bool = Query(False), db: Session = Depends(get_db), current_user: User = Depends(get_admin_user) ): """List all users with pagination and filtering""" query = db.query(User) if search: query = query.filter( or_( User.username.ilike(f"%{search}%"), User.email.ilike(f"%{search}%"), User.first_name.ilike(f"%{search}%"), User.last_name.ilike(f"%{search}%") ) ) if active_only: query = query.filter(User.is_active == True) users = query.offset(skip).limit(limit).all() return users @router.get("/users/{user_id}", response_model=UserResponse) async def get_user( user_id: int, db: Session = Depends(get_db), current_user: User = Depends(get_admin_user) ): """Get user by ID""" user = db.query(User).filter(User.id == user_id).first() if not user: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="User not found" ) return user @router.post("/users", response_model=UserResponse) async def create_user( user_data: UserCreate, request: Request, db: Session = Depends(get_db), current_user: User = Depends(get_admin_user) ): """Create new user""" # Check if username already exists existing_user = db.query(User).filter(User.username == user_data.username).first() if existing_user: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Username already exists" ) # Check if email already exists existing_email = db.query(User).filter(User.email == user_data.email).first() if existing_email: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Email already exists" ) # Create new user hashed_password = get_password_hash(user_data.password) new_user = User( username=user_data.username, email=user_data.email, first_name=user_data.first_name, last_name=user_data.last_name, hashed_password=hashed_password, is_admin=user_data.is_admin, is_active=user_data.is_active, created_at=datetime.now(), updated_at=datetime.now() ) db.add(new_user) db.commit() db.refresh(new_user) # Log the user creation audit_service.log_user_action( db=db, action="CREATE", target_user=new_user, acting_user=current_user, changes={ "username": new_user.username, "email": new_user.email, "is_admin": new_user.is_admin, "is_active": new_user.is_active }, request=request ) return new_user @router.put("/users/{user_id}", response_model=UserResponse) async def update_user( user_id: int, user_data: UserUpdate, request: Request, db: Session = Depends(get_db), current_user: User = Depends(get_admin_user) ): """Update user information""" user = db.query(User).filter(User.id == user_id).first() if not user: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="User not found" ) # Prevent self-deactivation if user_id == current_user.id and user_data.is_active is False: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Cannot deactivate your own account" ) # Check for username conflicts if user_data.username and user_data.username != user.username: existing_user = db.query(User).filter( User.username == user_data.username, User.id != user_id ).first() if existing_user: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Username already exists" ) # Check for email conflicts if user_data.email and user_data.email != user.email: existing_email = db.query(User).filter( User.email == user_data.email, User.id != user_id ).first() if existing_email: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Email already exists" ) # Track changes for audit log original_values = { "username": user.username, "email": user.email, "first_name": user.first_name, "last_name": user.last_name, "is_admin": user.is_admin, "is_active": user.is_active } # Update user fields update_data = user_data.model_dump(exclude_unset=True) changes = {} for field, value in update_data.items(): if getattr(user, field) != value: changes[field] = {"from": getattr(user, field), "to": value} setattr(user, field, value) user.updated_at = datetime.now() db.commit() db.refresh(user) # Log the user update if there were changes if changes: audit_service.log_user_action( db=db, action="UPDATE", target_user=user, acting_user=current_user, changes=changes, request=request ) return user @router.delete("/users/{user_id}") async def delete_user( user_id: int, request: Request, db: Session = Depends(get_db), current_user: User = Depends(get_admin_user) ): """Delete user (soft delete by deactivating)""" if user_id == current_user.id: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Cannot delete your own account" ) user = db.query(User).filter(User.id == user_id).first() if not user: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="User not found" ) # Soft delete by deactivating user.is_active = False user.updated_at = datetime.now() db.commit() # Log the user deactivation audit_service.log_user_action( db=db, action="DEACTIVATE", target_user=user, acting_user=current_user, changes={"is_active": {"from": True, "to": False}}, request=request ) return {"message": "User deactivated successfully"} @router.post("/users/{user_id}/reset-password") async def reset_user_password( user_id: int, password_data: PasswordReset, request: Request, db: Session = Depends(get_db), current_user: User = Depends(get_admin_user) ): """Reset user password""" if password_data.new_password != password_data.confirm_password: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Passwords do not match" ) user = db.query(User).filter(User.id == user_id).first() if not user: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="User not found" ) # Update password user.hashed_password = get_password_hash(password_data.new_password) user.updated_at = datetime.now() db.commit() # Log the password reset audit_service.log_user_action( db=db, action="RESET_PASSWORD", target_user=user, acting_user=current_user, changes={"password": "Password reset by administrator"}, request=request ) return {"message": "Password reset successfully"} # System Settings Management @router.get("/settings") async def get_system_settings( db: Session = Depends(get_db), current_user: User = Depends(get_admin_user) ): """Get all system settings""" settings = db.query(SystemSetup).all() return { "settings": [ { "setting_key": setting.setting_key, "setting_value": setting.setting_value, "description": setting.description, "setting_type": setting.setting_type } for setting in settings ] } @router.get("/settings/{setting_key}") async def get_setting( setting_key: str, db: Session = Depends(get_db), current_user: User = Depends(get_admin_user) ): """Get specific system setting""" setting = db.query(SystemSetup).filter(SystemSetup.setting_key == setting_key).first() if not setting: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Setting not found" ) return { "setting_key": setting.setting_key, "setting_value": setting.setting_value, "description": setting.description, "setting_type": setting.setting_type } @router.post("/settings") async def create_setting( setting_data: SystemSetting, db: Session = Depends(get_db), current_user: User = Depends(get_admin_user) ): """Create new system setting""" # Check if setting already exists existing_setting = db.query(SystemSetup).filter( SystemSetup.setting_key == setting_data.setting_key ).first() if existing_setting: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Setting already exists" ) new_setting = SystemSetup( setting_key=setting_data.setting_key, setting_value=setting_data.setting_value, description=setting_data.description, setting_type=setting_data.setting_type ) db.add(new_setting) db.commit() db.refresh(new_setting) return { "message": "Setting created successfully", "setting": { "setting_key": new_setting.setting_key, "setting_value": new_setting.setting_value, "description": new_setting.description, "setting_type": new_setting.setting_type } } @router.put("/settings/{setting_key}") async def update_setting( setting_key: str, setting_data: SettingUpdate, db: Session = Depends(get_db), current_user: User = Depends(get_admin_user) ): """Update system setting""" setting = db.query(SystemSetup).filter(SystemSetup.setting_key == setting_key).first() if not setting: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Setting not found" ) # Update setting setting.setting_value = setting_data.setting_value if setting_data.description: setting.description = setting_data.description db.commit() db.refresh(setting) return { "message": "Setting updated successfully", "setting": { "setting_key": setting.setting_key, "setting_value": setting.setting_value, "description": setting.description, "setting_type": setting.setting_type } } @router.delete("/settings/{setting_key}") async def delete_setting( setting_key: str, db: Session = Depends(get_db), current_user: User = Depends(get_admin_user) ): """Delete system setting""" setting = db.query(SystemSetup).filter(SystemSetup.setting_key == setting_key).first() if not setting: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Setting not found" ) db.delete(setting) db.commit() return {"message": "Setting deleted successfully"} # Database Maintenance and Lookup Management @router.get("/lookups/tables") async def get_lookup_tables( db: Session = Depends(get_db), current_user: User = Depends(get_admin_user) ): """Get information about all lookup tables""" tables = [ { "table_name": "employees", "display_name": "Employees", "record_count": db.query(func.count(Employee.empl_num)).scalar(), "description": "Staff/attorney information" }, { "table_name": "file_types", "display_name": "File Types", "record_count": db.query(func.count(FileType.type_code)).scalar(), "description": "Case/file type definitions" }, { "table_name": "file_statuses", "display_name": "File Statuses", "record_count": db.query(func.count(FileStatus.status_code)).scalar(), "description": "File status codes" }, { "table_name": "transaction_types", "display_name": "Transaction Types", "record_count": db.query(func.count(TransactionType.t_type)).scalar(), "description": "Ledger transaction types" }, { "table_name": "transaction_codes", "display_name": "Transaction Codes", "record_count": db.query(func.count(TransactionCode.t_code)).scalar(), "description": "Billing/expense codes" }, { "table_name": "states", "display_name": "States", "record_count": db.query(func.count(State.abbreviation)).scalar(), "description": "US states and territories" } ] return {"tables": tables} @router.post("/maintenance/vacuum") async def vacuum_database( db: Session = Depends(get_db), current_user: User = Depends(get_admin_user) ): """Vacuum/optimize database (SQLite only)""" if "sqlite" not in settings.database_url: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Database vacuum only supported for SQLite" ) start_time = time.time() try: db.execute(text("VACUUM")) db.commit() end_time = time.time() duration = end_time - start_time return { "operation": "vacuum", "status": "success", "message": "Database vacuum completed successfully", "duration_seconds": duration } except Exception as e: raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Database vacuum failed: {str(e)}" ) @router.post("/maintenance/analyze") async def analyze_database( db: Session = Depends(get_db), current_user: User = Depends(get_admin_user) ): """Analyze database statistics (SQLite only)""" if "sqlite" not in settings.database_url: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Database analyze only supported for SQLite" ) start_time = time.time() try: db.execute(text("ANALYZE")) db.commit() end_time = time.time() duration = end_time - start_time return { "operation": "analyze", "status": "success", "message": "Database analysis completed successfully", "duration_seconds": duration } except Exception as e: raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Database analysis failed: {str(e)}" ) @router.post("/backup/create") async def create_backup( db: Session = Depends(get_db), current_user: User = Depends(get_admin_user) ): """Create database backup""" if "sqlite" not in settings.database_url: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Backup creation only supported for SQLite" ) try: # Create backup directory if it doesn't exist backup_dir = Path("backups") backup_dir.mkdir(exist_ok=True) # Generate backup filename timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") backup_filename = f"delphi_backup_{timestamp}.db" backup_path = backup_dir / backup_filename # Copy database file db_path = settings.database_url.replace("sqlite:///", "") if os.path.exists(db_path): shutil.copy2(db_path, backup_path) # Get backup info backup_size = os.path.getsize(backup_path) return { "message": "Backup created successfully", "backup_info": { "filename": backup_filename, "size": f"{backup_size / (1024*1024):.1f} MB", "created_at": datetime.now().isoformat(), "backup_type": "manual", "status": "completed" } } else: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Database file not found" ) except Exception as e: raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Backup creation failed: {str(e)}" ) @router.get("/backup/list") async def list_backups( db: Session = Depends(get_db), current_user: User = Depends(get_admin_user) ): """List available backups""" backup_dir = Path("backups") if not backup_dir.exists(): return {"backups": []} backups = [] backup_files = list(backup_dir.glob("*.db")) for backup_file in sorted(backup_files, key=lambda p: p.stat().st_mtime, reverse=True): stat_info = backup_file.stat() backups.append({ "filename": backup_file.name, "size": f"{stat_info.st_size / (1024*1024):.1f} MB", "created_at": datetime.fromtimestamp(stat_info.st_mtime).isoformat(), "backup_type": "manual" if "backup_" in backup_file.name else "automatic", "status": "completed" }) return {"backups": backups} # Audit Logging and Activity Monitoring @router.get("/audit/logs") async def get_audit_logs( skip: int = Query(0, ge=0), limit: int = Query(100, ge=1, le=1000), user_id: Optional[int] = Query(None), resource_type: Optional[str] = Query(None), action: Optional[str] = Query(None), hours_back: int = Query(168, ge=1, le=8760), # Default 7 days, max 1 year db: Session = Depends(get_db), current_user: User = Depends(get_admin_user) ): """Get audit log entries with filtering""" cutoff_time = datetime.now() - timedelta(hours=hours_back) query = db.query(AuditLog).filter(AuditLog.timestamp >= cutoff_time) if user_id: query = query.filter(AuditLog.user_id == user_id) if resource_type: query = query.filter(AuditLog.resource_type.ilike(f"%{resource_type}%")) if action: query = query.filter(AuditLog.action.ilike(f"%{action}%")) total_count = query.count() logs = query.order_by(AuditLog.timestamp.desc()).offset(skip).limit(limit).all() return { "total": total_count, "logs": [ { "id": log.id, "user_id": log.user_id, "username": log.username, "action": log.action, "resource_type": log.resource_type, "resource_id": log.resource_id, "details": log.details, "ip_address": log.ip_address, "user_agent": log.user_agent, "timestamp": log.timestamp.isoformat() } for log in logs ] } @router.get("/audit/login-attempts") async def get_login_attempts( skip: int = Query(0, ge=0), limit: int = Query(100, ge=1, le=1000), username: Optional[str] = Query(None), failed_only: bool = Query(False), hours_back: int = Query(168, ge=1, le=8760), # Default 7 days db: Session = Depends(get_db), current_user: User = Depends(get_admin_user) ): """Get login attempts with filtering""" cutoff_time = datetime.now() - timedelta(hours=hours_back) query = db.query(LoginAttempt).filter(LoginAttempt.timestamp >= cutoff_time) if username: query = query.filter(LoginAttempt.username.ilike(f"%{username}%")) if failed_only: query = query.filter(LoginAttempt.success == 0) total_count = query.count() attempts = query.order_by(LoginAttempt.timestamp.desc()).offset(skip).limit(limit).all() return { "total": total_count, "attempts": [ { "id": attempt.id, "username": attempt.username, "ip_address": attempt.ip_address, "user_agent": attempt.user_agent, "success": bool(attempt.success), "failure_reason": attempt.failure_reason, "timestamp": attempt.timestamp.isoformat() } for attempt in attempts ] } @router.get("/audit/user-activity/{user_id}") async def get_user_activity( user_id: int, limit: int = Query(100, ge=1, le=500), db: Session = Depends(get_db), current_user: User = Depends(get_admin_user) ): """Get activity for a specific user""" # Verify user exists user = db.query(User).filter(User.id == user_id).first() if not user: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="User not found" ) logs = audit_service.get_user_activity(db, user_id, limit) return { "user": { "id": user.id, "username": user.username, "email": user.email, "first_name": user.first_name, "last_name": user.last_name }, "activity": [ { "id": log.id, "action": log.action, "resource_type": log.resource_type, "resource_id": log.resource_id, "details": log.details, "ip_address": log.ip_address, "timestamp": log.timestamp.isoformat() } for log in logs ] } @router.get("/audit/security-alerts") async def get_security_alerts( hours_back: int = Query(24, ge=1, le=168), # Default 24 hours, max 7 days db: Session = Depends(get_db), current_user: User = Depends(get_admin_user) ): """Get security alerts and suspicious activity""" cutoff_time = datetime.now() - timedelta(hours=hours_back) # Get failed login attempts failed_logins = db.query(LoginAttempt).filter( LoginAttempt.success == 0, LoginAttempt.timestamp >= cutoff_time ).order_by(LoginAttempt.timestamp.desc()).all() # Group failed logins by IP and username for analysis failed_by_ip = {} failed_by_username = {} for attempt in failed_logins: # Group by IP if attempt.ip_address not in failed_by_ip: failed_by_ip[attempt.ip_address] = [] failed_by_ip[attempt.ip_address].append(attempt) # Group by username if attempt.username not in failed_by_username: failed_by_username[attempt.username] = [] failed_by_username[attempt.username].append(attempt) alerts = [] # Check for suspicious IPs (multiple failed attempts) for ip, attempts in failed_by_ip.items(): if len(attempts) >= 5: # Threshold for suspicious activity alerts.append({ "type": "SUSPICIOUS_IP", "severity": "HIGH" if len(attempts) >= 10 else "MEDIUM", "description": f"IP {ip} had {len(attempts)} failed login attempts", "details": { "ip_address": ip, "failed_attempts": len(attempts), "usernames_targeted": list(set(a.username for a in attempts)), "time_range": f"{attempts[-1].timestamp.isoformat()} to {attempts[0].timestamp.isoformat()}" } }) # Check for targeted usernames (multiple failed attempts on same account) for username, attempts in failed_by_username.items(): if len(attempts) >= 3: # Threshold for account targeting alerts.append({ "type": "ACCOUNT_TARGETED", "severity": "HIGH" if len(attempts) >= 5 else "MEDIUM", "description": f"Username '{username}' had {len(attempts)} failed login attempts", "details": { "username": username, "failed_attempts": len(attempts), "source_ips": list(set(a.ip_address for a in attempts)), "time_range": f"{attempts[-1].timestamp.isoformat()} to {attempts[0].timestamp.isoformat()}" } }) # Get recent admin actions admin_actions = db.query(AuditLog).filter( AuditLog.timestamp >= cutoff_time, AuditLog.action.in_(["DELETE", "DEACTIVATE", "RESET_PASSWORD", "GRANT_ADMIN"]) ).order_by(AuditLog.timestamp.desc()).limit(10).all() # Add alerts for sensitive admin actions for action in admin_actions: if action.action in ["DELETE", "DEACTIVATE"]: alerts.append({ "type": "ADMIN_ACTION", "severity": "MEDIUM", "description": f"Admin {action.username} performed {action.action} on {action.resource_type}", "details": { "admin_user": action.username, "action": action.action, "resource_type": action.resource_type, "resource_id": action.resource_id, "timestamp": action.timestamp.isoformat() } }) return { "alert_summary": { "total_alerts": len(alerts), "high_severity": len([a for a in alerts if a["severity"] == "HIGH"]), "medium_severity": len([a for a in alerts if a["severity"] == "MEDIUM"]), "failed_logins_total": len(failed_logins) }, "alerts": alerts[:20], # Return top 20 alerts "recent_failed_logins": [ { "username": attempt.username, "ip_address": attempt.ip_address, "failure_reason": attempt.failure_reason, "timestamp": attempt.timestamp.isoformat() } for attempt in failed_logins[:10] ] } @router.get("/audit/statistics") async def get_audit_statistics( days_back: int = Query(30, ge=1, le=365), # Default 30 days db: Session = Depends(get_db), current_user: User = Depends(get_admin_user) ): """Get audit statistics and metrics""" cutoff_time = datetime.now() - timedelta(days=days_back) # Total activity counts total_audit_entries = db.query(func.count(AuditLog.id)).filter( AuditLog.timestamp >= cutoff_time ).scalar() total_login_attempts = db.query(func.count(LoginAttempt.id)).filter( LoginAttempt.timestamp >= cutoff_time ).scalar() successful_logins = db.query(func.count(LoginAttempt.id)).filter( LoginAttempt.timestamp >= cutoff_time, LoginAttempt.success == 1 ).scalar() failed_logins = db.query(func.count(LoginAttempt.id)).filter( LoginAttempt.timestamp >= cutoff_time, LoginAttempt.success == 0 ).scalar() # Activity by action type activity_by_action = db.query( AuditLog.action, func.count(AuditLog.id).label('count') ).filter( AuditLog.timestamp >= cutoff_time ).group_by(AuditLog.action).all() # Activity by resource type activity_by_resource = db.query( AuditLog.resource_type, func.count(AuditLog.id).label('count') ).filter( AuditLog.timestamp >= cutoff_time ).group_by(AuditLog.resource_type).all() # Most active users most_active_users = db.query( AuditLog.username, func.count(AuditLog.id).label('count') ).filter( AuditLog.timestamp >= cutoff_time, AuditLog.username != "system" ).group_by(AuditLog.username).order_by(func.count(AuditLog.id).desc()).limit(10).all() return { "period": f"Last {days_back} days", "summary": { "total_audit_entries": total_audit_entries, "total_login_attempts": total_login_attempts, "successful_logins": successful_logins, "failed_logins": failed_logins, "success_rate": round((successful_logins / total_login_attempts * 100) if total_login_attempts > 0 else 0, 1) }, "activity_by_action": [ {"action": action, "count": count} for action, count in activity_by_action ], "activity_by_resource": [ {"resource_type": resource_type, "count": count} for resource_type, count in activity_by_resource ], "most_active_users": [ {"username": username, "activity_count": count} for username, count in most_active_users ] }