remove old import

This commit is contained in:
HotSwapp
2025-08-14 21:27:34 -05:00
parent bfc04a6909
commit 679ab4446a
17 changed files with 2016 additions and 557 deletions

432
app/utils/security.py Normal file
View File

@@ -0,0 +1,432 @@
"""
Security audit utility for credential validation and security best practices.
"""
import re
import hashlib
import secrets
from typing import List, Dict, Any, Optional, Tuple
from pathlib import Path
from dataclasses import dataclass
from enum import Enum
import ast
from app.utils.logging import security_logger
class SecurityLevel(Enum):
"""Security issue severity levels."""
CRITICAL = "critical"
HIGH = "high"
MEDIUM = "medium"
LOW = "low"
INFO = "info"
@dataclass
class SecurityFinding:
"""Represents a security finding from code analysis."""
file_path: str
line_number: int
issue_type: str
severity: SecurityLevel
description: str
recommendation: str
code_snippet: str = ""
class CredentialValidator:
"""Utility for detecting hardcoded credentials and security issues."""
# Patterns for detecting potential hardcoded credentials
CREDENTIAL_PATTERNS = {
'password': [
r'password\s*=\s*["\'][^"\']+["\']',
r'passwd\s*=\s*["\'][^"\']+["\']',
r'pwd\s*=\s*["\'][^"\']+["\']',
],
'api_key': [
r'api_key\s*=\s*["\'][^"\']+["\']',
r'apikey\s*=\s*["\'][^"\']+["\']',
r'key\s*=\s*["\'][A-Za-z0-9]{20,}["\']',
],
'token': [
r'token\s*=\s*["\'][^"\']+["\']',
r'access_token\s*=\s*["\'][^"\']+["\']',
r'auth_token\s*=\s*["\'][^"\']+["\']',
],
'secret': [
r'secret\s*=\s*["\'][^"\']+["\']',
r'secret_key\s*=\s*["\'][^"\']+["\']',
r'client_secret\s*=\s*["\'][^"\']+["\']',
],
'database_url': [
r'database_url\s*=\s*["\'][^"\']*://[^"\']+["\']',
r'db_url\s*=\s*["\'][^"\']*://[^"\']+["\']',
r'connection_string\s*=\s*["\'][^"\']*://[^"\']+["\']',
],
'private_key': [
r'private_key\s*=\s*["\'][^"\']+["\']',
r'-----BEGIN\s+(?:RSA\s+)?PRIVATE\s+KEY-----',
]
}
# Safe patterns that should not be flagged
SAFE_PATTERNS = [
r'password\s*=\s*["\']os\.getenv\(',
r'password\s*=\s*["\']settings\.',
r'password\s*=\s*["\']config\.',
r'password\s*=\s*["\']env\.',
r'password\s*=\s*["\'].*\{\}.*["\']', # Template strings
r'password\s*=\s*["\'].*%s.*["\']', # Format strings
]
# Patterns for other security issues
SECURITY_PATTERNS = {
'sql_injection': [
r'\.execute\s*\(\s*["\'][^"\']*\+[^"\']*["\']', # String concatenation in SQL
r'\.execute\s*\(\s*f["\'][^"\']*\{[^}]+\}[^"\']*["\']', # f-string in SQL
r'\.execute\s*\(\s*["\'][^"\']*%[^"\']*["\']', # % formatting in SQL
],
'hardcoded_ip': [
r'["\'](?:(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)["\']',
],
'debug_mode': [
r'debug\s*=\s*True',
r'DEBUG\s*=\s*True',
],
'weak_crypto': [
r'hashlib\.md5\(',
r'hashlib\.sha1\(',
]
}
def __init__(self):
self.findings: List[SecurityFinding] = []
def scan_file(self, file_path: Path) -> List[SecurityFinding]:
"""
Scan a single file for security issues.
Args:
file_path: Path to the file to scan
Returns:
List of security findings
"""
findings = []
try:
with open(file_path, 'r', encoding='utf-8') as f:
lines = f.readlines()
for line_num, line in enumerate(lines, 1):
# Check for credential patterns
for cred_type, patterns in self.CREDENTIAL_PATTERNS.items():
for pattern in patterns:
if re.search(pattern, line, re.IGNORECASE):
# Check if it's a safe pattern
is_safe = any(re.search(safe_pattern, line, re.IGNORECASE)
for safe_pattern in self.SAFE_PATTERNS)
if not is_safe:
findings.append(SecurityFinding(
file_path=str(file_path),
line_number=line_num,
issue_type=f"hardcoded_{cred_type}",
severity=SecurityLevel.CRITICAL,
description=f"Potential hardcoded {cred_type} detected",
recommendation=f"Move {cred_type} to environment variables or secure configuration",
code_snippet=line.strip()
))
# Check for other security patterns
for issue_type, patterns in self.SECURITY_PATTERNS.items():
for pattern in patterns:
if re.search(pattern, line, re.IGNORECASE):
severity = self._get_severity_for_issue(issue_type)
findings.append(SecurityFinding(
file_path=str(file_path),
line_number=line_num,
issue_type=issue_type,
severity=severity,
description=self._get_description_for_issue(issue_type),
recommendation=self._get_recommendation_for_issue(issue_type),
code_snippet=line.strip()
))
except Exception as e:
security_logger.error(f"Error scanning file {file_path}: {str(e)}")
return findings
def scan_directory(self, directory_path: Path, file_pattern: str = "*.py") -> List[SecurityFinding]:
"""
Scan all files in a directory for security issues.
Args:
directory_path: Path to the directory to scan
file_pattern: File pattern to match (default: *.py)
Returns:
List of all security findings
"""
all_findings = []
try:
for file_path in directory_path.rglob(file_pattern):
if file_path.is_file():
findings = self.scan_file(file_path)
all_findings.extend(findings)
except Exception as e:
security_logger.error(f"Error scanning directory {directory_path}: {str(e)}")
return all_findings
def _get_severity_for_issue(self, issue_type: str) -> SecurityLevel:
"""Get severity level for an issue type."""
severity_map = {
'sql_injection': SecurityLevel.CRITICAL,
'hardcoded_ip': SecurityLevel.MEDIUM,
'debug_mode': SecurityLevel.HIGH,
'weak_crypto': SecurityLevel.MEDIUM,
}
return severity_map.get(issue_type, SecurityLevel.LOW)
def _get_description_for_issue(self, issue_type: str) -> str:
"""Get description for an issue type."""
descriptions = {
'sql_injection': "Potential SQL injection vulnerability detected",
'hardcoded_ip': "Hardcoded IP address found",
'debug_mode': "Debug mode enabled in production code",
'weak_crypto': "Weak cryptographic algorithm detected",
}
return descriptions.get(issue_type, f"Security issue: {issue_type}")
def _get_recommendation_for_issue(self, issue_type: str) -> str:
"""Get recommendation for an issue type."""
recommendations = {
'sql_injection': "Use parameterized queries or ORM methods to prevent SQL injection",
'hardcoded_ip': "Move IP addresses to configuration files or environment variables",
'debug_mode': "Set debug mode via environment variables, default to False in production",
'weak_crypto': "Use stronger cryptographic algorithms (SHA-256 or better)",
}
return recommendations.get(issue_type, "Review and address this security concern")
def generate_report(self, findings: List[SecurityFinding]) -> Dict[str, Any]:
"""
Generate a security report from findings.
Args:
findings: List of security findings
Returns:
Dictionary containing security report
"""
report = {
'total_issues': len(findings),
'by_severity': {},
'by_type': {},
'files_affected': set(),
'critical_issues': [],
'recommendations': []
}
# Count by severity
for severity in SecurityLevel:
count = len([f for f in findings if f.severity == severity])
if count > 0:
report['by_severity'][severity.value] = count
# Count by type
for finding in findings:
if finding.issue_type not in report['by_type']:
report['by_type'][finding.issue_type] = 0
report['by_type'][finding.issue_type] += 1
report['files_affected'].add(finding.file_path)
if finding.severity in [SecurityLevel.CRITICAL, SecurityLevel.HIGH]:
report['critical_issues'].append({
'file': finding.file_path,
'line': finding.line_number,
'type': finding.issue_type,
'severity': finding.severity.value,
'description': finding.description
})
report['files_affected'] = list(report['files_affected'])
# Generate summary recommendations
if report['by_type']:
report['recommendations'] = self._generate_recommendations(report['by_type'])
return report
def _generate_recommendations(self, issues_by_type: Dict[str, int]) -> List[str]:
"""Generate summary recommendations based on issue types found."""
recommendations = []
if any('hardcoded' in issue_type for issue_type in issues_by_type):
recommendations.append(
"Implement a secure configuration management system using environment variables or encrypted config files"
)
if 'sql_injection' in issues_by_type:
recommendations.append(
"Review all database queries and ensure parameterized queries are used consistently"
)
if 'debug_mode' in issues_by_type:
recommendations.append(
"Implement environment-based configuration for debug settings"
)
if 'weak_crypto' in issues_by_type:
recommendations.append(
"Upgrade cryptographic implementations to use stronger algorithms"
)
return recommendations
class PasswordStrengthValidator:
"""Utility for validating password strength and generating secure passwords."""
def __init__(self):
self.min_length = 8
self.require_uppercase = True
self.require_lowercase = True
self.require_digits = True
self.require_special = True
def validate_password_strength(self, password: str) -> Tuple[bool, List[str]]:
"""
Validate password strength.
Args:
password: Password to validate
Returns:
Tuple of (is_valid, list_of_issues)
"""
issues = []
if len(password) < self.min_length:
issues.append(f"Password must be at least {self.min_length} characters long")
if self.require_uppercase and not re.search(r'[A-Z]', password):
issues.append("Password must contain at least one uppercase letter")
if self.require_lowercase and not re.search(r'[a-z]', password):
issues.append("Password must contain at least one lowercase letter")
if self.require_digits and not re.search(r'\d', password):
issues.append("Password must contain at least one digit")
if self.require_special and not re.search(r'[!@#$%^&*(),.?":{}|<>]', password):
issues.append("Password must contain at least one special character")
return len(issues) == 0, issues
def generate_secure_password(self, length: int = 16) -> str:
"""
Generate a cryptographically secure password.
Args:
length: Length of password to generate
Returns:
Secure password string
"""
import string
# Define character sets
lowercase = string.ascii_lowercase
uppercase = string.ascii_uppercase
digits = string.digits
special = "!@#$%^&*(),.?\":{}|<>"
# Ensure at least one character from each required set
password_chars = []
if self.require_lowercase:
password_chars.append(secrets.choice(lowercase))
if self.require_uppercase:
password_chars.append(secrets.choice(uppercase))
if self.require_digits:
password_chars.append(secrets.choice(digits))
if self.require_special:
password_chars.append(secrets.choice(special))
# Fill remaining length with random characters from all sets
all_chars = lowercase + uppercase + digits + special
for _ in range(length - len(password_chars)):
password_chars.append(secrets.choice(all_chars))
# Shuffle the password characters
secrets.SystemRandom().shuffle(password_chars)
return ''.join(password_chars)
def audit_code_security(directory_path: str, file_pattern: str = "*.py") -> Dict[str, Any]:
"""
Perform a comprehensive security audit of code in a directory.
Args:
directory_path: Path to the directory to audit
file_pattern: File pattern to match (default: *.py)
Returns:
Security audit report
"""
validator = CredentialValidator()
path = Path(directory_path)
security_logger.info(f"Starting security audit of {directory_path}")
findings = validator.scan_directory(path, file_pattern)
report = validator.generate_report(findings)
security_logger.info(f"Security audit completed", **{
"total_issues": report['total_issues'],
"files_scanned": len(report['files_affected']),
"critical_issues": len(report['critical_issues'])
})
return report
def hash_password_securely(password: str) -> str:
"""
Hash a password using a secure algorithm.
Args:
password: Plain text password
Returns:
Securely hashed password
"""
import bcrypt
# Generate salt and hash password
salt = bcrypt.gensalt()
hashed = bcrypt.hashpw(password.encode('utf-8'), salt)
return hashed.decode('utf-8')
def verify_password(password: str, hashed: str) -> bool:
"""
Verify a password against its hash.
Args:
password: Plain text password
hashed: Hashed password
Returns:
True if password matches, False otherwise
"""
import bcrypt
return bcrypt.checkpw(password.encode('utf-8'), hashed.encode('utf-8'))