Files
delphi-database-v2/app/import_legacy.py
2025-10-14 07:56:13 -05:00

2279 lines
86 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Legacy CSV import functions for Delphi Database.
This module provides import functions for all legacy database tables,
reading from old-csv files and populating the legacy SQLAlchemy models.
"""
import csv
import os
from datetime import datetime
from decimal import Decimal, InvalidOperation
from typing import Dict, Any, Optional
from sqlalchemy.orm import Session
from sqlalchemy.exc import IntegrityError
import structlog
from .models import (
Rolodex, LegacyPhone, LegacyFile, FilesR, FilesV, FileNots,
Ledger, Deposits, LegacyPayment, TrnsType, TrnsLkup,
Footers, FileStat, Employee, GroupLkup, FileType,
Qdros, PlanInfo, Pensions, PensionMarriage, PensionDeath,
PensionSchedule, PensionSeparate, PensionResults,
RolexV, FVarLkup, RVarLkup, States, Printers, Setup
)
logger = structlog.get_logger(__name__)
# Batch size for commits
BATCH_SIZE = 500
def open_text_with_fallbacks(file_path: str):
"""
Open a text file trying multiple encodings commonly seen in legacy CSVs.
Returns a tuple of (file_object, encoding_used).
"""
# First try strict mode with common encodings
# Try latin-1/iso-8859-1 earlier as they are more forgiving and commonly used in legacy data
encodings = ["utf-8", "utf-8-sig", "iso-8859-1", "latin-1", "cp1252", "windows-1252", "cp1250"]
last_error = None
for enc in encodings:
try:
# First open in strict mode just for a quick sanity check on the first
# chunk of the file. We do *not* keep this handle because a later
# unexpected character could still trigger a UnicodeDecodeError when
# the CSV iterator continues reading. After the quick check we
# immediately close the handle and reopen with `errors="replace"`
# which guarantees that *any* undecodable bytes that appear further
# down will be replaced with the official Unicode replacement
# character (U+FFFD) instead of raising an exception and aborting the
# import. This keeps the import pipeline resilient while still
# letting us log the originally detected encoding for auditing.
test_f = open(file_path, 'r', encoding=enc, errors='strict', newline='')
# Read 50 KB from the start of the file enough to catch the vast
# majority of encoding problems without loading the entire file into
# memory.
_ = test_f.read(51200)
test_f.close()
# Re-open for the real CSV processing pass using a forgiving error
# strategy.
f = open(file_path, 'r', encoding=enc, errors='replace', newline='')
logger.info("csv_open_encoding_selected", file=file_path, encoding=enc)
return f, enc
except Exception as e:
last_error = e
logger.warning("encoding_fallback_failed", file=file_path, encoding=enc, error=str(e))
try:
f.close()
except:
pass
continue
# If strict mode fails, try with error replacement for robustness
logger.warning("strict_encoding_failed", file=file_path, trying_with_replace=True)
try:
# Try UTF-8 with error replacement first (most common case)
f = open(file_path, 'r', encoding='utf-8', errors='replace', newline='')
_ = f.read(51200) # Read 50KB to catch encoding issues deeper in the file (increased from 20KB)
f.seek(0)
logger.info("csv_open_encoding_with_replace", file=file_path, encoding="utf-8-replace")
return f, "utf-8-replace"
except Exception as e:
logger.warning("utf8_replace_failed", file=file_path, error=str(e))
# Final fallback: use latin-1 with replace (handles any byte sequence)
try:
f = open(file_path, 'r', encoding='latin-1', errors='replace', newline='')
_ = f.read(51200) # Read 50KB to catch encoding issues deeper in the file (increased from 20KB)
f.seek(0)
logger.info("csv_open_encoding_fallback", file=file_path, encoding="latin-1-replace")
return f, "latin-1-replace"
except Exception as e:
last_error = e
error_msg = f"Unable to open file '{file_path}' with any supported encodings"
if last_error:
error_msg += f". Last error: {str(last_error)}"
raise RuntimeError(error_msg)
def parse_date(date_str: str) -> Optional[datetime]:
"""Parse date string in various formats, return None if blank/invalid."""
if not date_str or not date_str.strip():
return None
date_str = date_str.strip()
# Try common date formats
formats = [
"%m/%d/%Y",
"%m/%d/%y",
"%Y-%m-%d",
"%m-%d-%Y",
"%m-%d-%y",
]
for fmt in formats:
try:
return datetime.strptime(date_str, fmt)
except ValueError:
continue
logger.warning("date_parse_failed", date_string=date_str)
return None
def parse_decimal(value: str) -> Optional[Decimal]:
"""Parse decimal string, return None if blank/invalid."""
if not value or not value.strip():
return None
try:
return Decimal(value.strip())
except (InvalidOperation, ValueError):
logger.warning("decimal_parse_failed", value=value)
return None
def clean_string(value: str) -> Optional[str]:
"""Return a sanitized string or None if blank/only junk.
• Strips leading/trailing whitespace
• Removes Unicode replacement characters ( / U+FFFD) introduced by our
liberal decoder
• Removes ASCII control characters (0x00-0x1F, 0x7F)
"""
if not value:
return None
# Remove replacement chars created by errors="replace" decoding
cleaned = value.replace("", "").replace("\uFFFD", "")
# Strip out remaining control chars
cleaned = "".join(ch for ch in cleaned if ch >= " " and ch != "\x7f")
cleaned = cleaned.strip()
return cleaned or None
# ============================================================================
# Reference Table Imports (should be imported first)
# ============================================================================
def import_trnstype(db: Session, file_path: str) -> Dict[str, Any]:
"""Import TRNSTYPE.csv → TrnsType model with upsert logic."""
result = {'success': 0, 'errors': [], 'total_rows': 0, 'updated': 0, 'inserted': 0}
try:
f, encoding = open_text_with_fallbacks(file_path)
reader = csv.DictReader(f)
for row_num, row in enumerate(reader, start=2):
result['total_rows'] += 1
try:
t_type = clean_string(row.get('T_Type'))
if not t_type:
continue
# Check if record already exists
existing = db.query(TrnsType).filter(TrnsType.t_type == t_type).first()
if existing:
# Update existing record
existing.t_type_l = clean_string(row.get('T_Type_L'))
existing.header = clean_string(row.get('Header'))
existing.footer = clean_string(row.get('Footer'))
result['updated'] += 1
else:
# Insert new record
record = TrnsType(
t_type=t_type,
t_type_l=clean_string(row.get('T_Type_L')),
header=clean_string(row.get('Header')),
footer=clean_string(row.get('Footer'))
)
db.add(record)
result['inserted'] += 1
result['success'] += 1
# Commit in batches for performance
if result['success'] % BATCH_SIZE == 0:
db.commit()
except Exception as e:
result['errors'].append(f"Row {row_num}: {str(e)}")
db.rollback()
# Commit any remaining changes
db.commit()
f.close()
logger.info("import_trnstype_complete", **result)
except Exception as e:
db.rollback()
result['errors'].append(f"Fatal error: {str(e)}")
logger.error("import_trnstype_failed", error=str(e))
return result
def import_trnslkup(db: Session, file_path: str) -> Dict[str, Any]:
"""Import TRNSLKUP.csv → TrnsLkup model with upsert logic."""
result = {'success': 0, 'errors': [], 'total_rows': 0, 'updated': 0, 'inserted': 0}
try:
f, encoding = open_text_with_fallbacks(file_path)
reader = csv.DictReader(f)
for row_num, row in enumerate(reader, start=2):
result['total_rows'] += 1
try:
t_code = clean_string(row.get('T_Code'))
if not t_code:
continue
# Check if record already exists
existing = db.query(TrnsLkup).filter(TrnsLkup.t_code == t_code).first()
if existing:
# Update existing record
existing.t_type = clean_string(row.get('T_Type'))
existing.t_type_l = clean_string(row.get('T_Type_L'))
existing.amount = parse_decimal(row.get('Amount'))
existing.description = clean_string(row.get('Description'))
result['updated'] += 1
else:
# Insert new record
record = TrnsLkup(
t_code=t_code,
t_type=clean_string(row.get('T_Type')),
t_type_l=clean_string(row.get('T_Type_L')),
amount=parse_decimal(row.get('Amount')),
description=clean_string(row.get('Description'))
)
db.add(record)
result['inserted'] += 1
result['success'] += 1
# Commit in batches for performance
if result['success'] % BATCH_SIZE == 0:
db.commit()
except Exception as e:
result['errors'].append(f"Row {row_num}: {str(e)}")
db.rollback()
# Commit any remaining changes
db.commit()
f.close()
logger.info("import_trnslkup_complete", **result)
except Exception as e:
db.rollback()
result['errors'].append(f"Fatal error: {str(e)}")
logger.error("import_trnslkup_failed", error=str(e))
return result
def import_footers(db: Session, file_path: str) -> Dict[str, Any]:
"""Import FOOTERS.csv → Footers model with upsert logic."""
result = {'success': 0, 'errors': [], 'total_rows': 0, 'updated': 0, 'inserted': 0}
try:
f, encoding = open_text_with_fallbacks(file_path)
reader = csv.DictReader(f)
for row_num, row in enumerate(reader, start=2):
result['total_rows'] += 1
try:
f_code = clean_string(row.get('F_Code'))
if not f_code:
continue
# Check if record already exists
existing = db.query(Footers).filter(Footers.f_code == f_code).first()
if existing:
# Update existing record
existing.f_footer = clean_string(row.get('F_Footer'))
result['updated'] += 1
else:
# Insert new record
record = Footers(
f_code=f_code,
f_footer=clean_string(row.get('F_Footer'))
)
db.add(record)
result['inserted'] += 1
result['success'] += 1
# Commit in batches for performance
if result['success'] % BATCH_SIZE == 0:
db.commit()
except Exception as e:
result['errors'].append(f"Row {row_num}: {str(e)}")
db.rollback()
# Commit any remaining changes
db.commit()
f.close()
logger.info("import_footers_complete", **result)
except Exception as e:
db.rollback()
result['errors'].append(f"Fatal error: {str(e)}")
logger.error("import_footers_failed", error=str(e))
return result
def import_filestat(db: Session, file_path: str) -> Dict[str, Any]:
"""Import FILESTAT.csv → FileStat model with upsert logic."""
result = {'success': 0, 'errors': [], 'total_rows': 0, 'updated': 0, 'inserted': 0}
try:
f, encoding = open_text_with_fallbacks(file_path)
reader = csv.DictReader(f)
for row_num, row in enumerate(reader, start=2):
result['total_rows'] += 1
try:
status = clean_string(row.get('Status'))
if not status:
continue
# Check if record already exists
existing = db.query(FileStat).filter(FileStat.status == status).first()
if existing:
# Update existing record
existing.definition = clean_string(row.get('Definition'))
existing.send = clean_string(row.get('Send'))
existing.footer_code = clean_string(row.get('Footer_Code'))
result['updated'] += 1
else:
# Insert new record
record = FileStat(
status=status,
definition=clean_string(row.get('Definition')),
send=clean_string(row.get('Send')),
footer_code=clean_string(row.get('Footer_Code'))
)
db.add(record)
result['inserted'] += 1
result['success'] += 1
# Commit in batches for performance
if result['success'] % BATCH_SIZE == 0:
db.commit()
except Exception as e:
result['errors'].append(f"Row {row_num}: {str(e)}")
db.rollback()
# Commit any remaining changes
db.commit()
f.close()
logger.info("import_filestat_complete", **result)
except Exception as e:
db.rollback()
result['errors'].append(f"Fatal error: {str(e)}")
logger.error("import_filestat_failed", error=str(e))
return result
def import_employee(db: Session, file_path: str) -> Dict[str, Any]:
"""Import EMPLOYEE.csv → Employee model with upsert logic."""
result = {'success': 0, 'errors': [], 'total_rows': 0, 'updated': 0, 'inserted': 0}
try:
f, encoding = open_text_with_fallbacks(file_path)
reader = csv.DictReader(f)
for row_num, row in enumerate(reader, start=2):
result['total_rows'] += 1
try:
empl_num = clean_string(row.get('Empl_Num'))
if not empl_num:
continue
# Check if record already exists
existing = db.query(Employee).filter(Employee.empl_num == empl_num).first()
if existing:
# Update existing record
existing.empl_id = clean_string(row.get('Empl_Id'))
existing.rate_per_hour = parse_decimal(row.get('Rate_Per_Hour'))
result['updated'] += 1
else:
# Insert new record
record = Employee(
empl_num=empl_num,
empl_id=clean_string(row.get('Empl_Id')),
rate_per_hour=parse_decimal(row.get('Rate_Per_Hour'))
)
db.add(record)
result['inserted'] += 1
result['success'] += 1
# Commit in batches for performance
if result['success'] % BATCH_SIZE == 0:
db.commit()
except Exception as e:
result['errors'].append(f"Row {row_num}: {str(e)}")
db.rollback()
# Commit any remaining changes
db.commit()
f.close()
logger.info("import_employee_complete", **result)
except Exception as e:
db.rollback()
result['errors'].append(f"Fatal error: {str(e)}")
logger.error("import_employee_failed", error=str(e))
return result
def import_gruplkup(db: Session, file_path: str) -> Dict[str, Any]:
"""Import GRUPLKUP.csv → GroupLkup model with upsert logic."""
result = {'success': 0, 'errors': [], 'total_rows': 0, 'updated': 0, 'inserted': 0}
try:
f, encoding = open_text_with_fallbacks(file_path)
reader = csv.DictReader(f)
for row_num, row in enumerate(reader, start=2):
result['total_rows'] += 1
try:
code = clean_string(row.get('Code'))
if not code:
continue
# Check if record already exists
existing = db.query(GroupLkup).filter(GroupLkup.code == code).first()
if existing:
# Update existing record
existing.description = clean_string(row.get('Description'))
existing.title = clean_string(row.get('Title'))
result['updated'] += 1
else:
# Insert new record
record = GroupLkup(
code=code,
description=clean_string(row.get('Description')),
title=clean_string(row.get('Title'))
)
db.add(record)
result['inserted'] += 1
result['success'] += 1
# Commit in batches for performance
if result['success'] % BATCH_SIZE == 0:
db.commit()
except Exception as e:
result['errors'].append(f"Row {row_num}: {str(e)}")
db.rollback()
# Commit any remaining changes
db.commit()
f.close()
logger.info("import_gruplkup_complete", **result)
except Exception as e:
db.rollback()
result['errors'].append(f"Fatal error: {str(e)}")
logger.error("import_gruplkup_failed", error=str(e))
return result
def import_filetype(db: Session, file_path: str) -> Dict[str, Any]:
"""Import FILETYPE.csv → FileType model with upsert logic."""
result = {'success': 0, 'errors': [], 'total_rows': 0, 'updated': 0, 'inserted': 0}
try:
f, encoding = open_text_with_fallbacks(file_path)
reader = csv.DictReader(f)
# Track seen file types in-memory to avoid duplicates within the same CSV
seen_in_batch = set()
for row_num, row in enumerate(reader, start=2):
result['total_rows'] += 1
try:
file_type = clean_string(row.get('File_Type'))
if not file_type:
continue
# Skip if we've already processed this file_type in current import
if file_type in seen_in_batch:
continue
seen_in_batch.add(file_type)
# Check if it already exists in DB
existing = db.query(FileType).filter(FileType.file_type == file_type).first()
if existing:
# No fields to update for FileType (only has PK), just count it
result['updated'] += 1
else:
# Insert new record
record = FileType(file_type=file_type)
db.add(record)
result['inserted'] += 1
result['success'] += 1
# Commit in batches for performance
if result['success'] % BATCH_SIZE == 0:
db.commit()
except Exception as e:
result['errors'].append(f"Row {row_num}: {str(e)}")
db.rollback()
# Commit any remaining changes
db.commit()
f.close()
logger.info("import_filetype_complete", **result)
except Exception as e:
db.rollback()
result['errors'].append(f"Fatal error: {str(e)}")
logger.error("import_filetype_failed", error=str(e))
return result
def import_fvarlkup(db: Session, file_path: str) -> Dict[str, Any]:
"""Import FVARLKUP.csv → FVarLkup model with upsert logic."""
result = {'success': 0, 'errors': [], 'total_rows': 0, 'updated': 0, 'inserted': 0}
try:
f, encoding = open_text_with_fallbacks(file_path)
reader = csv.DictReader(f)
for row_num, row in enumerate(reader, start=2):
result['total_rows'] += 1
try:
identifier = clean_string(row.get('Identifier'))
if not identifier:
continue
# Check if record already exists
existing = db.query(FVarLkup).filter(FVarLkup.identifier == identifier).first()
if existing:
# Update existing record
existing.query = clean_string(row.get('Query'))
existing.response = clean_string(row.get('Response'))
result['updated'] += 1
else:
# Insert new record
record = FVarLkup(
identifier=identifier,
query=clean_string(row.get('Query')),
response=clean_string(row.get('Response'))
)
db.add(record)
result['inserted'] += 1
result['success'] += 1
# Commit in batches for performance
if result['success'] % BATCH_SIZE == 0:
db.commit()
except Exception as e:
result['errors'].append(f"Row {row_num}: {str(e)}")
db.rollback()
# Commit any remaining changes
db.commit()
f.close()
logger.info("import_fvarlkup_complete", **result)
except Exception as e:
db.rollback()
result['errors'].append(f"Fatal error: {str(e)}")
logger.error("import_fvarlkup_failed", error=str(e))
return result
def import_rvarlkup(db: Session, file_path: str) -> Dict[str, Any]:
"""Import RVARLKUP.csv → RVarLkup model with upsert logic."""
result = {'success': 0, 'errors': [], 'total_rows': 0, 'updated': 0, 'inserted': 0}
try:
f, encoding = open_text_with_fallbacks(file_path)
reader = csv.DictReader(f)
for row_num, row in enumerate(reader, start=2):
result['total_rows'] += 1
try:
identifier = clean_string(row.get('Identifier'))
if not identifier:
continue
# Check if record already exists
existing = db.query(RVarLkup).filter(RVarLkup.identifier == identifier).first()
if existing:
# Update existing record
existing.query = clean_string(row.get('Query'))
result['updated'] += 1
else:
# Insert new record
record = RVarLkup(
identifier=identifier,
query=clean_string(row.get('Query'))
)
db.add(record)
result['inserted'] += 1
result['success'] += 1
# Commit in batches for performance
if result['success'] % BATCH_SIZE == 0:
db.commit()
except Exception as e:
result['errors'].append(f"Row {row_num}: {str(e)}")
db.rollback()
# Commit any remaining changes
db.commit()
f.close()
logger.info("import_rvarlkup_complete", **result)
except Exception as e:
db.rollback()
result['errors'].append(f"Fatal error: {str(e)}")
logger.error("import_rvarlkup_failed", error=str(e))
return result
def import_states(db: Session, file_path: str) -> Dict[str, Any]:
"""Import STATES.csv → States model with upsert logic."""
result = {'success': 0, 'errors': [], 'total_rows': 0, 'updated': 0, 'inserted': 0}
try:
f, encoding = open_text_with_fallbacks(file_path)
reader = csv.DictReader(f)
for row_num, row in enumerate(reader, start=2):
result['total_rows'] += 1
try:
abrev = clean_string(row.get('Abrev'))
if not abrev:
continue
# Check if record already exists
existing = db.query(States).filter(States.abrev == abrev).first()
if existing:
# Update existing record
existing.st = clean_string(row.get('St'))
result['updated'] += 1
else:
# Insert new record
record = States(
abrev=abrev,
st=clean_string(row.get('St'))
)
db.add(record)
result['inserted'] += 1
result['success'] += 1
# Commit in batches for performance
if result['success'] % BATCH_SIZE == 0:
db.commit()
except Exception as e:
result['errors'].append(f"Row {row_num}: {str(e)}")
db.rollback()
# Commit any remaining changes
db.commit()
f.close()
logger.info("import_states_complete", **result)
except Exception as e:
db.rollback()
result['errors'].append(f"Fatal error: {str(e)}")
logger.error("import_states_failed", error=str(e))
return result
def import_printers(db: Session, file_path: str) -> Dict[str, Any]:
"""Import PRINTERS.csv → Printers model with upsert logic."""
result = {'success': 0, 'errors': [], 'total_rows': 0, 'updated': 0, 'inserted': 0}
try:
f, encoding = open_text_with_fallbacks(file_path)
reader = csv.DictReader(f)
for row_num, row in enumerate(reader, start=2):
result['total_rows'] += 1
try:
number_str = clean_string(row.get('Number'))
if not number_str:
continue
try:
number = int(number_str)
except ValueError:
result['errors'].append(f"Row {row_num}: Invalid Number '{number_str}'")
continue
# Check if record already exists
existing = db.query(Printers).filter(Printers.number == number).first()
if existing:
# Update existing record
existing.name = clean_string(row.get('Name'))
existing.port = clean_string(row.get('Port'))
existing.page_break = clean_string(row.get('Page_Break'))
existing.setup_st = clean_string(row.get('Setup_St'))
existing.phone_book = clean_string(row.get('Phone_Book'))
existing.rolodex_info = clean_string(row.get('Rolodex_Info'))
existing.envelope = clean_string(row.get('Envelope'))
existing.file_cabinet = clean_string(row.get('File_Cabinet'))
existing.accounts = clean_string(row.get('Accounts'))
existing.statements = clean_string(row.get('Statements'))
existing.calendar = clean_string(row.get('Calendar'))
existing.reset_st = clean_string(row.get('Reset_St'))
existing.b_underline = clean_string(row.get('B_Underline'))
existing.e_underline = clean_string(row.get('E_Underline'))
existing.b_bold = clean_string(row.get('B_Bold'))
existing.e_bold = clean_string(row.get('E_Bold'))
result['updated'] += 1
else:
# Insert new record
record = Printers(
number=number,
name=clean_string(row.get('Name')),
port=clean_string(row.get('Port')),
page_break=clean_string(row.get('Page_Break')),
setup_st=clean_string(row.get('Setup_St')),
phone_book=clean_string(row.get('Phone_Book')),
rolodex_info=clean_string(row.get('Rolodex_Info')),
envelope=clean_string(row.get('Envelope')),
file_cabinet=clean_string(row.get('File_Cabinet')),
accounts=clean_string(row.get('Accounts')),
statements=clean_string(row.get('Statements')),
calendar=clean_string(row.get('Calendar')),
reset_st=clean_string(row.get('Reset_St')),
b_underline=clean_string(row.get('B_Underline')),
e_underline=clean_string(row.get('E_Underline')),
b_bold=clean_string(row.get('B_Bold')),
e_bold=clean_string(row.get('E_Bold'))
)
db.add(record)
result['inserted'] += 1
result['success'] += 1
# Commit in batches for performance
if result['success'] % BATCH_SIZE == 0:
db.commit()
except Exception as e:
result['errors'].append(f"Row {row_num}: {str(e)}")
db.rollback()
# Commit any remaining changes
db.commit()
f.close()
logger.info("import_printers_complete", **result)
except Exception as e:
db.rollback()
result['errors'].append(f"Fatal error: {str(e)}")
logger.error("import_printers_failed", error=str(e))
return result
def import_setup(db: Session, file_path: str) -> Dict[str, Any]:
"""Import SETUP.csv → Setup model (clears and re-inserts)."""
result = {'success': 0, 'errors': [], 'total_rows': 0}
try:
# Clear existing setup records (typically only one row in legacy system)
db.query(Setup).delete()
db.commit()
f, encoding = open_text_with_fallbacks(file_path)
reader = csv.DictReader(f)
batch = []
for row_num, row in enumerate(reader, start=2):
result['total_rows'] += 1
try:
# Parse default_printer as integer if present
default_printer = None
default_printer_str = clean_string(row.get('Default_Printer'))
if default_printer_str:
try:
default_printer = int(default_printer_str)
except ValueError:
result['errors'].append(f"Row {row_num}: Invalid Default_Printer '{default_printer_str}'")
record = Setup(
appl_title=clean_string(row.get('Appl_Title')),
l_head1=clean_string(row.get('L_Head1')),
l_head2=clean_string(row.get('L_Head2')),
l_head3=clean_string(row.get('L_Head3')),
l_head4=clean_string(row.get('L_Head4')),
l_head5=clean_string(row.get('L_Head5')),
l_head6=clean_string(row.get('L_Head6')),
l_head7=clean_string(row.get('L_Head7')),
l_head8=clean_string(row.get('L_Head8')),
l_head9=clean_string(row.get('L_Head9')),
l_head10=clean_string(row.get('L_Head10')),
default_printer=default_printer
)
batch.append(record)
if len(batch) >= BATCH_SIZE:
db.bulk_save_objects(batch)
db.commit()
result['success'] += len(batch)
batch = []
except Exception as e:
result['errors'].append(f"Row {row_num}: {str(e)}")
if batch:
db.bulk_save_objects(batch)
db.commit()
result['success'] += len(batch)
f.close()
logger.info("import_setup_complete", **result)
except Exception as e:
db.rollback()
result['errors'].append(f"Fatal error: {str(e)}")
logger.error("import_setup_failed", error=str(e))
return result
# ============================================================================
# Core Data Table Imports
# ============================================================================
def import_rolodex(db: Session, file_path: str) -> Dict[str, Any]:
"""Import ROLODEX.csv → Rolodex model."""
result = {'success': 0, 'errors': [], 'total_rows': 0, 'skipped': 0}
try:
f, encoding = open_text_with_fallbacks(file_path)
reader = csv.DictReader(f)
# Track IDs we've seen in this import to handle duplicates
seen_in_import = set()
batch = []
for row_num, row in enumerate(reader, start=2):
result['total_rows'] += 1
try:
rolodex_id = clean_string(row.get('Id'))
if not rolodex_id:
continue
# Skip if we've already processed this ID in current import
if rolodex_id in seen_in_import:
result['skipped'] += 1
continue
# Skip if it already exists in database
if db.query(Rolodex).filter(Rolodex.id == rolodex_id).first():
result['skipped'] += 1
seen_in_import.add(rolodex_id)
continue
record = Rolodex(
id=rolodex_id,
prefix=clean_string(row.get('Prefix')),
first=clean_string(row.get('First')),
middle=clean_string(row.get('Middle')),
last=clean_string(row.get('Last')),
suffix=clean_string(row.get('Suffix')),
title=clean_string(row.get('Title')),
a1=clean_string(row.get('A1')),
a2=clean_string(row.get('A2')),
a3=clean_string(row.get('A3')),
city=clean_string(row.get('City')),
abrev=clean_string(row.get('Abrev')),
st=clean_string(row.get('St')),
zip=clean_string(row.get('Zip')),
email=clean_string(row.get('Email')),
dob=parse_date(row.get('DOB')),
ss=clean_string(row.get('SS#')),
legal_status=clean_string(row.get('Legal_Status')),
group=clean_string(row.get('Group')),
memo=clean_string(row.get('Memo'))
)
batch.append(record)
seen_in_import.add(rolodex_id)
if len(batch) >= BATCH_SIZE:
try:
db.bulk_save_objects(batch)
db.commit()
result['success'] += len(batch)
batch = []
except IntegrityError as ie:
db.rollback()
# Handle any remaining duplicates by inserting one at a time
for record in batch:
try:
db.add(record)
db.commit()
result['success'] += 1
except IntegrityError:
db.rollback()
result['skipped'] += 1
batch = []
except Exception as e:
result['errors'].append(f"Row {row_num}: {str(e)}")
# Save remaining batch
if batch:
try:
db.bulk_save_objects(batch)
db.commit()
result['success'] += len(batch)
except IntegrityError:
db.rollback()
# Handle any remaining duplicates by inserting one at a time
for record in batch:
try:
db.add(record)
db.commit()
result['success'] += 1
except IntegrityError:
db.rollback()
result['skipped'] += 1
f.close()
logger.info("import_rolodex_complete", **result)
except Exception as e:
db.rollback()
result['errors'].append(f"Fatal error: {str(e)}")
logger.error("import_rolodex_failed", error=str(e))
return result
def import_phone(db: Session, file_path: str) -> Dict[str, Any]:
"""Import PHONE.csv → LegacyPhone model with upsert logic."""
result = {'success': 0, 'errors': [], 'total_rows': 0, 'updated': 0, 'inserted': 0, 'skipped': 0, 'skipped_no_phone': 0, 'skipped_no_id': 0}
try:
f, encoding = open_text_with_fallbacks(file_path)
reader = csv.DictReader(f)
# Track seen combinations in this import to handle duplicates within the CSV
seen_in_import = set()
for row_num, row in enumerate(reader, start=2):
result['total_rows'] += 1
try:
rolodex_id = clean_string(row.get('Id'))
phone = clean_string(row.get('Phone'))
# Skip rows with missing required fields (phone is part of PK, cannot be NULL)
if not rolodex_id:
result['skipped_no_id'] += 1
continue
if not phone:
result['skipped_no_phone'] += 1
continue
# Create a composite key for tracking
composite_key = (rolodex_id, phone)
# Skip if we've already processed this combination in current import
if composite_key in seen_in_import:
result['skipped'] += 1
continue
seen_in_import.add(composite_key)
# Check if record already exists in database
existing = db.query(LegacyPhone).filter(
LegacyPhone.id == rolodex_id,
LegacyPhone.phone == phone
).first()
if existing:
# Update existing record
existing.location = clean_string(row.get('Location'))
result['updated'] += 1
else:
# Insert new record
record = LegacyPhone(
id=rolodex_id,
phone=phone,
location=clean_string(row.get('Location'))
)
db.add(record)
result['inserted'] += 1
result['success'] += 1
# Commit in batches for performance
if result['success'] % BATCH_SIZE == 0:
db.commit()
except Exception as e:
result['errors'].append(f"Row {row_num}: {str(e)}")
db.rollback()
# Commit any remaining changes
db.commit()
f.close()
logger.info("import_phone_complete", **result)
except Exception as e:
db.rollback()
result['errors'].append(f"Fatal error: {str(e)}")
logger.error("import_phone_failed", error=str(e))
return result
def import_rolex_v(db: Session, file_path: str) -> Dict[str, Any]:
"""Import ROLEX_V.csv → RolexV model."""
result = {'success': 0, 'errors': [], 'total_rows': 0}
try:
f, encoding = open_text_with_fallbacks(file_path)
reader = csv.DictReader(f)
batch = []
for row_num, row in enumerate(reader, start=2):
result['total_rows'] += 1
try:
rolodex_id = clean_string(row.get('Id'))
identifier = clean_string(row.get('Identifier'))
if not rolodex_id or not identifier:
continue
record = RolexV(
id=rolodex_id,
identifier=identifier,
response=clean_string(row.get('Response'))
)
batch.append(record)
if len(batch) >= BATCH_SIZE:
db.bulk_save_objects(batch)
db.commit()
result['success'] += len(batch)
batch = []
except Exception as e:
result['errors'].append(f"Row {row_num}: {str(e)}")
if batch:
db.bulk_save_objects(batch)
db.commit()
result['success'] += len(batch)
f.close()
logger.info("import_rolex_v_complete", **result)
except Exception as e:
db.rollback()
result['errors'].append(f"Fatal error: {str(e)}")
logger.error("import_rolex_v_failed", error=str(e))
return result
def import_files(db: Session, file_path: str) -> Dict[str, Any]:
"""Import FILES.csv → LegacyFile model."""
result = {'success': 0, 'errors': [], 'total_rows': 0}
try:
f, encoding = open_text_with_fallbacks(file_path)
reader = csv.DictReader(f)
batch = []
for row_num, row in enumerate(reader, start=2):
result['total_rows'] += 1
try:
file_no = clean_string(row.get('File_No'))
if not file_no:
continue
record = LegacyFile(
file_no=file_no,
id=clean_string(row.get('Id')),
file_type=clean_string(row.get('File_Type')),
regarding=clean_string(row.get('Regarding')),
opened=parse_date(row.get('Opened')),
closed=parse_date(row.get('Closed')),
empl_num=clean_string(row.get('Empl_Num')),
rate_per_hour=parse_decimal(row.get('Rate_Per_Hour')),
status=clean_string(row.get('Status')),
footer_code=clean_string(row.get('Footer_Code')),
opposing=clean_string(row.get('Opposing')),
hours=parse_decimal(row.get('Hours')),
hours_p=parse_decimal(row.get('Hours_P')),
trust_bal=parse_decimal(row.get('Trust_Bal')),
trust_bal_p=parse_decimal(row.get('Trust_Bal_P')),
hourly_fees=parse_decimal(row.get('Hourly_Fees')),
hourly_fees_p=parse_decimal(row.get('Hourly_Fees_P')),
flat_fees=parse_decimal(row.get('Flat_Fees')),
flat_fees_p=parse_decimal(row.get('Flat_Fees_P')),
disbursements=parse_decimal(row.get('Disbursements')),
disbursements_p=parse_decimal(row.get('Disbursements_P')),
credit_bal=parse_decimal(row.get('Credit_Bal')),
credit_bal_p=parse_decimal(row.get('Credit_Bal_P')),
total_charges=parse_decimal(row.get('Total_Charges')),
total_charges_p=parse_decimal(row.get('Total_Charges_P')),
amount_owing=parse_decimal(row.get('Amount_Owing')),
amount_owing_p=parse_decimal(row.get('Amount_Owing_P')),
transferable=parse_decimal(row.get('Transferable')),
memo=clean_string(row.get('Memo'))
)
batch.append(record)
if len(batch) >= BATCH_SIZE:
db.bulk_save_objects(batch)
db.commit()
result['success'] += len(batch)
batch = []
except Exception as e:
result['errors'].append(f"Row {row_num}: {str(e)}")
if batch:
db.bulk_save_objects(batch)
db.commit()
result['success'] += len(batch)
f.close()
logger.info("import_files_complete", **result)
except Exception as e:
db.rollback()
result['errors'].append(f"Fatal error: {str(e)}")
logger.error("import_files_failed", error=str(e))
return result
def import_files_r(db: Session, file_path: str) -> Dict[str, Any]:
"""Import FILES_R.csv → FilesR model."""
result = {'success': 0, 'errors': [], 'total_rows': 0}
try:
f, encoding = open_text_with_fallbacks(file_path)
reader = csv.DictReader(f)
batch = []
for row_num, row in enumerate(reader, start=2):
result['total_rows'] += 1
try:
file_no = clean_string(row.get('File_No'))
relationship = clean_string(row.get('Relationship'))
rolodex_id = clean_string(row.get('Rolodex_Id'))
if not file_no or not relationship or not rolodex_id:
continue
record = FilesR(
file_no=file_no,
relationship=relationship,
rolodex_id=rolodex_id
)
batch.append(record)
if len(batch) >= BATCH_SIZE:
db.bulk_save_objects(batch)
db.commit()
result['success'] += len(batch)
batch = []
except Exception as e:
result['errors'].append(f"Row {row_num}: {str(e)}")
if batch:
db.bulk_save_objects(batch)
db.commit()
result['success'] += len(batch)
f.close()
logger.info("import_files_r_complete", **result)
except Exception as e:
db.rollback()
result['errors'].append(f"Fatal error: {str(e)}")
logger.error("import_files_r_failed", error=str(e))
return result
def import_files_v(db: Session, file_path: str) -> Dict[str, Any]:
"""Import FILES_V.csv → FilesV model."""
result = {'success': 0, 'errors': [], 'total_rows': 0}
try:
f, encoding = open_text_with_fallbacks(file_path)
reader = csv.DictReader(f)
batch = []
for row_num, row in enumerate(reader, start=2):
result['total_rows'] += 1
try:
file_no = clean_string(row.get('File_No'))
identifier = clean_string(row.get('Identifier'))
if not file_no or not identifier:
continue
record = FilesV(
file_no=file_no,
identifier=identifier,
response=clean_string(row.get('Response'))
)
batch.append(record)
if len(batch) >= BATCH_SIZE:
db.bulk_save_objects(batch)
db.commit()
result['success'] += len(batch)
batch = []
except Exception as e:
result['errors'].append(f"Row {row_num}: {str(e)}")
if batch:
db.bulk_save_objects(batch)
db.commit()
result['success'] += len(batch)
f.close()
logger.info("import_files_v_complete", **result)
except Exception as e:
db.rollback()
result['errors'].append(f"Fatal error: {str(e)}")
logger.error("import_files_v_failed", error=str(e))
return result
def import_filenots(db: Session, file_path: str) -> Dict[str, Any]:
"""Import FILENOTS.csv → FileNots model."""
result = {'success': 0, 'errors': [], 'total_rows': 0}
try:
f, encoding = open_text_with_fallbacks(file_path)
reader = csv.DictReader(f)
batch = []
for row_num, row in enumerate(reader, start=2):
result['total_rows'] += 1
try:
file_no = clean_string(row.get('File_No'))
memo_date = parse_date(row.get('Memo_Date'))
if not file_no or not memo_date:
continue
record = FileNots(
file_no=file_no,
memo_date=memo_date,
memo_note=clean_string(row.get('Memo_Note'))
)
batch.append(record)
if len(batch) >= BATCH_SIZE:
db.bulk_save_objects(batch)
db.commit()
result['success'] += len(batch)
batch = []
except Exception as e:
result['errors'].append(f"Row {row_num}: {str(e)}")
if batch:
db.bulk_save_objects(batch)
db.commit()
result['success'] += len(batch)
f.close()
logger.info("import_filenots_complete", **result)
except Exception as e:
db.rollback()
result['errors'].append(f"Fatal error: {str(e)}")
logger.error("import_filenots_failed", error=str(e))
return result
def import_ledger(db: Session, file_path: str) -> Dict[str, Any]:
"""Import LEDGER.csv → Ledger model."""
result = {'success': 0, 'errors': [], 'total_rows': 0}
try:
f, encoding = open_text_with_fallbacks(file_path)
reader = csv.DictReader(f)
batch = []
for row_num, row in enumerate(reader, start=2):
result['total_rows'] += 1
try:
file_no = clean_string(row.get('File_No'))
item_no_str = clean_string(row.get('Item_No'))
if not file_no or not item_no_str:
continue
try:
item_no = int(item_no_str)
except ValueError:
result['errors'].append(f"Row {row_num}: Invalid Item_No '{item_no_str}'")
continue
record = Ledger(
file_no=file_no,
date=parse_date(row.get('Date')),
item_no=item_no,
empl_num=clean_string(row.get('Empl_Num')),
t_code=clean_string(row.get('T_Code')),
t_type=clean_string(row.get('T_Type')),
t_type_l=clean_string(row.get('T_Type_L')),
quantity=parse_decimal(row.get('Quantity')),
rate=parse_decimal(row.get('Rate')),
amount=parse_decimal(row.get('Amount')),
billed=clean_string(row.get('Billed')),
note=clean_string(row.get('Note'))
)
batch.append(record)
if len(batch) >= BATCH_SIZE:
db.bulk_save_objects(batch)
db.commit()
result['success'] += len(batch)
batch = []
except Exception as e:
result['errors'].append(f"Row {row_num}: {str(e)}")
if batch:
db.bulk_save_objects(batch)
db.commit()
result['success'] += len(batch)
f.close()
logger.info("import_ledger_complete", **result)
except Exception as e:
db.rollback()
result['errors'].append(f"Fatal error: {str(e)}")
logger.error("import_ledger_failed", error=str(e))
return result
def import_deposits(db: Session, file_path: str) -> Dict[str, Any]:
"""Import DEPOSITS.csv → Deposits model."""
result = {'success': 0, 'errors': [], 'total_rows': 0}
try:
f, encoding = open_text_with_fallbacks(file_path)
reader = csv.DictReader(f)
batch = []
for row_num, row in enumerate(reader, start=2):
result['total_rows'] += 1
try:
deposit_date = parse_date(row.get('Deposit_Date'))
if not deposit_date:
continue
record = Deposits(
deposit_date=deposit_date,
total=parse_decimal(row.get('Total'))
)
batch.append(record)
if len(batch) >= BATCH_SIZE:
db.bulk_save_objects(batch)
db.commit()
result['success'] += len(batch)
batch = []
except Exception as e:
result['errors'].append(f"Row {row_num}: {str(e)}")
if batch:
db.bulk_save_objects(batch)
db.commit()
result['success'] += len(batch)
f.close()
logger.info("import_deposits_complete", **result)
except Exception as e:
db.rollback()
result['errors'].append(f"Fatal error: {str(e)}")
logger.error("import_deposits_failed", error=str(e))
return result
def import_payments(db: Session, file_path: str) -> Dict[str, Any]:
"""Import PAYMENTS.csv → LegacyPayment model."""
result = {'success': 0, 'errors': [], 'total_rows': 0}
try:
f, encoding = open_text_with_fallbacks(file_path)
reader = csv.DictReader(f)
batch = []
for row_num, row in enumerate(reader, start=2):
result['total_rows'] += 1
try:
# LegacyPayment has auto-increment id, so we don't need to check for it
record = LegacyPayment(
deposit_date=parse_date(row.get('Deposit_Date')),
file_no=clean_string(row.get('File_No')),
rolodex_id=clean_string(row.get('Id')),
regarding=clean_string(row.get('Regarding')),
amount=parse_decimal(row.get('Amount')),
note=clean_string(row.get('Note'))
)
batch.append(record)
if len(batch) >= BATCH_SIZE:
db.bulk_save_objects(batch)
db.commit()
result['success'] += len(batch)
batch = []
except Exception as e:
result['errors'].append(f"Row {row_num}: {str(e)}")
if batch:
db.bulk_save_objects(batch)
db.commit()
result['success'] += len(batch)
f.close()
logger.info("import_payments_complete", **result)
except Exception as e:
db.rollback()
result['errors'].append(f"Fatal error: {str(e)}")
logger.error("import_payments_failed", error=str(e))
return result
# ============================================================================
# Specialized Imports (QDRO, Pensions, Plan Info)
# ============================================================================
def import_planinfo(db: Session, file_path: str) -> Dict[str, Any]:
"""Import PLANINFO.csv → PlanInfo model."""
result = {'success': 0, 'errors': [], 'total_rows': 0}
try:
f, encoding = open_text_with_fallbacks(file_path)
reader = csv.DictReader(f)
# Fetch once to avoid many round-trips
existing_ids: set[str] = {
pid for (pid,) in db.query(PlanInfo.plan_id).all()
}
batch: list[PlanInfo] = []
updating: list[PlanInfo] = []
for row_num, row in enumerate(reader, start=2):
result['total_rows'] += 1
try:
plan_id = clean_string(row.get('Plan_Id'))
# Skip rows where plan_id is missing or clearly corrupted (contains replacement character)
if not plan_id:
# Record as warning so user can review later
result['errors'].append(
f"Row {row_num}: skipped due to invalid plan_id '{plan_id}'"
)
continue
if plan_id in existing_ids:
# Update existing record in place (UPSERT)
rec: PlanInfo = db.query(PlanInfo).filter_by(plan_id=plan_id).first()
if rec:
rec.plan_name = clean_string(row.get('Plan_Name'))
rec.plan_type = clean_string(row.get('Plan_Type'))
rec.empl_id_no = clean_string(row.get('Empl_Id_No'))
rec.plan_no = clean_string(row.get('Plan_No'))
rec.nra = clean_string(row.get('NRA'))
rec.era = clean_string(row.get('ERA'))
rec.errf = clean_string(row.get('ERRF'))
rec.colas = clean_string(row.get('COLAS'))
rec.divided_by = clean_string(row.get('Divided_By'))
rec.drafted = clean_string(row.get('Drafted'))
rec.benefit_c = clean_string(row.get('Benefit_C'))
rec.qdro_c = clean_string(row.get('QDRO_C'))
rec.rev = clean_string(row.get('^REV'))
rec.pa = clean_string(row.get('^PA'))
rec.form_name = clean_string(row.get('Form_Name'))
rec.drafted_on = parse_date(row.get('Drafted_On'))
rec.memo = clean_string(row.get('Memo'))
updating.append(rec)
continue
record = PlanInfo(
plan_id=plan_id,
plan_name=clean_string(row.get('Plan_Name')),
plan_type=clean_string(row.get('Plan_Type')),
empl_id_no=clean_string(row.get('Empl_Id_No')),
plan_no=clean_string(row.get('Plan_No')),
nra=clean_string(row.get('NRA')),
era=clean_string(row.get('ERA')),
errf=clean_string(row.get('ERRF')),
colas=clean_string(row.get('COLAS')),
divided_by=clean_string(row.get('Divided_By')),
drafted=clean_string(row.get('Drafted')),
benefit_c=clean_string(row.get('Benefit_C')),
qdro_c=clean_string(row.get('QDRO_C')),
rev=clean_string(row.get('^REV')),
pa=clean_string(row.get('^PA')),
form_name=clean_string(row.get('Form_Name')),
drafted_on=parse_date(row.get('Drafted_On')),
memo=clean_string(row.get('Memo'))
)
batch.append(record)
# Track to prevent duplicates within same import
existing_ids.add(plan_id)
if len(batch) >= BATCH_SIZE:
db.bulk_save_objects(batch)
db.commit()
result['success'] += len(batch)
batch = []
except Exception as e:
result['errors'].append(f"Row {row_num}: {str(e)}")
# First flush updates if any
if updating:
db.commit()
if batch:
db.bulk_save_objects(batch)
db.commit()
result['success'] += len(batch)
f.close()
logger.info("import_planinfo_complete", **result)
except Exception as e:
db.rollback()
result['errors'].append(f"Fatal error: {str(e)}")
logger.error("import_planinfo_failed", error=str(e))
return result
def import_qdros(db: Session, file_path: str) -> Dict[str, Any]:
"""Import QDROS.csv → Qdros model."""
result = {'success': 0, 'errors': [], 'total_rows': 0}
try:
f, encoding = open_text_with_fallbacks(file_path)
reader = csv.DictReader(f)
batch = []
for row_num, row in enumerate(reader, start=2):
result['total_rows'] += 1
try:
file_no = clean_string(row.get('File_No'))
version = clean_string(row.get('Version'))
if not file_no or not version:
continue
record = Qdros(
file_no=file_no,
version=version,
plan_id=clean_string(row.get('Plan_Id')),
_1=clean_string(row.get('^1')),
_2=clean_string(row.get('^2')),
part=clean_string(row.get('^Part')),
altp=clean_string(row.get('^AltP')),
pet=clean_string(row.get('^Pet')),
res=clean_string(row.get('^Res')),
case_type=clean_string(row.get('Case_Type')),
case_code=clean_string(row.get('Case_Code')),
section=clean_string(row.get('Section')),
case_number=clean_string(row.get('Case_Number')),
judgment_date=parse_date(row.get('Judgment_Date')),
valuation_date=parse_date(row.get('Valuation_Date')),
married_on=parse_date(row.get('Married_On')),
percent_awarded=parse_decimal(row.get('Percent_Awarded')),
ven_city=clean_string(row.get('Ven_City')),
ven_cnty=clean_string(row.get('Ven_Cnty')),
ven_st=clean_string(row.get('Ven_St')),
draft_out=parse_date(row.get('Draft_Out')),
draft_apr=parse_date(row.get('Draft_Apr')),
final_out=parse_date(row.get('Final_Out')),
judge=clean_string(row.get('Judge')),
form_name=clean_string(row.get('Form_Name'))
)
batch.append(record)
if len(batch) >= BATCH_SIZE:
db.bulk_save_objects(batch)
db.commit()
result['success'] += len(batch)
batch = []
except Exception as e:
result['errors'].append(f"Row {row_num}: {str(e)}")
if batch:
db.bulk_save_objects(batch)
db.commit()
result['success'] += len(batch)
f.close()
logger.info("import_qdros_complete", **result)
except Exception as e:
db.rollback()
result['errors'].append(f"Fatal error: {str(e)}")
logger.error("import_qdros_failed", error=str(e))
return result
def import_pensions(db: Session, file_path: str) -> Dict[str, Any]:
"""Import PENSIONS.csv → Pensions model."""
result = {'success': 0, 'errors': [], 'total_rows': 0, 'skipped': 0}
try:
f, encoding = open_text_with_fallbacks(file_path)
reader = csv.DictReader(f)
# Track (file_no, version) combinations we've seen in this import to handle duplicates
seen_in_import = set()
batch = []
for row_num, row in enumerate(reader, start=2):
result['total_rows'] += 1
try:
file_no = clean_string(row.get('File_No'))
version = clean_string(row.get('Version'))
if not file_no or not version:
continue
# Create composite key for duplicate tracking
composite_key = (file_no, version)
# Skip if we've already processed this combination in current import
if composite_key in seen_in_import:
result['skipped'] += 1
continue
# Skip if it already exists in database
if db.query(Pensions).filter(
Pensions.file_no == file_no,
Pensions.version == version
).first():
result['skipped'] += 1
seen_in_import.add(composite_key)
continue
record = Pensions(
file_no=file_no,
version=version,
plan_id=clean_string(row.get('Plan_Id')),
plan_name=clean_string(row.get('Plan_Name')),
title=clean_string(row.get('Title')),
first=clean_string(row.get('First')),
last=clean_string(row.get('Last')),
birth=parse_date(row.get('Birth')),
race=clean_string(row.get('Race')),
sex=clean_string(row.get('Sex')),
info=parse_date(row.get('Info')),
valu=parse_date(row.get('Valu')),
accrued=parse_decimal(row.get('Accrued')),
vested_per=parse_decimal(row.get('Vested_Per')),
start_age=parse_decimal(row.get('Start_Age')),
cola=parse_decimal(row.get('COLA')),
max_cola=parse_decimal(row.get('Max_COLA')),
withdrawal=parse_decimal(row.get('Withdrawal')),
pre_dr=parse_decimal(row.get('Pre_DR')),
post_dr=parse_decimal(row.get('Post_DR')),
tax_rate=parse_decimal(row.get('Tax_Rate'))
)
batch.append(record)
seen_in_import.add(composite_key)
if len(batch) >= BATCH_SIZE:
try:
db.bulk_save_objects(batch)
db.commit()
result['success'] += len(batch)
batch = []
except IntegrityError as ie:
db.rollback()
# Handle any remaining duplicates by inserting one at a time
for record in batch:
try:
db.add(record)
db.commit()
result['success'] += 1
except IntegrityError:
db.rollback()
result['skipped'] += 1
batch = []
except Exception as e:
result['errors'].append(f"Row {row_num}: {str(e)}")
# Save remaining batch
if batch:
try:
db.bulk_save_objects(batch)
db.commit()
result['success'] += len(batch)
except IntegrityError:
db.rollback()
# Handle any remaining duplicates by inserting one at a time
for record in batch:
try:
db.add(record)
db.commit()
result['success'] += 1
except IntegrityError:
db.rollback()
result['skipped'] += 1
f.close()
logger.info("import_pensions_complete", **result)
except Exception as e:
db.rollback()
result['errors'].append(f"Fatal error: {str(e)}")
logger.error("import_pensions_failed", error=str(e))
return result
def import_pension_marriage(db: Session, file_path: str) -> Dict[str, Any]:
"""Import Pensions/MARRIAGE.csv → PensionMarriage model."""
result = {'success': 0, 'errors': [], 'total_rows': 0}
try:
f, encoding = open_text_with_fallbacks(file_path)
reader = csv.DictReader(f)
batch = []
for row_num, row in enumerate(reader, start=2):
result['total_rows'] += 1
try:
file_no = clean_string(row.get('File_No'))
version = clean_string(row.get('Version'))
if not file_no or not version:
continue
record = PensionMarriage(
file_no=file_no,
version=version,
married_from=parse_date(row.get('Married_From')),
married_to=parse_date(row.get('Married_To')),
married_years=parse_decimal(row.get('Married_Years')),
service_from=parse_date(row.get('Service_From')),
service_to=parse_date(row.get('Service_To')),
service_years=parse_decimal(row.get('Service_Years')),
marital_pct=parse_decimal(row.get('Marital_%'))
)
batch.append(record)
if len(batch) >= BATCH_SIZE:
db.bulk_save_objects(batch)
db.commit()
result['success'] += len(batch)
batch = []
except Exception as e:
result['errors'].append(f"Row {row_num}: {str(e)}")
if batch:
db.bulk_save_objects(batch)
db.commit()
result['success'] += len(batch)
f.close()
logger.info("import_pension_marriage_complete", **result)
except Exception as e:
db.rollback()
result['errors'].append(f"Fatal error: {str(e)}")
logger.error("import_pension_marriage_failed", error=str(e))
return result
def import_pension_death(db: Session, file_path: str) -> Dict[str, Any]:
"""Import Pensions/DEATH.csv → PensionDeath model."""
result = {'success': 0, 'errors': [], 'total_rows': 0, 'skipped': 0}
try:
f, encoding = open_text_with_fallbacks(file_path)
reader = csv.DictReader(f)
# Track (file_no, version) combinations we've seen in this import to handle duplicates
seen_in_import = set()
batch = []
for row_num, row in enumerate(reader, start=2):
result['total_rows'] += 1
try:
file_no = clean_string(row.get('File_No'))
version = clean_string(row.get('Version'))
if not file_no or not version:
continue
# Create composite key for duplicate tracking
composite_key = (file_no, version)
# Skip if we've already processed this combination in current import
if composite_key in seen_in_import:
result['skipped'] += 1
continue
# Skip if it already exists in database
if db.query(PensionDeath).filter(
PensionDeath.file_no == file_no,
PensionDeath.version == version
).first():
result['skipped'] += 1
seen_in_import.add(composite_key)
continue
record = PensionDeath(
file_no=file_no,
version=version,
lump1=parse_decimal(row.get('Lump1')),
lump2=parse_decimal(row.get('Lump2')),
growth1=parse_decimal(row.get('Growth1')),
growth2=parse_decimal(row.get('Growth2')),
disc1=parse_decimal(row.get('Disc1')),
disc2=parse_decimal(row.get('Disc2'))
)
batch.append(record)
seen_in_import.add(composite_key)
if len(batch) >= BATCH_SIZE:
try:
db.bulk_save_objects(batch)
db.commit()
result['success'] += len(batch)
batch = []
except IntegrityError as ie:
db.rollback()
# Handle any remaining duplicates by inserting one at a time
for record in batch:
try:
db.add(record)
db.commit()
result['success'] += 1
except IntegrityError:
db.rollback()
result['skipped'] += 1
batch = []
except Exception as e:
result['errors'].append(f"Row {row_num}: {str(e)}")
# Save remaining batch
if batch:
try:
db.bulk_save_objects(batch)
db.commit()
result['success'] += len(batch)
except IntegrityError:
db.rollback()
# Handle any remaining duplicates by inserting one at a time
for record in batch:
try:
db.add(record)
db.commit()
result['success'] += 1
except IntegrityError:
db.rollback()
result['skipped'] += 1
f.close()
logger.info("import_pension_death_complete", **result)
except Exception as e:
db.rollback()
result['errors'].append(f"Fatal error: {str(e)}")
logger.error("import_pension_death_failed", error=str(e))
return result
def import_pension_schedule(db: Session, file_path: str) -> Dict[str, Any]:
"""Import Pensions/SCHEDULE.csv → PensionSchedule model."""
result = {'success': 0, 'errors': [], 'total_rows': 0}
try:
f, encoding = open_text_with_fallbacks(file_path)
reader = csv.DictReader(f)
batch = []
for row_num, row in enumerate(reader, start=2):
result['total_rows'] += 1
try:
file_no = clean_string(row.get('File_No'))
version = clean_string(row.get('Version'))
if not file_no or not version:
continue
record = PensionSchedule(
file_no=file_no,
version=version,
vests_on=parse_date(row.get('Vests_On')),
vests_at=parse_decimal(row.get('Vests_At'))
)
batch.append(record)
if len(batch) >= BATCH_SIZE:
db.bulk_save_objects(batch)
db.commit()
result['success'] += len(batch)
batch = []
except Exception as e:
result['errors'].append(f"Row {row_num}: {str(e)}")
if batch:
db.bulk_save_objects(batch)
db.commit()
result['success'] += len(batch)
f.close()
logger.info("import_pension_schedule_complete", **result)
except Exception as e:
db.rollback()
result['errors'].append(f"Fatal error: {str(e)}")
logger.error("import_pension_schedule_failed", error=str(e))
return result
def import_pension_separate(db: Session, file_path: str) -> Dict[str, Any]:
"""Import Pensions/SEPARATE.csv → PensionSeparate model."""
result = {'success': 0, 'errors': [], 'total_rows': 0, 'skipped': 0}
try:
f, encoding = open_text_with_fallbacks(file_path)
reader = csv.DictReader(f)
# Track (file_no, version) combinations we've seen in this import to handle duplicates
seen_in_import = set()
batch = []
for row_num, row in enumerate(reader, start=2):
result['total_rows'] += 1
try:
file_no = clean_string(row.get('File_No'))
version = clean_string(row.get('Version'))
if not file_no or not version:
continue
# Create composite key for duplicate tracking
composite_key = (file_no, version)
# Skip if we've already processed this combination in current import
if composite_key in seen_in_import:
result['skipped'] += 1
continue
# Skip if it already exists in database
if db.query(PensionSeparate).filter(
PensionSeparate.file_no == file_no,
PensionSeparate.version == version
).first():
result['skipped'] += 1
seen_in_import.add(composite_key)
continue
record = PensionSeparate(
file_no=file_no,
version=version,
separation_rate=parse_decimal(row.get('Separation_Rate'))
)
batch.append(record)
seen_in_import.add(composite_key)
if len(batch) >= BATCH_SIZE:
try:
db.bulk_save_objects(batch)
db.commit()
result['success'] += len(batch)
batch = []
except IntegrityError as ie:
db.rollback()
# Handle any remaining duplicates by inserting one at a time
for record in batch:
try:
db.add(record)
db.commit()
result['success'] += 1
except IntegrityError:
db.rollback()
result['skipped'] += 1
batch = []
except Exception as e:
result['errors'].append(f"Row {row_num}: {str(e)}")
# Save remaining batch
if batch:
try:
db.bulk_save_objects(batch)
db.commit()
result['success'] += len(batch)
except IntegrityError:
db.rollback()
# Handle any remaining duplicates by inserting one at a time
for record in batch:
try:
db.add(record)
db.commit()
result['success'] += 1
except IntegrityError:
db.rollback()
result['skipped'] += 1
f.close()
logger.info("import_pension_separate_complete", **result)
except Exception as e:
db.rollback()
result['errors'].append(f"Fatal error: {str(e)}")
logger.error("import_pension_separate_failed", error=str(e))
return result
def import_pension_results(db: Session, file_path: str) -> Dict[str, Any]:
"""Import Pensions/RESULTS.csv → PensionResults model."""
result = {'success': 0, 'errors': [], 'total_rows': 0, 'skipped': 0}
try:
f, encoding = open_text_with_fallbacks(file_path)
reader = csv.DictReader(f)
# Track (file_no, version) combinations we've seen in this import to handle duplicates
seen_in_import = set()
batch = []
for row_num, row in enumerate(reader, start=2):
result['total_rows'] += 1
try:
# Note: RESULTS.csv might not have explicit file_no/version columns
# May need to derive from associated pension record
# For now, skip if missing
file_no = clean_string(row.get('File_No'))
version = clean_string(row.get('Version'))
if not file_no or not version:
continue
# Create composite key for duplicate tracking
composite_key = (file_no, version)
# Skip if we've already processed this combination in current import
if composite_key in seen_in_import:
result['skipped'] += 1
continue
# Skip if it already exists in database
if db.query(PensionResults).filter(
PensionResults.file_no == file_no,
PensionResults.version == version
).first():
result['skipped'] += 1
seen_in_import.add(composite_key)
continue
record = PensionResults(
file_no=file_no,
version=version,
accrued=parse_decimal(row.get('Accrued')),
start_age=parse_decimal(row.get('Start_Age')),
cola=parse_decimal(row.get('COLA')),
withdrawal=parse_decimal(row.get('Withdrawal')),
pre_dr=parse_decimal(row.get('Pre_DR')),
post_dr=parse_decimal(row.get('Post_DR')),
tax_rate=parse_decimal(row.get('Tax_Rate')),
age=parse_decimal(row.get('Age')),
years_from=parse_decimal(row.get('Years_From')),
life_exp=parse_decimal(row.get('Life_Exp')),
ev_monthly=parse_decimal(row.get('EV_Monthly')),
payments=parse_decimal(row.get('Payments')),
pay_out=parse_decimal(row.get('Pay_Out')),
fund_value=parse_decimal(row.get('Fund_Value')),
pv=parse_decimal(row.get('PV')),
mortality=parse_decimal(row.get('Mortality')),
pv_am=parse_decimal(row.get('PV_AM')),
pv_amt=parse_decimal(row.get('PV_AMT')),
pv_pre_db=parse_decimal(row.get('PV_Pre_DB')),
pv_annuity=parse_decimal(row.get('PV_Annuity')),
wv_at=parse_decimal(row.get('WV_AT')),
pv_plan=parse_decimal(row.get('PV_Plan')),
years_married=parse_decimal(row.get('Years_Married')),
years_service=parse_decimal(row.get('Years_Service')),
marr_per=parse_decimal(row.get('Marr_Per')),
marr_amt=parse_decimal(row.get('Marr_Amt'))
)
batch.append(record)
seen_in_import.add(composite_key)
if len(batch) >= BATCH_SIZE:
try:
db.bulk_save_objects(batch)
db.commit()
result['success'] += len(batch)
batch = []
except IntegrityError as ie:
db.rollback()
# Handle any remaining duplicates by inserting one at a time
for record in batch:
try:
db.add(record)
db.commit()
result['success'] += 1
except IntegrityError:
db.rollback()
result['skipped'] += 1
batch = []
except Exception as e:
result['errors'].append(f"Row {row_num}: {str(e)}")
# Save remaining batch
if batch:
try:
db.bulk_save_objects(batch)
db.commit()
result['success'] += len(batch)
except IntegrityError:
db.rollback()
# Handle any remaining duplicates by inserting one at a time
for record in batch:
try:
db.add(record)
db.commit()
result['success'] += 1
except IntegrityError:
db.rollback()
result['skipped'] += 1
f.close()
logger.info("import_pension_results_complete", **result)
except Exception as e:
db.rollback()
result['errors'].append(f"Fatal error: {str(e)}")
logger.error("import_pension_results_failed", error=str(e))
return result