""" Session management models for advanced security """ from datetime import datetime, timezone, timedelta from typing import Optional from enum import Enum from sqlalchemy import Column, Integer, String, DateTime, Boolean, ForeignKey, Text, func from sqlalchemy.orm import relationship from sqlalchemy.dialects.postgresql import UUID import uuid from app.models.base import BaseModel class SessionStatus(str, Enum): """Session status enumeration""" ACTIVE = "active" EXPIRED = "expired" REVOKED = "revoked" LOCKED = "locked" class UserSession(BaseModel): """ Enhanced user session tracking for security monitoring """ __tablename__ = "user_sessions" id = Column(Integer, primary_key=True, autoincrement=True, index=True) session_id = Column(String(128), nullable=False, unique=True, index=True) # Secure session identifier user_id = Column(Integer, ForeignKey("users.id"), nullable=False, index=True) # Session metadata ip_address = Column(String(45), nullable=True, index=True) user_agent = Column(Text, nullable=True) device_fingerprint = Column(String(255), nullable=True) # For device tracking # Geographic and security info country = Column(String(5), nullable=True) # ISO country code city = Column(String(100), nullable=True) is_suspicious = Column(Boolean, default=False, nullable=False, index=True) risk_score = Column(Integer, default=0, nullable=False) # 0-100 risk assessment # Session lifecycle status = Column(String(20), default=SessionStatus.ACTIVE, nullable=False, index=True) created_at = Column(DateTime(timezone=True), default=lambda: datetime.now(timezone.utc), nullable=False) last_activity = Column(DateTime(timezone=True), default=lambda: datetime.now(timezone.utc), nullable=False, index=True) expires_at = Column(DateTime(timezone=True), nullable=False, index=True) # Security tracking login_method = Column(String(50), nullable=True) # password, 2fa, sso, etc. locked_at = Column(DateTime(timezone=True), nullable=True) revoked_at = Column(DateTime(timezone=True), nullable=True) revocation_reason = Column(String(100), nullable=True) # Relationships user = relationship("User", back_populates="sessions") activities = relationship("SessionActivity", back_populates="session", cascade="all, delete-orphan") def is_expired(self) -> bool: """Check if session is expired""" return datetime.now(timezone.utc) >= self.expires_at def is_active(self) -> bool: """Check if session is currently active""" return self.status == SessionStatus.ACTIVE and not self.is_expired() def extend_session(self, duration: timedelta = timedelta(hours=8)) -> None: """Extend session expiration time""" self.expires_at = datetime.now(timezone.utc) + duration self.last_activity = datetime.now(timezone.utc) def revoke_session(self, reason: str = "user_logout") -> None: """Revoke the session""" self.status = SessionStatus.REVOKED self.revoked_at = datetime.now(timezone.utc) self.revocation_reason = reason def lock_session(self, reason: str = "suspicious_activity") -> None: """Lock the session for security reasons""" self.status = SessionStatus.LOCKED self.locked_at = datetime.now(timezone.utc) self.revocation_reason = reason class SessionActivity(BaseModel): """ Track user activities within sessions for security analysis """ __tablename__ = "session_activities" id = Column(Integer, primary_key=True, autoincrement=True, index=True) session_id = Column(Integer, ForeignKey("user_sessions.id"), nullable=False, index=True) user_id = Column(Integer, ForeignKey("users.id"), nullable=False, index=True) # Activity details activity_type = Column(String(50), nullable=False, index=True) # login, logout, api_call, admin_action, etc. endpoint = Column(String(255), nullable=True) # API endpoint accessed method = Column(String(10), nullable=True) # HTTP method status_code = Column(Integer, nullable=True) # Response status # Request details ip_address = Column(String(45), nullable=True, index=True) user_agent = Column(Text, nullable=True) referer = Column(String(255), nullable=True) # Security analysis is_suspicious = Column(Boolean, default=False, nullable=False, index=True) risk_factors = Column(Text, nullable=True) # JSON string of detected risks # Timing timestamp = Column(DateTime(timezone=True), default=lambda: datetime.now(timezone.utc), nullable=False, index=True) duration_ms = Column(Integer, nullable=True) # Request processing time # Additional metadata resource_accessed = Column(String(255), nullable=True) # File, customer, etc. data_volume = Column(Integer, nullable=True) # Bytes transferred # Relationships session = relationship("UserSession", back_populates="activities") user = relationship("User") class SessionConfiguration(BaseModel): """ Configurable session policies and limits """ __tablename__ = "session_configurations" id = Column(Integer, primary_key=True, autoincrement=True, index=True) user_id = Column(Integer, ForeignKey("users.id"), nullable=True, index=True) # Null for global config # Session limits max_concurrent_sessions = Column(Integer, default=3, nullable=False) session_timeout_minutes = Column(Integer, default=480, nullable=False) # 8 hours default idle_timeout_minutes = Column(Integer, default=60, nullable=False) # 1 hour idle # Security policies require_session_renewal = Column(Boolean, default=True, nullable=False) renewal_interval_hours = Column(Integer, default=24, nullable=False) force_logout_on_ip_change = Column(Boolean, default=False, nullable=False) suspicious_activity_threshold = Column(Integer, default=5, nullable=False) # Geographic restrictions allowed_countries = Column(Text, nullable=True) # JSON array of ISO codes blocked_countries = Column(Text, nullable=True) # JSON array of ISO codes # Timestamps created_at = Column(DateTime(timezone=True), default=lambda: datetime.now(timezone.utc), nullable=False) updated_at = Column(DateTime(timezone=True), default=lambda: datetime.now(timezone.utc), onupdate=func.now(), nullable=False) # Relationships user = relationship("User") # If user-specific config class SessionSecurityEvent(BaseModel): """ Track security events related to sessions """ __tablename__ = "session_security_events" id = Column(Integer, primary_key=True, autoincrement=True, index=True) session_id = Column(Integer, ForeignKey("user_sessions.id"), nullable=True, index=True) user_id = Column(Integer, ForeignKey("users.id"), nullable=False, index=True) # Event details event_type = Column(String(50), nullable=False, index=True) # session_fixation, concurrent_limit, suspicious_login, etc. severity = Column(String(20), nullable=False, index=True) # low, medium, high, critical description = Column(Text, nullable=False) # Context ip_address = Column(String(45), nullable=True, index=True) user_agent = Column(Text, nullable=True) country = Column(String(5), nullable=True) # Response actions action_taken = Column(String(100), nullable=True) # session_locked, user_notified, admin_alerted, etc. resolved = Column(Boolean, default=False, nullable=False, index=True) resolved_at = Column(DateTime(timezone=True), nullable=True) resolved_by = Column(Integer, ForeignKey("users.id"), nullable=True) # Timestamps timestamp = Column(DateTime(timezone=True), default=lambda: datetime.now(timezone.utc), nullable=False, index=True) # Relationships session = relationship("UserSession") user = relationship("User", foreign_keys=[user_id]) resolver = relationship("User", foreign_keys=[resolved_by])