diff --git a/app/api/import_csv.py b/app/api/import_csv.py
new file mode 100644
index 0000000..4158fc1
--- /dev/null
+++ b/app/api/import_csv.py
@@ -0,0 +1,398 @@
+"""
+CSV Import API Endpoints
+"""
+from fastapi import APIRouter, Depends, File, UploadFile, Form, HTTPException, BackgroundTasks
+from fastapi.responses import JSONResponse
+from sqlalchemy.orm import Session
+from typing import List, Optional, Dict, Any
+import logging
+import uuid
+from datetime import datetime
+
+from app.database.base import get_db
+from app.auth.security import get_admin_user
+from app.models.user import User
+from app.import_export.import_service import ImportService, TableType
+from app.core.logging import get_logger
+
+logger = get_logger("import_api")
+router = APIRouter()
+
+# In-memory storage for import progress (could be moved to Redis in production)
+import_progress = {}
+
+
+class ImportStatus:
+ """Track import operation status"""
+ def __init__(self, import_id: str, table_name: str):
+ self.import_id = import_id
+ self.table_name = table_name
+ self.status = "PROCESSING"
+ self.started_at = datetime.utcnow()
+ self.completed_at = None
+ self.result = None
+ self.error = None
+
+
+@router.get("/tables")
+async def get_supported_tables(
+ current_user: User = Depends(get_admin_user),
+ db: Session = Depends(get_db)
+):
+ """Get list of supported tables for import"""
+ try:
+ service = ImportService(db)
+ tables = service.get_supported_tables()
+
+ return {
+ "success": True,
+ "tables": tables,
+ "total": len(tables)
+ }
+ except Exception as e:
+ logger.error(f"Error getting supported tables: {str(e)}")
+ raise HTTPException(status_code=500, detail="Failed to get supported tables")
+
+
+@router.get("/tables/{table_name}/schema")
+async def get_table_schema(
+ table_name: str,
+ current_user: User = Depends(get_admin_user),
+ db: Session = Depends(get_db)
+):
+ """Get schema information for a specific table"""
+ try:
+ service = ImportService(db)
+ schema = service.get_table_schema(table_name)
+
+ if not schema:
+ raise HTTPException(status_code=404, detail=f"Table '{table_name}' not found")
+
+ return {
+ "success": True,
+ "schema": schema
+ }
+ except HTTPException:
+ raise
+ except Exception as e:
+ logger.error(f"Error getting table schema for {table_name}: {str(e)}")
+ raise HTTPException(status_code=500, detail="Failed to get table schema")
+
+
+@router.post("/validate")
+async def validate_csv_headers(
+ table_name: str = Form(...),
+ file: UploadFile = File(...),
+ current_user: User = Depends(get_admin_user),
+ db: Session = Depends(get_db)
+):
+ """Validate CSV headers without importing data"""
+ try:
+ # Read file content
+ content = await file.read()
+ csv_content = content.decode('utf-8')
+
+ service = ImportService(db)
+ result = service.validate_csv_headers(table_name, csv_content)
+
+ return {
+ "success": result.success,
+ "table_name": table_name,
+ "filename": file.filename,
+ "validation_result": result.to_dict()
+ }
+
+ except UnicodeDecodeError:
+ raise HTTPException(status_code=400, detail="Invalid file encoding. Please use UTF-8.")
+ except Exception as e:
+ logger.error(f"Error validating CSV headers: {str(e)}")
+ raise HTTPException(status_code=500, detail="Failed to validate CSV headers")
+
+
+async def process_import_background(
+ import_id: str,
+ table_name: str,
+ csv_content: str,
+ db: Session
+):
+ """Background task to process CSV import"""
+ try:
+ logger.info(f"Starting background import {import_id} for table {table_name}")
+ print(f"[IMPORT] Starting background import {import_id} for table {table_name}")
+
+ service = ImportService(db)
+ result = service.import_csv(table_name, csv_content, import_id=import_id)
+
+ # Update progress
+ if import_id in import_progress:
+ progress = import_progress[import_id]
+ progress.status = "COMPLETED" if result.success else "FAILED"
+ progress.completed_at = datetime.utcnow()
+ progress.result = result
+
+ logger.info(f"Import {import_id} completed with {result.imported_rows} rows imported")
+ print(f"[IMPORT] Import {import_id} completed: success={result.success}, rows={result.imported_rows}")
+
+ except Exception as e:
+ logger.error(f"Background import {import_id} failed: {str(e)}")
+ print(f"[IMPORT] Background import {import_id} failed: {str(e)}")
+ if import_id in import_progress:
+ progress = import_progress[import_id]
+ progress.status = "FAILED"
+ progress.completed_at = datetime.utcnow()
+ progress.error = str(e)
+
+
+@router.post("/csv")
+async def import_csv_file(
+ background_tasks: BackgroundTasks,
+ table_name: str = Form(...),
+ file: UploadFile = File(...),
+ current_user: User = Depends(get_admin_user),
+ db: Session = Depends(get_db)
+):
+ """Import CSV file to specified table"""
+ try:
+ logger.info(f"Received CSV import request: table={table_name}, file={file.filename}, user={current_user.username}")
+ print(f"[IMPORT API] CSV import request: table={table_name}, file={file.filename}")
+
+ # Validate table name
+ if table_name.lower() not in [t.value for t in TableType]:
+ print(f"[IMPORT API] Invalid table name: {table_name}")
+ raise HTTPException(
+ status_code=400,
+ detail=f"Unsupported table: {table_name}"
+ )
+
+ # Validate file type
+ if not file.filename.lower().endswith('.csv'):
+ raise HTTPException(
+ status_code=400,
+ detail="File must be a CSV file"
+ )
+
+ # Read file content
+ content = await file.read()
+ csv_content = content.decode('utf-8')
+
+ if not csv_content.strip():
+ raise HTTPException(status_code=400, detail="File is empty")
+
+ # Generate import ID
+ import_id = str(uuid.uuid4())
+ print(f"[IMPORT API] Generated import ID: {import_id}")
+
+ # Create progress tracker
+ progress = ImportStatus(import_id, table_name)
+ import_progress[import_id] = progress
+
+ # Start background import
+ background_tasks.add_task(
+ process_import_background,
+ import_id,
+ table_name,
+ csv_content,
+ db
+ )
+
+ logger.info(f"Started CSV import {import_id} for table {table_name}")
+ print(f"[IMPORT API] Background task queued for import {import_id}")
+
+ return {
+ "success": True,
+ "import_id": import_id,
+ "table_name": table_name,
+ "filename": file.filename,
+ "status": "PROCESSING",
+ "message": "Import started successfully"
+ }
+
+ except UnicodeDecodeError:
+ raise HTTPException(status_code=400, detail="Invalid file encoding. Please use UTF-8.")
+ except HTTPException:
+ raise
+ except Exception as e:
+ logger.error(f"Error starting CSV import: {str(e)}")
+ raise HTTPException(status_code=500, detail="Failed to start import")
+
+
+@router.get("/status/{import_id}")
+async def get_import_status(
+ import_id: str,
+ current_user: User = Depends(get_admin_user)
+):
+ """Get status of an import operation"""
+ try:
+ if import_id not in import_progress:
+ raise HTTPException(status_code=404, detail="Import not found")
+
+ progress = import_progress[import_id]
+
+ response = {
+ "import_id": import_id,
+ "table_name": progress.table_name,
+ "status": progress.status,
+ "started_at": progress.started_at.isoformat(),
+ "completed_at": progress.completed_at.isoformat() if progress.completed_at else None
+ }
+
+ if progress.result:
+ response["result"] = progress.result.to_dict()
+ elif progress.error:
+ response["error"] = progress.error
+
+ return response
+
+ except HTTPException:
+ raise
+ except Exception as e:
+ logger.error(f"Error getting import status: {str(e)}")
+ raise HTTPException(status_code=500, detail="Failed to get import status")
+
+
+@router.post("/batch")
+async def batch_import_csv(
+ background_tasks: BackgroundTasks,
+ files: List[UploadFile] = File(...),
+ table_names: List[str] = Form(...),
+ current_user: User = Depends(get_admin_user),
+ db: Session = Depends(get_db)
+):
+ """Import multiple CSV files in batch"""
+ try:
+ if len(files) != len(table_names):
+ raise HTTPException(
+ status_code=400,
+ detail="Number of files must match number of table names"
+ )
+
+ imports = []
+ import_ids = []
+
+ for i, (file, table_name) in enumerate(zip(files, table_names)):
+ # Validate table name
+ if table_name.lower() not in [t.value for t in TableType]:
+ raise HTTPException(
+ status_code=400,
+ detail=f"Unsupported table: {table_name}"
+ )
+
+ # Validate file type
+ if not file.filename.lower().endswith('.csv'):
+ raise HTTPException(
+ status_code=400,
+ detail=f"File {file.filename} must be a CSV file"
+ )
+
+ # Read file content
+ content = await file.read()
+ csv_content = content.decode('utf-8')
+
+ if not csv_content.strip():
+ raise HTTPException(
+ status_code=400,
+ detail=f"File {file.filename} is empty"
+ )
+
+ imports.append({
+ "table_name": table_name,
+ "csv_content": csv_content,
+ "filename": file.filename
+ })
+
+ # Generate import ID for tracking
+ import_id = str(uuid.uuid4())
+ import_ids.append(import_id)
+
+ # Create progress tracker
+ progress = ImportStatus(import_id, table_name)
+ import_progress[import_id] = progress
+
+ # Process batch import in background
+ async def process_batch_background():
+ try:
+ service = ImportService(db)
+ results = service.batch_import(imports)
+
+ # Update progress for each import
+ for i, import_id in enumerate(import_ids):
+ if import_id in import_progress:
+ progress = import_progress[import_id]
+ table_name = progress.table_name
+
+ # Find result for this table
+ result = None
+ for key, res in results.items():
+ if key.startswith(table_name):
+ result = res
+ break
+
+ if result:
+ progress.status = "COMPLETED" if result.success else "FAILED"
+ progress.result = result
+ else:
+ progress.status = "FAILED"
+ progress.error = "No result found"
+
+ progress.completed_at = datetime.utcnow()
+
+ except Exception as e:
+ logger.error(f"Batch import failed: {str(e)}")
+ for import_id in import_ids:
+ if import_id in import_progress:
+ progress = import_progress[import_id]
+ progress.status = "FAILED"
+ progress.error = str(e)
+ progress.completed_at = datetime.utcnow()
+
+ background_tasks.add_task(process_batch_background)
+
+ logger.info(f"Started batch import with {len(files)} files")
+
+ return {
+ "success": True,
+ "import_ids": import_ids,
+ "total_files": len(files),
+ "status": "PROCESSING",
+ "message": "Batch import started successfully"
+ }
+
+ except UnicodeDecodeError:
+ raise HTTPException(status_code=400, detail="Invalid file encoding. Please use UTF-8.")
+ except HTTPException:
+ raise
+ except Exception as e:
+ logger.error(f"Error starting batch import: {str(e)}")
+ raise HTTPException(status_code=500, detail="Failed to start batch import")
+
+
+@router.delete("/progress")
+async def cleanup_import_progress(
+ current_user: User = Depends(get_admin_user)
+):
+ """Clean up completed import progress records"""
+ try:
+ completed_count = 0
+ to_remove = []
+
+ for import_id, progress in import_progress.items():
+ if progress.status in ["COMPLETED", "FAILED"]:
+ # Remove progress older than 1 hour
+ if progress.completed_at:
+ age = datetime.utcnow() - progress.completed_at
+ if age.total_seconds() > 3600: # 1 hour
+ to_remove.append(import_id)
+ completed_count += 1
+
+ for import_id in to_remove:
+ del import_progress[import_id]
+
+ return {
+ "success": True,
+ "cleaned_up": completed_count,
+ "remaining": len(import_progress)
+ }
+
+ except Exception as e:
+ logger.error(f"Error cleaning up import progress: {str(e)}")
+ raise HTTPException(status_code=500, detail="Failed to cleanup progress")
\ No newline at end of file
diff --git a/app/import_export/base.py b/app/import_export/base.py
new file mode 100644
index 0000000..7d427a0
--- /dev/null
+++ b/app/import_export/base.py
@@ -0,0 +1,306 @@
+"""
+Base classes for CSV import functionality
+"""
+from abc import ABC, abstractmethod
+from typing import Dict, List, Any, Optional, Tuple
+import csv
+import io
+from datetime import datetime, date
+import logging
+import uuid
+from sqlalchemy.orm import Session
+from sqlalchemy.exc import IntegrityError, SQLAlchemyError
+
+from .logging_config import create_import_logger, ImportMetrics
+
+logger = logging.getLogger(__name__)
+
+
+class ImportResult:
+ """Container for import operation results"""
+
+ def __init__(self):
+ self.success = False
+ self.total_rows = 0
+ self.imported_rows = 0
+ self.skipped_rows = 0
+ self.error_rows = 0
+ self.errors: List[str] = []
+ self.warnings: List[str] = []
+ self.import_id = None
+
+ def add_error(self, error: str):
+ """Add an error message"""
+ self.errors.append(error)
+ self.error_rows += 1
+
+ def add_warning(self, warning: str):
+ """Add a warning message"""
+ self.warnings.append(warning)
+
+ def to_dict(self) -> Dict[str, Any]:
+ """Convert result to dictionary for JSON response"""
+ return {
+ "success": self.success,
+ "total_rows": self.total_rows,
+ "imported_rows": self.imported_rows,
+ "skipped_rows": self.skipped_rows,
+ "error_rows": self.error_rows,
+ "errors": self.errors,
+ "warnings": self.warnings,
+ "import_id": self.import_id
+ }
+
+
+class BaseCSVImporter(ABC):
+ """Abstract base class for all CSV importers"""
+
+ def __init__(self, db_session: Session, import_id: Optional[str] = None):
+ self.db_session = db_session
+ self.result = ImportResult()
+ self.import_id = import_id or str(uuid.uuid4())
+ self.result.import_id = self.import_id
+ self.import_logger = create_import_logger(self.import_id, self.table_name)
+ self.metrics = ImportMetrics()
+
+ @property
+ @abstractmethod
+ def table_name(self) -> str:
+ """Name of the database table being imported to"""
+ pass
+
+ @property
+ @abstractmethod
+ def required_fields(self) -> List[str]:
+ """List of required field names"""
+ pass
+
+ @property
+ @abstractmethod
+ def field_mapping(self) -> Dict[str, str]:
+ """Mapping from CSV headers to database field names"""
+ pass
+
+ @abstractmethod
+ def create_model_instance(self, row_data: Dict[str, Any]) -> Any:
+ """Create a model instance from processed row data"""
+ pass
+
+ def parse_date(self, date_str: str) -> Optional[date]:
+ """Parse date string to date object"""
+ if not date_str or date_str.strip() == "":
+ return None
+
+ date_str = date_str.strip()
+
+ # Try common date formats
+ formats = [
+ "%Y-%m-%d", # ISO format
+ "%m/%d/%Y", # US format
+ "%m/%d/%y", # US format 2-digit year
+ "%d/%m/%Y", # European format
+ "%Y%m%d", # Compact format
+ ]
+
+ for fmt in formats:
+ try:
+ return datetime.strptime(date_str, fmt).date()
+ except ValueError:
+ continue
+
+ raise ValueError(f"Unable to parse date: {date_str}")
+
+ def parse_float(self, value_str: str) -> float:
+ """Parse string to float, handling empty values"""
+ if not value_str or value_str.strip() == "":
+ return 0.0
+ value_str = value_str.strip().replace(",", "") # Remove commas
+ try:
+ return float(value_str)
+ except ValueError:
+ raise ValueError(f"Unable to parse float: {value_str}")
+
+ def parse_int(self, value_str: str) -> int:
+ """Parse string to int, handling empty values"""
+ if not value_str or value_str.strip() == "":
+ return 0
+ value_str = value_str.strip().replace(",", "") # Remove commas
+ try:
+ return int(float(value_str)) # Handle "1.0" format
+ except ValueError:
+ raise ValueError(f"Unable to parse integer: {value_str}")
+
+ def normalize_string(self, value: str, max_length: Optional[int] = None) -> str:
+ """Normalize string value"""
+ if not value:
+ return ""
+ value = str(value).strip()
+ if max_length and len(value) > max_length:
+ self.result.add_warning(f"String truncated from {len(value)} to {max_length} characters: {value[:50]}...")
+ value = value[:max_length]
+ return value
+
+ def detect_delimiter(self, csv_content: str) -> str:
+ """Auto-detect CSV delimiter"""
+ sample = csv_content[:1024] # Check first 1KB
+ sniffer = csv.Sniffer()
+ try:
+ dialect = sniffer.sniff(sample, delimiters=",;\t|")
+ return dialect.delimiter
+ except:
+ return "," # Default to comma
+
+ def validate_headers(self, headers: List[str]) -> bool:
+ """Validate that required headers are present"""
+ missing_required = []
+
+ # Create case-insensitive mapping of headers
+ header_map = {h.lower().strip(): h for h in headers}
+
+ for required_field in self.required_fields:
+ # Check direct match first
+ if required_field in headers:
+ continue
+
+ # Check if there's a mapping for this field
+ mapped_name = self.field_mapping.get(required_field, required_field)
+ if mapped_name.lower() in header_map:
+ continue
+
+ missing_required.append(required_field)
+
+ if missing_required:
+ self.result.add_error(f"Missing required columns: {', '.join(missing_required)}")
+ return False
+
+ return True
+
+ def map_row_data(self, row: Dict[str, str], headers: List[str]) -> Dict[str, Any]:
+ """Map CSV row data to database field names"""
+ mapped_data = {}
+
+ # Create case-insensitive lookup
+ row_lookup = {k.lower().strip(): v for k, v in row.items() if k}
+
+ for db_field, csv_field in self.field_mapping.items():
+ csv_field_lower = csv_field.lower().strip()
+
+ # Try exact match first
+ if csv_field in row:
+ mapped_data[db_field] = row[csv_field]
+ # Try case-insensitive match
+ elif csv_field_lower in row_lookup:
+ mapped_data[db_field] = row_lookup[csv_field_lower]
+ else:
+ mapped_data[db_field] = ""
+
+ return mapped_data
+
+ def process_csv_content(self, csv_content: str, encoding: str = "utf-8") -> ImportResult:
+ """Process CSV content and import data"""
+ self.import_logger.info(f"Starting CSV import for {self.table_name}")
+
+ try:
+ # Detect delimiter
+ delimiter = self.detect_delimiter(csv_content)
+ self.import_logger.debug(f"Detected CSV delimiter: '{delimiter}'")
+
+ # Parse CSV
+ csv_reader = csv.DictReader(
+ io.StringIO(csv_content),
+ delimiter=delimiter
+ )
+
+ headers = csv_reader.fieldnames or []
+ if not headers:
+ error_msg = "No headers found in CSV file"
+ self.result.add_error(error_msg)
+ self.import_logger.error(error_msg)
+ return self.result
+
+ self.import_logger.info(f"Found headers: {headers}")
+
+ # Validate headers
+ if not self.validate_headers(headers):
+ self.import_logger.error("Header validation failed")
+ return self.result
+
+ self.import_logger.info("Header validation passed")
+
+ # Process rows
+ imported_count = 0
+ total_count = 0
+
+ for row_num, row in enumerate(csv_reader, 1):
+ total_count += 1
+ self.metrics.total_rows = total_count
+
+ try:
+ # Map CSV data to database fields
+ mapped_data = self.map_row_data(row, headers)
+
+ # Create model instance
+ model_instance = self.create_model_instance(mapped_data)
+
+ # Add to session
+ self.db_session.add(model_instance)
+ imported_count += 1
+
+ self.import_logger.log_row_processed(row_num, success=True)
+ self.metrics.record_row_processed(success=True)
+
+ except ImportValidationError as e:
+ error_msg = f"Row {row_num}: {str(e)}"
+ self.result.add_error(error_msg)
+ self.import_logger.log_row_processed(row_num, success=False)
+ self.import_logger.log_validation_error(row_num, "validation", row, str(e))
+ self.metrics.record_validation_error(row_num, str(e))
+
+ except Exception as e:
+ error_msg = f"Row {row_num}: Unexpected error - {str(e)}"
+ self.result.add_error(error_msg)
+ self.import_logger.log_row_processed(row_num, success=False)
+ self.import_logger.error(error_msg, row_number=row_num, exception_type=type(e).__name__)
+ self.metrics.record_validation_error(row_num, str(e))
+
+ # Commit transaction
+ try:
+ self.db_session.commit()
+ self.result.success = True
+ self.result.imported_rows = imported_count
+
+ self.import_logger.info(f"Successfully committed {imported_count} rows to database")
+ logger.info(f"Successfully imported {imported_count} rows to {self.table_name}")
+
+ except (IntegrityError, SQLAlchemyError) as e:
+ self.db_session.rollback()
+ error_msg = f"Database error during commit: {str(e)}"
+ self.result.add_error(error_msg)
+ self.import_logger.error(error_msg)
+ self.metrics.record_database_error(str(e))
+ logger.error(f"Database error importing to {self.table_name}: {str(e)}")
+
+ self.result.total_rows = total_count
+ self.metrics.finalize()
+
+ # Log final summary
+ self.import_logger.log_import_summary(
+ total_count,
+ imported_count,
+ self.result.error_rows
+ )
+
+ except Exception as e:
+ self.db_session.rollback()
+ error_msg = f"Failed to process CSV: {str(e)}"
+ self.result.add_error(error_msg)
+ self.import_logger.error(error_msg, exception_type=type(e).__name__)
+ self.metrics.record_database_error(str(e))
+ logger.error(f"CSV processing error for {self.table_name}: {str(e)}")
+
+ return self.result
+
+
+class ImportValidationError(Exception):
+ """Exception raised for validation errors during import"""
+ pass
\ No newline at end of file
diff --git a/app/import_export/files_importer.py b/app/import_export/files_importer.py
new file mode 100644
index 0000000..2ebb5c9
--- /dev/null
+++ b/app/import_export/files_importer.py
@@ -0,0 +1,144 @@
+"""
+FILES CSV Importer
+"""
+from typing import Dict, List, Any
+from datetime import date
+from sqlalchemy.orm import Session
+
+from .base import BaseCSVImporter, ImportValidationError
+from app.models.files import File
+from app.models.rolodex import Rolodex
+
+
+class FilesCSVImporter(BaseCSVImporter):
+ """CSV importer for FILES table"""
+
+ @property
+ def table_name(self) -> str:
+ return "files"
+
+ @property
+ def required_fields(self) -> List[str]:
+ return ["file_no", "id", "empl_num", "file_type", "opened", "status", "rate_per_hour"]
+
+ @property
+ def field_mapping(self) -> Dict[str, str]:
+ """Map CSV headers to database field names"""
+ return {
+ "file_no": "file_no",
+ "id": "id",
+ "regarding": "regarding",
+ "empl_num": "empl_num",
+ "file_type": "file_type",
+ "opened": "opened",
+ "closed": "closed",
+ "status": "status",
+ "footer_code": "footer_code",
+ "opposing": "opposing",
+ "rate_per_hour": "rate_per_hour",
+ # Financial balance fields (previously billed)
+ "trust_bal_p": "trust_bal_p",
+ "hours_p": "hours_p",
+ "hourly_fees_p": "hourly_fees_p",
+ "flat_fees_p": "flat_fees_p",
+ "disbursements_p": "disbursements_p",
+ "credit_bal_p": "credit_bal_p",
+ "total_charges_p": "total_charges_p",
+ "amount_owing_p": "amount_owing_p",
+ # Financial balance fields (current totals)
+ "trust_bal": "trust_bal",
+ "hours": "hours",
+ "hourly_fees": "hourly_fees",
+ "flat_fees": "flat_fees",
+ "disbursements": "disbursements",
+ "credit_bal": "credit_bal",
+ "total_charges": "total_charges",
+ "amount_owing": "amount_owing",
+ "transferable": "transferable",
+ "memo": "memo"
+ }
+
+ def create_model_instance(self, row_data: Dict[str, Any]) -> File:
+ """Create a Files instance from processed row data"""
+
+ # Validate required fields
+ required_checks = [
+ ("file_no", "File number"),
+ ("id", "Rolodex ID"),
+ ("empl_num", "Employee number"),
+ ("file_type", "File type"),
+ ("opened", "Opened date"),
+ ("status", "Status"),
+ ("rate_per_hour", "Rate per hour")
+ ]
+
+ for field, display_name in required_checks:
+ if not row_data.get(field):
+ raise ImportValidationError(f"{display_name} is required")
+
+ # Check for duplicate file number
+ existing = self.db_session.query(File).filter_by(file_no=row_data["file_no"]).first()
+ if existing:
+ raise ImportValidationError(f"File number '{row_data['file_no']}' already exists")
+
+ # Validate foreign key exists (rolodex ID)
+ rolodex_exists = self.db_session.query(Rolodex).filter_by(id=row_data["id"]).first()
+ if not rolodex_exists:
+ raise ImportValidationError(f"Rolodex ID '{row_data['id']}' does not exist")
+
+ # Parse dates
+ opened_date = None
+ closed_date = None
+
+ try:
+ opened_date = self.parse_date(row_data["opened"])
+ except ValueError as e:
+ raise ImportValidationError(f"Invalid opened date: {e}")
+
+ if row_data.get("closed"):
+ try:
+ closed_date = self.parse_date(row_data["closed"])
+ except ValueError as e:
+ raise ImportValidationError(f"Invalid closed date: {e}")
+
+ # Parse financial fields
+ try:
+ rate_per_hour = self.parse_float(row_data["rate_per_hour"])
+ if rate_per_hour < 0:
+ raise ImportValidationError("Rate per hour cannot be negative")
+ except ValueError as e:
+ raise ImportValidationError(f"Invalid rate per hour: {e}")
+
+ # Parse all financial balance fields
+ financial_fields = [
+ "trust_bal_p", "hours_p", "hourly_fees_p", "flat_fees_p",
+ "disbursements_p", "credit_bal_p", "total_charges_p", "amount_owing_p",
+ "trust_bal", "hours", "hourly_fees", "flat_fees",
+ "disbursements", "credit_bal", "total_charges", "amount_owing", "transferable"
+ ]
+
+ financial_data = {}
+ for field in financial_fields:
+ try:
+ financial_data[field] = self.parse_float(row_data.get(field, "0"))
+ except ValueError as e:
+ raise ImportValidationError(f"Invalid {field}: {e}")
+
+ # Create instance
+ files = File(
+ file_no=self.normalize_string(row_data["file_no"], 45),
+ id=self.normalize_string(row_data["id"], 80),
+ regarding=row_data.get("regarding", ""), # Text field
+ empl_num=self.normalize_string(row_data["empl_num"], 10),
+ file_type=self.normalize_string(row_data["file_type"], 45),
+ opened=opened_date,
+ closed=closed_date,
+ status=self.normalize_string(row_data["status"], 45),
+ footer_code=self.normalize_string(row_data.get("footer_code", ""), 45),
+ opposing=self.normalize_string(row_data.get("opposing", ""), 80),
+ rate_per_hour=rate_per_hour,
+ memo=row_data.get("memo", ""), # Text field
+ **financial_data # Unpack all financial fields
+ )
+
+ return files
\ No newline at end of file
diff --git a/app/import_export/import_service.py b/app/import_export/import_service.py
new file mode 100644
index 0000000..e7d200a
--- /dev/null
+++ b/app/import_export/import_service.py
@@ -0,0 +1,206 @@
+"""
+Main Import Service - coordinates all CSV importers
+"""
+from typing import Dict, List, Any, Optional, Union
+import logging
+from enum import Enum
+from sqlalchemy.orm import Session
+
+from .base import ImportResult
+from .rolodex_importer import RolodexCSVImporter
+from .phone_importer import PhoneCSVImporter
+from .files_importer import FilesCSVImporter
+from .ledger_importer import LedgerCSVImporter
+from .qdros_importer import QdrosCSVImporter
+
+logger = logging.getLogger(__name__)
+
+
+class TableType(Enum):
+ """Supported table types for import"""
+ ROLODEX = "rolodex"
+ PHONE = "phone"
+ FILES = "files"
+ LEDGER = "ledger"
+ QDROS = "qdros"
+
+
+class ImportService:
+ """Main service for handling CSV imports"""
+
+ def __init__(self, db_session: Session):
+ self.db_session = db_session
+ self._importers = {
+ TableType.ROLODEX: RolodexCSVImporter,
+ TableType.PHONE: PhoneCSVImporter,
+ TableType.FILES: FilesCSVImporter,
+ TableType.LEDGER: LedgerCSVImporter,
+ TableType.QDROS: QdrosCSVImporter
+ }
+
+ def get_supported_tables(self) -> List[str]:
+ """Get list of supported table names"""
+ return [table.value for table in TableType]
+
+ def get_table_schema(self, table_name: str) -> Optional[Dict[str, Any]]:
+ """Get schema information for a table"""
+ try:
+ table_type = TableType(table_name.lower())
+ importer_class = self._importers[table_type]
+ temp_importer = importer_class(self.db_session, "temp_schema_check")
+
+ return {
+ "table_name": temp_importer.table_name,
+ "required_fields": temp_importer.required_fields,
+ "field_mapping": temp_importer.field_mapping,
+ "sample_headers": list(temp_importer.field_mapping.keys())
+ }
+ except (ValueError, KeyError):
+ return None
+
+ def import_csv(
+ self,
+ table_name: str,
+ csv_content: str,
+ encoding: str = "utf-8",
+ import_id: Optional[str] = None
+ ) -> ImportResult:
+ """Import CSV data to specified table"""
+
+ try:
+ # Validate table name
+ table_type = TableType(table_name.lower())
+ except ValueError:
+ result = ImportResult()
+ result.add_error(f"Unsupported table: {table_name}")
+ return result
+
+ # Get appropriate importer
+ importer_class = self._importers[table_type]
+ importer = importer_class(self.db_session, import_id)
+
+ logger.info(f"Starting CSV import for table: {table_name} (import_id: {importer.import_id})")
+
+ try:
+ # Process the CSV
+ result = importer.process_csv_content(csv_content, encoding)
+
+ if result.success:
+ logger.info(
+ f"Successfully imported {result.imported_rows} rows to {table_name}"
+ )
+ else:
+ logger.warning(
+ f"Import failed for {table_name}: {len(result.errors)} errors"
+ )
+
+ return result
+
+ except Exception as e:
+ logger.error(f"Unexpected error during import to {table_name}: {str(e)}")
+ result = ImportResult()
+ result.add_error(f"Unexpected error: {str(e)}")
+ return result
+
+ def batch_import(
+ self,
+ imports: List[Dict[str, Any]]
+ ) -> Dict[str, ImportResult]:
+ """
+ Import multiple CSV files in a batch
+
+ Args:
+ imports: List of dicts with keys: table_name, csv_content, encoding
+
+ Returns:
+ Dict mapping table names to ImportResult objects
+ """
+ results = {}
+
+ # Recommended import order (dependencies first)
+ import_order = [
+ TableType.ROLODEX, # No dependencies
+ TableType.PHONE, # Depends on ROLODEX
+ TableType.FILES, # Depends on ROLODEX
+ TableType.LEDGER, # Depends on FILES
+ TableType.QDROS # Depends on FILES
+ ]
+
+ # Group imports by table type
+ imports_by_table = {}
+ for import_data in imports:
+ table_name = import_data["table_name"].lower()
+ if table_name not in imports_by_table:
+ imports_by_table[table_name] = []
+ imports_by_table[table_name].append(import_data)
+
+ # Process in dependency order
+ for table_type in import_order:
+ table_name = table_type.value
+ if table_name in imports_by_table:
+ table_imports = imports_by_table[table_name]
+
+ for import_data in table_imports:
+ result = self.import_csv(
+ table_name,
+ import_data["csv_content"],
+ import_data.get("encoding", "utf-8")
+ )
+
+ # Use a unique key if multiple imports for same table
+ key = table_name
+ counter = 1
+ while key in results:
+ counter += 1
+ key = f"{table_name}_{counter}"
+
+ results[key] = result
+
+ # Stop processing if critical import fails
+ if not result.success and table_type in [TableType.ROLODEX, TableType.FILES]:
+ logger.error(f"Critical import failed for {table_name}, stopping batch")
+ break
+
+ return results
+
+ def validate_csv_headers(self, table_name: str, csv_content: str) -> ImportResult:
+ """Validate CSV headers without importing data"""
+ try:
+ table_type = TableType(table_name.lower())
+ except ValueError:
+ result = ImportResult()
+ result.add_error(f"Unsupported table: {table_name}")
+ return result
+
+ # Get appropriate importer
+ importer_class = self._importers[table_type]
+ importer = importer_class(self.db_session, "validation_check")
+
+ # Parse headers only
+ import csv
+ import io
+
+ try:
+ delimiter = importer.detect_delimiter(csv_content)
+ csv_reader = csv.DictReader(io.StringIO(csv_content), delimiter=delimiter)
+ headers = csv_reader.fieldnames or []
+
+ if not headers:
+ result = ImportResult()
+ result.add_error("No headers found in CSV file")
+ return result
+
+ # Validate headers
+ result = ImportResult()
+ is_valid = importer.validate_headers(headers)
+ result.success = is_valid
+
+ if is_valid:
+ result.add_warning(f"Headers validated successfully for {table_name}")
+
+ return result
+
+ except Exception as e:
+ result = ImportResult()
+ result.add_error(f"Error validating headers: {str(e)}")
+ return result
\ No newline at end of file
diff --git a/app/import_export/ledger_importer.py b/app/import_export/ledger_importer.py
new file mode 100644
index 0000000..5ffcf46
--- /dev/null
+++ b/app/import_export/ledger_importer.py
@@ -0,0 +1,113 @@
+"""
+LEDGER CSV Importer
+"""
+from typing import Dict, List, Any
+from datetime import date
+from sqlalchemy.orm import Session
+
+from .base import BaseCSVImporter, ImportValidationError
+from app.models.ledger import Ledger
+from app.models.files import File
+
+
+class LedgerCSVImporter(BaseCSVImporter):
+ """CSV importer for LEDGER table"""
+
+ @property
+ def table_name(self) -> str:
+ return "ledger"
+
+ @property
+ def required_fields(self) -> List[str]:
+ return ["file_no", "date", "t_code", "t_type", "empl_num", "amount"]
+
+ @property
+ def field_mapping(self) -> Dict[str, str]:
+ """Map CSV headers to database field names"""
+ return {
+ "file_no": "file_no",
+ "item_no": "item_no",
+ "date": "date",
+ "t_code": "t_code",
+ "t_type": "t_type",
+ "t_type_l": "t_type_l",
+ "empl_num": "empl_num",
+ "quantity": "quantity",
+ "rate": "rate",
+ "amount": "amount",
+ "billed": "billed",
+ "note": "note"
+ }
+
+ def create_model_instance(self, row_data: Dict[str, Any]) -> Ledger:
+ """Create a Ledger instance from processed row data"""
+
+ # Validate required fields
+ required_checks = [
+ ("file_no", "File number"),
+ ("date", "Date"),
+ ("t_code", "Transaction code"),
+ ("t_type", "Transaction type"),
+ ("empl_num", "Employee number"),
+ ("amount", "Amount")
+ ]
+
+ for field, display_name in required_checks:
+ if not row_data.get(field):
+ raise ImportValidationError(f"{display_name} is required")
+
+ # Validate foreign key exists (file number)
+ file_exists = self.db_session.query(File).filter_by(file_no=row_data["file_no"]).first()
+ if not file_exists:
+ raise ImportValidationError(f"File number '{row_data['file_no']}' does not exist")
+
+ # Parse date
+ try:
+ transaction_date = self.parse_date(row_data["date"])
+ except ValueError as e:
+ raise ImportValidationError(f"Invalid date: {e}")
+
+ # Parse numeric fields
+ try:
+ item_no = 1 # Default
+ if row_data.get("item_no"):
+ item_no = self.parse_int(row_data["item_no"])
+ if item_no < 1:
+ raise ImportValidationError("Item number must be positive")
+ except ValueError as e:
+ raise ImportValidationError(f"Invalid item number: {e}")
+
+ try:
+ quantity = self.parse_float(row_data.get("quantity", "0"))
+ rate = self.parse_float(row_data.get("rate", "0"))
+ amount = self.parse_float(row_data["amount"])
+ except ValueError as e:
+ raise ImportValidationError(f"Invalid numeric value: {e}")
+
+ # Validate transaction code and type
+ t_code = self.normalize_string(row_data["t_code"], 10)
+ t_type = self.normalize_string(row_data["t_type"], 1)
+ t_type_l = self.normalize_string(row_data.get("t_type_l", ""), 1)
+
+ # Validate billed field (Y/N)
+ billed = row_data.get("billed", "N").strip().upper()
+ if billed not in ["Y", "N", ""]:
+ billed = "N" # Default to N if invalid
+
+ # Create instance
+ ledger = Ledger(
+ file_no=self.normalize_string(row_data["file_no"], 45),
+ item_no=item_no,
+ date=transaction_date,
+ t_code=t_code,
+ t_type=t_type,
+ t_type_l=t_type_l,
+ empl_num=self.normalize_string(row_data["empl_num"], 10),
+ quantity=quantity,
+ rate=rate,
+ amount=amount,
+ billed=billed,
+ note=row_data.get("note", "") # Text field
+ )
+
+ return ledger
\ No newline at end of file
diff --git a/app/import_export/logging_config.py b/app/import_export/logging_config.py
new file mode 100644
index 0000000..d45c268
--- /dev/null
+++ b/app/import_export/logging_config.py
@@ -0,0 +1,160 @@
+"""
+Enhanced logging configuration for import operations
+"""
+import logging
+import os
+from datetime import datetime
+from typing import Optional, Dict, Any
+
+
+class ImportLogger:
+ """Specialized logger for import operations"""
+
+ def __init__(self, import_id: str, table_name: str):
+ self.import_id = import_id
+ self.table_name = table_name
+ self.logger = logging.getLogger(f"import.{table_name}")
+
+ # Create logs directory if it doesn't exist
+ log_dir = "logs/imports"
+ os.makedirs(log_dir, exist_ok=True)
+
+ # Create file handler for this specific import
+ log_file = os.path.join(log_dir, f"{import_id}_{table_name}.log")
+ file_handler = logging.FileHandler(log_file)
+ file_handler.setLevel(logging.DEBUG)
+
+ # Create formatter
+ formatter = logging.Formatter(
+ '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
+ )
+ file_handler.setFormatter(formatter)
+
+ # Add handler to logger
+ self.logger.addHandler(file_handler)
+ self.logger.setLevel(logging.DEBUG)
+
+ # Track import session details
+ self.session_start = datetime.utcnow()
+ self.row_count = 0
+ self.error_count = 0
+
+ def info(self, message: str, **kwargs):
+ """Log info message with import context"""
+ self._log_with_context("info", message, **kwargs)
+
+ def warning(self, message: str, **kwargs):
+ """Log warning message with import context"""
+ self._log_with_context("warning", message, **kwargs)
+
+ def error(self, message: str, **kwargs):
+ """Log error message with import context"""
+ self.error_count += 1
+ self._log_with_context("error", message, **kwargs)
+
+ def debug(self, message: str, **kwargs):
+ """Log debug message with import context"""
+ self._log_with_context("debug", message, **kwargs)
+
+ def _log_with_context(self, level: str, message: str, **kwargs):
+ """Log message with import context"""
+ context = {
+ "import_id": self.import_id,
+ "table": self.table_name,
+ "row_count": self.row_count,
+ **kwargs
+ }
+
+ context_str = " | ".join([f"{k}={v}" for k, v in context.items()])
+ full_message = f"[{context_str}] {message}"
+
+ getattr(self.logger, level)(full_message)
+
+ def log_row_processed(self, row_number: int, success: bool = True):
+ """Log that a row has been processed"""
+ self.row_count += 1
+ if success:
+ self.debug(f"Row {row_number} processed successfully")
+ else:
+ self.error(f"Row {row_number} failed to process")
+
+ def log_validation_error(self, row_number: int, field: str, value: Any, error: str):
+ """Log validation error for specific field"""
+ self.error(
+ f"Validation error on row {row_number}",
+ field=field,
+ value=str(value)[:100], # Truncate long values
+ error=error
+ )
+
+ def log_import_summary(self, total_rows: int, imported_rows: int, error_rows: int):
+ """Log final import summary"""
+ duration = datetime.utcnow() - self.session_start
+
+ self.info(
+ f"Import completed",
+ total_rows=total_rows,
+ imported_rows=imported_rows,
+ error_rows=error_rows,
+ duration_seconds=duration.total_seconds(),
+ success_rate=f"{(imported_rows/total_rows)*100:.1f}%" if total_rows > 0 else "0%"
+ )
+
+
+def create_import_logger(import_id: str, table_name: str) -> ImportLogger:
+ """Factory function to create import logger"""
+ return ImportLogger(import_id, table_name)
+
+
+class ImportMetrics:
+ """Track import performance metrics"""
+
+ def __init__(self):
+ self.start_time = datetime.utcnow()
+ self.end_time = None
+ self.total_rows = 0
+ self.processed_rows = 0
+ self.error_rows = 0
+ self.validation_errors = []
+ self.database_errors = []
+
+ def record_row_processed(self, success: bool = True):
+ """Record that a row was processed"""
+ self.processed_rows += 1
+ if not success:
+ self.error_rows += 1
+
+ def record_validation_error(self, row_number: int, error: str):
+ """Record a validation error"""
+ self.validation_errors.append({
+ "row": row_number,
+ "error": error,
+ "timestamp": datetime.utcnow()
+ })
+
+ def record_database_error(self, error: str):
+ """Record a database error"""
+ self.database_errors.append({
+ "error": error,
+ "timestamp": datetime.utcnow()
+ })
+
+ def finalize(self):
+ """Finalize metrics collection"""
+ self.end_time = datetime.utcnow()
+
+ def get_summary(self) -> Dict[str, Any]:
+ """Get metrics summary"""
+ duration = (self.end_time or datetime.utcnow()) - self.start_time
+
+ return {
+ "start_time": self.start_time.isoformat(),
+ "end_time": self.end_time.isoformat() if self.end_time else None,
+ "duration_seconds": duration.total_seconds(),
+ "total_rows": self.total_rows,
+ "processed_rows": self.processed_rows,
+ "error_rows": self.error_rows,
+ "success_rate": (self.processed_rows / self.total_rows * 100) if self.total_rows > 0 else 0,
+ "validation_errors": len(self.validation_errors),
+ "database_errors": len(self.database_errors)
+ }
\ No newline at end of file
diff --git a/app/import_export/phone_importer.py b/app/import_export/phone_importer.py
new file mode 100644
index 0000000..03dadb0
--- /dev/null
+++ b/app/import_export/phone_importer.py
@@ -0,0 +1,54 @@
+"""
+PHONE CSV Importer
+"""
+from typing import Dict, List, Any
+from sqlalchemy.orm import Session
+
+from .base import BaseCSVImporter, ImportValidationError
+from app.models.rolodex import Phone, Rolodex
+
+
+class PhoneCSVImporter(BaseCSVImporter):
+ """CSV importer for PHONE table"""
+
+ @property
+ def table_name(self) -> str:
+ return "phone"
+
+ @property
+ def required_fields(self) -> List[str]:
+ return ["rolodex_id", "phone"] # rolodex_id and phone number are required
+
+ @property
+ def field_mapping(self) -> Dict[str, str]:
+ """Map CSV headers to database field names"""
+ return {
+ "rolodex_id": "rolodex_id",
+ "location": "location",
+ "phone": "phone"
+ }
+
+ def create_model_instance(self, row_data: Dict[str, Any]) -> Phone:
+ """Create a Phone instance from processed row data"""
+
+ # Validate required fields
+ if not row_data.get("rolodex_id"):
+ raise ImportValidationError("Rolodex ID is required")
+ if not row_data.get("phone"):
+ raise ImportValidationError("Phone number is required")
+
+ # Validate foreign key exists
+ rolodex_exists = self.db_session.query(Rolodex).filter_by(
+ id=row_data["rolodex_id"]
+ ).first()
+ if not rolodex_exists:
+ raise ImportValidationError(f"Rolodex ID '{row_data['rolodex_id']}' does not exist")
+
+ # Create instance with field length validation
+ phone = Phone(
+ rolodex_id=self.normalize_string(row_data["rolodex_id"], 80),
+ location=self.normalize_string(row_data.get("location", ""), 45),
+ phone=self.normalize_string(row_data["phone"], 45)
+ )
+
+ return phone
\ No newline at end of file
diff --git a/app/import_export/qdros_importer.py b/app/import_export/qdros_importer.py
new file mode 100644
index 0000000..0d78111
--- /dev/null
+++ b/app/import_export/qdros_importer.py
@@ -0,0 +1,137 @@
+"""
+QDROS CSV Importer
+"""
+from typing import Dict, List, Any
+from datetime import date
+from sqlalchemy.orm import Session
+
+from .base import BaseCSVImporter, ImportValidationError
+from app.models.qdro import QDRO
+from app.models.files import File
+
+
+class QdrosCSVImporter(BaseCSVImporter):
+ """CSV importer for QDROS table"""
+
+ @property
+ def table_name(self) -> str:
+ return "qdros"
+
+ @property
+ def required_fields(self) -> List[str]:
+ return ["file_no"] # Only file_no is strictly required
+
+ @property
+ def field_mapping(self) -> Dict[str, str]:
+ """Map CSV headers to database field names"""
+ return {
+ "file_no": "file_no",
+ "version": "version",
+ "plan_id": "plan_id",
+ # Legacy CSV fields
+ "field1": "field1",
+ "field2": "field2",
+ "part": "part",
+ "altp": "altp",
+ "pet": "pet",
+ "res": "res",
+ # Case information
+ "case_type": "case_type",
+ "case_code": "case_code",
+ "section": "section",
+ "case_number": "case_number",
+ # Dates
+ "judgment_date": "judgment_date",
+ "valuation_date": "valuation_date",
+ "married_on": "married_on",
+ # Award and venue
+ "percent_awarded": "percent_awarded",
+ "ven_city": "ven_city",
+ "ven_cnty": "ven_cnty",
+ "ven_st": "ven_st",
+ # Document status dates
+ "draft_out": "draft_out",
+ "draft_apr": "draft_apr",
+ "final_out": "final_out",
+ # Additional fields
+ "judge": "judge",
+ "form_name": "form_name",
+ "status": "status",
+ "content": "content",
+ "notes": "notes",
+ "approval_status": "approval_status",
+ "approved_date": "approved_date",
+ "filed_date": "filed_date"
+ }
+
+ def create_model_instance(self, row_data: Dict[str, Any]) -> QDRO:
+ """Create a Qdro instance from processed row data"""
+
+ # Validate required fields
+ if not row_data.get("file_no"):
+ raise ImportValidationError("File number is required")
+
+ # Validate foreign key exists (file number)
+ file_exists = self.db_session.query(File).filter_by(file_no=row_data["file_no"]).first()
+ if not file_exists:
+ raise ImportValidationError(f"File number '{row_data['file_no']}' does not exist")
+
+ # Parse date fields
+ date_fields = [
+ "judgment_date", "valuation_date", "married_on",
+ "draft_out", "draft_apr", "final_out", "approved_date", "filed_date"
+ ]
+
+ parsed_dates = {}
+ for field in date_fields:
+ if row_data.get(field):
+ try:
+ parsed_dates[field] = self.parse_date(row_data[field])
+ except ValueError as e:
+ raise ImportValidationError(f"Invalid {field}: {e}")
+ else:
+ parsed_dates[field] = None
+
+ # Validate state abbreviation length
+ ven_st = row_data.get("ven_st", "")
+ if ven_st and len(ven_st) > 2:
+ self.result.add_warning(f"State abbreviation truncated: {ven_st}")
+ ven_st = ven_st[:2]
+
+ # Set default status if not provided
+ status = row_data.get("status", "DRAFT")
+
+ # Create instance
+ qdro = QDRO(
+ file_no=self.normalize_string(row_data["file_no"], 45),
+ version=self.normalize_string(row_data.get("version", "01"), 10),
+ plan_id=self.normalize_string(row_data.get("plan_id", ""), 45),
+ # Legacy CSV fields
+ field1=self.normalize_string(row_data.get("field1", ""), 100),
+ field2=self.normalize_string(row_data.get("field2", ""), 100),
+ part=self.normalize_string(row_data.get("part", ""), 100),
+ altp=self.normalize_string(row_data.get("altp", ""), 100),
+ pet=self.normalize_string(row_data.get("pet", ""), 100),
+ res=self.normalize_string(row_data.get("res", ""), 100),
+ # Case information
+ case_type=self.normalize_string(row_data.get("case_type", ""), 45),
+ case_code=self.normalize_string(row_data.get("case_code", ""), 45),
+ section=self.normalize_string(row_data.get("section", ""), 45),
+ case_number=self.normalize_string(row_data.get("case_number", ""), 100),
+ # Dates
+ **parsed_dates,
+ # Award and venue
+ percent_awarded=self.normalize_string(row_data.get("percent_awarded", ""), 100),
+ ven_city=self.normalize_string(row_data.get("ven_city", ""), 50),
+ ven_cnty=self.normalize_string(row_data.get("ven_cnty", ""), 50),
+ ven_st=ven_st,
+ # Additional fields
+ judge=self.normalize_string(row_data.get("judge", ""), 100),
+ form_name=self.normalize_string(row_data.get("form_name", ""), 200),
+ status=self.normalize_string(status, 45),
+ content=row_data.get("content", ""), # Text field
+ notes=row_data.get("notes", ""), # Text field
+ approval_status=self.normalize_string(row_data.get("approval_status", ""), 45)
+ )
+
+ return qdro
\ No newline at end of file
diff --git a/app/import_export/rolodex_importer.py b/app/import_export/rolodex_importer.py
new file mode 100644
index 0000000..59fd6ab
--- /dev/null
+++ b/app/import_export/rolodex_importer.py
@@ -0,0 +1,93 @@
+"""
+ROLODEX CSV Importer
+"""
+from typing import Dict, List, Any
+from datetime import date
+from sqlalchemy.orm import Session
+
+from .base import BaseCSVImporter, ImportValidationError
+from app.models.rolodex import Rolodex
+
+
+class RolodexCSVImporter(BaseCSVImporter):
+ """CSV importer for ROLODEX table"""
+
+ @property
+ def table_name(self) -> str:
+ return "rolodex"
+
+ @property
+ def required_fields(self) -> List[str]:
+ return ["id", "last"] # Only ID and last name are required
+
+ @property
+ def field_mapping(self) -> Dict[str, str]:
+ """Map CSV headers to database field names"""
+ return {
+ "id": "id",
+ "last": "last",
+ "first": "first",
+ "middle": "middle",
+ "prefix": "prefix",
+ "suffix": "suffix",
+ "title": "title",
+ "group": "group",
+ "a1": "a1",
+ "a2": "a2",
+ "a3": "a3",
+ "city": "city",
+ "abrev": "abrev",
+ "zip": "zip",
+ "email": "email",
+ "dob": "dob",
+ "ss_number": "ss_number",
+ "legal_status": "legal_status",
+ "memo": "memo"
+ }
+
+ def create_model_instance(self, row_data: Dict[str, Any]) -> Rolodex:
+ """Create a Rolodex instance from processed row data"""
+
+ # Validate required fields
+ if not row_data.get("id"):
+ raise ImportValidationError("ID is required")
+ if not row_data.get("last"):
+ raise ImportValidationError("Last name is required")
+
+ # Check for duplicate ID
+ existing = self.db_session.query(Rolodex).filter_by(id=row_data["id"]).first()
+ if existing:
+ raise ImportValidationError(f"Rolodex ID '{row_data['id']}' already exists")
+
+ # Parse date of birth
+ dob = None
+ if row_data.get("dob"):
+ try:
+ dob = self.parse_date(row_data["dob"])
+ except ValueError as e:
+ raise ImportValidationError(f"Invalid date of birth: {e}")
+
+ # Create instance with field length validation
+ rolodex = Rolodex(
+ id=self.normalize_string(row_data["id"], 80),
+ last=self.normalize_string(row_data["last"], 80),
+ first=self.normalize_string(row_data.get("first", ""), 45),
+ middle=self.normalize_string(row_data.get("middle", ""), 45),
+ prefix=self.normalize_string(row_data.get("prefix", ""), 45),
+ suffix=self.normalize_string(row_data.get("suffix", ""), 45),
+ title=self.normalize_string(row_data.get("title", ""), 45),
+ group=self.normalize_string(row_data.get("group", ""), 45),
+ a1=self.normalize_string(row_data.get("a1", ""), 45),
+ a2=self.normalize_string(row_data.get("a2", ""), 45),
+ a3=self.normalize_string(row_data.get("a3", ""), 45),
+ city=self.normalize_string(row_data.get("city", ""), 80),
+ abrev=self.normalize_string(row_data.get("abrev", ""), 45),
+ zip=self.normalize_string(row_data.get("zip", ""), 45),
+ email=self.normalize_string(row_data.get("email", ""), 100),
+ dob=dob,
+ ss_number=self.normalize_string(row_data.get("ss_number", ""), 20),
+ legal_status=self.normalize_string(row_data.get("legal_status", ""), 45),
+ memo=row_data.get("memo", "") # Text field, no length limit
+ )
+
+ return rolodex
\ No newline at end of file
diff --git a/app/main.py b/app/main.py
index 766986e..1099282 100644
--- a/app/main.py
+++ b/app/main.py
@@ -175,6 +175,7 @@ from app.api.document_workflows import router as document_workflows_router
from app.api.session_management import router as session_management_router
from app.api.advanced_templates import router as advanced_templates_router
from app.api.jobs import router as jobs_router
+from app.api.import_csv import router as import_csv_router
logger.info("Including API routers")
app.include_router(advanced_variables_router, prefix="/api/variables", tags=["advanced-variables"])
@@ -201,6 +202,7 @@ app.include_router(deadlines_router, prefix="/api/deadlines", tags=["deadlines"]
app.include_router(document_workflows_router, prefix="/api/workflows", tags=["document-workflows"])
app.include_router(labels_router, prefix="/api/labels", tags=["labels"])
app.include_router(jobs_router, prefix="/api/jobs", tags=["jobs"])
+app.include_router(import_csv_router, prefix="/api/admin/import", tags=["import"])
@app.get("/", response_class=HTMLResponse)
@@ -284,6 +286,15 @@ async def admin_page(request: Request):
)
+@app.get("/admin/import", response_class=HTMLResponse)
+async def admin_import_page(request: Request):
+ """CSV Import page (admin only)"""
+ return templates.TemplateResponse(
+ "admin_import.html",
+ {"request": request, "title": "CSV Import - " + settings.app_name}
+ )
+
+
diff --git a/app/models/__init__.py b/app/models/__init__.py
index cf1030b..4f22e38 100644
--- a/app/models/__init__.py
+++ b/app/models/__init__.py
@@ -10,7 +10,6 @@ from .qdro import QDRO, QDROVersion, QDROCommunication
from .audit import AuditLog, LoginAttempt, ImportAudit, ImportAuditFile
from .auth import RefreshToken
from .additional import Deposit, Payment, FileNote, FormVariable, ReportVariable, Document
-from .flexible import FlexibleImport
from .support import SupportTicket, TicketResponse, TicketStatus, TicketPriority, TicketCategory
from .pensions import (
Pension, PensionSchedule, MarriageHistory, DeathBenefit,
@@ -52,7 +51,7 @@ from .lookups import (
__all__ = [
"BaseModel", "User", "Rolodex", "Phone", "File", "Ledger", "QDRO", "QDROVersion", "QDROCommunication",
"AuditLog", "LoginAttempt", "ImportAudit", "ImportAuditFile", "RefreshToken",
- "Deposit", "Payment", "FileNote", "FormVariable", "ReportVariable", "Document", "FlexibleImport",
+ "Deposit", "Payment", "FileNote", "FormVariable", "ReportVariable", "Document",
"SupportTicket", "TicketResponse", "TicketStatus", "TicketPriority", "TicketCategory",
"Pension", "PensionSchedule", "MarriageHistory", "DeathBenefit",
"SeparationAgreement", "LifeTable", "NumberTable", "PensionResult",
diff --git a/static/js/admin_import.js b/static/js/admin_import.js
new file mode 100644
index 0000000..e0f7b2d
--- /dev/null
+++ b/static/js/admin_import.js
@@ -0,0 +1,477 @@
+/**
+ * Admin Import JavaScript
+ * Handles CSV file import functionality
+ */
+
+class ImportManager {
+ constructor() {
+ this.supportedTables = [];
+ this.batchFileCount = 0;
+ this.currentImportId = null;
+ this.pollInterval = null;
+
+ this.init();
+ }
+
+ async init() {
+ await this.loadSupportedTables();
+ this.setupEventListeners();
+ this.addInitialBatchFile();
+ }
+
+ async loadSupportedTables() {
+ try {
+ console.log('Loading supported tables...');
+ const response = await window.http.wrappedFetch('/api/admin/import/tables');
+ if (response.ok) {
+ const data = await response.json();
+ this.supportedTables = data.tables || [];
+ console.log('Supported tables loaded:', this.supportedTables);
+ } else {
+ console.error('Failed to load supported tables, status:', response.status);
+ }
+ } catch (error) {
+ console.error('Failed to load supported tables:', error);
+ window.alerts.error('Failed to load supported tables');
+ }
+ }
+
+ setupEventListeners() {
+ // Single import form
+ document.getElementById('importForm').addEventListener('submit', (e) => {
+ e.preventDefault();
+ this.handleSingleImport();
+ });
+
+ // Validate button
+ document.getElementById('validateBtn').addEventListener('click', () => {
+ this.validateHeaders();
+ });
+
+ // Table selection change
+ document.getElementById('tableSelect').addEventListener('change', (e) => {
+ this.onTableChange(e.target.value);
+ });
+
+ // Batch import buttons
+ document.getElementById('addBatchFile').addEventListener('click', () => {
+ this.addBatchFile();
+ });
+
+ document.getElementById('batchImportBtn').addEventListener('click', () => {
+ this.handleBatchImport();
+ });
+ }
+
+ async onTableChange(tableName) {
+ const schemaInfo = document.getElementById('schemaInfo');
+ const schemaDetails = document.getElementById('schemaDetails');
+
+ if (!tableName) {
+ schemaInfo.classList.add('hidden');
+ return;
+ }
+
+ try {
+ console.log('Loading schema for table:', tableName);
+ const response = await window.http.wrappedFetch(`/api/admin/import/tables/${tableName}/schema`);
+ if (response.ok) {
+ const data = await response.json();
+ const schema = data.schema;
+ console.log('Schema loaded for', tableName, ':', schema);
+
+ let html = '
';
+ html += '
Required Fields:
';
+ html += '
';
+ schema.required_fields.forEach(field => {
+ html += `${field} `;
+ });
+ html += '
';
+
+ html += '
All Available Fields:
';
+ html += '
';
+ html += '
';
+ Object.keys(schema.field_mapping).forEach(field => {
+ html += `${field}`;
+ });
+ html += '
';
+
+ schemaDetails.innerHTML = html;
+ schemaInfo.classList.remove('hidden');
+ }
+ } catch (error) {
+ console.error('Failed to load schema:', error);
+ }
+ }
+
+ async validateHeaders() {
+ const tableSelect = document.getElementById('tableSelect');
+ const fileInput = document.getElementById('csvFile');
+
+ console.log('Starting header validation...');
+
+ if (!tableSelect.value) {
+ console.warn('No table selected for validation');
+ window.alerts.error('Please select a table type');
+ return;
+ }
+
+ if (!fileInput.files[0]) {
+ console.warn('No file selected for validation');
+ window.alerts.error('Please select a CSV file');
+ return;
+ }
+
+ console.log('Validating headers for table:', tableSelect.value, 'file:', fileInput.files[0].name);
+
+ const formData = new FormData();
+ formData.append('table_name', tableSelect.value);
+ formData.append('file', fileInput.files[0]);
+
+ try {
+ const response = await window.http.wrappedFetch('/api/admin/import/validate', {
+ method: 'POST',
+ body: formData
+ });
+
+ console.log('Validation response status:', response.status);
+
+ if (response.ok) {
+ const result = await response.json();
+ console.log('Validation result:', result);
+ if (result.success) {
+ window.alerts.success('CSV headers validated successfully!');
+ } else {
+ const errors = result.validation_result.errors.join('\\n');
+ console.error('Validation errors:', result.validation_result.errors);
+ window.alerts.error(`Validation failed:\\n${errors}`);
+ }
+ } else {
+ const error = await response.json();
+ console.error('Validation failed with error:', error);
+ window.alerts.error(`Validation failed: ${error.detail}`);
+ }
+ } catch (error) {
+ console.error('Validation error:', error);
+ window.alerts.error('Failed to validate CSV headers');
+ }
+ }
+
+ async handleSingleImport() {
+ const tableSelect = document.getElementById('tableSelect');
+ const fileInput = document.getElementById('csvFile');
+
+ console.log('Starting single import...');
+
+ if (!tableSelect.value) {
+ console.warn('No table selected for import');
+ window.alerts.error('Please select a table type');
+ return;
+ }
+
+ if (!fileInput.files[0]) {
+ console.warn('No file selected for import');
+ window.alerts.error('Please select a CSV file');
+ return;
+ }
+
+ console.log('Importing to table:', tableSelect.value, 'file:', fileInput.files[0].name);
+
+ const formData = new FormData();
+ formData.append('table_name', tableSelect.value);
+ formData.append('file', fileInput.files[0]);
+
+ // Show progress
+ this.showProgress();
+
+ try {
+ const response = await window.http.wrappedFetch('/api/admin/import/csv', {
+ method: 'POST',
+ body: formData
+ });
+
+ console.log('Import response status:', response.status);
+
+ if (response.ok) {
+ const result = await response.json();
+ console.log('Import started successfully:', result);
+ this.currentImportId = result.import_id;
+ this.updateProgress(`Import started for ${result.table_name} (ID: ${result.import_id})`, 'info');
+ this.startPolling();
+ } else {
+ const error = await response.json();
+ console.error('Import failed:', error);
+ this.updateProgress(`Import failed: ${error.detail}`, 'error');
+ }
+ } catch (error) {
+ console.error('Import error:', error);
+ this.updateProgress('Import failed: Network error', 'error');
+ }
+ }
+
+ addInitialBatchFile() {
+ this.addBatchFile();
+ }
+
+ addBatchFile() {
+ this.batchFileCount++;
+ const container = document.getElementById('batchFiles');
+
+ const fileDiv = document.createElement('div');
+ fileDiv.className = 'flex space-x-3 items-center';
+ fileDiv.id = `batchFile${this.batchFileCount}`;
+
+ fileDiv.innerHTML = `
+
+
+
+ `;
+
+ container.appendChild(fileDiv);
+ }
+
+ removeBatchFile(fileId) {
+ const fileDiv = document.getElementById(`batchFile${fileId}`);
+ if (fileDiv) {
+ fileDiv.remove();
+ }
+ }
+
+ async handleBatchImport() {
+ const batchFiles = document.getElementById('batchFiles');
+ const fileDivs = batchFiles.children;
+
+ const formData = new FormData();
+ const tableNames = [];
+ let hasFiles = false;
+
+ for (let i = 0; i < fileDivs.length; i++) {
+ const div = fileDivs[i];
+ const tableSelect = div.querySelector('select');
+ const fileInput = div.querySelector('input[type="file"]');
+
+ if (tableSelect.value && fileInput.files[0]) {
+ tableNames.push(tableSelect.value);
+ formData.append('files', fileInput.files[0]);
+ hasFiles = true;
+ }
+ }
+
+ if (!hasFiles) {
+ window.alerts.error('Please select at least one table and file');
+ return;
+ }
+
+ // Add table names to form data
+ tableNames.forEach(name => {
+ formData.append('table_names', name);
+ });
+
+ // Show progress
+ this.showProgress();
+
+ try {
+ const response = await window.http.wrappedFetch('/api/admin/import/batch', {
+ method: 'POST',
+ body: formData
+ });
+
+ if (response.ok) {
+ const result = await response.json();
+ this.currentImportIds = result.import_ids;
+ this.updateProgress(`Batch import started for ${result.total_files} files`, 'info');
+ this.startBatchPolling();
+ } else {
+ const error = await response.json();
+ this.updateProgress(`Batch import failed: ${error.detail}`, 'error');
+ }
+ } catch (error) {
+ console.error('Batch import error:', error);
+ this.updateProgress('Batch import failed: Network error', 'error');
+ }
+ }
+
+ showProgress() {
+ document.getElementById('importProgress').classList.remove('hidden');
+ document.getElementById('importResults').classList.add('hidden');
+ }
+
+ updateProgress(message, type = 'info') {
+ const progressDetails = document.getElementById('progressDetails');
+ const timestamp = new Date().toLocaleTimeString();
+
+ let colorClass = 'text-blue-600';
+ if (type === 'error') colorClass = 'text-red-600';
+ if (type === 'success') colorClass = 'text-green-600';
+ if (type === 'warning') colorClass = 'text-yellow-600';
+
+ progressDetails.innerHTML += `
+
+ ${timestamp}
+ ${message}
+
+ `;
+
+ // Scroll to bottom
+ progressDetails.scrollTop = progressDetails.scrollHeight;
+ }
+
+ startPolling() {
+ if (this.pollInterval) {
+ clearInterval(this.pollInterval);
+ }
+
+ this.pollInterval = setInterval(async () => {
+ await this.checkImportStatus();
+ }, 2000); // Poll every 2 seconds
+ }
+
+ startBatchPolling() {
+ if (this.pollInterval) {
+ clearInterval(this.pollInterval);
+ }
+
+ this.pollInterval = setInterval(async () => {
+ await this.checkBatchStatus();
+ }, 2000); // Poll every 2 seconds
+ }
+
+ async checkImportStatus() {
+ if (!this.currentImportId) return;
+
+ try {
+ const response = await window.http.wrappedFetch(`/api/admin/import/status/${this.currentImportId}`);
+ if (response.ok) {
+ const status = await response.json();
+
+ if (status.status === 'COMPLETED') {
+ clearInterval(this.pollInterval);
+ this.updateProgress('Import completed successfully!', 'success');
+ this.showResults(status.result);
+ } else if (status.status === 'FAILED') {
+ clearInterval(this.pollInterval);
+ this.updateProgress(`Import failed: ${status.error || 'Unknown error'}`, 'error');
+ if (status.result) {
+ this.showResults(status.result);
+ }
+ } else {
+ this.updateProgress(`Import status: ${status.status}`, 'info');
+ }
+ }
+ } catch (error) {
+ console.error('Status check error:', error);
+ }
+ }
+
+ async checkBatchStatus() {
+ if (!this.currentImportIds || !Array.isArray(this.currentImportIds)) return;
+
+ let allCompleted = true;
+ let anyFailed = false;
+
+ for (const importId of this.currentImportIds) {
+ try {
+ const response = await window.http.wrappedFetch(`/api/admin/import/status/${importId}`);
+ if (response.ok) {
+ const status = await response.json();
+
+ if (status.status === 'PROCESSING') {
+ allCompleted = false;
+ } else if (status.status === 'FAILED') {
+ anyFailed = true;
+ this.updateProgress(`${status.table_name} import failed: ${status.error || 'Unknown error'}`, 'error');
+ } else if (status.status === 'COMPLETED') {
+ this.updateProgress(`${status.table_name} import completed`, 'success');
+ }
+ }
+ } catch (error) {
+ console.error('Batch status check error:', error);
+ }
+ }
+
+ if (allCompleted) {
+ clearInterval(this.pollInterval);
+ const message = anyFailed ? 'Batch import completed with some failures' : 'Batch import completed successfully!';
+ const type = anyFailed ? 'warning' : 'success';
+ this.updateProgress(message, type);
+ }
+ }
+
+ showResults(result) {
+ const resultsContent = document.getElementById('resultsContent');
+ const resultsDiv = document.getElementById('importResults');
+
+ let html = '';
+
+ // Summary
+ html += `
+
+
+
${result.total_rows}
+
Total Rows
+
+
+
${result.imported_rows}
+
Imported
+
+
+
${result.skipped_rows}
+
Skipped
+
+
+
${result.error_rows}
+
Errors
+
+
+ `;
+
+ // Errors
+ if (result.errors && result.errors.length > 0) {
+ html += '
';
+ html += '
Errors:
';
+ html += '
';
+ result.errors.forEach(error => {
+ html += `
${this.escapeHtml(error)}
`;
+ });
+ html += '
';
+ }
+
+ // Warnings
+ if (result.warnings && result.warnings.length > 0) {
+ html += '
';
+ html += '
Warnings:
';
+ html += '
';
+ result.warnings.forEach(warning => {
+ html += `
${this.escapeHtml(warning)}
`;
+ });
+ html += '
';
+ }
+
+ html += '
';
+
+ resultsContent.innerHTML = html;
+ resultsDiv.classList.remove('hidden');
+ }
+
+ escapeHtml(text) {
+ const div = document.createElement('div');
+ div.textContent = text;
+ return div.innerHTML;
+ }
+}
+
+// Initialize when DOM is loaded
+document.addEventListener('DOMContentLoaded', () => {
+ window.importManager = new ImportManager();
+});
\ No newline at end of file
diff --git a/templates/admin_import.html b/templates/admin_import.html
new file mode 100644
index 0000000..7de7591
--- /dev/null
+++ b/templates/admin_import.html
@@ -0,0 +1,133 @@
+
+
+
+
+
+ {{ title }}
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
CSV Data Import
+
+ Import CSV files converted from legacy .sc files into the database.
+ Please ensure your CSV files follow the correct format for each table.
+
+
+
+
+
+
Single File Import
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
Batch Import (Multiple Files)
+
+ Import multiple CSV files at once. Files will be processed in dependency order
+ (ROLODEX → PHONE/FILES → LEDGER/QDROS).
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file