""" File Security and Validation Utilities Comprehensive security validation for file uploads to prevent: - Path traversal attacks - File type spoofing - DoS attacks via large files - Malicious file uploads - Directory traversal """ import os import re import hashlib from pathlib import Path from typing import List, Optional, Tuple, Dict, Any from fastapi import HTTPException, UploadFile # Try to import python-magic, fall back to extension-based detection try: import magic MAGIC_AVAILABLE = True except ImportError: MAGIC_AVAILABLE = False # File size limits (bytes) MAX_FILE_SIZES = { 'document': 10 * 1024 * 1024, # 10MB for documents 'csv': 50 * 1024 * 1024, # 50MB for CSV imports 'template': 5 * 1024 * 1024, # 5MB for templates 'image': 2 * 1024 * 1024, # 2MB for images 'default': 10 * 1024 * 1024, # 10MB default } # Allowed MIME types for security ALLOWED_MIME_TYPES = { 'document': { 'application/pdf', 'application/msword', 'application/vnd.openxmlformats-officedocument.wordprocessingml.document', }, 'csv': { 'text/csv', 'text/plain', 'application/csv', }, 'template': { 'application/pdf', 'application/vnd.openxmlformats-officedocument.wordprocessingml.document', }, 'image': { 'image/jpeg', 'image/png', 'image/gif', 'image/webp', } } # File extensions mapping to categories FILE_EXTENSIONS = { 'document': {'.pdf', '.doc', '.docx'}, 'csv': {'.csv', '.txt'}, 'template': {'.pdf', '.docx'}, 'image': {'.jpg', '.jpeg', '.png', '.gif', '.webp'}, } # Dangerous file extensions that should never be uploaded DANGEROUS_EXTENSIONS = { '.exe', '.bat', '.cmd', '.com', '.scr', '.pif', '.vbs', '.js', '.jar', '.app', '.deb', '.pkg', '.dmg', '.rpm', '.msi', '.dll', '.so', '.dylib', '.sys', '.drv', '.ocx', '.cpl', '.scf', '.lnk', '.ps1', '.ps2', '.psc1', '.psc2', '.msh', '.msh1', '.msh2', '.mshxml', '.msh1xml', '.msh2xml', '.scf', '.inf', '.reg', '.vb', '.vbe', '.asp', '.aspx', '.php', '.jsp', '.jspx', '.py', '.rb', '.pl', '.sh', '.bash' } class FileSecurityValidator: """Comprehensive file security validation""" def __init__(self): self.magic_mime = None if MAGIC_AVAILABLE: try: self.magic_mime = magic.Magic(mime=True) except Exception: self.magic_mime = None def sanitize_filename(self, filename: str) -> str: """Sanitize filename to prevent path traversal and other attacks""" if not filename: raise HTTPException(status_code=400, detail="Filename cannot be empty") # Remove any path separators and dangerous characters filename = os.path.basename(filename) filename = re.sub(r'[<>:"/\\|?*\x00-\x1f]', '_', filename) # Remove leading/trailing dots and spaces filename = filename.strip('. ') # Ensure filename is not empty after sanitization if not filename: raise HTTPException(status_code=400, detail="Invalid filename") # Limit filename length if len(filename) > 255: name, ext = os.path.splitext(filename) filename = name[:250] + ext return filename def validate_file_extension(self, filename: str, category: str) -> str: """Validate file extension against allowed types""" if not filename: raise HTTPException(status_code=400, detail="Filename required") # Get file extension _, ext = os.path.splitext(filename.lower()) # Check for dangerous extensions if ext in DANGEROUS_EXTENSIONS: raise HTTPException( status_code=400, detail=f"File type '{ext}' is not allowed for security reasons" ) # Check against allowed extensions for category allowed_extensions = FILE_EXTENSIONS.get(category, set()) if ext not in allowed_extensions: # Standardized message expected by tests raise HTTPException(status_code=400, detail="Invalid file type") return ext def _detect_mime_from_content(self, content: bytes, filename: str) -> str: """Detect MIME type from file content or extension""" if self.magic_mime: try: return self.magic_mime.from_buffer(content) except Exception: pass # Fallback to extension-based detection and basic content inspection _, ext = os.path.splitext(filename.lower()) # Basic content-based detection for common file types if content.startswith(b'%PDF'): return 'application/pdf' elif content.startswith(b'PK\x03\x04') and ext in ['.docx', '.xlsx', '.pptx']: if ext == '.docx': return 'application/vnd.openxmlformats-officedocument.wordprocessingml.document' elif content.startswith(b'\xd0\xcf\x11\xe0') and ext in ['.doc', '.xls', '.ppt']: if ext == '.doc': return 'application/msword' elif content.startswith(b'\xff\xd8\xff'): return 'image/jpeg' elif content.startswith(b'\x89PNG'): return 'image/png' elif content.startswith(b'GIF8'): return 'image/gif' elif content.startswith(b'RIFF') and b'WEBP' in content[:20]: return 'image/webp' # Extension-based fallback extension_to_mime = { '.pdf': 'application/pdf', '.doc': 'application/msword', '.docx': 'application/vnd.openxmlformats-officedocument.wordprocessingml.document', '.csv': 'text/csv', '.txt': 'text/plain', '.jpg': 'image/jpeg', '.jpeg': 'image/jpeg', '.png': 'image/png', '.gif': 'image/gif', '.webp': 'image/webp', } return extension_to_mime.get(ext, 'application/octet-stream') def validate_mime_type(self, content: bytes, filename: str, category: str) -> str: """Validate MIME type using content inspection and file extension""" if not content: raise HTTPException(status_code=400, detail="File content is empty") # Detect MIME type detected_mime = self._detect_mime_from_content(content, filename) # Check against allowed MIME types allowed_mimes = ALLOWED_MIME_TYPES.get(category, set()) if detected_mime not in allowed_mimes: # Standardized message expected by tests raise HTTPException(status_code=400, detail="Invalid file type") return detected_mime def validate_file_size(self, content: bytes, category: str) -> int: """Validate file size against limits""" size = len(content) max_size = MAX_FILE_SIZES.get(category, MAX_FILE_SIZES['default']) if size == 0: # Standardized message expected by tests raise HTTPException(status_code=400, detail="No file uploaded") if size > max_size: # Standardized message expected by tests raise HTTPException(status_code=400, detail="File too large") return size def scan_for_malware_patterns(self, content: bytes, filename: str) -> None: """Basic malware pattern detection""" # Check for common malware signatures malware_patterns = [ b' str: """Generate secure file path preventing directory traversal""" # Sanitize filename safe_filename = self.sanitize_filename(filename) # Build path components path_parts = [base_dir] if subdir: # Sanitize subdirectory name safe_subdir = re.sub(r'[^a-zA-Z0-9_-]', '_', subdir) path_parts.append(safe_subdir) path_parts.append(safe_filename) # Use Path to safely join and resolve full_path = Path(*path_parts).resolve() base_path = Path(base_dir).resolve() # Ensure the resolved path is within the base directory if not str(full_path).startswith(str(base_path)): raise HTTPException( status_code=400, detail="Invalid file path - directory traversal detected" ) return str(full_path) async def validate_upload_file( self, file: UploadFile, category: str, max_size_override: Optional[int] = None ) -> Tuple[bytes, str, str, str]: """ Comprehensive validation of uploaded file Returns: (content, sanitized_filename, file_extension, mime_type) """ # Check if file was uploaded if not file.filename: raise HTTPException(status_code=400, detail="No file uploaded") # Read file content content = await file.read() # Validate file size if max_size_override: max_size = max_size_override if len(content) > max_size: raise HTTPException( status_code=400, detail=f"File size exceeds limit ({max_size:,} bytes)" ) else: size = self.validate_file_size(content, category) # Sanitize filename safe_filename = self.sanitize_filename(file.filename) # Validate file extension file_ext = self.validate_file_extension(safe_filename, category) # Validate MIME type using actual file content mime_type = self.validate_mime_type(content, safe_filename, category) # Scan for malware patterns self.scan_for_malware_patterns(content, safe_filename) return content, safe_filename, file_ext, mime_type # Global instance for use across the application file_validator = FileSecurityValidator() def validate_csv_content(content: str) -> None: """Additional validation for CSV content""" # Check for SQL injection patterns in CSV content sql_patterns = [ r'(union\s+select)', r'(drop\s+table)', r'(delete\s+from)', r'(insert\s+into)', r'(update\s+.*set)', r'(exec\s*\()', r'( None: """Safely create upload directory with proper permissions""" try: os.makedirs(path, mode=0o755, exist_ok=True) except OSError as e: raise HTTPException( status_code=500, detail=f"Could not create upload directory: {str(e)}" )