343 lines
12 KiB
Python
343 lines
12 KiB
Python
"""
|
|
File Security and Validation Utilities
|
|
|
|
Comprehensive security validation for file uploads to prevent:
|
|
- Path traversal attacks
|
|
- File type spoofing
|
|
- DoS attacks via large files
|
|
- Malicious file uploads
|
|
- Directory traversal
|
|
"""
|
|
import os
|
|
import re
|
|
import hashlib
|
|
from pathlib import Path
|
|
from typing import List, Optional, Tuple, Dict, Any
|
|
from fastapi import HTTPException, UploadFile
|
|
|
|
# Try to import python-magic, fall back to extension-based detection
|
|
try:
|
|
import magic
|
|
MAGIC_AVAILABLE = True
|
|
except ImportError:
|
|
MAGIC_AVAILABLE = False
|
|
|
|
# File size limits (bytes)
|
|
MAX_FILE_SIZES = {
|
|
'document': 10 * 1024 * 1024, # 10MB for documents
|
|
'csv': 50 * 1024 * 1024, # 50MB for CSV imports
|
|
'template': 5 * 1024 * 1024, # 5MB for templates
|
|
'image': 2 * 1024 * 1024, # 2MB for images
|
|
'default': 10 * 1024 * 1024, # 10MB default
|
|
}
|
|
|
|
# Allowed MIME types for security
|
|
ALLOWED_MIME_TYPES = {
|
|
'document': {
|
|
'application/pdf',
|
|
'application/msword',
|
|
'application/vnd.openxmlformats-officedocument.wordprocessingml.document',
|
|
},
|
|
'csv': {
|
|
'text/csv',
|
|
'text/plain',
|
|
'application/csv',
|
|
},
|
|
'template': {
|
|
'application/pdf',
|
|
'application/vnd.openxmlformats-officedocument.wordprocessingml.document',
|
|
},
|
|
'image': {
|
|
'image/jpeg',
|
|
'image/png',
|
|
'image/gif',
|
|
'image/webp',
|
|
}
|
|
}
|
|
|
|
# File extensions mapping to categories
|
|
FILE_EXTENSIONS = {
|
|
'document': {'.pdf', '.doc', '.docx'},
|
|
'csv': {'.csv', '.txt'},
|
|
'template': {'.pdf', '.docx'},
|
|
'image': {'.jpg', '.jpeg', '.png', '.gif', '.webp'},
|
|
}
|
|
|
|
# Dangerous file extensions that should never be uploaded
|
|
DANGEROUS_EXTENSIONS = {
|
|
'.exe', '.bat', '.cmd', '.com', '.scr', '.pif', '.vbs', '.js',
|
|
'.jar', '.app', '.deb', '.pkg', '.dmg', '.rpm', '.msi', '.dll',
|
|
'.so', '.dylib', '.sys', '.drv', '.ocx', '.cpl', '.scf', '.lnk',
|
|
'.ps1', '.ps2', '.psc1', '.psc2', '.msh', '.msh1', '.msh2', '.mshxml',
|
|
'.msh1xml', '.msh2xml', '.scf', '.inf', '.reg', '.vb', '.vbe', '.asp',
|
|
'.aspx', '.php', '.jsp', '.jspx', '.py', '.rb', '.pl', '.sh', '.bash'
|
|
}
|
|
|
|
|
|
class FileSecurityValidator:
|
|
"""Comprehensive file security validation"""
|
|
|
|
def __init__(self):
|
|
self.magic_mime = None
|
|
if MAGIC_AVAILABLE:
|
|
try:
|
|
self.magic_mime = magic.Magic(mime=True)
|
|
except Exception:
|
|
self.magic_mime = None
|
|
|
|
def sanitize_filename(self, filename: str) -> str:
|
|
"""Sanitize filename to prevent path traversal and other attacks"""
|
|
if not filename:
|
|
raise HTTPException(status_code=400, detail="Filename cannot be empty")
|
|
|
|
# Remove any path separators and dangerous characters
|
|
filename = os.path.basename(filename)
|
|
filename = re.sub(r'[<>:"/\\|?*\x00-\x1f]', '_', filename)
|
|
|
|
# Remove leading/trailing dots and spaces
|
|
filename = filename.strip('. ')
|
|
|
|
# Ensure filename is not empty after sanitization
|
|
if not filename:
|
|
raise HTTPException(status_code=400, detail="Invalid filename")
|
|
|
|
# Limit filename length
|
|
if len(filename) > 255:
|
|
name, ext = os.path.splitext(filename)
|
|
filename = name[:250] + ext
|
|
|
|
return filename
|
|
|
|
def validate_file_extension(self, filename: str, category: str) -> str:
|
|
"""Validate file extension against allowed types"""
|
|
if not filename:
|
|
raise HTTPException(status_code=400, detail="Filename required")
|
|
|
|
# Get file extension
|
|
_, ext = os.path.splitext(filename.lower())
|
|
|
|
# Check for dangerous extensions
|
|
if ext in DANGEROUS_EXTENSIONS:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail=f"File type '{ext}' is not allowed for security reasons"
|
|
)
|
|
|
|
# Check against allowed extensions for category
|
|
allowed_extensions = FILE_EXTENSIONS.get(category, set())
|
|
if ext not in allowed_extensions:
|
|
# Standardized message expected by tests
|
|
raise HTTPException(status_code=400, detail="Invalid file type")
|
|
|
|
return ext
|
|
|
|
def _detect_mime_from_content(self, content: bytes, filename: str) -> str:
|
|
"""Detect MIME type from file content or extension"""
|
|
if self.magic_mime:
|
|
try:
|
|
return self.magic_mime.from_buffer(content)
|
|
except Exception:
|
|
pass
|
|
|
|
# Fallback to extension-based detection and basic content inspection
|
|
_, ext = os.path.splitext(filename.lower())
|
|
|
|
# Basic content-based detection for common file types
|
|
if content.startswith(b'%PDF'):
|
|
return 'application/pdf'
|
|
elif content.startswith(b'PK\x03\x04') and ext in ['.docx', '.xlsx', '.pptx']:
|
|
if ext == '.docx':
|
|
return 'application/vnd.openxmlformats-officedocument.wordprocessingml.document'
|
|
elif content.startswith(b'\xd0\xcf\x11\xe0') and ext in ['.doc', '.xls', '.ppt']:
|
|
if ext == '.doc':
|
|
return 'application/msword'
|
|
elif content.startswith(b'\xff\xd8\xff'):
|
|
return 'image/jpeg'
|
|
elif content.startswith(b'\x89PNG'):
|
|
return 'image/png'
|
|
elif content.startswith(b'GIF8'):
|
|
return 'image/gif'
|
|
elif content.startswith(b'RIFF') and b'WEBP' in content[:20]:
|
|
return 'image/webp'
|
|
|
|
# Extension-based fallback
|
|
extension_to_mime = {
|
|
'.pdf': 'application/pdf',
|
|
'.doc': 'application/msword',
|
|
'.docx': 'application/vnd.openxmlformats-officedocument.wordprocessingml.document',
|
|
'.csv': 'text/csv',
|
|
'.txt': 'text/plain',
|
|
'.jpg': 'image/jpeg',
|
|
'.jpeg': 'image/jpeg',
|
|
'.png': 'image/png',
|
|
'.gif': 'image/gif',
|
|
'.webp': 'image/webp',
|
|
}
|
|
|
|
return extension_to_mime.get(ext, 'application/octet-stream')
|
|
|
|
def validate_mime_type(self, content: bytes, filename: str, category: str) -> str:
|
|
"""Validate MIME type using content inspection and file extension"""
|
|
if not content:
|
|
raise HTTPException(status_code=400, detail="File content is empty")
|
|
|
|
# Detect MIME type
|
|
detected_mime = self._detect_mime_from_content(content, filename)
|
|
|
|
# Check against allowed MIME types
|
|
allowed_mimes = ALLOWED_MIME_TYPES.get(category, set())
|
|
if detected_mime not in allowed_mimes:
|
|
# Standardized message expected by tests
|
|
raise HTTPException(status_code=400, detail="Invalid file type")
|
|
|
|
return detected_mime
|
|
|
|
def validate_file_size(self, content: bytes, category: str) -> int:
|
|
"""Validate file size against limits"""
|
|
size = len(content)
|
|
max_size = MAX_FILE_SIZES.get(category, MAX_FILE_SIZES['default'])
|
|
|
|
if size == 0:
|
|
# Standardized message expected by tests
|
|
raise HTTPException(status_code=400, detail="No file uploaded")
|
|
|
|
if size > max_size:
|
|
# Standardized message expected by tests
|
|
raise HTTPException(status_code=400, detail="File too large")
|
|
|
|
return size
|
|
|
|
def scan_for_malware_patterns(self, content: bytes, filename: str) -> None:
|
|
"""Basic malware pattern detection"""
|
|
# Check for common malware signatures
|
|
malware_patterns = [
|
|
b'<script',
|
|
b'javascript:',
|
|
b'vbscript:',
|
|
b'data:text/html',
|
|
b'<?php',
|
|
b'<% ',
|
|
b'eval(',
|
|
b'exec(',
|
|
b'system(',
|
|
b'shell_exec(',
|
|
b'passthru(',
|
|
b'cmd.exe',
|
|
b'powershell',
|
|
]
|
|
|
|
content_lower = content.lower()
|
|
for pattern in malware_patterns:
|
|
if pattern in content_lower:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail=f"File contains potentially malicious content and cannot be uploaded"
|
|
)
|
|
|
|
def generate_secure_path(self, base_dir: str, filename: str, subdir: Optional[str] = None) -> str:
|
|
"""Generate secure file path preventing directory traversal"""
|
|
# Sanitize filename
|
|
safe_filename = self.sanitize_filename(filename)
|
|
|
|
# Build path components
|
|
path_parts = [base_dir]
|
|
if subdir:
|
|
# Sanitize subdirectory name
|
|
safe_subdir = re.sub(r'[^a-zA-Z0-9_-]', '_', subdir)
|
|
path_parts.append(safe_subdir)
|
|
path_parts.append(safe_filename)
|
|
|
|
# Use Path to safely join and resolve
|
|
full_path = Path(*path_parts).resolve()
|
|
base_path = Path(base_dir).resolve()
|
|
|
|
# Ensure the resolved path is within the base directory
|
|
if not str(full_path).startswith(str(base_path)):
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail="Invalid file path - directory traversal detected"
|
|
)
|
|
|
|
return str(full_path)
|
|
|
|
async def validate_upload_file(
|
|
self,
|
|
file: UploadFile,
|
|
category: str,
|
|
max_size_override: Optional[int] = None
|
|
) -> Tuple[bytes, str, str, str]:
|
|
"""
|
|
Comprehensive validation of uploaded file
|
|
|
|
Returns: (content, sanitized_filename, file_extension, mime_type)
|
|
"""
|
|
# Check if file was uploaded
|
|
if not file.filename:
|
|
raise HTTPException(status_code=400, detail="No file uploaded")
|
|
|
|
# Read file content
|
|
content = await file.read()
|
|
|
|
# Validate file size
|
|
if max_size_override:
|
|
max_size = max_size_override
|
|
if len(content) > max_size:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail=f"File size exceeds limit ({max_size:,} bytes)"
|
|
)
|
|
else:
|
|
size = self.validate_file_size(content, category)
|
|
|
|
# Sanitize filename
|
|
safe_filename = self.sanitize_filename(file.filename)
|
|
|
|
# Validate file extension
|
|
file_ext = self.validate_file_extension(safe_filename, category)
|
|
|
|
# Validate MIME type using actual file content
|
|
mime_type = self.validate_mime_type(content, safe_filename, category)
|
|
|
|
# Scan for malware patterns
|
|
self.scan_for_malware_patterns(content, safe_filename)
|
|
|
|
return content, safe_filename, file_ext, mime_type
|
|
|
|
|
|
# Global instance for use across the application
|
|
file_validator = FileSecurityValidator()
|
|
|
|
|
|
def validate_csv_content(content: str) -> None:
|
|
"""Additional validation for CSV content"""
|
|
# Check for SQL injection patterns in CSV content
|
|
sql_patterns = [
|
|
r'(union\s+select)',
|
|
r'(drop\s+table)',
|
|
r'(delete\s+from)',
|
|
r'(insert\s+into)',
|
|
r'(update\s+.*set)',
|
|
r'(exec\s*\()',
|
|
r'(<script)',
|
|
r'(javascript:)',
|
|
]
|
|
|
|
content_lower = content.lower()
|
|
for pattern in sql_patterns:
|
|
if re.search(pattern, content_lower):
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail="CSV content contains potentially malicious data"
|
|
)
|
|
|
|
|
|
def create_upload_directory(path: str) -> None:
|
|
"""Safely create upload directory with proper permissions"""
|
|
try:
|
|
os.makedirs(path, mode=0o755, exist_ok=True)
|
|
except OSError as e:
|
|
raise HTTPException(
|
|
status_code=500,
|
|
detail=f"Could not create upload directory: {str(e)}"
|
|
)
|