# WebSocket Connection Pooling and Management This document describes the WebSocket connection pooling system implemented in the Delphi Database application. ## Overview The WebSocket pooling system provides: - **Connection Pooling**: Efficient management of multiple concurrent WebSocket connections - **Automatic Cleanup**: Removal of stale and inactive connections - **Resource Management**: Prevention of memory leaks and resource exhaustion - **Health Monitoring**: Connection health checks and heartbeat management - **Topic-Based Broadcasting**: Efficient message distribution to subscriber groups - **Admin Management**: Administrative tools for monitoring and managing connections ## Architecture ### Core Components 1. **WebSocketPool** (`app/services/websocket_pool.py`) - Central connection pool manager - Handles connection lifecycle - Provides broadcasting and cleanup functionality 2. **WebSocketManager** (`app/middleware/websocket_middleware.py`) - High-level interface for WebSocket operations - Handles authentication and message processing - Provides convenient decorators and utilities 3. **Admin API** (`app/api/admin.py`) - Administrative endpoints for monitoring and management - Connection statistics and health metrics - Manual cleanup and broadcasting tools ### Key Features #### Connection Management - **Unique Connection IDs**: Each connection gets a unique identifier - **User Association**: Connections can be associated with authenticated users - **Topic Subscriptions**: Connections can subscribe to multiple topics - **Metadata Storage**: Custom metadata can be attached to connections #### Automatic Cleanup - **Stale Connection Detection**: Identifies inactive connections - **Background Cleanup**: Automatic removal of stale connections - **Failed Message Cleanup**: Removes connections that fail to receive messages - **Configurable Timeouts**: Customizable timeout settings #### Health Monitoring - **Heartbeat System**: Regular health checks via ping/pong - **Connection State Tracking**: Monitors connection lifecycle states - **Error Counting**: Tracks connection errors and failures - **Activity Monitoring**: Tracks last activity timestamps #### Broadcasting System - **Topic-Based**: Efficient message distribution by topic - **User-Based**: Send messages to all connections for a specific user - **Selective Exclusion**: Exclude specific connections from broadcasts - **Message Types**: Structured message format with type classification ## Configuration ### Pool Settings ```python # Initialize WebSocket pool with custom settings await initialize_websocket_pool( cleanup_interval=60, # Cleanup check interval (seconds) connection_timeout=300, # Connection timeout (seconds) heartbeat_interval=30, # Heartbeat interval (seconds) max_connections_per_topic=1000, # Max connections per topic max_total_connections=10000 # Max total connections ) ``` ### Environment Variables The pool respects the following configuration from `app/config.py`: - Database connection settings for user authentication - Logging configuration for structured logging - Security settings for token verification ## Usage Examples ### Basic WebSocket Endpoint ```python from app.middleware.websocket_middleware import websocket_endpoint @router.websocket("/ws/notifications") @websocket_endpoint(topics={"notifications"}, require_auth=True) async def notifications_endpoint(websocket: WebSocket, connection_id: str, manager: WebSocketManager): # Connection is automatically managed # Authentication is handled automatically # Cleanup is handled automatically pass ``` ### Manual Connection Management ```python from app.middleware.websocket_middleware import get_websocket_manager @router.websocket("/ws/custom") async def custom_endpoint(websocket: WebSocket): manager = get_websocket_manager() async def handle_message(connection_id: str, message: WebSocketMessage): if message.type == "chat": await manager.broadcast_to_topic( topic="chat_room", message_type="chat_message", data=message.data ) await manager.handle_connection( websocket=websocket, topics={"chat_room"}, require_auth=True, message_handler=handle_message ) ``` ### Broadcasting Messages ```python from app.middleware.websocket_middleware import get_websocket_manager async def send_notification(user_id: int, message: str): manager = get_websocket_manager() # Send to specific user await manager.send_to_user( user_id=user_id, message_type="notification", data={"message": message} ) async def broadcast_announcement(message: str): manager = get_websocket_manager() # Broadcast to all subscribers of a topic await manager.broadcast_to_topic( topic="announcements", message_type="system_announcement", data={"message": message} ) ``` ## Administrative Features ### WebSocket Statistics ```bash GET /api/admin/websockets/stats ``` Returns comprehensive statistics about the WebSocket pool: - Total and active connections - Message counts (sent/failed) - Topic distribution - Connection states - Cleanup statistics ### Connection Management ```bash # List all connections GET /api/admin/websockets/connections # Filter connections GET /api/admin/websockets/connections?user_id=123&topic=notifications # Get specific connection details GET /api/admin/websockets/connections/{connection_id} # Disconnect connections POST /api/admin/websockets/disconnect { "user_id": 123, // or connection_ids, or topic "reason": "maintenance" } # Manual cleanup POST /api/admin/websockets/cleanup # Broadcast message POST /api/admin/websockets/broadcast { "topic": "announcements", "message_type": "admin_message", "data": {"message": "System maintenance in 5 minutes"} } ``` ## Message Format All WebSocket messages follow a structured format: ```json { "type": "message_type", "topic": "optional_topic", "data": { "key": "value" }, "timestamp": "2023-01-01T12:00:00Z", "error": "optional_error_message" } ``` ### Standard Message Types - `ping`/`pong`: Heartbeat messages - `welcome`: Initial connection message - `subscribe`/`unsubscribe`: Topic subscription management - `data`: General data messages - `error`: Error notifications - `heartbeat`: Automated health checks ## Security ### Authentication - Token-based authentication via query parameters or headers - User session validation against database - Automatic connection termination for invalid credentials ### Authorization - Admin-only access to management endpoints - User-specific connection filtering - Topic-based access control (application-level) ### Resource Protection - Connection limits per topic and total - Automatic cleanup of stale connections - Rate limiting integration (via existing middleware) ## Monitoring and Debugging ### Structured Logging All WebSocket operations are logged with structured data: - Connection lifecycle events - Message broadcasting statistics - Error conditions and cleanup actions - Performance metrics ### Health Checks - Connection state monitoring - Stale connection detection - Message delivery success rates - Resource usage tracking ### Metrics The system provides metrics for: - Active connection count - Message throughput - Error rates - Cleanup efficiency ## Integration with Existing Features ### Billing API Integration The existing billing WebSocket endpoint has been migrated to use the pool: - Topic: `batch_progress_{batch_id}` - Automatic connection management - Improved reliability and resource usage ### Future Integration Opportunities - Real-time search result updates - Document processing notifications - User activity broadcasts - System status updates ## Performance Considerations ### Scalability - Connection pooling reduces resource overhead - Topic-based broadcasting is more efficient than individual sends - Background cleanup prevents resource leaks ### Memory Management - Automatic cleanup of stale connections - Efficient data structures for connection storage - Minimal memory footprint per connection ### Network Efficiency - Heartbeat system prevents connection timeouts - Failed connection detection and cleanup - Structured message format reduces parsing overhead ## Troubleshooting ### Common Issues 1. **Connections not cleaning up** - Check cleanup interval configuration - Verify connection timeout settings - Monitor stale connection detection 2. **Messages not broadcasting** - Verify topic subscription - Check connection state - Review authentication status 3. **High memory usage** - Monitor connection count limits - Check for stale connections - Review cleanup efficiency ### Debug Tools 1. **Admin API endpoints** for real-time monitoring 2. **Structured logs** for detailed operation tracking 3. **Connection metrics** for performance analysis 4. **Health check endpoints** for system status ## Testing Comprehensive test suite covers: - Connection pool functionality - Message broadcasting - Cleanup mechanisms - Health monitoring - Admin API operations - Integration scenarios - Stress testing Run tests with: ```bash pytest tests/test_websocket_pool.py -v pytest tests/test_websocket_admin_api.py -v ``` ## Future Enhancements Potential improvements: - Redis-based connection sharing across multiple application instances - WebSocket cluster support for horizontal scaling - Advanced message routing and filtering - Integration with external message brokers - Enhanced monitoring and alerting ## Examples See `examples/websocket_pool_example.py` for comprehensive usage examples including: - Basic WebSocket endpoints - Custom message handling - Broadcasting services - Connection monitoring - Real-time data streaming