""" Enhanced logging configuration for import operations """ import logging import os from datetime import datetime from typing import Optional, Dict, Any class ImportLogger: """Specialized logger for import operations""" def __init__(self, import_id: str, table_name: str): self.import_id = import_id self.table_name = table_name self.logger = logging.getLogger(f"import.{table_name}") # Create logs directory if it doesn't exist log_dir = "logs/imports" os.makedirs(log_dir, exist_ok=True) # Create file handler for this specific import log_file = os.path.join(log_dir, f"{import_id}_{table_name}.log") file_handler = logging.FileHandler(log_file) file_handler.setLevel(logging.DEBUG) # Create formatter formatter = logging.Formatter( '%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) file_handler.setFormatter(formatter) # Add handler to logger self.logger.addHandler(file_handler) self.logger.setLevel(logging.DEBUG) # Track import session details self.session_start = datetime.utcnow() self.row_count = 0 self.error_count = 0 def info(self, message: str, **kwargs): """Log info message with import context""" self._log_with_context("info", message, **kwargs) def warning(self, message: str, **kwargs): """Log warning message with import context""" self._log_with_context("warning", message, **kwargs) def error(self, message: str, **kwargs): """Log error message with import context""" self.error_count += 1 self._log_with_context("error", message, **kwargs) def debug(self, message: str, **kwargs): """Log debug message with import context""" self._log_with_context("debug", message, **kwargs) def _log_with_context(self, level: str, message: str, **kwargs): """Log message with import context""" context = { "import_id": self.import_id, "table": self.table_name, "row_count": self.row_count, **kwargs } context_str = " | ".join([f"{k}={v}" for k, v in context.items()]) full_message = f"[{context_str}] {message}" getattr(self.logger, level)(full_message) def log_row_processed(self, row_number: int, success: bool = True): """Log that a row has been processed""" self.row_count += 1 if success: self.debug(f"Row {row_number} processed successfully") else: self.error(f"Row {row_number} failed to process") def log_validation_error(self, row_number: int, field: str, value: Any, error: str): """Log validation error for specific field""" self.error( f"Validation error on row {row_number}", field=field, value=str(value)[:100], # Truncate long values error=error ) def log_import_summary(self, total_rows: int, imported_rows: int, error_rows: int): """Log final import summary""" duration = datetime.utcnow() - self.session_start self.info( f"Import completed", total_rows=total_rows, imported_rows=imported_rows, error_rows=error_rows, duration_seconds=duration.total_seconds(), success_rate=f"{(imported_rows/total_rows)*100:.1f}%" if total_rows > 0 else "0%" ) def create_import_logger(import_id: str, table_name: str) -> ImportLogger: """Factory function to create import logger""" return ImportLogger(import_id, table_name) class ImportMetrics: """Track import performance metrics""" def __init__(self): self.start_time = datetime.utcnow() self.end_time = None self.total_rows = 0 self.processed_rows = 0 self.error_rows = 0 self.validation_errors = [] self.database_errors = [] def record_row_processed(self, success: bool = True): """Record that a row was processed""" self.processed_rows += 1 if not success: self.error_rows += 1 def record_validation_error(self, row_number: int, error: str): """Record a validation error""" self.validation_errors.append({ "row": row_number, "error": error, "timestamp": datetime.utcnow() }) def record_database_error(self, error: str): """Record a database error""" self.database_errors.append({ "error": error, "timestamp": datetime.utcnow() }) def finalize(self): """Finalize metrics collection""" self.end_time = datetime.utcnow() def get_summary(self) -> Dict[str, Any]: """Get metrics summary""" duration = (self.end_time or datetime.utcnow()) - self.start_time return { "start_time": self.start_time.isoformat(), "end_time": self.end_time.isoformat() if self.end_time else None, "duration_seconds": duration.total_seconds(), "total_rows": self.total_rows, "processed_rows": self.processed_rows, "error_rows": self.error_rows, "success_rate": (self.processed_rows / self.total_rows * 100) if self.total_rows > 0 else 0, "validation_errors": len(self.validation_errors), "database_errors": len(self.database_errors) }