334 lines
12 KiB
Python
334 lines
12 KiB
Python
"""
|
|
Main Import Service - coordinates all CSV importers
|
|
"""
|
|
from typing import Dict, List, Any, Optional, Union
|
|
import logging
|
|
from enum import Enum
|
|
from sqlalchemy.orm import Session
|
|
|
|
from .base import ImportResult
|
|
from .rolodex_importer import RolodexCSVImporter
|
|
from .phone_importer import PhoneCSVImporter
|
|
from .files_importer import FilesCSVImporter
|
|
from .ledger_importer import LedgerCSVImporter
|
|
from .qdros_importer import QdrosCSVImporter
|
|
from .generic_importer import GenericCSVImporter
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class TableType(Enum):
|
|
"""Supported table types for import"""
|
|
ROLODEX = "rolodex"
|
|
PHONE = "phone"
|
|
FILES = "files"
|
|
LEDGER = "ledger"
|
|
QDROS = "qdros"
|
|
# Generic table types for all other CSV files
|
|
GRUPLKUP = "gruplkup"
|
|
EMPLOYEE = "employee"
|
|
SETUP = "setup"
|
|
FILETYPE = "filetype"
|
|
TRNSTYPE = "trnstype"
|
|
TRNSACTN = "trnsactn"
|
|
TRNSLKUP = "trnslkup"
|
|
RVARLKUP = "rvarlkup"
|
|
FVARLKUP = "fvarlkup"
|
|
FILENOTS = "filenots"
|
|
DEPOSITS = "deposits"
|
|
PAYMENTS = "payments"
|
|
PENSIONS = "pensions"
|
|
PLANINFO = "planinfo"
|
|
# Form tables
|
|
NUMBERAL = "numberal"
|
|
INX_LKUP = "inx_lkup"
|
|
FORM_LST = "form_lst"
|
|
FORM_INX = "form_inx"
|
|
LIFETABL = "lifetabl"
|
|
# Pension tables
|
|
MARRIAGE = "marriage"
|
|
DEATH = "death"
|
|
SEPARATE = "separate"
|
|
SCHEDULE = "schedule"
|
|
|
|
|
|
class ImportService:
|
|
"""Main service for handling CSV imports"""
|
|
|
|
def __init__(self, db_session: Session):
|
|
self.db_session = db_session
|
|
self._importers = {
|
|
# Use generic importers for all tables to handle legacy CSV structure variations
|
|
TableType.ROLODEX: GenericCSVImporter, # Use generic importer for rolodex (more flexible)
|
|
TableType.PHONE: GenericCSVImporter, # Use generic importer for phone
|
|
TableType.FILES: GenericCSVImporter, # Use generic importer for files
|
|
TableType.LEDGER: GenericCSVImporter, # Use generic importer for ledger (to avoid FK issues)
|
|
TableType.QDROS: GenericCSVImporter, # Use generic importer for qdros (to avoid FK issues)
|
|
# Generic importer for all other tables
|
|
TableType.GRUPLKUP: GenericCSVImporter,
|
|
TableType.EMPLOYEE: GenericCSVImporter,
|
|
TableType.SETUP: GenericCSVImporter,
|
|
TableType.FILETYPE: GenericCSVImporter,
|
|
TableType.TRNSTYPE: GenericCSVImporter,
|
|
TableType.TRNSACTN: GenericCSVImporter,
|
|
TableType.TRNSLKUP: GenericCSVImporter,
|
|
TableType.RVARLKUP: GenericCSVImporter,
|
|
TableType.FVARLKUP: GenericCSVImporter,
|
|
TableType.FILENOTS: GenericCSVImporter,
|
|
TableType.DEPOSITS: GenericCSVImporter,
|
|
TableType.PAYMENTS: GenericCSVImporter,
|
|
TableType.PENSIONS: GenericCSVImporter,
|
|
TableType.PLANINFO: GenericCSVImporter,
|
|
TableType.NUMBERAL: GenericCSVImporter,
|
|
TableType.INX_LKUP: GenericCSVImporter,
|
|
TableType.FORM_LST: GenericCSVImporter,
|
|
TableType.FORM_INX: GenericCSVImporter,
|
|
TableType.LIFETABL: GenericCSVImporter,
|
|
TableType.MARRIAGE: GenericCSVImporter,
|
|
TableType.DEATH: GenericCSVImporter,
|
|
TableType.SEPARATE: GenericCSVImporter,
|
|
TableType.SCHEDULE: GenericCSVImporter,
|
|
}
|
|
|
|
def get_supported_tables(self) -> List[str]:
|
|
"""Get list of supported table names"""
|
|
return [table.value for table in TableType]
|
|
|
|
def get_table_schema(self, table_name: str) -> Optional[Dict[str, Any]]:
|
|
"""Get schema information for a table"""
|
|
try:
|
|
table_type = TableType(table_name.lower())
|
|
importer_class = self._importers[table_type]
|
|
|
|
# Handle generic importer differently
|
|
if importer_class == GenericCSVImporter:
|
|
temp_importer = importer_class(self.db_session, table_name, "temp_schema_check")
|
|
else:
|
|
temp_importer = importer_class(self.db_session, "temp_schema_check")
|
|
|
|
return {
|
|
"table_name": temp_importer.table_name,
|
|
"required_fields": temp_importer.required_fields,
|
|
"field_mapping": temp_importer.field_mapping,
|
|
"sample_headers": list(temp_importer.field_mapping.keys())
|
|
}
|
|
except (ValueError, KeyError):
|
|
return None
|
|
|
|
def import_csv(
|
|
self,
|
|
table_name: str,
|
|
csv_content: str,
|
|
encoding: str = "utf-8",
|
|
import_id: Optional[str] = None
|
|
) -> ImportResult:
|
|
"""Import CSV data to specified table"""
|
|
|
|
try:
|
|
# Validate table name
|
|
table_type = TableType(table_name.lower())
|
|
except ValueError:
|
|
result = ImportResult()
|
|
result.add_error(f"Unsupported table: {table_name}")
|
|
return result
|
|
|
|
# Get appropriate importer
|
|
importer_class = self._importers[table_type]
|
|
|
|
# Handle generic importer differently
|
|
if importer_class == GenericCSVImporter:
|
|
importer = importer_class(self.db_session, table_name, import_id)
|
|
else:
|
|
importer = importer_class(self.db_session, import_id)
|
|
|
|
logger.info(f"Starting CSV import for table: {table_name} (import_id: {importer.import_id})")
|
|
|
|
try:
|
|
# Process the CSV
|
|
result = importer.process_csv_content(csv_content, encoding)
|
|
|
|
if result.success:
|
|
logger.info(
|
|
f"Successfully imported {result.imported_rows} rows to {table_name}"
|
|
)
|
|
else:
|
|
logger.warning(
|
|
f"Import failed for {table_name}: {len(result.errors)} errors"
|
|
)
|
|
|
|
return result
|
|
|
|
except Exception as e:
|
|
logger.error(f"Unexpected error during import to {table_name}: {str(e)}")
|
|
result = ImportResult()
|
|
result.add_error(f"Unexpected error: {str(e)}")
|
|
return result
|
|
|
|
def batch_import(
|
|
self,
|
|
imports: List[Dict[str, Any]]
|
|
) -> Dict[str, ImportResult]:
|
|
"""
|
|
Import multiple CSV files in a batch
|
|
|
|
Args:
|
|
imports: List of dicts with keys: table_name, csv_content, encoding
|
|
|
|
Returns:
|
|
Dict mapping table names to ImportResult objects
|
|
"""
|
|
results = {}
|
|
|
|
# Recommended import order (dependencies first)
|
|
import_order = [
|
|
# Core tables with dependencies
|
|
TableType.ROLODEX, # No dependencies
|
|
TableType.PHONE, # Depends on ROLODEX
|
|
TableType.FILES, # Depends on ROLODEX
|
|
TableType.LEDGER, # Depends on FILES
|
|
TableType.QDROS, # Depends on FILES
|
|
# Lookup and reference tables (no dependencies)
|
|
TableType.GRUPLKUP,
|
|
TableType.EMPLOYEE,
|
|
TableType.SETUP,
|
|
TableType.FILETYPE,
|
|
TableType.TRNSTYPE,
|
|
TableType.TRNSACTN,
|
|
TableType.TRNSLKUP,
|
|
TableType.RVARLKUP,
|
|
TableType.FVARLKUP,
|
|
TableType.FILENOTS,
|
|
TableType.PLANINFO,
|
|
# Financial tables
|
|
TableType.DEPOSITS,
|
|
TableType.PAYMENTS,
|
|
TableType.PENSIONS,
|
|
# Form tables
|
|
TableType.NUMBERAL,
|
|
TableType.INX_LKUP,
|
|
TableType.FORM_LST,
|
|
TableType.FORM_INX,
|
|
TableType.LIFETABL,
|
|
# Pension tables
|
|
TableType.MARRIAGE,
|
|
TableType.DEATH,
|
|
TableType.SEPARATE,
|
|
TableType.SCHEDULE
|
|
]
|
|
|
|
# Group imports by table type
|
|
imports_by_table = {}
|
|
for import_data in imports:
|
|
table_name = import_data["table_name"].lower()
|
|
if table_name not in imports_by_table:
|
|
imports_by_table[table_name] = []
|
|
imports_by_table[table_name].append(import_data)
|
|
|
|
# Track processed tables
|
|
processed_tables = set()
|
|
|
|
# Process in dependency order
|
|
for table_type in import_order:
|
|
table_name = table_type.value
|
|
if table_name in imports_by_table:
|
|
table_imports = imports_by_table[table_name]
|
|
processed_tables.add(table_name)
|
|
|
|
for import_data in table_imports:
|
|
result = self.import_csv(
|
|
table_name,
|
|
import_data["csv_content"],
|
|
import_data.get("encoding", "utf-8")
|
|
)
|
|
|
|
# Use a unique key if multiple imports for same table
|
|
key = table_name
|
|
counter = 1
|
|
while key in results:
|
|
counter += 1
|
|
key = f"{table_name}_{counter}"
|
|
|
|
results[key] = result
|
|
|
|
# Stop processing if critical import fails
|
|
if not result.success and table_type in [TableType.ROLODEX, TableType.FILES]:
|
|
logger.error(f"Critical import failed for {table_name}, stopping batch")
|
|
break
|
|
|
|
# Small delay to reduce database lock contention
|
|
import time
|
|
time.sleep(0.1)
|
|
|
|
# Process any remaining tables not in the explicit order
|
|
for table_name, table_imports in imports_by_table.items():
|
|
if table_name not in processed_tables:
|
|
logger.info(f"Processing table {table_name} (not in explicit order)")
|
|
|
|
for import_data in table_imports:
|
|
result = self.import_csv(
|
|
table_name,
|
|
import_data["csv_content"],
|
|
import_data.get("encoding", "utf-8")
|
|
)
|
|
|
|
# Use a unique key if multiple imports for same table
|
|
key = table_name
|
|
counter = 1
|
|
while key in results:
|
|
counter += 1
|
|
key = f"{table_name}_{counter}"
|
|
|
|
results[key] = result
|
|
|
|
# Small delay to reduce database lock contention
|
|
import time
|
|
time.sleep(0.1)
|
|
|
|
return results
|
|
|
|
def validate_csv_headers(self, table_name: str, csv_content: str) -> ImportResult:
|
|
"""Validate CSV headers without importing data"""
|
|
try:
|
|
table_type = TableType(table_name.lower())
|
|
except ValueError:
|
|
result = ImportResult()
|
|
result.add_error(f"Unsupported table: {table_name}")
|
|
return result
|
|
|
|
# Get appropriate importer
|
|
importer_class = self._importers[table_type]
|
|
|
|
# Handle generic importer differently
|
|
if importer_class == GenericCSVImporter:
|
|
importer = importer_class(self.db_session, table_name, "validation_check")
|
|
else:
|
|
importer = importer_class(self.db_session, "validation_check")
|
|
|
|
# Parse headers only
|
|
import csv
|
|
import io
|
|
|
|
try:
|
|
delimiter = importer.detect_delimiter(csv_content)
|
|
csv_reader = csv.DictReader(io.StringIO(csv_content), delimiter=delimiter)
|
|
headers = csv_reader.fieldnames or []
|
|
|
|
if not headers:
|
|
result = ImportResult()
|
|
result.add_error("No headers found in CSV file")
|
|
return result
|
|
|
|
# Validate headers
|
|
result = ImportResult()
|
|
is_valid = importer.validate_headers(headers)
|
|
result.success = is_valid
|
|
|
|
if is_valid:
|
|
result.add_warning(f"Headers validated successfully for {table_name}")
|
|
|
|
return result
|
|
|
|
except Exception as e:
|
|
result = ImportResult()
|
|
result.add_error(f"Error validating headers: {str(e)}")
|
|
return result |