541 lines
21 KiB
Python
541 lines
21 KiB
Python
"""
|
|
Enhanced Authentication Utilities
|
|
|
|
Provides advanced authentication features including:
|
|
- Password complexity validation
|
|
- Account lockout protection
|
|
- Session management
|
|
- Login attempt tracking
|
|
- Suspicious activity detection
|
|
"""
|
|
import re
|
|
import time
|
|
from datetime import datetime, timedelta, timezone
|
|
from typing import Optional, Dict, List, Tuple
|
|
from sqlalchemy.orm import Session
|
|
from sqlalchemy import func, and_
|
|
from fastapi import HTTPException, status, Request
|
|
from passlib.context import CryptContext
|
|
|
|
from app.models.user import User
|
|
try:
|
|
# Optional: enhanced features may rely on this model
|
|
from app.models.auth import LoginAttempt # type: ignore
|
|
except Exception: # pragma: no cover - older schemas may not include this model
|
|
LoginAttempt = None # type: ignore
|
|
from app.config import settings
|
|
from app.utils.logging import app_logger
|
|
|
|
logger = app_logger.bind(name="enhanced_auth")
|
|
|
|
# Password complexity configuration
|
|
PASSWORD_CONFIG = {
|
|
"min_length": 8,
|
|
"max_length": 128,
|
|
"require_uppercase": True,
|
|
"require_lowercase": True,
|
|
"require_digits": True,
|
|
"require_special_chars": True,
|
|
"special_chars": "!@#$%^&*()_+-=[]{}|;:,.<>?",
|
|
"max_consecutive_chars": 3,
|
|
"prevent_common_passwords": True,
|
|
}
|
|
|
|
# Account lockout configuration
|
|
LOCKOUT_CONFIG = {
|
|
"max_attempts": 5,
|
|
"lockout_duration": 900, # 15 minutes
|
|
"window_duration": 900, # 15 minutes
|
|
"progressive_delay": True,
|
|
"notify_on_lockout": True,
|
|
}
|
|
|
|
# Common weak passwords to prevent
|
|
COMMON_PASSWORDS = {
|
|
"password", "123456", "password123", "admin", "qwerty", "letmein",
|
|
"welcome", "monkey", "1234567890", "password1", "123456789",
|
|
"welcome123", "admin123", "root", "toor", "pass", "test", "guest",
|
|
"user", "login", "default", "changeme", "secret", "administrator"
|
|
}
|
|
|
|
# Password validation context
|
|
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
|
|
|
|
|
|
class PasswordValidator:
|
|
"""Advanced password validation with security requirements"""
|
|
|
|
@staticmethod
|
|
def validate_password_strength(password: str) -> Tuple[bool, List[str]]:
|
|
"""Validate password strength and return detailed feedback"""
|
|
errors = []
|
|
|
|
# Length check
|
|
if len(password) < PASSWORD_CONFIG["min_length"]:
|
|
errors.append(f"Password must be at least {PASSWORD_CONFIG['min_length']} characters long")
|
|
|
|
if len(password) > PASSWORD_CONFIG["max_length"]:
|
|
errors.append(f"Password must not exceed {PASSWORD_CONFIG['max_length']} characters")
|
|
|
|
# Character requirements
|
|
if PASSWORD_CONFIG["require_uppercase"] and not re.search(r'[A-Z]', password):
|
|
errors.append("Password must contain at least one uppercase letter")
|
|
|
|
if PASSWORD_CONFIG["require_lowercase"] and not re.search(r'[a-z]', password):
|
|
errors.append("Password must contain at least one lowercase letter")
|
|
|
|
if PASSWORD_CONFIG["require_digits"] and not re.search(r'\d', password):
|
|
errors.append("Password must contain at least one digit")
|
|
|
|
if PASSWORD_CONFIG["require_special_chars"]:
|
|
special_chars = PASSWORD_CONFIG["special_chars"]
|
|
if not re.search(f'[{re.escape(special_chars)}]', password):
|
|
errors.append(f"Password must contain at least one special character ({special_chars[:10]}...)")
|
|
|
|
# Consecutive character check
|
|
max_consecutive = PASSWORD_CONFIG["max_consecutive_chars"]
|
|
for i in range(len(password) - max_consecutive):
|
|
substr = password[i:i + max_consecutive + 1]
|
|
if len(set(substr)) == 1: # All same character
|
|
errors.append(f"Password cannot contain more than {max_consecutive} consecutive identical characters")
|
|
break
|
|
|
|
# Common password check
|
|
if PASSWORD_CONFIG["prevent_common_passwords"]:
|
|
if password.lower() in COMMON_PASSWORDS:
|
|
errors.append("Password is too common and easily guessable")
|
|
|
|
# Sequential character check
|
|
if PasswordValidator._contains_sequence(password):
|
|
errors.append("Password cannot contain common keyboard sequences")
|
|
|
|
# Dictionary word check (basic)
|
|
if PasswordValidator._is_dictionary_word(password):
|
|
errors.append("Password should not be a common dictionary word")
|
|
|
|
return len(errors) == 0, errors
|
|
|
|
@staticmethod
|
|
def _contains_sequence(password: str) -> bool:
|
|
"""Check for common keyboard sequences"""
|
|
sequences = [
|
|
"123456789", "987654321", "abcdefgh", "zyxwvuts",
|
|
"qwertyui", "asdfghjk", "zxcvbnm", "uioplkjh",
|
|
"qazwsxed", "plmoknij"
|
|
]
|
|
|
|
password_lower = password.lower()
|
|
for seq in sequences:
|
|
if seq in password_lower or seq[::-1] in password_lower:
|
|
return True
|
|
return False
|
|
|
|
@staticmethod
|
|
def _is_dictionary_word(password: str) -> bool:
|
|
"""Basic check for common dictionary words"""
|
|
# Simple check for common English words
|
|
common_words = {
|
|
"password", "computer", "internet", "database", "security",
|
|
"welcome", "hello", "world", "admin", "user", "login",
|
|
"system", "server", "network", "access", "control"
|
|
}
|
|
|
|
return password.lower() in common_words
|
|
|
|
@staticmethod
|
|
def generate_password_strength_score(password: str) -> int:
|
|
"""Generate a password strength score from 0-100"""
|
|
score = 0
|
|
|
|
# Length score (up to 25 points)
|
|
score += min(25, len(password) * 2)
|
|
|
|
# Character diversity (up to 40 points)
|
|
if re.search(r'[a-z]', password):
|
|
score += 5
|
|
if re.search(r'[A-Z]', password):
|
|
score += 5
|
|
if re.search(r'\d', password):
|
|
score += 5
|
|
if re.search(r'[!@#$%^&*()_+\-=\[\]{}|;:,.<>?]', password):
|
|
score += 10
|
|
|
|
# Bonus for multiple character types
|
|
char_types = sum([
|
|
bool(re.search(r'[a-z]', password)),
|
|
bool(re.search(r'[A-Z]', password)),
|
|
bool(re.search(r'\d', password)),
|
|
bool(re.search(r'[!@#$%^&*()_+\-=\[\]{}|;:,.<>?]', password))
|
|
])
|
|
score += char_types * 3
|
|
|
|
# Length bonus
|
|
if len(password) >= 12:
|
|
score += 10
|
|
if len(password) >= 16:
|
|
score += 5
|
|
|
|
# Penalties
|
|
if password.lower() in COMMON_PASSWORDS:
|
|
score -= 25
|
|
|
|
# Check for patterns
|
|
if re.search(r'(.)\1{2,}', password): # Repeated characters
|
|
score -= 10
|
|
|
|
return max(0, min(100, score))
|
|
|
|
|
|
class AccountLockoutManager:
|
|
"""Manages account lockout and login attempt tracking"""
|
|
|
|
@staticmethod
|
|
def record_login_attempt(
|
|
db: Session,
|
|
username: str,
|
|
success: bool,
|
|
ip_address: str,
|
|
user_agent: str,
|
|
failure_reason: Optional[str] = None
|
|
) -> None:
|
|
"""Record a login attempt in the database"""
|
|
try:
|
|
if LoginAttempt is None:
|
|
# Schema not available; log-only fallback
|
|
logger.info(
|
|
"Login attempt (no model)",
|
|
username=username,
|
|
success=success,
|
|
ip=ip_address,
|
|
reason=failure_reason
|
|
)
|
|
return
|
|
|
|
attempt = LoginAttempt( # type: ignore[call-arg]
|
|
username=username,
|
|
ip_address=ip_address,
|
|
user_agent=user_agent,
|
|
success=1 if success else 0,
|
|
failure_reason=failure_reason,
|
|
timestamp=datetime.now(timezone.utc)
|
|
)
|
|
db.add(attempt)
|
|
db.commit()
|
|
|
|
logger.info(
|
|
"Login attempt recorded",
|
|
username=username,
|
|
success=success,
|
|
ip=ip_address,
|
|
reason=failure_reason
|
|
)
|
|
except Exception as e:
|
|
logger.error("Failed to record login attempt", error=str(e))
|
|
db.rollback()
|
|
|
|
@staticmethod
|
|
def is_account_locked(db: Session, username: str) -> Tuple[bool, Optional[datetime]]:
|
|
"""Check if an account is locked due to failed attempts"""
|
|
try:
|
|
if LoginAttempt is None:
|
|
return False, None
|
|
now = datetime.now(timezone.utc)
|
|
window_start = now - timedelta(seconds=LOCKOUT_CONFIG["window_duration"])
|
|
|
|
# Count failed attempts within the window
|
|
failed_attempts = db.query(func.count(LoginAttempt.id)).filter( # type: ignore[attr-defined]
|
|
and_(
|
|
LoginAttempt.username == username,
|
|
LoginAttempt.success == 0,
|
|
LoginAttempt.timestamp >= window_start
|
|
)
|
|
).scalar()
|
|
|
|
if failed_attempts >= LOCKOUT_CONFIG["max_attempts"]:
|
|
# Get the time of the last failed attempt
|
|
last_attempt = db.query(LoginAttempt.timestamp).filter( # type: ignore[attr-defined]
|
|
and_(
|
|
LoginAttempt.username == username,
|
|
LoginAttempt.success == 0
|
|
)
|
|
).order_by(LoginAttempt.timestamp.desc()).first()
|
|
|
|
if last_attempt:
|
|
unlock_time = last_attempt[0] + timedelta(seconds=LOCKOUT_CONFIG["lockout_duration"])
|
|
if now < unlock_time:
|
|
return True, unlock_time
|
|
|
|
return False, None
|
|
except Exception as e:
|
|
logger.error("Failed to check account lockout", error=str(e))
|
|
return False, None
|
|
|
|
@staticmethod
|
|
def get_lockout_info(db: Session, username: str) -> Dict[str, any]:
|
|
"""Get detailed lockout information for an account"""
|
|
try:
|
|
now = datetime.now(timezone.utc)
|
|
window_start = now - timedelta(seconds=LOCKOUT_CONFIG["window_duration"])
|
|
if LoginAttempt is None:
|
|
return {
|
|
"is_locked": False,
|
|
"failed_attempts": 0,
|
|
"max_attempts": LOCKOUT_CONFIG["max_attempts"],
|
|
"attempts_remaining": LOCKOUT_CONFIG["max_attempts"],
|
|
"unlock_time": None,
|
|
"window_start": window_start.isoformat(),
|
|
"lockout_duration": LOCKOUT_CONFIG["lockout_duration"],
|
|
}
|
|
|
|
# Get recent failed attempts
|
|
failed_attempts = db.query(LoginAttempt).filter( # type: ignore[arg-type]
|
|
and_(
|
|
LoginAttempt.username == username,
|
|
LoginAttempt.success == 0,
|
|
LoginAttempt.timestamp >= window_start
|
|
)
|
|
).order_by(LoginAttempt.timestamp.desc()).all()
|
|
|
|
failed_count = len(failed_attempts)
|
|
is_locked, unlock_time = AccountLockoutManager.is_account_locked(db, username)
|
|
|
|
return {
|
|
"is_locked": is_locked,
|
|
"failed_attempts": failed_count,
|
|
"max_attempts": LOCKOUT_CONFIG["max_attempts"],
|
|
"attempts_remaining": max(0, LOCKOUT_CONFIG["max_attempts"] - failed_count),
|
|
"unlock_time": unlock_time.isoformat() if unlock_time else None,
|
|
"window_start": window_start.isoformat(),
|
|
"lockout_duration": LOCKOUT_CONFIG["lockout_duration"],
|
|
}
|
|
except Exception as e:
|
|
logger.error("Failed to get lockout info", error=str(e))
|
|
return {
|
|
"is_locked": False,
|
|
"failed_attempts": 0,
|
|
"max_attempts": LOCKOUT_CONFIG["max_attempts"],
|
|
"attempts_remaining": LOCKOUT_CONFIG["max_attempts"],
|
|
"unlock_time": None,
|
|
"window_start": window_start.isoformat() if 'window_start' in locals() else None,
|
|
"lockout_duration": LOCKOUT_CONFIG["lockout_duration"],
|
|
}
|
|
|
|
@staticmethod
|
|
def reset_failed_attempts(db: Session, username: str) -> None:
|
|
"""Reset failed login attempts for successful login"""
|
|
try:
|
|
# We don't delete the records, just mark successful login
|
|
# The lockout check will naturally reset due to time window
|
|
logger.info("Failed attempts naturally reset for successful login", username=username)
|
|
except Exception as e:
|
|
logger.error("Failed to reset attempts", error=str(e))
|
|
|
|
|
|
class SuspiciousActivityDetector:
|
|
"""Detects and reports suspicious authentication activity"""
|
|
|
|
@staticmethod
|
|
def detect_suspicious_patterns(db: Session, timeframe_hours: int = 24) -> List[Dict[str, any]]:
|
|
"""Detect suspicious login patterns"""
|
|
alerts = []
|
|
try:
|
|
if LoginAttempt is None:
|
|
return []
|
|
cutoff_time = datetime.now(timezone.utc) - timedelta(hours=timeframe_hours)
|
|
|
|
# Get all login attempts in timeframe
|
|
attempts = db.query(LoginAttempt).filter( # type: ignore[arg-type]
|
|
LoginAttempt.timestamp >= cutoff_time
|
|
).all()
|
|
|
|
# Analyze patterns
|
|
ip_attempts = {}
|
|
username_attempts = {}
|
|
|
|
for attempt in attempts:
|
|
# Group by IP
|
|
if attempt.ip_address not in ip_attempts:
|
|
ip_attempts[attempt.ip_address] = []
|
|
ip_attempts[attempt.ip_address].append(attempt)
|
|
|
|
# Group by username
|
|
if attempt.username not in username_attempts:
|
|
username_attempts[attempt.username] = []
|
|
username_attempts[attempt.username].append(attempt)
|
|
|
|
# Check for suspicious IP activity
|
|
for ip, attempts_list in ip_attempts.items():
|
|
failed_attempts = [a for a in attempts_list if not a.success]
|
|
if len(failed_attempts) >= 10: # Many failed attempts from one IP
|
|
alerts.append({
|
|
"type": "suspicious_ip",
|
|
"severity": "high",
|
|
"ip_address": ip,
|
|
"failed_attempts": len(failed_attempts),
|
|
"usernames_targeted": list(set(a.username for a in failed_attempts)),
|
|
"timeframe": f"{timeframe_hours} hours"
|
|
})
|
|
|
|
# Check for account targeting
|
|
for username, attempts_list in username_attempts.items():
|
|
failed_attempts = [a for a in attempts_list if not a.success]
|
|
unique_ips = set(a.ip_address for a in failed_attempts)
|
|
|
|
if len(failed_attempts) >= 5 and len(unique_ips) > 2:
|
|
alerts.append({
|
|
"type": "account_targeted",
|
|
"severity": "medium",
|
|
"username": username,
|
|
"failed_attempts": len(failed_attempts),
|
|
"source_ips": list(unique_ips),
|
|
"timeframe": f"{timeframe_hours} hours"
|
|
})
|
|
|
|
return alerts
|
|
except Exception as e:
|
|
logger.error("Failed to detect suspicious patterns", error=str(e))
|
|
return []
|
|
|
|
@staticmethod
|
|
def is_login_suspicious(
|
|
db: Session,
|
|
username: str,
|
|
ip_address: str,
|
|
user_agent: str
|
|
) -> Tuple[bool, List[str]]:
|
|
"""Check if a login attempt is suspicious"""
|
|
warnings = []
|
|
try:
|
|
if LoginAttempt is None:
|
|
return False, []
|
|
# Check for unusual IP
|
|
recent_ips = db.query(LoginAttempt.ip_address).filter( # type: ignore[attr-defined]
|
|
and_(
|
|
LoginAttempt.username == username,
|
|
LoginAttempt.success == 1,
|
|
LoginAttempt.timestamp >= datetime.now(timezone.utc) - timedelta(days=30)
|
|
)
|
|
).distinct().all()
|
|
|
|
known_ips = {ip[0] for ip in recent_ips}
|
|
if ip_address not in known_ips and len(known_ips) > 0:
|
|
warnings.append("Login from new IP address")
|
|
|
|
# Check for unusual time
|
|
now = datetime.now(timezone.utc)
|
|
if now.hour < 6 or now.hour > 22: # Outside business hours
|
|
warnings.append("Login outside normal business hours")
|
|
|
|
# Check for rapid attempts from same IP
|
|
recent_attempts = db.query(func.count(LoginAttempt.id)).filter( # type: ignore[attr-defined]
|
|
and_(
|
|
LoginAttempt.ip_address == ip_address,
|
|
LoginAttempt.timestamp >= datetime.now(timezone.utc) - timedelta(minutes=5)
|
|
)
|
|
).scalar()
|
|
|
|
if recent_attempts > 3:
|
|
warnings.append("Multiple rapid login attempts from same IP")
|
|
|
|
return len(warnings) > 0, warnings
|
|
except Exception as e:
|
|
logger.error("Failed to check suspicious login", error=str(e))
|
|
return False, []
|
|
|
|
|
|
def validate_and_authenticate_user(
|
|
db: Session,
|
|
username: str,
|
|
password: str,
|
|
request: Request
|
|
) -> Tuple[Optional[User], List[str]]:
|
|
"""Enhanced user authentication with security checks"""
|
|
errors = []
|
|
|
|
try:
|
|
# Extract request information
|
|
ip_address = get_client_ip(request)
|
|
user_agent = request.headers.get("user-agent", "")
|
|
|
|
# Check account lockout
|
|
is_locked, unlock_time = AccountLockoutManager.is_account_locked(db, username)
|
|
if is_locked:
|
|
AccountLockoutManager.record_login_attempt(
|
|
db, username, False, ip_address, user_agent, "Account locked"
|
|
)
|
|
unlock_str = unlock_time.strftime("%Y-%m-%d %H:%M:%S UTC") if unlock_time else "unknown"
|
|
errors.append(f"Account is locked due to too many failed attempts. Try again after {unlock_str}")
|
|
return None, errors
|
|
|
|
# Find user
|
|
user = db.query(User).filter(User.username == username).first()
|
|
if not user:
|
|
AccountLockoutManager.record_login_attempt(
|
|
db, username, False, ip_address, user_agent, "User not found"
|
|
)
|
|
errors.append("Invalid username or password")
|
|
return None, errors
|
|
|
|
# Check if user is active
|
|
if not user.is_active:
|
|
AccountLockoutManager.record_login_attempt(
|
|
db, username, False, ip_address, user_agent, "User account disabled"
|
|
)
|
|
errors.append("User account is disabled")
|
|
return None, errors
|
|
|
|
# Verify password
|
|
from app.auth.security import verify_password
|
|
if not verify_password(password, user.hashed_password):
|
|
AccountLockoutManager.record_login_attempt(
|
|
db, username, False, ip_address, user_agent, "Invalid password"
|
|
)
|
|
errors.append("Invalid username or password")
|
|
return None, errors
|
|
|
|
# Check for suspicious activity
|
|
is_suspicious, warnings = SuspiciousActivityDetector.is_login_suspicious(
|
|
db, username, ip_address, user_agent
|
|
)
|
|
|
|
if is_suspicious:
|
|
logger.warning(
|
|
"Suspicious login detected",
|
|
username=username,
|
|
ip=ip_address,
|
|
warnings=warnings
|
|
)
|
|
# You could require additional verification here
|
|
|
|
# Successful login
|
|
AccountLockoutManager.record_login_attempt(
|
|
db, username, True, ip_address, user_agent, None
|
|
)
|
|
|
|
# Update last login time
|
|
user.last_login = datetime.now(timezone.utc)
|
|
db.commit()
|
|
|
|
return user, []
|
|
|
|
except Exception as e:
|
|
logger.error("Authentication error", error=str(e))
|
|
errors.append("Authentication service temporarily unavailable")
|
|
return None, errors
|
|
|
|
|
|
def get_client_ip(request: Request) -> str:
|
|
"""Extract client IP from request headers"""
|
|
forwarded_for = request.headers.get("x-forwarded-for")
|
|
if forwarded_for:
|
|
return forwarded_for.split(",")[0].strip()
|
|
|
|
real_ip = request.headers.get("x-real-ip")
|
|
if real_ip:
|
|
return real_ip
|
|
|
|
if request.client:
|
|
return request.client.host
|
|
|
|
return "unknown"
|