Files
delphi-database/app/utils/file_security.py
HotSwapp bac8cc4bd5 changes
2025-08-18 20:20:04 -05:00

343 lines
12 KiB
Python

"""
File Security and Validation Utilities
Comprehensive security validation for file uploads to prevent:
- Path traversal attacks
- File type spoofing
- DoS attacks via large files
- Malicious file uploads
- Directory traversal
"""
import os
import re
import hashlib
from pathlib import Path
from typing import List, Optional, Tuple, Dict, Any
from fastapi import HTTPException, UploadFile
# Try to import python-magic, fall back to extension-based detection
try:
import magic
MAGIC_AVAILABLE = True
except ImportError:
MAGIC_AVAILABLE = False
# File size limits (bytes)
MAX_FILE_SIZES = {
'document': 10 * 1024 * 1024, # 10MB for documents
'csv': 50 * 1024 * 1024, # 50MB for CSV imports
'template': 5 * 1024 * 1024, # 5MB for templates
'image': 2 * 1024 * 1024, # 2MB for images
'default': 10 * 1024 * 1024, # 10MB default
}
# Allowed MIME types for security
ALLOWED_MIME_TYPES = {
'document': {
'application/pdf',
'application/msword',
'application/vnd.openxmlformats-officedocument.wordprocessingml.document',
},
'csv': {
'text/csv',
'text/plain',
'application/csv',
},
'template': {
'application/pdf',
'application/vnd.openxmlformats-officedocument.wordprocessingml.document',
},
'image': {
'image/jpeg',
'image/png',
'image/gif',
'image/webp',
}
}
# File extensions mapping to categories
FILE_EXTENSIONS = {
'document': {'.pdf', '.doc', '.docx'},
'csv': {'.csv', '.txt'},
'template': {'.pdf', '.docx'},
'image': {'.jpg', '.jpeg', '.png', '.gif', '.webp'},
}
# Dangerous file extensions that should never be uploaded
DANGEROUS_EXTENSIONS = {
'.exe', '.bat', '.cmd', '.com', '.scr', '.pif', '.vbs', '.js',
'.jar', '.app', '.deb', '.pkg', '.dmg', '.rpm', '.msi', '.dll',
'.so', '.dylib', '.sys', '.drv', '.ocx', '.cpl', '.scf', '.lnk',
'.ps1', '.ps2', '.psc1', '.psc2', '.msh', '.msh1', '.msh2', '.mshxml',
'.msh1xml', '.msh2xml', '.scf', '.inf', '.reg', '.vb', '.vbe', '.asp',
'.aspx', '.php', '.jsp', '.jspx', '.py', '.rb', '.pl', '.sh', '.bash'
}
class FileSecurityValidator:
"""Comprehensive file security validation"""
def __init__(self):
self.magic_mime = None
if MAGIC_AVAILABLE:
try:
self.magic_mime = magic.Magic(mime=True)
except Exception:
self.magic_mime = None
def sanitize_filename(self, filename: str) -> str:
"""Sanitize filename to prevent path traversal and other attacks"""
if not filename:
raise HTTPException(status_code=400, detail="Filename cannot be empty")
# Remove any path separators and dangerous characters
filename = os.path.basename(filename)
filename = re.sub(r'[<>:"/\\|?*\x00-\x1f]', '_', filename)
# Remove leading/trailing dots and spaces
filename = filename.strip('. ')
# Ensure filename is not empty after sanitization
if not filename:
raise HTTPException(status_code=400, detail="Invalid filename")
# Limit filename length
if len(filename) > 255:
name, ext = os.path.splitext(filename)
filename = name[:250] + ext
return filename
def validate_file_extension(self, filename: str, category: str) -> str:
"""Validate file extension against allowed types"""
if not filename:
raise HTTPException(status_code=400, detail="Filename required")
# Get file extension
_, ext = os.path.splitext(filename.lower())
# Check for dangerous extensions
if ext in DANGEROUS_EXTENSIONS:
raise HTTPException(
status_code=400,
detail=f"File type '{ext}' is not allowed for security reasons"
)
# Check against allowed extensions for category
allowed_extensions = FILE_EXTENSIONS.get(category, set())
if ext not in allowed_extensions:
# Standardized message expected by tests
raise HTTPException(status_code=400, detail="Invalid file type")
return ext
def _detect_mime_from_content(self, content: bytes, filename: str) -> str:
"""Detect MIME type from file content or extension"""
if self.magic_mime:
try:
return self.magic_mime.from_buffer(content)
except Exception:
pass
# Fallback to extension-based detection and basic content inspection
_, ext = os.path.splitext(filename.lower())
# Basic content-based detection for common file types
if content.startswith(b'%PDF'):
return 'application/pdf'
elif content.startswith(b'PK\x03\x04') and ext in ['.docx', '.xlsx', '.pptx']:
if ext == '.docx':
return 'application/vnd.openxmlformats-officedocument.wordprocessingml.document'
elif content.startswith(b'\xd0\xcf\x11\xe0') and ext in ['.doc', '.xls', '.ppt']:
if ext == '.doc':
return 'application/msword'
elif content.startswith(b'\xff\xd8\xff'):
return 'image/jpeg'
elif content.startswith(b'\x89PNG'):
return 'image/png'
elif content.startswith(b'GIF8'):
return 'image/gif'
elif content.startswith(b'RIFF') and b'WEBP' in content[:20]:
return 'image/webp'
# Extension-based fallback
extension_to_mime = {
'.pdf': 'application/pdf',
'.doc': 'application/msword',
'.docx': 'application/vnd.openxmlformats-officedocument.wordprocessingml.document',
'.csv': 'text/csv',
'.txt': 'text/plain',
'.jpg': 'image/jpeg',
'.jpeg': 'image/jpeg',
'.png': 'image/png',
'.gif': 'image/gif',
'.webp': 'image/webp',
}
return extension_to_mime.get(ext, 'application/octet-stream')
def validate_mime_type(self, content: bytes, filename: str, category: str) -> str:
"""Validate MIME type using content inspection and file extension"""
if not content:
raise HTTPException(status_code=400, detail="File content is empty")
# Detect MIME type
detected_mime = self._detect_mime_from_content(content, filename)
# Check against allowed MIME types
allowed_mimes = ALLOWED_MIME_TYPES.get(category, set())
if detected_mime not in allowed_mimes:
# Standardized message expected by tests
raise HTTPException(status_code=400, detail="Invalid file type")
return detected_mime
def validate_file_size(self, content: bytes, category: str) -> int:
"""Validate file size against limits"""
size = len(content)
max_size = MAX_FILE_SIZES.get(category, MAX_FILE_SIZES['default'])
if size == 0:
# Standardized message expected by tests
raise HTTPException(status_code=400, detail="No file uploaded")
if size > max_size:
# Standardized message expected by tests
raise HTTPException(status_code=400, detail="File too large")
return size
def scan_for_malware_patterns(self, content: bytes, filename: str) -> None:
"""Basic malware pattern detection"""
# Check for common malware signatures
malware_patterns = [
b'<script',
b'javascript:',
b'vbscript:',
b'data:text/html',
b'<?php',
b'<% ',
b'eval(',
b'exec(',
b'system(',
b'shell_exec(',
b'passthru(',
b'cmd.exe',
b'powershell',
]
content_lower = content.lower()
for pattern in malware_patterns:
if pattern in content_lower:
raise HTTPException(
status_code=400,
detail=f"File contains potentially malicious content and cannot be uploaded"
)
def generate_secure_path(self, base_dir: str, filename: str, subdir: Optional[str] = None) -> str:
"""Generate secure file path preventing directory traversal"""
# Sanitize filename
safe_filename = self.sanitize_filename(filename)
# Build path components
path_parts = [base_dir]
if subdir:
# Sanitize subdirectory name
safe_subdir = re.sub(r'[^a-zA-Z0-9_-]', '_', subdir)
path_parts.append(safe_subdir)
path_parts.append(safe_filename)
# Use Path to safely join and resolve
full_path = Path(*path_parts).resolve()
base_path = Path(base_dir).resolve()
# Ensure the resolved path is within the base directory
if not str(full_path).startswith(str(base_path)):
raise HTTPException(
status_code=400,
detail="Invalid file path - directory traversal detected"
)
return str(full_path)
async def validate_upload_file(
self,
file: UploadFile,
category: str,
max_size_override: Optional[int] = None
) -> Tuple[bytes, str, str, str]:
"""
Comprehensive validation of uploaded file
Returns: (content, sanitized_filename, file_extension, mime_type)
"""
# Check if file was uploaded
if not file.filename:
raise HTTPException(status_code=400, detail="No file uploaded")
# Read file content
content = await file.read()
# Validate file size
if max_size_override:
max_size = max_size_override
if len(content) > max_size:
raise HTTPException(
status_code=400,
detail=f"File size exceeds limit ({max_size:,} bytes)"
)
else:
size = self.validate_file_size(content, category)
# Sanitize filename
safe_filename = self.sanitize_filename(file.filename)
# Validate file extension
file_ext = self.validate_file_extension(safe_filename, category)
# Validate MIME type using actual file content
mime_type = self.validate_mime_type(content, safe_filename, category)
# Scan for malware patterns
self.scan_for_malware_patterns(content, safe_filename)
return content, safe_filename, file_ext, mime_type
# Global instance for use across the application
file_validator = FileSecurityValidator()
def validate_csv_content(content: str) -> None:
"""Additional validation for CSV content"""
# Check for SQL injection patterns in CSV content
sql_patterns = [
r'(union\s+select)',
r'(drop\s+table)',
r'(delete\s+from)',
r'(insert\s+into)',
r'(update\s+.*set)',
r'(exec\s*\()',
r'(<script)',
r'(javascript:)',
]
content_lower = content.lower()
for pattern in sql_patterns:
if re.search(pattern, content_lower):
raise HTTPException(
status_code=400,
detail="CSV content contains potentially malicious data"
)
def create_upload_directory(path: str) -> None:
"""Safely create upload directory with proper permissions"""
try:
os.makedirs(path, mode=0o755, exist_ok=True)
except OSError as e:
raise HTTPException(
status_code=500,
detail=f"Could not create upload directory: {str(e)}"
)