Files
delphi-database-v2/app/import_legacy.py
HotSwapp 4030dbd88e Implement comprehensive CSV import system for legacy database migration
- Added 5 new legacy models to app/models.py (FileType, FileNots, RolexV, FVarLkup, RVarLkup)
- Created app/import_legacy.py with import functions for all legacy tables:
  * Reference tables: TRNSTYPE, TRNSLKUP, FOOTERS, FILESTAT, EMPLOYEE, GRUPLKUP, FILETYPE, FVARLKUP, RVARLKUP
  * Core tables: ROLODEX, PHONE, ROLEX_V, FILES, FILES_R, FILES_V, FILENOTS, LEDGER, DEPOSITS, PAYMENTS
  * Specialized: PLANINFO, QDROS, PENSIONS and all pension-related tables
- Created app/sync_legacy_to_modern.py with sync functions to populate modern models from legacy data
- Updated admin routes in app/main.py:
  * Extended process_csv_import to support all new import types
  * Added /admin/sync endpoint for syncing legacy to modern models
  * Updated get_import_type_from_filename to recognize all CSV file patterns
- Enhanced app/templates/admin.html with:
  * Import Order Guide showing recommended import sequence
  * Sync to Modern Models section with confirmation dialog
  * Sync results display with detailed per-table statistics
  * Updated supported file formats list
- All import functions use batch processing (500 rows), proper error handling, and structured logging
- Sync functions maintain foreign key integrity and skip orphaned records with warnings
2025-10-08 09:41:38 -05:00

1616 lines
56 KiB
Python

"""
Legacy CSV import functions for Delphi Database.
This module provides import functions for all legacy database tables,
reading from old-csv files and populating the legacy SQLAlchemy models.
"""
import csv
import os
from datetime import datetime
from decimal import Decimal, InvalidOperation
from typing import Dict, Any, Optional
from sqlalchemy.orm import Session
from sqlalchemy.exc import IntegrityError
import structlog
from .models import (
Rolodex, LegacyPhone, LegacyFile, FilesR, FilesV, FileNots,
Ledger, Deposits, LegacyPayment, TrnsType, TrnsLkup,
Footers, FileStat, Employee, GroupLkup, FileType,
Qdros, PlanInfo, Pensions, PensionMarriage, PensionDeath,
PensionSchedule, PensionSeparate, PensionResults,
RolexV, FVarLkup, RVarLkup
)
logger = structlog.get_logger(__name__)
# Batch size for commits
BATCH_SIZE = 500
def open_text_with_fallbacks(file_path: str):
"""
Open a text file trying multiple encodings commonly seen in legacy CSVs.
Returns a tuple of (file_object, encoding_used).
"""
encodings = ["utf-8", "utf-8-sig", "cp1252", "windows-1252", "cp1250", "iso-8859-1", "latin-1"]
last_error = None
for enc in encodings:
try:
f = open(file_path, 'r', encoding=enc, errors='strict', newline='')
_ = f.read(1024)
f.seek(0)
logger.info("csv_open_encoding_selected", file=file_path, encoding=enc)
return f, enc
except Exception as e:
last_error = e
logger.warning("encoding_fallback_failed", file=file_path, encoding=enc, error=str(e))
continue
error_msg = f"Unable to open file '{file_path}' with any supported encodings"
if last_error:
error_msg += f". Last error: {str(last_error)}"
raise RuntimeError(error_msg)
def parse_date(date_str: str) -> Optional[datetime]:
"""Parse date string in various formats, return None if blank/invalid."""
if not date_str or not date_str.strip():
return None
date_str = date_str.strip()
# Try common date formats
formats = [
"%m/%d/%Y",
"%m/%d/%y",
"%Y-%m-%d",
"%m-%d-%Y",
"%m-%d-%y",
]
for fmt in formats:
try:
return datetime.strptime(date_str, fmt)
except ValueError:
continue
logger.warning("date_parse_failed", date_string=date_str)
return None
def parse_decimal(value: str) -> Optional[Decimal]:
"""Parse decimal string, return None if blank/invalid."""
if not value or not value.strip():
return None
try:
return Decimal(value.strip())
except (InvalidOperation, ValueError):
logger.warning("decimal_parse_failed", value=value)
return None
def clean_string(value: str) -> Optional[str]:
"""Clean string value, return None if blank."""
if not value or not value.strip():
return None
return value.strip()
# ============================================================================
# Reference Table Imports (should be imported first)
# ============================================================================
def import_trnstype(db: Session, file_path: str) -> Dict[str, Any]:
"""Import TRNSTYPE.csv → TrnsType model."""
result = {'success': 0, 'errors': [], 'total_rows': 0}
try:
f, encoding = open_text_with_fallbacks(file_path)
reader = csv.DictReader(f)
batch = []
for row_num, row in enumerate(reader, start=2):
result['total_rows'] += 1
try:
t_type = clean_string(row.get('T_Type'))
if not t_type:
continue
record = TrnsType(
t_type=t_type,
t_type_l=clean_string(row.get('T_Type_L')),
header=clean_string(row.get('Header')),
footer=clean_string(row.get('Footer'))
)
batch.append(record)
if len(batch) >= BATCH_SIZE:
db.bulk_save_objects(batch)
db.commit()
result['success'] += len(batch)
batch = []
except Exception as e:
result['errors'].append(f"Row {row_num}: {str(e)}")
# Save remaining batch
if batch:
db.bulk_save_objects(batch)
db.commit()
result['success'] += len(batch)
f.close()
logger.info("import_trnstype_complete", **result)
except Exception as e:
db.rollback()
result['errors'].append(f"Fatal error: {str(e)}")
logger.error("import_trnstype_failed", error=str(e))
return result
def import_trnslkup(db: Session, file_path: str) -> Dict[str, Any]:
"""Import TRNSLKUP.csv → TrnsLkup model."""
result = {'success': 0, 'errors': [], 'total_rows': 0}
try:
f, encoding = open_text_with_fallbacks(file_path)
reader = csv.DictReader(f)
batch = []
for row_num, row in enumerate(reader, start=2):
result['total_rows'] += 1
try:
t_code = clean_string(row.get('T_Code'))
if not t_code:
continue
record = TrnsLkup(
t_code=t_code,
t_type=clean_string(row.get('T_Type')),
t_type_l=clean_string(row.get('T_Type_L')),
amount=parse_decimal(row.get('Amount')),
description=clean_string(row.get('Description'))
)
batch.append(record)
if len(batch) >= BATCH_SIZE:
db.bulk_save_objects(batch)
db.commit()
result['success'] += len(batch)
batch = []
except Exception as e:
result['errors'].append(f"Row {row_num}: {str(e)}")
if batch:
db.bulk_save_objects(batch)
db.commit()
result['success'] += len(batch)
f.close()
logger.info("import_trnslkup_complete", **result)
except Exception as e:
db.rollback()
result['errors'].append(f"Fatal error: {str(e)}")
logger.error("import_trnslkup_failed", error=str(e))
return result
def import_footers(db: Session, file_path: str) -> Dict[str, Any]:
"""Import FOOTERS.csv → Footers model."""
result = {'success': 0, 'errors': [], 'total_rows': 0}
try:
f, encoding = open_text_with_fallbacks(file_path)
reader = csv.DictReader(f)
batch = []
for row_num, row in enumerate(reader, start=2):
result['total_rows'] += 1
try:
f_code = clean_string(row.get('F_Code'))
if not f_code:
continue
record = Footers(
f_code=f_code,
f_footer=clean_string(row.get('F_Footer'))
)
batch.append(record)
if len(batch) >= BATCH_SIZE:
db.bulk_save_objects(batch)
db.commit()
result['success'] += len(batch)
batch = []
except Exception as e:
result['errors'].append(f"Row {row_num}: {str(e)}")
if batch:
db.bulk_save_objects(batch)
db.commit()
result['success'] += len(batch)
f.close()
logger.info("import_footers_complete", **result)
except Exception as e:
db.rollback()
result['errors'].append(f"Fatal error: {str(e)}")
logger.error("import_footers_failed", error=str(e))
return result
def import_filestat(db: Session, file_path: str) -> Dict[str, Any]:
"""Import FILESTAT.csv → FileStat model."""
result = {'success': 0, 'errors': [], 'total_rows': 0}
try:
f, encoding = open_text_with_fallbacks(file_path)
reader = csv.DictReader(f)
batch = []
for row_num, row in enumerate(reader, start=2):
result['total_rows'] += 1
try:
status = clean_string(row.get('Status'))
if not status:
continue
record = FileStat(
status=status,
definition=clean_string(row.get('Definition')),
send=clean_string(row.get('Send')),
footer_code=clean_string(row.get('Footer_Code'))
)
batch.append(record)
if len(batch) >= BATCH_SIZE:
db.bulk_save_objects(batch)
db.commit()
result['success'] += len(batch)
batch = []
except Exception as e:
result['errors'].append(f"Row {row_num}: {str(e)}")
if batch:
db.bulk_save_objects(batch)
db.commit()
result['success'] += len(batch)
f.close()
logger.info("import_filestat_complete", **result)
except Exception as e:
db.rollback()
result['errors'].append(f"Fatal error: {str(e)}")
logger.error("import_filestat_failed", error=str(e))
return result
def import_employee(db: Session, file_path: str) -> Dict[str, Any]:
"""Import EMPLOYEE.csv → Employee model."""
result = {'success': 0, 'errors': [], 'total_rows': 0}
try:
f, encoding = open_text_with_fallbacks(file_path)
reader = csv.DictReader(f)
batch = []
for row_num, row in enumerate(reader, start=2):
result['total_rows'] += 1
try:
empl_num = clean_string(row.get('Empl_Num'))
if not empl_num:
continue
record = Employee(
empl_num=empl_num,
empl_id=clean_string(row.get('Empl_Id')),
rate_per_hour=parse_decimal(row.get('Rate_Per_Hour'))
)
batch.append(record)
if len(batch) >= BATCH_SIZE:
db.bulk_save_objects(batch)
db.commit()
result['success'] += len(batch)
batch = []
except Exception as e:
result['errors'].append(f"Row {row_num}: {str(e)}")
if batch:
db.bulk_save_objects(batch)
db.commit()
result['success'] += len(batch)
f.close()
logger.info("import_employee_complete", **result)
except Exception as e:
db.rollback()
result['errors'].append(f"Fatal error: {str(e)}")
logger.error("import_employee_failed", error=str(e))
return result
def import_gruplkup(db: Session, file_path: str) -> Dict[str, Any]:
"""Import GRUPLKUP.csv → GroupLkup model."""
result = {'success': 0, 'errors': [], 'total_rows': 0}
try:
f, encoding = open_text_with_fallbacks(file_path)
reader = csv.DictReader(f)
batch = []
for row_num, row in enumerate(reader, start=2):
result['total_rows'] += 1
try:
code = clean_string(row.get('Code'))
if not code:
continue
record = GroupLkup(
code=code,
description=clean_string(row.get('Description')),
title=clean_string(row.get('Title'))
)
batch.append(record)
if len(batch) >= BATCH_SIZE:
db.bulk_save_objects(batch)
db.commit()
result['success'] += len(batch)
batch = []
except Exception as e:
result['errors'].append(f"Row {row_num}: {str(e)}")
if batch:
db.bulk_save_objects(batch)
db.commit()
result['success'] += len(batch)
f.close()
logger.info("import_gruplkup_complete", **result)
except Exception as e:
db.rollback()
result['errors'].append(f"Fatal error: {str(e)}")
logger.error("import_gruplkup_failed", error=str(e))
return result
def import_filetype(db: Session, file_path: str) -> Dict[str, Any]:
"""Import FILETYPE.csv → FileType model."""
result = {'success': 0, 'errors': [], 'total_rows': 0}
try:
f, encoding = open_text_with_fallbacks(file_path)
reader = csv.DictReader(f)
batch = []
for row_num, row in enumerate(reader, start=2):
result['total_rows'] += 1
try:
file_type = clean_string(row.get('File_Type'))
if not file_type:
continue
record = FileType(file_type=file_type)
batch.append(record)
if len(batch) >= BATCH_SIZE:
db.bulk_save_objects(batch)
db.commit()
result['success'] += len(batch)
batch = []
except Exception as e:
result['errors'].append(f"Row {row_num}: {str(e)}")
if batch:
db.bulk_save_objects(batch)
db.commit()
result['success'] += len(batch)
f.close()
logger.info("import_filetype_complete", **result)
except Exception as e:
db.rollback()
result['errors'].append(f"Fatal error: {str(e)}")
logger.error("import_filetype_failed", error=str(e))
return result
def import_fvarlkup(db: Session, file_path: str) -> Dict[str, Any]:
"""Import FVARLKUP.csv → FVarLkup model."""
result = {'success': 0, 'errors': [], 'total_rows': 0}
try:
f, encoding = open_text_with_fallbacks(file_path)
reader = csv.DictReader(f)
batch = []
for row_num, row in enumerate(reader, start=2):
result['total_rows'] += 1
try:
identifier = clean_string(row.get('Identifier'))
if not identifier:
continue
record = FVarLkup(
identifier=identifier,
query=clean_string(row.get('Query')),
response=clean_string(row.get('Response'))
)
batch.append(record)
if len(batch) >= BATCH_SIZE:
db.bulk_save_objects(batch)
db.commit()
result['success'] += len(batch)
batch = []
except Exception as e:
result['errors'].append(f"Row {row_num}: {str(e)}")
if batch:
db.bulk_save_objects(batch)
db.commit()
result['success'] += len(batch)
f.close()
logger.info("import_fvarlkup_complete", **result)
except Exception as e:
db.rollback()
result['errors'].append(f"Fatal error: {str(e)}")
logger.error("import_fvarlkup_failed", error=str(e))
return result
def import_rvarlkup(db: Session, file_path: str) -> Dict[str, Any]:
"""Import RVARLKUP.csv → RVarLkup model."""
result = {'success': 0, 'errors': [], 'total_rows': 0}
try:
f, encoding = open_text_with_fallbacks(file_path)
reader = csv.DictReader(f)
batch = []
for row_num, row in enumerate(reader, start=2):
result['total_rows'] += 1
try:
identifier = clean_string(row.get('Identifier'))
if not identifier:
continue
record = RVarLkup(
identifier=identifier,
query=clean_string(row.get('Query'))
)
batch.append(record)
if len(batch) >= BATCH_SIZE:
db.bulk_save_objects(batch)
db.commit()
result['success'] += len(batch)
batch = []
except Exception as e:
result['errors'].append(f"Row {row_num}: {str(e)}")
if batch:
db.bulk_save_objects(batch)
db.commit()
result['success'] += len(batch)
f.close()
logger.info("import_rvarlkup_complete", **result)
except Exception as e:
db.rollback()
result['errors'].append(f"Fatal error: {str(e)}")
logger.error("import_rvarlkup_failed", error=str(e))
return result
# ============================================================================
# Core Data Table Imports
# ============================================================================
def import_rolodex(db: Session, file_path: str) -> Dict[str, Any]:
"""Import ROLODEX.csv → Rolodex model."""
result = {'success': 0, 'errors': [], 'total_rows': 0}
try:
f, encoding = open_text_with_fallbacks(file_path)
reader = csv.DictReader(f)
batch = []
for row_num, row in enumerate(reader, start=2):
result['total_rows'] += 1
try:
rolodex_id = clean_string(row.get('Id'))
if not rolodex_id:
continue
record = Rolodex(
id=rolodex_id,
prefix=clean_string(row.get('Prefix')),
first=clean_string(row.get('First')),
middle=clean_string(row.get('Middle')),
last=clean_string(row.get('Last')),
suffix=clean_string(row.get('Suffix')),
title=clean_string(row.get('Title')),
a1=clean_string(row.get('A1')),
a2=clean_string(row.get('A2')),
a3=clean_string(row.get('A3')),
city=clean_string(row.get('City')),
abrev=clean_string(row.get('Abrev')),
st=clean_string(row.get('St')),
zip=clean_string(row.get('Zip')),
email=clean_string(row.get('Email')),
dob=parse_date(row.get('DOB')),
ss=clean_string(row.get('SS#')),
legal_status=clean_string(row.get('Legal_Status')),
group=clean_string(row.get('Group')),
memo=clean_string(row.get('Memo'))
)
batch.append(record)
if len(batch) >= BATCH_SIZE:
db.bulk_save_objects(batch)
db.commit()
result['success'] += len(batch)
batch = []
except Exception as e:
result['errors'].append(f"Row {row_num}: {str(e)}")
if batch:
db.bulk_save_objects(batch)
db.commit()
result['success'] += len(batch)
f.close()
logger.info("import_rolodex_complete", **result)
except Exception as e:
db.rollback()
result['errors'].append(f"Fatal error: {str(e)}")
logger.error("import_rolodex_failed", error=str(e))
return result
def import_phone(db: Session, file_path: str) -> Dict[str, Any]:
"""Import PHONE.csv → LegacyPhone model."""
result = {'success': 0, 'errors': [], 'total_rows': 0}
try:
f, encoding = open_text_with_fallbacks(file_path)
reader = csv.DictReader(f)
batch = []
for row_num, row in enumerate(reader, start=2):
result['total_rows'] += 1
try:
rolodex_id = clean_string(row.get('Id'))
phone = clean_string(row.get('Phone'))
if not rolodex_id or not phone:
continue
record = LegacyPhone(
id=rolodex_id,
phone=phone,
location=clean_string(row.get('Location'))
)
batch.append(record)
if len(batch) >= BATCH_SIZE:
db.bulk_save_objects(batch)
db.commit()
result['success'] += len(batch)
batch = []
except Exception as e:
result['errors'].append(f"Row {row_num}: {str(e)}")
if batch:
db.bulk_save_objects(batch)
db.commit()
result['success'] += len(batch)
f.close()
logger.info("import_phone_complete", **result)
except Exception as e:
db.rollback()
result['errors'].append(f"Fatal error: {str(e)}")
logger.error("import_phone_failed", error=str(e))
return result
def import_rolex_v(db: Session, file_path: str) -> Dict[str, Any]:
"""Import ROLEX_V.csv → RolexV model."""
result = {'success': 0, 'errors': [], 'total_rows': 0}
try:
f, encoding = open_text_with_fallbacks(file_path)
reader = csv.DictReader(f)
batch = []
for row_num, row in enumerate(reader, start=2):
result['total_rows'] += 1
try:
rolodex_id = clean_string(row.get('Id'))
identifier = clean_string(row.get('Identifier'))
if not rolodex_id or not identifier:
continue
record = RolexV(
id=rolodex_id,
identifier=identifier,
response=clean_string(row.get('Response'))
)
batch.append(record)
if len(batch) >= BATCH_SIZE:
db.bulk_save_objects(batch)
db.commit()
result['success'] += len(batch)
batch = []
except Exception as e:
result['errors'].append(f"Row {row_num}: {str(e)}")
if batch:
db.bulk_save_objects(batch)
db.commit()
result['success'] += len(batch)
f.close()
logger.info("import_rolex_v_complete", **result)
except Exception as e:
db.rollback()
result['errors'].append(f"Fatal error: {str(e)}")
logger.error("import_rolex_v_failed", error=str(e))
return result
def import_files(db: Session, file_path: str) -> Dict[str, Any]:
"""Import FILES.csv → LegacyFile model."""
result = {'success': 0, 'errors': [], 'total_rows': 0}
try:
f, encoding = open_text_with_fallbacks(file_path)
reader = csv.DictReader(f)
batch = []
for row_num, row in enumerate(reader, start=2):
result['total_rows'] += 1
try:
file_no = clean_string(row.get('File_No'))
if not file_no:
continue
record = LegacyFile(
file_no=file_no,
id=clean_string(row.get('Id')),
file_type=clean_string(row.get('File_Type')),
regarding=clean_string(row.get('Regarding')),
opened=parse_date(row.get('Opened')),
closed=parse_date(row.get('Closed')),
empl_num=clean_string(row.get('Empl_Num')),
rate_per_hour=parse_decimal(row.get('Rate_Per_Hour')),
status=clean_string(row.get('Status')),
footer_code=clean_string(row.get('Footer_Code')),
opposing=clean_string(row.get('Opposing')),
hours=parse_decimal(row.get('Hours')),
hours_p=parse_decimal(row.get('Hours_P')),
trust_bal=parse_decimal(row.get('Trust_Bal')),
trust_bal_p=parse_decimal(row.get('Trust_Bal_P')),
hourly_fees=parse_decimal(row.get('Hourly_Fees')),
hourly_fees_p=parse_decimal(row.get('Hourly_Fees_P')),
flat_fees=parse_decimal(row.get('Flat_Fees')),
flat_fees_p=parse_decimal(row.get('Flat_Fees_P')),
disbursements=parse_decimal(row.get('Disbursements')),
disbursements_p=parse_decimal(row.get('Disbursements_P')),
credit_bal=parse_decimal(row.get('Credit_Bal')),
credit_bal_p=parse_decimal(row.get('Credit_Bal_P')),
total_charges=parse_decimal(row.get('Total_Charges')),
total_charges_p=parse_decimal(row.get('Total_Charges_P')),
amount_owing=parse_decimal(row.get('Amount_Owing')),
amount_owing_p=parse_decimal(row.get('Amount_Owing_P')),
transferable=parse_decimal(row.get('Transferable')),
memo=clean_string(row.get('Memo'))
)
batch.append(record)
if len(batch) >= BATCH_SIZE:
db.bulk_save_objects(batch)
db.commit()
result['success'] += len(batch)
batch = []
except Exception as e:
result['errors'].append(f"Row {row_num}: {str(e)}")
if batch:
db.bulk_save_objects(batch)
db.commit()
result['success'] += len(batch)
f.close()
logger.info("import_files_complete", **result)
except Exception as e:
db.rollback()
result['errors'].append(f"Fatal error: {str(e)}")
logger.error("import_files_failed", error=str(e))
return result
def import_files_r(db: Session, file_path: str) -> Dict[str, Any]:
"""Import FILES_R.csv → FilesR model."""
result = {'success': 0, 'errors': [], 'total_rows': 0}
try:
f, encoding = open_text_with_fallbacks(file_path)
reader = csv.DictReader(f)
batch = []
for row_num, row in enumerate(reader, start=2):
result['total_rows'] += 1
try:
file_no = clean_string(row.get('File_No'))
relationship = clean_string(row.get('Relationship'))
rolodex_id = clean_string(row.get('Rolodex_Id'))
if not file_no or not relationship or not rolodex_id:
continue
record = FilesR(
file_no=file_no,
relationship=relationship,
rolodex_id=rolodex_id
)
batch.append(record)
if len(batch) >= BATCH_SIZE:
db.bulk_save_objects(batch)
db.commit()
result['success'] += len(batch)
batch = []
except Exception as e:
result['errors'].append(f"Row {row_num}: {str(e)}")
if batch:
db.bulk_save_objects(batch)
db.commit()
result['success'] += len(batch)
f.close()
logger.info("import_files_r_complete", **result)
except Exception as e:
db.rollback()
result['errors'].append(f"Fatal error: {str(e)}")
logger.error("import_files_r_failed", error=str(e))
return result
def import_files_v(db: Session, file_path: str) -> Dict[str, Any]:
"""Import FILES_V.csv → FilesV model."""
result = {'success': 0, 'errors': [], 'total_rows': 0}
try:
f, encoding = open_text_with_fallbacks(file_path)
reader = csv.DictReader(f)
batch = []
for row_num, row in enumerate(reader, start=2):
result['total_rows'] += 1
try:
file_no = clean_string(row.get('File_No'))
identifier = clean_string(row.get('Identifier'))
if not file_no or not identifier:
continue
record = FilesV(
file_no=file_no,
identifier=identifier,
response=clean_string(row.get('Response'))
)
batch.append(record)
if len(batch) >= BATCH_SIZE:
db.bulk_save_objects(batch)
db.commit()
result['success'] += len(batch)
batch = []
except Exception as e:
result['errors'].append(f"Row {row_num}: {str(e)}")
if batch:
db.bulk_save_objects(batch)
db.commit()
result['success'] += len(batch)
f.close()
logger.info("import_files_v_complete", **result)
except Exception as e:
db.rollback()
result['errors'].append(f"Fatal error: {str(e)}")
logger.error("import_files_v_failed", error=str(e))
return result
def import_filenots(db: Session, file_path: str) -> Dict[str, Any]:
"""Import FILENOTS.csv → FileNots model."""
result = {'success': 0, 'errors': [], 'total_rows': 0}
try:
f, encoding = open_text_with_fallbacks(file_path)
reader = csv.DictReader(f)
batch = []
for row_num, row in enumerate(reader, start=2):
result['total_rows'] += 1
try:
file_no = clean_string(row.get('File_No'))
memo_date = parse_date(row.get('Memo_Date'))
if not file_no or not memo_date:
continue
record = FileNots(
file_no=file_no,
memo_date=memo_date,
memo_note=clean_string(row.get('Memo_Note'))
)
batch.append(record)
if len(batch) >= BATCH_SIZE:
db.bulk_save_objects(batch)
db.commit()
result['success'] += len(batch)
batch = []
except Exception as e:
result['errors'].append(f"Row {row_num}: {str(e)}")
if batch:
db.bulk_save_objects(batch)
db.commit()
result['success'] += len(batch)
f.close()
logger.info("import_filenots_complete", **result)
except Exception as e:
db.rollback()
result['errors'].append(f"Fatal error: {str(e)}")
logger.error("import_filenots_failed", error=str(e))
return result
def import_ledger(db: Session, file_path: str) -> Dict[str, Any]:
"""Import LEDGER.csv → Ledger model."""
result = {'success': 0, 'errors': [], 'total_rows': 0}
try:
f, encoding = open_text_with_fallbacks(file_path)
reader = csv.DictReader(f)
batch = []
for row_num, row in enumerate(reader, start=2):
result['total_rows'] += 1
try:
file_no = clean_string(row.get('File_No'))
item_no_str = clean_string(row.get('Item_No'))
if not file_no or not item_no_str:
continue
try:
item_no = int(item_no_str)
except ValueError:
result['errors'].append(f"Row {row_num}: Invalid Item_No '{item_no_str}'")
continue
record = Ledger(
file_no=file_no,
date=parse_date(row.get('Date')),
item_no=item_no,
empl_num=clean_string(row.get('Empl_Num')),
t_code=clean_string(row.get('T_Code')),
t_type=clean_string(row.get('T_Type')),
t_type_l=clean_string(row.get('T_Type_L')),
quantity=parse_decimal(row.get('Quantity')),
rate=parse_decimal(row.get('Rate')),
amount=parse_decimal(row.get('Amount')),
billed=clean_string(row.get('Billed')),
note=clean_string(row.get('Note'))
)
batch.append(record)
if len(batch) >= BATCH_SIZE:
db.bulk_save_objects(batch)
db.commit()
result['success'] += len(batch)
batch = []
except Exception as e:
result['errors'].append(f"Row {row_num}: {str(e)}")
if batch:
db.bulk_save_objects(batch)
db.commit()
result['success'] += len(batch)
f.close()
logger.info("import_ledger_complete", **result)
except Exception as e:
db.rollback()
result['errors'].append(f"Fatal error: {str(e)}")
logger.error("import_ledger_failed", error=str(e))
return result
def import_deposits(db: Session, file_path: str) -> Dict[str, Any]:
"""Import DEPOSITS.csv → Deposits model."""
result = {'success': 0, 'errors': [], 'total_rows': 0}
try:
f, encoding = open_text_with_fallbacks(file_path)
reader = csv.DictReader(f)
batch = []
for row_num, row in enumerate(reader, start=2):
result['total_rows'] += 1
try:
deposit_date = parse_date(row.get('Deposit_Date'))
if not deposit_date:
continue
record = Deposits(
deposit_date=deposit_date,
total=parse_decimal(row.get('Total'))
)
batch.append(record)
if len(batch) >= BATCH_SIZE:
db.bulk_save_objects(batch)
db.commit()
result['success'] += len(batch)
batch = []
except Exception as e:
result['errors'].append(f"Row {row_num}: {str(e)}")
if batch:
db.bulk_save_objects(batch)
db.commit()
result['success'] += len(batch)
f.close()
logger.info("import_deposits_complete", **result)
except Exception as e:
db.rollback()
result['errors'].append(f"Fatal error: {str(e)}")
logger.error("import_deposits_failed", error=str(e))
return result
def import_payments(db: Session, file_path: str) -> Dict[str, Any]:
"""Import PAYMENTS.csv → LegacyPayment model."""
result = {'success': 0, 'errors': [], 'total_rows': 0}
try:
f, encoding = open_text_with_fallbacks(file_path)
reader = csv.DictReader(f)
batch = []
for row_num, row in enumerate(reader, start=2):
result['total_rows'] += 1
try:
# LegacyPayment has auto-increment id, so we don't need to check for it
record = LegacyPayment(
deposit_date=parse_date(row.get('Deposit_Date')),
file_no=clean_string(row.get('File_No')),
rolodex_id=clean_string(row.get('Id')),
regarding=clean_string(row.get('Regarding')),
amount=parse_decimal(row.get('Amount')),
note=clean_string(row.get('Note'))
)
batch.append(record)
if len(batch) >= BATCH_SIZE:
db.bulk_save_objects(batch)
db.commit()
result['success'] += len(batch)
batch = []
except Exception as e:
result['errors'].append(f"Row {row_num}: {str(e)}")
if batch:
db.bulk_save_objects(batch)
db.commit()
result['success'] += len(batch)
f.close()
logger.info("import_payments_complete", **result)
except Exception as e:
db.rollback()
result['errors'].append(f"Fatal error: {str(e)}")
logger.error("import_payments_failed", error=str(e))
return result
# ============================================================================
# Specialized Imports (QDRO, Pensions, Plan Info)
# ============================================================================
def import_planinfo(db: Session, file_path: str) -> Dict[str, Any]:
"""Import PLANINFO.csv → PlanInfo model."""
result = {'success': 0, 'errors': [], 'total_rows': 0}
try:
f, encoding = open_text_with_fallbacks(file_path)
reader = csv.DictReader(f)
batch = []
for row_num, row in enumerate(reader, start=2):
result['total_rows'] += 1
try:
plan_id = clean_string(row.get('Plan_Id'))
if not plan_id:
continue
record = PlanInfo(
plan_id=plan_id,
plan_name=clean_string(row.get('Plan_Name')),
plan_type=clean_string(row.get('Plan_Type')),
empl_id_no=clean_string(row.get('Empl_Id_No')),
plan_no=clean_string(row.get('Plan_No')),
nra=clean_string(row.get('NRA')),
era=clean_string(row.get('ERA')),
errf=clean_string(row.get('ERRF')),
colas=clean_string(row.get('COLAS')),
divided_by=clean_string(row.get('Divided_By')),
drafted=clean_string(row.get('Drafted')),
benefit_c=clean_string(row.get('Benefit_C')),
qdro_c=clean_string(row.get('QDRO_C')),
rev=clean_string(row.get('^REV')),
pa=clean_string(row.get('^PA')),
form_name=clean_string(row.get('Form_Name')),
drafted_on=parse_date(row.get('Drafted_On')),
memo=clean_string(row.get('Memo'))
)
batch.append(record)
if len(batch) >= BATCH_SIZE:
db.bulk_save_objects(batch)
db.commit()
result['success'] += len(batch)
batch = []
except Exception as e:
result['errors'].append(f"Row {row_num}: {str(e)}")
if batch:
db.bulk_save_objects(batch)
db.commit()
result['success'] += len(batch)
f.close()
logger.info("import_planinfo_complete", **result)
except Exception as e:
db.rollback()
result['errors'].append(f"Fatal error: {str(e)}")
logger.error("import_planinfo_failed", error=str(e))
return result
def import_qdros(db: Session, file_path: str) -> Dict[str, Any]:
"""Import QDROS.csv → Qdros model."""
result = {'success': 0, 'errors': [], 'total_rows': 0}
try:
f, encoding = open_text_with_fallbacks(file_path)
reader = csv.DictReader(f)
batch = []
for row_num, row in enumerate(reader, start=2):
result['total_rows'] += 1
try:
file_no = clean_string(row.get('File_No'))
version = clean_string(row.get('Version'))
if not file_no or not version:
continue
record = Qdros(
file_no=file_no,
version=version,
plan_id=clean_string(row.get('Plan_Id')),
_1=clean_string(row.get('^1')),
_2=clean_string(row.get('^2')),
part=clean_string(row.get('^Part')),
altp=clean_string(row.get('^AltP')),
pet=clean_string(row.get('^Pet')),
res=clean_string(row.get('^Res')),
case_type=clean_string(row.get('Case_Type')),
case_code=clean_string(row.get('Case_Code')),
section=clean_string(row.get('Section')),
case_number=clean_string(row.get('Case_Number')),
judgment_date=parse_date(row.get('Judgment_Date')),
valuation_date=parse_date(row.get('Valuation_Date')),
married_on=parse_date(row.get('Married_On')),
percent_awarded=parse_decimal(row.get('Percent_Awarded')),
ven_city=clean_string(row.get('Ven_City')),
ven_cnty=clean_string(row.get('Ven_Cnty')),
ven_st=clean_string(row.get('Ven_St')),
draft_out=parse_date(row.get('Draft_Out')),
draft_apr=parse_date(row.get('Draft_Apr')),
final_out=parse_date(row.get('Final_Out')),
judge=clean_string(row.get('Judge')),
form_name=clean_string(row.get('Form_Name'))
)
batch.append(record)
if len(batch) >= BATCH_SIZE:
db.bulk_save_objects(batch)
db.commit()
result['success'] += len(batch)
batch = []
except Exception as e:
result['errors'].append(f"Row {row_num}: {str(e)}")
if batch:
db.bulk_save_objects(batch)
db.commit()
result['success'] += len(batch)
f.close()
logger.info("import_qdros_complete", **result)
except Exception as e:
db.rollback()
result['errors'].append(f"Fatal error: {str(e)}")
logger.error("import_qdros_failed", error=str(e))
return result
def import_pensions(db: Session, file_path: str) -> Dict[str, Any]:
"""Import PENSIONS.csv → Pensions model."""
result = {'success': 0, 'errors': [], 'total_rows': 0}
try:
f, encoding = open_text_with_fallbacks(file_path)
reader = csv.DictReader(f)
batch = []
for row_num, row in enumerate(reader, start=2):
result['total_rows'] += 1
try:
file_no = clean_string(row.get('File_No'))
version = clean_string(row.get('Version'))
if not file_no or not version:
continue
record = Pensions(
file_no=file_no,
version=version,
plan_id=clean_string(row.get('Plan_Id')),
plan_name=clean_string(row.get('Plan_Name')),
title=clean_string(row.get('Title')),
first=clean_string(row.get('First')),
last=clean_string(row.get('Last')),
birth=parse_date(row.get('Birth')),
race=clean_string(row.get('Race')),
sex=clean_string(row.get('Sex')),
info=parse_date(row.get('Info')),
valu=parse_date(row.get('Valu')),
accrued=parse_decimal(row.get('Accrued')),
vested_per=parse_decimal(row.get('Vested_Per')),
start_age=parse_decimal(row.get('Start_Age')),
cola=parse_decimal(row.get('COLA')),
max_cola=parse_decimal(row.get('Max_COLA')),
withdrawal=parse_decimal(row.get('Withdrawal')),
pre_dr=parse_decimal(row.get('Pre_DR')),
post_dr=parse_decimal(row.get('Post_DR')),
tax_rate=parse_decimal(row.get('Tax_Rate'))
)
batch.append(record)
if len(batch) >= BATCH_SIZE:
db.bulk_save_objects(batch)
db.commit()
result['success'] += len(batch)
batch = []
except Exception as e:
result['errors'].append(f"Row {row_num}: {str(e)}")
if batch:
db.bulk_save_objects(batch)
db.commit()
result['success'] += len(batch)
f.close()
logger.info("import_pensions_complete", **result)
except Exception as e:
db.rollback()
result['errors'].append(f"Fatal error: {str(e)}")
logger.error("import_pensions_failed", error=str(e))
return result
def import_pension_marriage(db: Session, file_path: str) -> Dict[str, Any]:
"""Import Pensions/MARRIAGE.csv → PensionMarriage model."""
result = {'success': 0, 'errors': [], 'total_rows': 0}
try:
f, encoding = open_text_with_fallbacks(file_path)
reader = csv.DictReader(f)
batch = []
for row_num, row in enumerate(reader, start=2):
result['total_rows'] += 1
try:
file_no = clean_string(row.get('File_No'))
version = clean_string(row.get('Version'))
if not file_no or not version:
continue
record = PensionMarriage(
file_no=file_no,
version=version,
married_from=parse_date(row.get('Married_From')),
married_to=parse_date(row.get('Married_To')),
married_years=parse_decimal(row.get('Married_Years')),
service_from=parse_date(row.get('Service_From')),
service_to=parse_date(row.get('Service_To')),
service_years=parse_decimal(row.get('Service_Years')),
marital_pct=parse_decimal(row.get('Marital_%'))
)
batch.append(record)
if len(batch) >= BATCH_SIZE:
db.bulk_save_objects(batch)
db.commit()
result['success'] += len(batch)
batch = []
except Exception as e:
result['errors'].append(f"Row {row_num}: {str(e)}")
if batch:
db.bulk_save_objects(batch)
db.commit()
result['success'] += len(batch)
f.close()
logger.info("import_pension_marriage_complete", **result)
except Exception as e:
db.rollback()
result['errors'].append(f"Fatal error: {str(e)}")
logger.error("import_pension_marriage_failed", error=str(e))
return result
def import_pension_death(db: Session, file_path: str) -> Dict[str, Any]:
"""Import Pensions/DEATH.csv → PensionDeath model."""
result = {'success': 0, 'errors': [], 'total_rows': 0}
try:
f, encoding = open_text_with_fallbacks(file_path)
reader = csv.DictReader(f)
batch = []
for row_num, row in enumerate(reader, start=2):
result['total_rows'] += 1
try:
file_no = clean_string(row.get('File_No'))
version = clean_string(row.get('Version'))
if not file_no or not version:
continue
record = PensionDeath(
file_no=file_no,
version=version,
lump1=parse_decimal(row.get('Lump1')),
lump2=parse_decimal(row.get('Lump2')),
growth1=parse_decimal(row.get('Growth1')),
growth2=parse_decimal(row.get('Growth2')),
disc1=parse_decimal(row.get('Disc1')),
disc2=parse_decimal(row.get('Disc2'))
)
batch.append(record)
if len(batch) >= BATCH_SIZE:
db.bulk_save_objects(batch)
db.commit()
result['success'] += len(batch)
batch = []
except Exception as e:
result['errors'].append(f"Row {row_num}: {str(e)}")
if batch:
db.bulk_save_objects(batch)
db.commit()
result['success'] += len(batch)
f.close()
logger.info("import_pension_death_complete", **result)
except Exception as e:
db.rollback()
result['errors'].append(f"Fatal error: {str(e)}")
logger.error("import_pension_death_failed", error=str(e))
return result
def import_pension_schedule(db: Session, file_path: str) -> Dict[str, Any]:
"""Import Pensions/SCHEDULE.csv → PensionSchedule model."""
result = {'success': 0, 'errors': [], 'total_rows': 0}
try:
f, encoding = open_text_with_fallbacks(file_path)
reader = csv.DictReader(f)
batch = []
for row_num, row in enumerate(reader, start=2):
result['total_rows'] += 1
try:
file_no = clean_string(row.get('File_No'))
version = clean_string(row.get('Version'))
if not file_no or not version:
continue
record = PensionSchedule(
file_no=file_no,
version=version,
vests_on=parse_date(row.get('Vests_On')),
vests_at=parse_decimal(row.get('Vests_At'))
)
batch.append(record)
if len(batch) >= BATCH_SIZE:
db.bulk_save_objects(batch)
db.commit()
result['success'] += len(batch)
batch = []
except Exception as e:
result['errors'].append(f"Row {row_num}: {str(e)}")
if batch:
db.bulk_save_objects(batch)
db.commit()
result['success'] += len(batch)
f.close()
logger.info("import_pension_schedule_complete", **result)
except Exception as e:
db.rollback()
result['errors'].append(f"Fatal error: {str(e)}")
logger.error("import_pension_schedule_failed", error=str(e))
return result
def import_pension_separate(db: Session, file_path: str) -> Dict[str, Any]:
"""Import Pensions/SEPARATE.csv → PensionSeparate model."""
result = {'success': 0, 'errors': [], 'total_rows': 0}
try:
f, encoding = open_text_with_fallbacks(file_path)
reader = csv.DictReader(f)
batch = []
for row_num, row in enumerate(reader, start=2):
result['total_rows'] += 1
try:
file_no = clean_string(row.get('File_No'))
version = clean_string(row.get('Version'))
if not file_no or not version:
continue
record = PensionSeparate(
file_no=file_no,
version=version,
separation_rate=parse_decimal(row.get('Separation_Rate'))
)
batch.append(record)
if len(batch) >= BATCH_SIZE:
db.bulk_save_objects(batch)
db.commit()
result['success'] += len(batch)
batch = []
except Exception as e:
result['errors'].append(f"Row {row_num}: {str(e)}")
if batch:
db.bulk_save_objects(batch)
db.commit()
result['success'] += len(batch)
f.close()
logger.info("import_pension_separate_complete", **result)
except Exception as e:
db.rollback()
result['errors'].append(f"Fatal error: {str(e)}")
logger.error("import_pension_separate_failed", error=str(e))
return result
def import_pension_results(db: Session, file_path: str) -> Dict[str, Any]:
"""Import Pensions/RESULTS.csv → PensionResults model."""
result = {'success': 0, 'errors': [], 'total_rows': 0}
try:
f, encoding = open_text_with_fallbacks(file_path)
reader = csv.DictReader(f)
batch = []
for row_num, row in enumerate(reader, start=2):
result['total_rows'] += 1
try:
# Note: RESULTS.csv might not have explicit file_no/version columns
# May need to derive from associated pension record
# For now, skip if missing
file_no = clean_string(row.get('File_No'))
version = clean_string(row.get('Version'))
if not file_no or not version:
continue
record = PensionResults(
file_no=file_no,
version=version,
accrued=parse_decimal(row.get('Accrued')),
start_age=parse_decimal(row.get('Start_Age')),
cola=parse_decimal(row.get('COLA')),
withdrawal=parse_decimal(row.get('Withdrawal')),
pre_dr=parse_decimal(row.get('Pre_DR')),
post_dr=parse_decimal(row.get('Post_DR')),
tax_rate=parse_decimal(row.get('Tax_Rate')),
age=parse_decimal(row.get('Age')),
years_from=parse_decimal(row.get('Years_From')),
life_exp=parse_decimal(row.get('Life_Exp')),
ev_monthly=parse_decimal(row.get('EV_Monthly')),
payments=parse_decimal(row.get('Payments')),
pay_out=parse_decimal(row.get('Pay_Out')),
fund_value=parse_decimal(row.get('Fund_Value')),
pv=parse_decimal(row.get('PV')),
mortality=parse_decimal(row.get('Mortality')),
pv_am=parse_decimal(row.get('PV_AM')),
pv_amt=parse_decimal(row.get('PV_AMT')),
pv_pre_db=parse_decimal(row.get('PV_Pre_DB')),
pv_annuity=parse_decimal(row.get('PV_Annuity')),
wv_at=parse_decimal(row.get('WV_AT')),
pv_plan=parse_decimal(row.get('PV_Plan')),
years_married=parse_decimal(row.get('Years_Married')),
years_service=parse_decimal(row.get('Years_Service')),
marr_per=parse_decimal(row.get('Marr_Per')),
marr_amt=parse_decimal(row.get('Marr_Amt'))
)
batch.append(record)
if len(batch) >= BATCH_SIZE:
db.bulk_save_objects(batch)
db.commit()
result['success'] += len(batch)
batch = []
except Exception as e:
result['errors'].append(f"Row {row_num}: {str(e)}")
if batch:
db.bulk_save_objects(batch)
db.commit()
result['success'] += len(batch)
f.close()
logger.info("import_pension_results_complete", **result)
except Exception as e:
db.rollback()
result['errors'].append(f"Fatal error: {str(e)}")
logger.error("import_pension_results_failed", error=str(e))
return result