Files
delphi-database/app/models/sessions.py
HotSwapp bac8cc4bd5 changes
2025-08-18 20:20:04 -05:00

190 lines
7.9 KiB
Python

"""
Session management models for advanced security
"""
from datetime import datetime, timezone, timedelta
from typing import Optional
from enum import Enum
from sqlalchemy import Column, Integer, String, DateTime, Boolean, ForeignKey, Text, func
from sqlalchemy.orm import relationship
from sqlalchemy.dialects.postgresql import UUID
import uuid
from app.models.base import BaseModel
class SessionStatus(str, Enum):
"""Session status enumeration"""
ACTIVE = "active"
EXPIRED = "expired"
REVOKED = "revoked"
LOCKED = "locked"
class UserSession(BaseModel):
"""
Enhanced user session tracking for security monitoring
"""
__tablename__ = "user_sessions"
id = Column(Integer, primary_key=True, autoincrement=True, index=True)
session_id = Column(String(128), nullable=False, unique=True, index=True) # Secure session identifier
user_id = Column(Integer, ForeignKey("users.id"), nullable=False, index=True)
# Session metadata
ip_address = Column(String(45), nullable=True, index=True)
user_agent = Column(Text, nullable=True)
device_fingerprint = Column(String(255), nullable=True) # For device tracking
# Geographic and security info
country = Column(String(5), nullable=True) # ISO country code
city = Column(String(100), nullable=True)
is_suspicious = Column(Boolean, default=False, nullable=False, index=True)
risk_score = Column(Integer, default=0, nullable=False) # 0-100 risk assessment
# Session lifecycle
status = Column(String(20), default=SessionStatus.ACTIVE, nullable=False, index=True)
created_at = Column(DateTime(timezone=True), default=lambda: datetime.now(timezone.utc), nullable=False)
last_activity = Column(DateTime(timezone=True), default=lambda: datetime.now(timezone.utc), nullable=False, index=True)
expires_at = Column(DateTime(timezone=True), nullable=False, index=True)
# Security tracking
login_method = Column(String(50), nullable=True) # password, 2fa, sso, etc.
locked_at = Column(DateTime(timezone=True), nullable=True)
revoked_at = Column(DateTime(timezone=True), nullable=True)
revocation_reason = Column(String(100), nullable=True)
# Relationships
user = relationship("User", back_populates="sessions")
activities = relationship("SessionActivity", back_populates="session", cascade="all, delete-orphan")
def is_expired(self) -> bool:
"""Check if session is expired"""
return datetime.now(timezone.utc) >= self.expires_at
def is_active(self) -> bool:
"""Check if session is currently active"""
return self.status == SessionStatus.ACTIVE and not self.is_expired()
def extend_session(self, duration: timedelta = timedelta(hours=8)) -> None:
"""Extend session expiration time"""
self.expires_at = datetime.now(timezone.utc) + duration
self.last_activity = datetime.now(timezone.utc)
def revoke_session(self, reason: str = "user_logout") -> None:
"""Revoke the session"""
self.status = SessionStatus.REVOKED
self.revoked_at = datetime.now(timezone.utc)
self.revocation_reason = reason
def lock_session(self, reason: str = "suspicious_activity") -> None:
"""Lock the session for security reasons"""
self.status = SessionStatus.LOCKED
self.locked_at = datetime.now(timezone.utc)
self.revocation_reason = reason
class SessionActivity(BaseModel):
"""
Track user activities within sessions for security analysis
"""
__tablename__ = "session_activities"
id = Column(Integer, primary_key=True, autoincrement=True, index=True)
session_id = Column(Integer, ForeignKey("user_sessions.id"), nullable=False, index=True)
user_id = Column(Integer, ForeignKey("users.id"), nullable=False, index=True)
# Activity details
activity_type = Column(String(50), nullable=False, index=True) # login, logout, api_call, admin_action, etc.
endpoint = Column(String(255), nullable=True) # API endpoint accessed
method = Column(String(10), nullable=True) # HTTP method
status_code = Column(Integer, nullable=True) # Response status
# Request details
ip_address = Column(String(45), nullable=True, index=True)
user_agent = Column(Text, nullable=True)
referer = Column(String(255), nullable=True)
# Security analysis
is_suspicious = Column(Boolean, default=False, nullable=False, index=True)
risk_factors = Column(Text, nullable=True) # JSON string of detected risks
# Timing
timestamp = Column(DateTime(timezone=True), default=lambda: datetime.now(timezone.utc), nullable=False, index=True)
duration_ms = Column(Integer, nullable=True) # Request processing time
# Additional metadata
resource_accessed = Column(String(255), nullable=True) # File, customer, etc.
data_volume = Column(Integer, nullable=True) # Bytes transferred
# Relationships
session = relationship("UserSession", back_populates="activities")
user = relationship("User")
class SessionConfiguration(BaseModel):
"""
Configurable session policies and limits
"""
__tablename__ = "session_configurations"
id = Column(Integer, primary_key=True, autoincrement=True, index=True)
user_id = Column(Integer, ForeignKey("users.id"), nullable=True, index=True) # Null for global config
# Session limits
max_concurrent_sessions = Column(Integer, default=3, nullable=False)
session_timeout_minutes = Column(Integer, default=480, nullable=False) # 8 hours default
idle_timeout_minutes = Column(Integer, default=60, nullable=False) # 1 hour idle
# Security policies
require_session_renewal = Column(Boolean, default=True, nullable=False)
renewal_interval_hours = Column(Integer, default=24, nullable=False)
force_logout_on_ip_change = Column(Boolean, default=False, nullable=False)
suspicious_activity_threshold = Column(Integer, default=5, nullable=False)
# Geographic restrictions
allowed_countries = Column(Text, nullable=True) # JSON array of ISO codes
blocked_countries = Column(Text, nullable=True) # JSON array of ISO codes
# Timestamps
created_at = Column(DateTime(timezone=True), default=lambda: datetime.now(timezone.utc), nullable=False)
updated_at = Column(DateTime(timezone=True), default=lambda: datetime.now(timezone.utc), onupdate=func.now(), nullable=False)
# Relationships
user = relationship("User") # If user-specific config
class SessionSecurityEvent(BaseModel):
"""
Track security events related to sessions
"""
__tablename__ = "session_security_events"
id = Column(Integer, primary_key=True, autoincrement=True, index=True)
session_id = Column(Integer, ForeignKey("user_sessions.id"), nullable=True, index=True)
user_id = Column(Integer, ForeignKey("users.id"), nullable=False, index=True)
# Event details
event_type = Column(String(50), nullable=False, index=True) # session_fixation, concurrent_limit, suspicious_login, etc.
severity = Column(String(20), nullable=False, index=True) # low, medium, high, critical
description = Column(Text, nullable=False)
# Context
ip_address = Column(String(45), nullable=True, index=True)
user_agent = Column(Text, nullable=True)
country = Column(String(5), nullable=True)
# Response actions
action_taken = Column(String(100), nullable=True) # session_locked, user_notified, admin_alerted, etc.
resolved = Column(Boolean, default=False, nullable=False, index=True)
resolved_at = Column(DateTime(timezone=True), nullable=True)
resolved_by = Column(Integer, ForeignKey("users.id"), nullable=True)
# Timestamps
timestamp = Column(DateTime(timezone=True), default=lambda: datetime.now(timezone.utc), nullable=False, index=True)
# Relationships
session = relationship("UserSession")
user = relationship("User", foreign_keys=[user_id])
resolver = relationship("User", foreign_keys=[resolved_by])