""" Security audit utility for credential validation and security best practices. """ import re import hashlib import secrets from typing import List, Dict, Any, Optional, Tuple from pathlib import Path from dataclasses import dataclass from enum import Enum import ast from app.utils.logging import security_logger class SecurityLevel(Enum): """Security issue severity levels.""" CRITICAL = "critical" HIGH = "high" MEDIUM = "medium" LOW = "low" INFO = "info" @dataclass class SecurityFinding: """Represents a security finding from code analysis.""" file_path: str line_number: int issue_type: str severity: SecurityLevel description: str recommendation: str code_snippet: str = "" class CredentialValidator: """Utility for detecting hardcoded credentials and security issues.""" # Patterns for detecting potential hardcoded credentials CREDENTIAL_PATTERNS = { 'password': [ r'password\s*=\s*["\'][^"\']+["\']', r'passwd\s*=\s*["\'][^"\']+["\']', r'pwd\s*=\s*["\'][^"\']+["\']', ], 'api_key': [ r'api_key\s*=\s*["\'][^"\']+["\']', r'apikey\s*=\s*["\'][^"\']+["\']', r'key\s*=\s*["\'][A-Za-z0-9]{20,}["\']', ], 'token': [ r'token\s*=\s*["\'][^"\']+["\']', r'access_token\s*=\s*["\'][^"\']+["\']', r'auth_token\s*=\s*["\'][^"\']+["\']', ], 'secret': [ r'secret\s*=\s*["\'][^"\']+["\']', r'secret_key\s*=\s*["\'][^"\']+["\']', r'client_secret\s*=\s*["\'][^"\']+["\']', ], 'database_url': [ r'database_url\s*=\s*["\'][^"\']*://[^"\']+["\']', r'db_url\s*=\s*["\'][^"\']*://[^"\']+["\']', r'connection_string\s*=\s*["\'][^"\']*://[^"\']+["\']', ], 'private_key': [ r'private_key\s*=\s*["\'][^"\']+["\']', r'-----BEGIN\s+(?:RSA\s+)?PRIVATE\s+KEY-----', ] } # Safe patterns that should not be flagged SAFE_PATTERNS = [ r'password\s*=\s*["\']os\.getenv\(', r'password\s*=\s*["\']settings\.', r'password\s*=\s*["\']config\.', r'password\s*=\s*["\']env\.', r'password\s*=\s*["\'].*\{\}.*["\']', # Template strings r'password\s*=\s*["\'].*%s.*["\']', # Format strings ] # Patterns for other security issues SECURITY_PATTERNS = { 'sql_injection': [ r'\.execute\s*\(\s*["\'][^"\']*\+[^"\']*["\']', # String concatenation in SQL r'\.execute\s*\(\s*f["\'][^"\']*\{[^}]+\}[^"\']*["\']', # f-string in SQL r'\.execute\s*\(\s*["\'][^"\']*%[^"\']*["\']', # % formatting in SQL ], 'hardcoded_ip': [ r'["\'](?:(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)["\']', ], 'debug_mode': [ r'debug\s*=\s*True', r'DEBUG\s*=\s*True', ], 'weak_crypto': [ r'hashlib\.md5\(', r'hashlib\.sha1\(', ] } def __init__(self): self.findings: List[SecurityFinding] = [] def scan_file(self, file_path: Path) -> List[SecurityFinding]: """ Scan a single file for security issues. Args: file_path: Path to the file to scan Returns: List of security findings """ findings = [] try: with open(file_path, 'r', encoding='utf-8') as f: lines = f.readlines() for line_num, line in enumerate(lines, 1): # Check for credential patterns for cred_type, patterns in self.CREDENTIAL_PATTERNS.items(): for pattern in patterns: if re.search(pattern, line, re.IGNORECASE): # Check if it's a safe pattern is_safe = any(re.search(safe_pattern, line, re.IGNORECASE) for safe_pattern in self.SAFE_PATTERNS) if not is_safe: findings.append(SecurityFinding( file_path=str(file_path), line_number=line_num, issue_type=f"hardcoded_{cred_type}", severity=SecurityLevel.CRITICAL, description=f"Potential hardcoded {cred_type} detected", recommendation=f"Move {cred_type} to environment variables or secure configuration", code_snippet=line.strip() )) # Check for other security patterns for issue_type, patterns in self.SECURITY_PATTERNS.items(): for pattern in patterns: if re.search(pattern, line, re.IGNORECASE): severity = self._get_severity_for_issue(issue_type) findings.append(SecurityFinding( file_path=str(file_path), line_number=line_num, issue_type=issue_type, severity=severity, description=self._get_description_for_issue(issue_type), recommendation=self._get_recommendation_for_issue(issue_type), code_snippet=line.strip() )) except Exception as e: security_logger.error(f"Error scanning file {file_path}: {str(e)}") return findings def scan_directory(self, directory_path: Path, file_pattern: str = "*.py") -> List[SecurityFinding]: """ Scan all files in a directory for security issues. Args: directory_path: Path to the directory to scan file_pattern: File pattern to match (default: *.py) Returns: List of all security findings """ all_findings = [] try: for file_path in directory_path.rglob(file_pattern): if file_path.is_file(): findings = self.scan_file(file_path) all_findings.extend(findings) except Exception as e: security_logger.error(f"Error scanning directory {directory_path}: {str(e)}") return all_findings def _get_severity_for_issue(self, issue_type: str) -> SecurityLevel: """Get severity level for an issue type.""" severity_map = { 'sql_injection': SecurityLevel.CRITICAL, 'hardcoded_ip': SecurityLevel.MEDIUM, 'debug_mode': SecurityLevel.HIGH, 'weak_crypto': SecurityLevel.MEDIUM, } return severity_map.get(issue_type, SecurityLevel.LOW) def _get_description_for_issue(self, issue_type: str) -> str: """Get description for an issue type.""" descriptions = { 'sql_injection': "Potential SQL injection vulnerability detected", 'hardcoded_ip': "Hardcoded IP address found", 'debug_mode': "Debug mode enabled in production code", 'weak_crypto': "Weak cryptographic algorithm detected", } return descriptions.get(issue_type, f"Security issue: {issue_type}") def _get_recommendation_for_issue(self, issue_type: str) -> str: """Get recommendation for an issue type.""" recommendations = { 'sql_injection': "Use parameterized queries or ORM methods to prevent SQL injection", 'hardcoded_ip': "Move IP addresses to configuration files or environment variables", 'debug_mode': "Set debug mode via environment variables, default to False in production", 'weak_crypto': "Use stronger cryptographic algorithms (SHA-256 or better)", } return recommendations.get(issue_type, "Review and address this security concern") def generate_report(self, findings: List[SecurityFinding]) -> Dict[str, Any]: """ Generate a security report from findings. Args: findings: List of security findings Returns: Dictionary containing security report """ report = { 'total_issues': len(findings), 'by_severity': {}, 'by_type': {}, 'files_affected': set(), 'critical_issues': [], 'recommendations': [] } # Count by severity for severity in SecurityLevel: count = len([f for f in findings if f.severity == severity]) if count > 0: report['by_severity'][severity.value] = count # Count by type for finding in findings: if finding.issue_type not in report['by_type']: report['by_type'][finding.issue_type] = 0 report['by_type'][finding.issue_type] += 1 report['files_affected'].add(finding.file_path) if finding.severity in [SecurityLevel.CRITICAL, SecurityLevel.HIGH]: report['critical_issues'].append({ 'file': finding.file_path, 'line': finding.line_number, 'type': finding.issue_type, 'severity': finding.severity.value, 'description': finding.description }) report['files_affected'] = list(report['files_affected']) # Generate summary recommendations if report['by_type']: report['recommendations'] = self._generate_recommendations(report['by_type']) return report def _generate_recommendations(self, issues_by_type: Dict[str, int]) -> List[str]: """Generate summary recommendations based on issue types found.""" recommendations = [] if any('hardcoded' in issue_type for issue_type in issues_by_type): recommendations.append( "Implement a secure configuration management system using environment variables or encrypted config files" ) if 'sql_injection' in issues_by_type: recommendations.append( "Review all database queries and ensure parameterized queries are used consistently" ) if 'debug_mode' in issues_by_type: recommendations.append( "Implement environment-based configuration for debug settings" ) if 'weak_crypto' in issues_by_type: recommendations.append( "Upgrade cryptographic implementations to use stronger algorithms" ) return recommendations class PasswordStrengthValidator: """Utility for validating password strength and generating secure passwords.""" def __init__(self): self.min_length = 8 self.require_uppercase = True self.require_lowercase = True self.require_digits = True self.require_special = True def validate_password_strength(self, password: str) -> Tuple[bool, List[str]]: """ Validate password strength. Args: password: Password to validate Returns: Tuple of (is_valid, list_of_issues) """ issues = [] if len(password) < self.min_length: issues.append(f"Password must be at least {self.min_length} characters long") if self.require_uppercase and not re.search(r'[A-Z]', password): issues.append("Password must contain at least one uppercase letter") if self.require_lowercase and not re.search(r'[a-z]', password): issues.append("Password must contain at least one lowercase letter") if self.require_digits and not re.search(r'\d', password): issues.append("Password must contain at least one digit") if self.require_special and not re.search(r'[!@#$%^&*(),.?":{}|<>]', password): issues.append("Password must contain at least one special character") return len(issues) == 0, issues def generate_secure_password(self, length: int = 16) -> str: """ Generate a cryptographically secure password. Args: length: Length of password to generate Returns: Secure password string """ import string # Define character sets lowercase = string.ascii_lowercase uppercase = string.ascii_uppercase digits = string.digits special = "!@#$%^&*(),.?\":{}|<>" # Ensure at least one character from each required set password_chars = [] if self.require_lowercase: password_chars.append(secrets.choice(lowercase)) if self.require_uppercase: password_chars.append(secrets.choice(uppercase)) if self.require_digits: password_chars.append(secrets.choice(digits)) if self.require_special: password_chars.append(secrets.choice(special)) # Fill remaining length with random characters from all sets all_chars = lowercase + uppercase + digits + special for _ in range(length - len(password_chars)): password_chars.append(secrets.choice(all_chars)) # Shuffle the password characters secrets.SystemRandom().shuffle(password_chars) return ''.join(password_chars) def audit_code_security(directory_path: str, file_pattern: str = "*.py") -> Dict[str, Any]: """ Perform a comprehensive security audit of code in a directory. Args: directory_path: Path to the directory to audit file_pattern: File pattern to match (default: *.py) Returns: Security audit report """ validator = CredentialValidator() path = Path(directory_path) security_logger.info(f"Starting security audit of {directory_path}") findings = validator.scan_directory(path, file_pattern) report = validator.generate_report(findings) security_logger.info(f"Security audit completed", **{ "total_issues": report['total_issues'], "files_scanned": len(report['files_affected']), "critical_issues": len(report['critical_issues']) }) return report def hash_password_securely(password: str) -> str: """ Hash a password using a secure algorithm. Args: password: Plain text password Returns: Securely hashed password """ import bcrypt # Generate salt and hash password salt = bcrypt.gensalt() hashed = bcrypt.hashpw(password.encode('utf-8'), salt) return hashed.decode('utf-8') def verify_password(password: str, hashed: str) -> bool: """ Verify a password against its hash. Args: password: Plain text password hashed: Hashed password Returns: True if password matches, False otherwise """ import bcrypt return bcrypt.checkpw(password.encode('utf-8'), hashed.encode('utf-8'))