""" Enhanced Authentication Utilities Provides advanced authentication features including: - Password complexity validation - Account lockout protection - Session management - Login attempt tracking - Suspicious activity detection """ import re import time from datetime import datetime, timedelta, timezone from typing import Optional, Dict, List, Tuple from sqlalchemy.orm import Session from sqlalchemy import func, and_ from fastapi import HTTPException, status, Request from passlib.context import CryptContext from app.models.user import User try: # Optional: enhanced features may rely on this model from app.models.auth import LoginAttempt # type: ignore except Exception: # pragma: no cover - older schemas may not include this model LoginAttempt = None # type: ignore from app.config import settings from app.utils.logging import app_logger logger = app_logger.bind(name="enhanced_auth") # Password complexity configuration PASSWORD_CONFIG = { "min_length": 8, "max_length": 128, "require_uppercase": True, "require_lowercase": True, "require_digits": True, "require_special_chars": True, "special_chars": "!@#$%^&*()_+-=[]{}|;:,.<>?", "max_consecutive_chars": 3, "prevent_common_passwords": True, } # Account lockout configuration LOCKOUT_CONFIG = { "max_attempts": 5, "lockout_duration": 900, # 15 minutes "window_duration": 900, # 15 minutes "progressive_delay": True, "notify_on_lockout": True, } # Common weak passwords to prevent COMMON_PASSWORDS = { "password", "123456", "password123", "admin", "qwerty", "letmein", "welcome", "monkey", "1234567890", "password1", "123456789", "welcome123", "admin123", "root", "toor", "pass", "test", "guest", "user", "login", "default", "changeme", "secret", "administrator" } # Password validation context pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") class PasswordValidator: """Advanced password validation with security requirements""" @staticmethod def validate_password_strength(password: str) -> Tuple[bool, List[str]]: """Validate password strength and return detailed feedback""" errors = [] # Length check if len(password) < PASSWORD_CONFIG["min_length"]: errors.append(f"Password must be at least {PASSWORD_CONFIG['min_length']} characters long") if len(password) > PASSWORD_CONFIG["max_length"]: errors.append(f"Password must not exceed {PASSWORD_CONFIG['max_length']} characters") # Character requirements if PASSWORD_CONFIG["require_uppercase"] and not re.search(r'[A-Z]', password): errors.append("Password must contain at least one uppercase letter") if PASSWORD_CONFIG["require_lowercase"] and not re.search(r'[a-z]', password): errors.append("Password must contain at least one lowercase letter") if PASSWORD_CONFIG["require_digits"] and not re.search(r'\d', password): errors.append("Password must contain at least one digit") if PASSWORD_CONFIG["require_special_chars"]: special_chars = PASSWORD_CONFIG["special_chars"] if not re.search(f'[{re.escape(special_chars)}]', password): errors.append(f"Password must contain at least one special character ({special_chars[:10]}...)") # Consecutive character check max_consecutive = PASSWORD_CONFIG["max_consecutive_chars"] for i in range(len(password) - max_consecutive): substr = password[i:i + max_consecutive + 1] if len(set(substr)) == 1: # All same character errors.append(f"Password cannot contain more than {max_consecutive} consecutive identical characters") break # Common password check if PASSWORD_CONFIG["prevent_common_passwords"]: if password.lower() in COMMON_PASSWORDS: errors.append("Password is too common and easily guessable") # Sequential character check if PasswordValidator._contains_sequence(password): errors.append("Password cannot contain common keyboard sequences") # Dictionary word check (basic) if PasswordValidator._is_dictionary_word(password): errors.append("Password should not be a common dictionary word") return len(errors) == 0, errors @staticmethod def _contains_sequence(password: str) -> bool: """Check for common keyboard sequences""" sequences = [ "123456789", "987654321", "abcdefgh", "zyxwvuts", "qwertyui", "asdfghjk", "zxcvbnm", "uioplkjh", "qazwsxed", "plmoknij" ] password_lower = password.lower() for seq in sequences: if seq in password_lower or seq[::-1] in password_lower: return True return False @staticmethod def _is_dictionary_word(password: str) -> bool: """Basic check for common dictionary words""" # Simple check for common English words common_words = { "password", "computer", "internet", "database", "security", "welcome", "hello", "world", "admin", "user", "login", "system", "server", "network", "access", "control" } return password.lower() in common_words @staticmethod def generate_password_strength_score(password: str) -> int: """Generate a password strength score from 0-100""" score = 0 # Length score (up to 25 points) score += min(25, len(password) * 2) # Character diversity (up to 40 points) if re.search(r'[a-z]', password): score += 5 if re.search(r'[A-Z]', password): score += 5 if re.search(r'\d', password): score += 5 if re.search(r'[!@#$%^&*()_+\-=\[\]{}|;:,.<>?]', password): score += 10 # Bonus for multiple character types char_types = sum([ bool(re.search(r'[a-z]', password)), bool(re.search(r'[A-Z]', password)), bool(re.search(r'\d', password)), bool(re.search(r'[!@#$%^&*()_+\-=\[\]{}|;:,.<>?]', password)) ]) score += char_types * 3 # Length bonus if len(password) >= 12: score += 10 if len(password) >= 16: score += 5 # Penalties if password.lower() in COMMON_PASSWORDS: score -= 25 # Check for patterns if re.search(r'(.)\1{2,}', password): # Repeated characters score -= 10 return max(0, min(100, score)) class AccountLockoutManager: """Manages account lockout and login attempt tracking""" @staticmethod def record_login_attempt( db: Session, username: str, success: bool, ip_address: str, user_agent: str, failure_reason: Optional[str] = None ) -> None: """Record a login attempt in the database""" try: if LoginAttempt is None: # Schema not available; log-only fallback logger.info( "Login attempt (no model)", username=username, success=success, ip=ip_address, reason=failure_reason ) return attempt = LoginAttempt( # type: ignore[call-arg] username=username, ip_address=ip_address, user_agent=user_agent, success=1 if success else 0, failure_reason=failure_reason, timestamp=datetime.now(timezone.utc) ) db.add(attempt) db.commit() logger.info( "Login attempt recorded", username=username, success=success, ip=ip_address, reason=failure_reason ) except Exception as e: logger.error("Failed to record login attempt", error=str(e)) db.rollback() @staticmethod def is_account_locked(db: Session, username: str) -> Tuple[bool, Optional[datetime]]: """Check if an account is locked due to failed attempts""" try: if LoginAttempt is None: return False, None now = datetime.now(timezone.utc) window_start = now - timedelta(seconds=LOCKOUT_CONFIG["window_duration"]) # Count failed attempts within the window failed_attempts = db.query(func.count(LoginAttempt.id)).filter( # type: ignore[attr-defined] and_( LoginAttempt.username == username, LoginAttempt.success == 0, LoginAttempt.timestamp >= window_start ) ).scalar() if failed_attempts >= LOCKOUT_CONFIG["max_attempts"]: # Get the time of the last failed attempt last_attempt = db.query(LoginAttempt.timestamp).filter( # type: ignore[attr-defined] and_( LoginAttempt.username == username, LoginAttempt.success == 0 ) ).order_by(LoginAttempt.timestamp.desc()).first() if last_attempt: unlock_time = last_attempt[0] + timedelta(seconds=LOCKOUT_CONFIG["lockout_duration"]) if now < unlock_time: return True, unlock_time return False, None except Exception as e: logger.error("Failed to check account lockout", error=str(e)) return False, None @staticmethod def get_lockout_info(db: Session, username: str) -> Dict[str, any]: """Get detailed lockout information for an account""" try: now = datetime.now(timezone.utc) window_start = now - timedelta(seconds=LOCKOUT_CONFIG["window_duration"]) if LoginAttempt is None: return { "is_locked": False, "failed_attempts": 0, "max_attempts": LOCKOUT_CONFIG["max_attempts"], "attempts_remaining": LOCKOUT_CONFIG["max_attempts"], "unlock_time": None, "window_start": window_start.isoformat(), "lockout_duration": LOCKOUT_CONFIG["lockout_duration"], } # Get recent failed attempts failed_attempts = db.query(LoginAttempt).filter( # type: ignore[arg-type] and_( LoginAttempt.username == username, LoginAttempt.success == 0, LoginAttempt.timestamp >= window_start ) ).order_by(LoginAttempt.timestamp.desc()).all() failed_count = len(failed_attempts) is_locked, unlock_time = AccountLockoutManager.is_account_locked(db, username) return { "is_locked": is_locked, "failed_attempts": failed_count, "max_attempts": LOCKOUT_CONFIG["max_attempts"], "attempts_remaining": max(0, LOCKOUT_CONFIG["max_attempts"] - failed_count), "unlock_time": unlock_time.isoformat() if unlock_time else None, "window_start": window_start.isoformat(), "lockout_duration": LOCKOUT_CONFIG["lockout_duration"], } except Exception as e: logger.error("Failed to get lockout info", error=str(e)) return { "is_locked": False, "failed_attempts": 0, "max_attempts": LOCKOUT_CONFIG["max_attempts"], "attempts_remaining": LOCKOUT_CONFIG["max_attempts"], "unlock_time": None, "window_start": window_start.isoformat() if 'window_start' in locals() else None, "lockout_duration": LOCKOUT_CONFIG["lockout_duration"], } @staticmethod def reset_failed_attempts(db: Session, username: str) -> None: """Reset failed login attempts for successful login""" try: # We don't delete the records, just mark successful login # The lockout check will naturally reset due to time window logger.info("Failed attempts naturally reset for successful login", username=username) except Exception as e: logger.error("Failed to reset attempts", error=str(e)) class SuspiciousActivityDetector: """Detects and reports suspicious authentication activity""" @staticmethod def detect_suspicious_patterns(db: Session, timeframe_hours: int = 24) -> List[Dict[str, any]]: """Detect suspicious login patterns""" alerts = [] try: if LoginAttempt is None: return [] cutoff_time = datetime.now(timezone.utc) - timedelta(hours=timeframe_hours) # Get all login attempts in timeframe attempts = db.query(LoginAttempt).filter( # type: ignore[arg-type] LoginAttempt.timestamp >= cutoff_time ).all() # Analyze patterns ip_attempts = {} username_attempts = {} for attempt in attempts: # Group by IP if attempt.ip_address not in ip_attempts: ip_attempts[attempt.ip_address] = [] ip_attempts[attempt.ip_address].append(attempt) # Group by username if attempt.username not in username_attempts: username_attempts[attempt.username] = [] username_attempts[attempt.username].append(attempt) # Check for suspicious IP activity for ip, attempts_list in ip_attempts.items(): failed_attempts = [a for a in attempts_list if not a.success] if len(failed_attempts) >= 10: # Many failed attempts from one IP alerts.append({ "type": "suspicious_ip", "severity": "high", "ip_address": ip, "failed_attempts": len(failed_attempts), "usernames_targeted": list(set(a.username for a in failed_attempts)), "timeframe": f"{timeframe_hours} hours" }) # Check for account targeting for username, attempts_list in username_attempts.items(): failed_attempts = [a for a in attempts_list if not a.success] unique_ips = set(a.ip_address for a in failed_attempts) if len(failed_attempts) >= 5 and len(unique_ips) > 2: alerts.append({ "type": "account_targeted", "severity": "medium", "username": username, "failed_attempts": len(failed_attempts), "source_ips": list(unique_ips), "timeframe": f"{timeframe_hours} hours" }) return alerts except Exception as e: logger.error("Failed to detect suspicious patterns", error=str(e)) return [] @staticmethod def is_login_suspicious( db: Session, username: str, ip_address: str, user_agent: str ) -> Tuple[bool, List[str]]: """Check if a login attempt is suspicious""" warnings = [] try: if LoginAttempt is None: return False, [] # Check for unusual IP recent_ips = db.query(LoginAttempt.ip_address).filter( # type: ignore[attr-defined] and_( LoginAttempt.username == username, LoginAttempt.success == 1, LoginAttempt.timestamp >= datetime.now(timezone.utc) - timedelta(days=30) ) ).distinct().all() known_ips = {ip[0] for ip in recent_ips} if ip_address not in known_ips and len(known_ips) > 0: warnings.append("Login from new IP address") # Check for unusual time now = datetime.now(timezone.utc) if now.hour < 6 or now.hour > 22: # Outside business hours warnings.append("Login outside normal business hours") # Check for rapid attempts from same IP recent_attempts = db.query(func.count(LoginAttempt.id)).filter( # type: ignore[attr-defined] and_( LoginAttempt.ip_address == ip_address, LoginAttempt.timestamp >= datetime.now(timezone.utc) - timedelta(minutes=5) ) ).scalar() if recent_attempts > 3: warnings.append("Multiple rapid login attempts from same IP") return len(warnings) > 0, warnings except Exception as e: logger.error("Failed to check suspicious login", error=str(e)) return False, [] def validate_and_authenticate_user( db: Session, username: str, password: str, request: Request ) -> Tuple[Optional[User], List[str]]: """Enhanced user authentication with security checks""" errors = [] try: # Extract request information ip_address = get_client_ip(request) user_agent = request.headers.get("user-agent", "") # Check account lockout is_locked, unlock_time = AccountLockoutManager.is_account_locked(db, username) if is_locked: AccountLockoutManager.record_login_attempt( db, username, False, ip_address, user_agent, "Account locked" ) unlock_str = unlock_time.strftime("%Y-%m-%d %H:%M:%S UTC") if unlock_time else "unknown" errors.append(f"Account is locked due to too many failed attempts. Try again after {unlock_str}") return None, errors # Find user user = db.query(User).filter(User.username == username).first() if not user: AccountLockoutManager.record_login_attempt( db, username, False, ip_address, user_agent, "User not found" ) errors.append("Invalid username or password") return None, errors # Check if user is active if not user.is_active: AccountLockoutManager.record_login_attempt( db, username, False, ip_address, user_agent, "User account disabled" ) errors.append("User account is disabled") return None, errors # Verify password from app.auth.security import verify_password if not verify_password(password, user.hashed_password): AccountLockoutManager.record_login_attempt( db, username, False, ip_address, user_agent, "Invalid password" ) errors.append("Invalid username or password") return None, errors # Check for suspicious activity is_suspicious, warnings = SuspiciousActivityDetector.is_login_suspicious( db, username, ip_address, user_agent ) if is_suspicious: logger.warning( "Suspicious login detected", username=username, ip=ip_address, warnings=warnings ) # You could require additional verification here # Successful login AccountLockoutManager.record_login_attempt( db, username, True, ip_address, user_agent, None ) # Update last login time user.last_login = datetime.now(timezone.utc) db.commit() return user, [] except Exception as e: logger.error("Authentication error", error=str(e)) errors.append("Authentication service temporarily unavailable") return None, errors def get_client_ip(request: Request) -> str: """Extract client IP from request headers""" forwarded_for = request.headers.get("x-forwarded-for") if forwarded_for: return forwarded_for.split(",")[0].strip() real_ip = request.headers.get("x-real-ip") if real_ip: return real_ip if request.client: return request.client.host return "unknown"