""" Audit logging service """ import json from typing import Dict, Any, Optional from datetime import datetime, timedelta from sqlalchemy.orm import Session from fastapi import Request from app.models.audit import AuditLog, LoginAttempt from app.models.user import User from app.core.logging import get_logger logger = get_logger("audit") class AuditService: """Service for handling audit logging""" @staticmethod def log_action( db: Session, action: str, resource_type: str, user: Optional[User] = None, resource_id: Optional[str] = None, details: Optional[Dict[str, Any]] = None, request: Optional[Request] = None ) -> AuditLog: """ Log an action to the audit trail Args: db: Database session action: Action performed (CREATE, UPDATE, DELETE, LOGIN, etc.) resource_type: Type of resource affected user: User who performed the action (None for system actions) resource_id: ID of the affected resource details: Additional details as dictionary request: FastAPI request object for IP and user agent Returns: AuditLog: The created audit log entry """ # Extract IP and user agent from request ip_address = None user_agent = None if request: # Get real IP address, accounting for proxies forwarded_for = request.headers.get("X-Forwarded-For") if forwarded_for: ip_address = forwarded_for.split(",")[0].strip() else: ip_address = getattr(request.client, 'host', None) user_agent = request.headers.get("User-Agent") audit_log = AuditLog( user_id=user.id if user else None, username=user.username if user else "system", action=action.upper(), resource_type=resource_type.upper(), resource_id=str(resource_id) if resource_id else None, details=details, ip_address=ip_address, user_agent=user_agent, timestamp=datetime.utcnow() ) try: db.add(audit_log) db.commit() db.refresh(audit_log) return audit_log except Exception as e: db.rollback() # Log the error but don't fail the main operation logger.error("Failed to log audit entry", error=str(e), action=action, user_id=user_id) return audit_log @staticmethod def log_login_attempt( db: Session, username: str, success: bool, request: Optional[Request] = None, failure_reason: Optional[str] = None ) -> LoginAttempt: """ Log a login attempt Args: db: Database session username: Username attempted success: Whether the login was successful request: FastAPI request object for IP and user agent failure_reason: Reason for failure if applicable Returns: LoginAttempt: The created login attempt entry """ # Extract IP and user agent from request ip_address = None user_agent = None if request: # Get real IP address, accounting for proxies forwarded_for = request.headers.get("X-Forwarded-For") if forwarded_for: ip_address = forwarded_for.split(",")[0].strip() else: ip_address = getattr(request.client, 'host', None) user_agent = request.headers.get("User-Agent") login_attempt = LoginAttempt( username=username, ip_address=ip_address or "unknown", user_agent=user_agent, success=1 if success else 0, timestamp=datetime.utcnow(), failure_reason=failure_reason if not success else None ) try: db.add(login_attempt) db.commit() db.refresh(login_attempt) return login_attempt except Exception as e: db.rollback() # Log the error but don't fail the main operation logger.error("Failed to log login attempt", error=str(e), username=username, success=success) return login_attempt @staticmethod def log_user_action( db: Session, action: str, target_user: User, acting_user: User, changes: Optional[Dict[str, Any]] = None, request: Optional[Request] = None ) -> AuditLog: """ Log an action performed on a user account Args: db: Database session action: Action performed target_user: User being acted upon acting_user: User performing the action changes: Dictionary of changes made request: FastAPI request object Returns: AuditLog: The created audit log entry """ details = { "target_user_id": target_user.id, "target_username": target_user.username, "target_email": target_user.email } if changes: details["changes"] = changes return AuditService.log_action( db=db, action=action, resource_type="USER", user=acting_user, resource_id=str(target_user.id), details=details, request=request ) @staticmethod def log_system_action( db: Session, action: str, resource_type: str, details: Optional[Dict[str, Any]] = None, request: Optional[Request] = None ) -> AuditLog: """ Log a system-level action (no specific user) Args: db: Database session action: Action performed resource_type: Type of resource affected details: Additional details request: FastAPI request object Returns: AuditLog: The created audit log entry """ return AuditService.log_action( db=db, action=action, resource_type=resource_type, user=None, details=details, request=request ) @staticmethod def get_recent_activity( db: Session, limit: int = 50, user_id: Optional[int] = None, resource_type: Optional[str] = None ) -> list[AuditLog]: """ Get recent audit activity Args: db: Database session limit: Maximum number of entries to return user_id: Filter by specific user resource_type: Filter by resource type Returns: List of recent audit log entries """ query = db.query(AuditLog).order_by(AuditLog.timestamp.desc()) if user_id: query = query.filter(AuditLog.user_id == user_id) if resource_type: query = query.filter(AuditLog.resource_type == resource_type.upper()) return query.limit(limit).all() @staticmethod def get_failed_login_attempts( db: Session, hours: int = 24, username: Optional[str] = None ) -> list[LoginAttempt]: """ Get failed login attempts within specified time period Args: db: Database session hours: Number of hours to look back username: Filter by specific username Returns: List of failed login attempts """ cutoff_time = datetime.utcnow() - timedelta(hours=hours) query = db.query(LoginAttempt).filter( LoginAttempt.success == 0, LoginAttempt.timestamp >= cutoff_time ).order_by(LoginAttempt.timestamp.desc()) if username: query = query.filter(LoginAttempt.username == username) return query.all() @staticmethod def get_user_activity( db: Session, user_id: int, limit: int = 100 ) -> list[AuditLog]: """ Get activity for a specific user Args: db: Database session user_id: User ID to get activity for limit: Maximum number of entries to return Returns: List of audit log entries for the user """ return db.query(AuditLog).filter( AuditLog.user_id == user_id ).order_by(AuditLog.timestamp.desc()).limit(limit).all() # Create global audit service instance audit_service = AuditService()