- Enhanced open_text_with_fallbacks() function to handle problematic bytes - Added CP1250 encoding to fallback list for better character set support - Added graceful error handling with replacement characters for edge cases - Ensures rolodex CSV import works with legacy encoding issues Fixes: 'charmap' codec can't decode byte 0x9d error during rolodex import
1650 lines
58 KiB
Python
1650 lines
58 KiB
Python
"""
|
|
Legacy CSV import functions for Delphi Database.
|
|
|
|
This module provides import functions for all legacy database tables,
|
|
reading from old-csv files and populating the legacy SQLAlchemy models.
|
|
"""
|
|
|
|
import csv
|
|
import os
|
|
from datetime import datetime
|
|
from decimal import Decimal, InvalidOperation
|
|
from typing import Dict, Any, Optional
|
|
from sqlalchemy.orm import Session
|
|
from sqlalchemy.exc import IntegrityError
|
|
import structlog
|
|
|
|
from .models import (
|
|
Rolodex, LegacyPhone, LegacyFile, FilesR, FilesV, FileNots,
|
|
Ledger, Deposits, LegacyPayment, TrnsType, TrnsLkup,
|
|
Footers, FileStat, Employee, GroupLkup, FileType,
|
|
Qdros, PlanInfo, Pensions, PensionMarriage, PensionDeath,
|
|
PensionSchedule, PensionSeparate, PensionResults,
|
|
RolexV, FVarLkup, RVarLkup
|
|
)
|
|
|
|
logger = structlog.get_logger(__name__)
|
|
|
|
# Batch size for commits
|
|
BATCH_SIZE = 500
|
|
|
|
|
|
def open_text_with_fallbacks(file_path: str):
|
|
"""
|
|
Open a text file trying multiple encodings commonly seen in legacy CSVs.
|
|
|
|
Returns a tuple of (file_object, encoding_used).
|
|
"""
|
|
# First try strict mode with common encodings
|
|
encodings = ["utf-8", "utf-8-sig", "cp1252", "windows-1252", "cp1250", "iso-8859-1", "latin-1"]
|
|
last_error = None
|
|
for enc in encodings:
|
|
try:
|
|
f = open(file_path, 'r', encoding=enc, errors='strict', newline='')
|
|
_ = f.read(1024)
|
|
f.seek(0)
|
|
logger.info("csv_open_encoding_selected", file=file_path, encoding=enc)
|
|
return f, enc
|
|
except Exception as e:
|
|
last_error = e
|
|
logger.warning("encoding_fallback_failed", file=file_path, encoding=enc, error=str(e))
|
|
continue
|
|
|
|
# If strict mode fails, try with error replacement for robustness
|
|
logger.warning("strict_encoding_failed", file=file_path, trying_with_replace=True)
|
|
try:
|
|
# Try UTF-8 with error replacement first (most common case)
|
|
f = open(file_path, 'r', encoding='utf-8', errors='replace', newline='')
|
|
_ = f.read(1024)
|
|
f.seek(0)
|
|
logger.info("csv_open_encoding_with_replace", file=file_path, encoding="utf-8-replace")
|
|
return f, "utf-8-replace"
|
|
except Exception as e:
|
|
logger.warning("utf8_replace_failed", file=file_path, error=str(e))
|
|
|
|
# Final fallback: use latin-1 with replace (handles any byte sequence)
|
|
try:
|
|
f = open(file_path, 'r', encoding='latin-1', errors='replace', newline='')
|
|
_ = f.read(1024)
|
|
f.seek(0)
|
|
logger.info("csv_open_encoding_fallback", file=file_path, encoding="latin-1-replace")
|
|
return f, "latin-1-replace"
|
|
except Exception as e:
|
|
last_error = e
|
|
|
|
error_msg = f"Unable to open file '{file_path}' with any supported encodings"
|
|
if last_error:
|
|
error_msg += f". Last error: {str(last_error)}"
|
|
raise RuntimeError(error_msg)
|
|
|
|
|
|
def parse_date(date_str: str) -> Optional[datetime]:
|
|
"""Parse date string in various formats, return None if blank/invalid."""
|
|
if not date_str or not date_str.strip():
|
|
return None
|
|
|
|
date_str = date_str.strip()
|
|
|
|
# Try common date formats
|
|
formats = [
|
|
"%m/%d/%Y",
|
|
"%m/%d/%y",
|
|
"%Y-%m-%d",
|
|
"%m-%d-%Y",
|
|
"%m-%d-%y",
|
|
]
|
|
|
|
for fmt in formats:
|
|
try:
|
|
return datetime.strptime(date_str, fmt)
|
|
except ValueError:
|
|
continue
|
|
|
|
logger.warning("date_parse_failed", date_string=date_str)
|
|
return None
|
|
|
|
|
|
def parse_decimal(value: str) -> Optional[Decimal]:
|
|
"""Parse decimal string, return None if blank/invalid."""
|
|
if not value or not value.strip():
|
|
return None
|
|
|
|
try:
|
|
return Decimal(value.strip())
|
|
except (InvalidOperation, ValueError):
|
|
logger.warning("decimal_parse_failed", value=value)
|
|
return None
|
|
|
|
|
|
def clean_string(value: str) -> Optional[str]:
|
|
"""Clean string value, return None if blank."""
|
|
if not value or not value.strip():
|
|
return None
|
|
return value.strip()
|
|
|
|
|
|
# ============================================================================
|
|
# Reference Table Imports (should be imported first)
|
|
# ============================================================================
|
|
|
|
def import_trnstype(db: Session, file_path: str) -> Dict[str, Any]:
|
|
"""Import TRNSTYPE.csv → TrnsType model."""
|
|
result = {'success': 0, 'errors': [], 'total_rows': 0}
|
|
|
|
try:
|
|
f, encoding = open_text_with_fallbacks(file_path)
|
|
reader = csv.DictReader(f)
|
|
|
|
batch = []
|
|
for row_num, row in enumerate(reader, start=2):
|
|
result['total_rows'] += 1
|
|
|
|
try:
|
|
t_type = clean_string(row.get('T_Type'))
|
|
if not t_type:
|
|
continue
|
|
|
|
record = TrnsType(
|
|
t_type=t_type,
|
|
t_type_l=clean_string(row.get('T_Type_L')),
|
|
header=clean_string(row.get('Header')),
|
|
footer=clean_string(row.get('Footer'))
|
|
)
|
|
batch.append(record)
|
|
|
|
if len(batch) >= BATCH_SIZE:
|
|
db.bulk_save_objects(batch)
|
|
db.commit()
|
|
result['success'] += len(batch)
|
|
batch = []
|
|
|
|
except Exception as e:
|
|
result['errors'].append(f"Row {row_num}: {str(e)}")
|
|
|
|
# Save remaining batch
|
|
if batch:
|
|
db.bulk_save_objects(batch)
|
|
db.commit()
|
|
result['success'] += len(batch)
|
|
|
|
f.close()
|
|
logger.info("import_trnstype_complete", **result)
|
|
|
|
except Exception as e:
|
|
db.rollback()
|
|
result['errors'].append(f"Fatal error: {str(e)}")
|
|
logger.error("import_trnstype_failed", error=str(e))
|
|
|
|
return result
|
|
|
|
|
|
def import_trnslkup(db: Session, file_path: str) -> Dict[str, Any]:
|
|
"""Import TRNSLKUP.csv → TrnsLkup model."""
|
|
result = {'success': 0, 'errors': [], 'total_rows': 0}
|
|
|
|
try:
|
|
f, encoding = open_text_with_fallbacks(file_path)
|
|
reader = csv.DictReader(f)
|
|
|
|
batch = []
|
|
for row_num, row in enumerate(reader, start=2):
|
|
result['total_rows'] += 1
|
|
|
|
try:
|
|
t_code = clean_string(row.get('T_Code'))
|
|
if not t_code:
|
|
continue
|
|
|
|
record = TrnsLkup(
|
|
t_code=t_code,
|
|
t_type=clean_string(row.get('T_Type')),
|
|
t_type_l=clean_string(row.get('T_Type_L')),
|
|
amount=parse_decimal(row.get('Amount')),
|
|
description=clean_string(row.get('Description'))
|
|
)
|
|
batch.append(record)
|
|
|
|
if len(batch) >= BATCH_SIZE:
|
|
db.bulk_save_objects(batch)
|
|
db.commit()
|
|
result['success'] += len(batch)
|
|
batch = []
|
|
|
|
except Exception as e:
|
|
result['errors'].append(f"Row {row_num}: {str(e)}")
|
|
|
|
if batch:
|
|
db.bulk_save_objects(batch)
|
|
db.commit()
|
|
result['success'] += len(batch)
|
|
|
|
f.close()
|
|
logger.info("import_trnslkup_complete", **result)
|
|
|
|
except Exception as e:
|
|
db.rollback()
|
|
result['errors'].append(f"Fatal error: {str(e)}")
|
|
logger.error("import_trnslkup_failed", error=str(e))
|
|
|
|
return result
|
|
|
|
|
|
def import_footers(db: Session, file_path: str) -> Dict[str, Any]:
|
|
"""Import FOOTERS.csv → Footers model."""
|
|
result = {'success': 0, 'errors': [], 'total_rows': 0}
|
|
|
|
try:
|
|
f, encoding = open_text_with_fallbacks(file_path)
|
|
reader = csv.DictReader(f)
|
|
|
|
batch = []
|
|
for row_num, row in enumerate(reader, start=2):
|
|
result['total_rows'] += 1
|
|
|
|
try:
|
|
f_code = clean_string(row.get('F_Code'))
|
|
if not f_code:
|
|
continue
|
|
|
|
record = Footers(
|
|
f_code=f_code,
|
|
f_footer=clean_string(row.get('F_Footer'))
|
|
)
|
|
batch.append(record)
|
|
|
|
if len(batch) >= BATCH_SIZE:
|
|
db.bulk_save_objects(batch)
|
|
db.commit()
|
|
result['success'] += len(batch)
|
|
batch = []
|
|
|
|
except Exception as e:
|
|
result['errors'].append(f"Row {row_num}: {str(e)}")
|
|
|
|
if batch:
|
|
db.bulk_save_objects(batch)
|
|
db.commit()
|
|
result['success'] += len(batch)
|
|
|
|
f.close()
|
|
logger.info("import_footers_complete", **result)
|
|
|
|
except Exception as e:
|
|
db.rollback()
|
|
result['errors'].append(f"Fatal error: {str(e)}")
|
|
logger.error("import_footers_failed", error=str(e))
|
|
|
|
return result
|
|
|
|
|
|
def import_filestat(db: Session, file_path: str) -> Dict[str, Any]:
|
|
"""Import FILESTAT.csv → FileStat model."""
|
|
result = {'success': 0, 'errors': [], 'total_rows': 0}
|
|
|
|
try:
|
|
f, encoding = open_text_with_fallbacks(file_path)
|
|
reader = csv.DictReader(f)
|
|
|
|
batch = []
|
|
for row_num, row in enumerate(reader, start=2):
|
|
result['total_rows'] += 1
|
|
|
|
try:
|
|
status = clean_string(row.get('Status'))
|
|
if not status:
|
|
continue
|
|
|
|
record = FileStat(
|
|
status=status,
|
|
definition=clean_string(row.get('Definition')),
|
|
send=clean_string(row.get('Send')),
|
|
footer_code=clean_string(row.get('Footer_Code'))
|
|
)
|
|
batch.append(record)
|
|
|
|
if len(batch) >= BATCH_SIZE:
|
|
db.bulk_save_objects(batch)
|
|
db.commit()
|
|
result['success'] += len(batch)
|
|
batch = []
|
|
|
|
except Exception as e:
|
|
result['errors'].append(f"Row {row_num}: {str(e)}")
|
|
|
|
if batch:
|
|
db.bulk_save_objects(batch)
|
|
db.commit()
|
|
result['success'] += len(batch)
|
|
|
|
f.close()
|
|
logger.info("import_filestat_complete", **result)
|
|
|
|
except Exception as e:
|
|
db.rollback()
|
|
result['errors'].append(f"Fatal error: {str(e)}")
|
|
logger.error("import_filestat_failed", error=str(e))
|
|
|
|
return result
|
|
|
|
|
|
def import_employee(db: Session, file_path: str) -> Dict[str, Any]:
|
|
"""Import EMPLOYEE.csv → Employee model."""
|
|
result = {'success': 0, 'errors': [], 'total_rows': 0}
|
|
|
|
try:
|
|
f, encoding = open_text_with_fallbacks(file_path)
|
|
reader = csv.DictReader(f)
|
|
|
|
batch = []
|
|
for row_num, row in enumerate(reader, start=2):
|
|
result['total_rows'] += 1
|
|
|
|
try:
|
|
empl_num = clean_string(row.get('Empl_Num'))
|
|
if not empl_num:
|
|
continue
|
|
|
|
record = Employee(
|
|
empl_num=empl_num,
|
|
empl_id=clean_string(row.get('Empl_Id')),
|
|
rate_per_hour=parse_decimal(row.get('Rate_Per_Hour'))
|
|
)
|
|
batch.append(record)
|
|
|
|
if len(batch) >= BATCH_SIZE:
|
|
db.bulk_save_objects(batch)
|
|
db.commit()
|
|
result['success'] += len(batch)
|
|
batch = []
|
|
|
|
except Exception as e:
|
|
result['errors'].append(f"Row {row_num}: {str(e)}")
|
|
|
|
if batch:
|
|
db.bulk_save_objects(batch)
|
|
db.commit()
|
|
result['success'] += len(batch)
|
|
|
|
f.close()
|
|
logger.info("import_employee_complete", **result)
|
|
|
|
except Exception as e:
|
|
db.rollback()
|
|
result['errors'].append(f"Fatal error: {str(e)}")
|
|
logger.error("import_employee_failed", error=str(e))
|
|
|
|
return result
|
|
|
|
|
|
def import_gruplkup(db: Session, file_path: str) -> Dict[str, Any]:
|
|
"""Import GRUPLKUP.csv → GroupLkup model."""
|
|
result = {'success': 0, 'errors': [], 'total_rows': 0}
|
|
|
|
try:
|
|
f, encoding = open_text_with_fallbacks(file_path)
|
|
reader = csv.DictReader(f)
|
|
|
|
batch = []
|
|
for row_num, row in enumerate(reader, start=2):
|
|
result['total_rows'] += 1
|
|
|
|
try:
|
|
code = clean_string(row.get('Code'))
|
|
if not code:
|
|
continue
|
|
|
|
record = GroupLkup(
|
|
code=code,
|
|
description=clean_string(row.get('Description')),
|
|
title=clean_string(row.get('Title'))
|
|
)
|
|
batch.append(record)
|
|
|
|
if len(batch) >= BATCH_SIZE:
|
|
db.bulk_save_objects(batch)
|
|
db.commit()
|
|
result['success'] += len(batch)
|
|
batch = []
|
|
|
|
except Exception as e:
|
|
result['errors'].append(f"Row {row_num}: {str(e)}")
|
|
|
|
if batch:
|
|
db.bulk_save_objects(batch)
|
|
db.commit()
|
|
result['success'] += len(batch)
|
|
|
|
f.close()
|
|
logger.info("import_gruplkup_complete", **result)
|
|
|
|
except Exception as e:
|
|
db.rollback()
|
|
result['errors'].append(f"Fatal error: {str(e)}")
|
|
logger.error("import_gruplkup_failed", error=str(e))
|
|
|
|
return result
|
|
|
|
|
|
def import_filetype(db: Session, file_path: str) -> Dict[str, Any]:
|
|
"""Import FILETYPE.csv → FileType model."""
|
|
result = {'success': 0, 'errors': [], 'total_rows': 0}
|
|
|
|
try:
|
|
f, encoding = open_text_with_fallbacks(file_path)
|
|
reader = csv.DictReader(f)
|
|
|
|
batch = []
|
|
# Track seen file types in-memory to avoid duplicates within the same CSV
|
|
seen_in_batch = set()
|
|
for row_num, row in enumerate(reader, start=2):
|
|
result['total_rows'] += 1
|
|
|
|
try:
|
|
file_type = clean_string(row.get('File_Type'))
|
|
if not file_type:
|
|
continue
|
|
|
|
# Skip if we've already queued this file_type in current batch
|
|
if file_type in seen_in_batch:
|
|
continue
|
|
|
|
# Skip if it already exists in DB (prevents UNIQUE violations when re-importing)
|
|
if db.query(FileType).filter(FileType.file_type == file_type).first():
|
|
continue
|
|
|
|
record = FileType(file_type=file_type)
|
|
batch.append(record)
|
|
seen_in_batch.add(file_type)
|
|
|
|
if len(batch) >= BATCH_SIZE:
|
|
db.bulk_save_objects(batch)
|
|
db.commit()
|
|
result['success'] += len(batch)
|
|
batch = []
|
|
|
|
except Exception as e:
|
|
result['errors'].append(f"Row {row_num}: {str(e)}")
|
|
|
|
if batch:
|
|
db.bulk_save_objects(batch)
|
|
db.commit()
|
|
result['success'] += len(batch)
|
|
|
|
f.close()
|
|
logger.info("import_filetype_complete", **result)
|
|
|
|
except Exception as e:
|
|
db.rollback()
|
|
result['errors'].append(f"Fatal error: {str(e)}")
|
|
logger.error("import_filetype_failed", error=str(e))
|
|
|
|
return result
|
|
|
|
|
|
def import_fvarlkup(db: Session, file_path: str) -> Dict[str, Any]:
|
|
"""Import FVARLKUP.csv → FVarLkup model."""
|
|
result = {'success': 0, 'errors': [], 'total_rows': 0}
|
|
|
|
try:
|
|
f, encoding = open_text_with_fallbacks(file_path)
|
|
reader = csv.DictReader(f)
|
|
|
|
batch = []
|
|
for row_num, row in enumerate(reader, start=2):
|
|
result['total_rows'] += 1
|
|
|
|
try:
|
|
identifier = clean_string(row.get('Identifier'))
|
|
if not identifier:
|
|
continue
|
|
|
|
record = FVarLkup(
|
|
identifier=identifier,
|
|
query=clean_string(row.get('Query')),
|
|
response=clean_string(row.get('Response'))
|
|
)
|
|
batch.append(record)
|
|
|
|
if len(batch) >= BATCH_SIZE:
|
|
db.bulk_save_objects(batch)
|
|
db.commit()
|
|
result['success'] += len(batch)
|
|
batch = []
|
|
|
|
except Exception as e:
|
|
result['errors'].append(f"Row {row_num}: {str(e)}")
|
|
|
|
if batch:
|
|
db.bulk_save_objects(batch)
|
|
db.commit()
|
|
result['success'] += len(batch)
|
|
|
|
f.close()
|
|
logger.info("import_fvarlkup_complete", **result)
|
|
|
|
except Exception as e:
|
|
db.rollback()
|
|
result['errors'].append(f"Fatal error: {str(e)}")
|
|
logger.error("import_fvarlkup_failed", error=str(e))
|
|
|
|
return result
|
|
|
|
|
|
def import_rvarlkup(db: Session, file_path: str) -> Dict[str, Any]:
|
|
"""Import RVARLKUP.csv → RVarLkup model."""
|
|
result = {'success': 0, 'errors': [], 'total_rows': 0}
|
|
|
|
try:
|
|
f, encoding = open_text_with_fallbacks(file_path)
|
|
reader = csv.DictReader(f)
|
|
|
|
batch = []
|
|
for row_num, row in enumerate(reader, start=2):
|
|
result['total_rows'] += 1
|
|
|
|
try:
|
|
identifier = clean_string(row.get('Identifier'))
|
|
if not identifier:
|
|
continue
|
|
|
|
record = RVarLkup(
|
|
identifier=identifier,
|
|
query=clean_string(row.get('Query'))
|
|
)
|
|
batch.append(record)
|
|
|
|
if len(batch) >= BATCH_SIZE:
|
|
db.bulk_save_objects(batch)
|
|
db.commit()
|
|
result['success'] += len(batch)
|
|
batch = []
|
|
|
|
except Exception as e:
|
|
result['errors'].append(f"Row {row_num}: {str(e)}")
|
|
|
|
if batch:
|
|
db.bulk_save_objects(batch)
|
|
db.commit()
|
|
result['success'] += len(batch)
|
|
|
|
f.close()
|
|
logger.info("import_rvarlkup_complete", **result)
|
|
|
|
except Exception as e:
|
|
db.rollback()
|
|
result['errors'].append(f"Fatal error: {str(e)}")
|
|
logger.error("import_rvarlkup_failed", error=str(e))
|
|
|
|
return result
|
|
|
|
|
|
# ============================================================================
|
|
# Core Data Table Imports
|
|
# ============================================================================
|
|
|
|
def import_rolodex(db: Session, file_path: str) -> Dict[str, Any]:
|
|
"""Import ROLODEX.csv → Rolodex model."""
|
|
result = {'success': 0, 'errors': [], 'total_rows': 0}
|
|
|
|
try:
|
|
f, encoding = open_text_with_fallbacks(file_path)
|
|
reader = csv.DictReader(f)
|
|
|
|
batch = []
|
|
for row_num, row in enumerate(reader, start=2):
|
|
result['total_rows'] += 1
|
|
|
|
try:
|
|
rolodex_id = clean_string(row.get('Id'))
|
|
if not rolodex_id:
|
|
continue
|
|
|
|
record = Rolodex(
|
|
id=rolodex_id,
|
|
prefix=clean_string(row.get('Prefix')),
|
|
first=clean_string(row.get('First')),
|
|
middle=clean_string(row.get('Middle')),
|
|
last=clean_string(row.get('Last')),
|
|
suffix=clean_string(row.get('Suffix')),
|
|
title=clean_string(row.get('Title')),
|
|
a1=clean_string(row.get('A1')),
|
|
a2=clean_string(row.get('A2')),
|
|
a3=clean_string(row.get('A3')),
|
|
city=clean_string(row.get('City')),
|
|
abrev=clean_string(row.get('Abrev')),
|
|
st=clean_string(row.get('St')),
|
|
zip=clean_string(row.get('Zip')),
|
|
email=clean_string(row.get('Email')),
|
|
dob=parse_date(row.get('DOB')),
|
|
ss=clean_string(row.get('SS#')),
|
|
legal_status=clean_string(row.get('Legal_Status')),
|
|
group=clean_string(row.get('Group')),
|
|
memo=clean_string(row.get('Memo'))
|
|
)
|
|
batch.append(record)
|
|
|
|
if len(batch) >= BATCH_SIZE:
|
|
db.bulk_save_objects(batch)
|
|
db.commit()
|
|
result['success'] += len(batch)
|
|
batch = []
|
|
|
|
except Exception as e:
|
|
result['errors'].append(f"Row {row_num}: {str(e)}")
|
|
|
|
if batch:
|
|
db.bulk_save_objects(batch)
|
|
db.commit()
|
|
result['success'] += len(batch)
|
|
|
|
f.close()
|
|
logger.info("import_rolodex_complete", **result)
|
|
|
|
except Exception as e:
|
|
db.rollback()
|
|
result['errors'].append(f"Fatal error: {str(e)}")
|
|
logger.error("import_rolodex_failed", error=str(e))
|
|
|
|
return result
|
|
|
|
|
|
def import_phone(db: Session, file_path: str) -> Dict[str, Any]:
|
|
"""Import PHONE.csv → LegacyPhone model."""
|
|
result = {'success': 0, 'errors': [], 'total_rows': 0}
|
|
|
|
try:
|
|
f, encoding = open_text_with_fallbacks(file_path)
|
|
reader = csv.DictReader(f)
|
|
|
|
batch = []
|
|
for row_num, row in enumerate(reader, start=2):
|
|
result['total_rows'] += 1
|
|
|
|
try:
|
|
rolodex_id = clean_string(row.get('Id'))
|
|
phone = clean_string(row.get('Phone'))
|
|
|
|
if not rolodex_id or not phone:
|
|
continue
|
|
|
|
record = LegacyPhone(
|
|
id=rolodex_id,
|
|
phone=phone,
|
|
location=clean_string(row.get('Location'))
|
|
)
|
|
batch.append(record)
|
|
|
|
if len(batch) >= BATCH_SIZE:
|
|
db.bulk_save_objects(batch)
|
|
db.commit()
|
|
result['success'] += len(batch)
|
|
batch = []
|
|
|
|
except Exception as e:
|
|
result['errors'].append(f"Row {row_num}: {str(e)}")
|
|
|
|
if batch:
|
|
db.bulk_save_objects(batch)
|
|
db.commit()
|
|
result['success'] += len(batch)
|
|
|
|
f.close()
|
|
logger.info("import_phone_complete", **result)
|
|
|
|
except Exception as e:
|
|
db.rollback()
|
|
result['errors'].append(f"Fatal error: {str(e)}")
|
|
logger.error("import_phone_failed", error=str(e))
|
|
|
|
return result
|
|
|
|
|
|
def import_rolex_v(db: Session, file_path: str) -> Dict[str, Any]:
|
|
"""Import ROLEX_V.csv → RolexV model."""
|
|
result = {'success': 0, 'errors': [], 'total_rows': 0}
|
|
|
|
try:
|
|
f, encoding = open_text_with_fallbacks(file_path)
|
|
reader = csv.DictReader(f)
|
|
|
|
batch = []
|
|
for row_num, row in enumerate(reader, start=2):
|
|
result['total_rows'] += 1
|
|
|
|
try:
|
|
rolodex_id = clean_string(row.get('Id'))
|
|
identifier = clean_string(row.get('Identifier'))
|
|
|
|
if not rolodex_id or not identifier:
|
|
continue
|
|
|
|
record = RolexV(
|
|
id=rolodex_id,
|
|
identifier=identifier,
|
|
response=clean_string(row.get('Response'))
|
|
)
|
|
batch.append(record)
|
|
|
|
if len(batch) >= BATCH_SIZE:
|
|
db.bulk_save_objects(batch)
|
|
db.commit()
|
|
result['success'] += len(batch)
|
|
batch = []
|
|
|
|
except Exception as e:
|
|
result['errors'].append(f"Row {row_num}: {str(e)}")
|
|
|
|
if batch:
|
|
db.bulk_save_objects(batch)
|
|
db.commit()
|
|
result['success'] += len(batch)
|
|
|
|
f.close()
|
|
logger.info("import_rolex_v_complete", **result)
|
|
|
|
except Exception as e:
|
|
db.rollback()
|
|
result['errors'].append(f"Fatal error: {str(e)}")
|
|
logger.error("import_rolex_v_failed", error=str(e))
|
|
|
|
return result
|
|
|
|
|
|
def import_files(db: Session, file_path: str) -> Dict[str, Any]:
|
|
"""Import FILES.csv → LegacyFile model."""
|
|
result = {'success': 0, 'errors': [], 'total_rows': 0}
|
|
|
|
try:
|
|
f, encoding = open_text_with_fallbacks(file_path)
|
|
reader = csv.DictReader(f)
|
|
|
|
batch = []
|
|
for row_num, row in enumerate(reader, start=2):
|
|
result['total_rows'] += 1
|
|
|
|
try:
|
|
file_no = clean_string(row.get('File_No'))
|
|
if not file_no:
|
|
continue
|
|
|
|
record = LegacyFile(
|
|
file_no=file_no,
|
|
id=clean_string(row.get('Id')),
|
|
file_type=clean_string(row.get('File_Type')),
|
|
regarding=clean_string(row.get('Regarding')),
|
|
opened=parse_date(row.get('Opened')),
|
|
closed=parse_date(row.get('Closed')),
|
|
empl_num=clean_string(row.get('Empl_Num')),
|
|
rate_per_hour=parse_decimal(row.get('Rate_Per_Hour')),
|
|
status=clean_string(row.get('Status')),
|
|
footer_code=clean_string(row.get('Footer_Code')),
|
|
opposing=clean_string(row.get('Opposing')),
|
|
hours=parse_decimal(row.get('Hours')),
|
|
hours_p=parse_decimal(row.get('Hours_P')),
|
|
trust_bal=parse_decimal(row.get('Trust_Bal')),
|
|
trust_bal_p=parse_decimal(row.get('Trust_Bal_P')),
|
|
hourly_fees=parse_decimal(row.get('Hourly_Fees')),
|
|
hourly_fees_p=parse_decimal(row.get('Hourly_Fees_P')),
|
|
flat_fees=parse_decimal(row.get('Flat_Fees')),
|
|
flat_fees_p=parse_decimal(row.get('Flat_Fees_P')),
|
|
disbursements=parse_decimal(row.get('Disbursements')),
|
|
disbursements_p=parse_decimal(row.get('Disbursements_P')),
|
|
credit_bal=parse_decimal(row.get('Credit_Bal')),
|
|
credit_bal_p=parse_decimal(row.get('Credit_Bal_P')),
|
|
total_charges=parse_decimal(row.get('Total_Charges')),
|
|
total_charges_p=parse_decimal(row.get('Total_Charges_P')),
|
|
amount_owing=parse_decimal(row.get('Amount_Owing')),
|
|
amount_owing_p=parse_decimal(row.get('Amount_Owing_P')),
|
|
transferable=parse_decimal(row.get('Transferable')),
|
|
memo=clean_string(row.get('Memo'))
|
|
)
|
|
batch.append(record)
|
|
|
|
if len(batch) >= BATCH_SIZE:
|
|
db.bulk_save_objects(batch)
|
|
db.commit()
|
|
result['success'] += len(batch)
|
|
batch = []
|
|
|
|
except Exception as e:
|
|
result['errors'].append(f"Row {row_num}: {str(e)}")
|
|
|
|
if batch:
|
|
db.bulk_save_objects(batch)
|
|
db.commit()
|
|
result['success'] += len(batch)
|
|
|
|
f.close()
|
|
logger.info("import_files_complete", **result)
|
|
|
|
except Exception as e:
|
|
db.rollback()
|
|
result['errors'].append(f"Fatal error: {str(e)}")
|
|
logger.error("import_files_failed", error=str(e))
|
|
|
|
return result
|
|
|
|
|
|
def import_files_r(db: Session, file_path: str) -> Dict[str, Any]:
|
|
"""Import FILES_R.csv → FilesR model."""
|
|
result = {'success': 0, 'errors': [], 'total_rows': 0}
|
|
|
|
try:
|
|
f, encoding = open_text_with_fallbacks(file_path)
|
|
reader = csv.DictReader(f)
|
|
|
|
batch = []
|
|
for row_num, row in enumerate(reader, start=2):
|
|
result['total_rows'] += 1
|
|
|
|
try:
|
|
file_no = clean_string(row.get('File_No'))
|
|
relationship = clean_string(row.get('Relationship'))
|
|
rolodex_id = clean_string(row.get('Rolodex_Id'))
|
|
|
|
if not file_no or not relationship or not rolodex_id:
|
|
continue
|
|
|
|
record = FilesR(
|
|
file_no=file_no,
|
|
relationship=relationship,
|
|
rolodex_id=rolodex_id
|
|
)
|
|
batch.append(record)
|
|
|
|
if len(batch) >= BATCH_SIZE:
|
|
db.bulk_save_objects(batch)
|
|
db.commit()
|
|
result['success'] += len(batch)
|
|
batch = []
|
|
|
|
except Exception as e:
|
|
result['errors'].append(f"Row {row_num}: {str(e)}")
|
|
|
|
if batch:
|
|
db.bulk_save_objects(batch)
|
|
db.commit()
|
|
result['success'] += len(batch)
|
|
|
|
f.close()
|
|
logger.info("import_files_r_complete", **result)
|
|
|
|
except Exception as e:
|
|
db.rollback()
|
|
result['errors'].append(f"Fatal error: {str(e)}")
|
|
logger.error("import_files_r_failed", error=str(e))
|
|
|
|
return result
|
|
|
|
|
|
def import_files_v(db: Session, file_path: str) -> Dict[str, Any]:
|
|
"""Import FILES_V.csv → FilesV model."""
|
|
result = {'success': 0, 'errors': [], 'total_rows': 0}
|
|
|
|
try:
|
|
f, encoding = open_text_with_fallbacks(file_path)
|
|
reader = csv.DictReader(f)
|
|
|
|
batch = []
|
|
for row_num, row in enumerate(reader, start=2):
|
|
result['total_rows'] += 1
|
|
|
|
try:
|
|
file_no = clean_string(row.get('File_No'))
|
|
identifier = clean_string(row.get('Identifier'))
|
|
|
|
if not file_no or not identifier:
|
|
continue
|
|
|
|
record = FilesV(
|
|
file_no=file_no,
|
|
identifier=identifier,
|
|
response=clean_string(row.get('Response'))
|
|
)
|
|
batch.append(record)
|
|
|
|
if len(batch) >= BATCH_SIZE:
|
|
db.bulk_save_objects(batch)
|
|
db.commit()
|
|
result['success'] += len(batch)
|
|
batch = []
|
|
|
|
except Exception as e:
|
|
result['errors'].append(f"Row {row_num}: {str(e)}")
|
|
|
|
if batch:
|
|
db.bulk_save_objects(batch)
|
|
db.commit()
|
|
result['success'] += len(batch)
|
|
|
|
f.close()
|
|
logger.info("import_files_v_complete", **result)
|
|
|
|
except Exception as e:
|
|
db.rollback()
|
|
result['errors'].append(f"Fatal error: {str(e)}")
|
|
logger.error("import_files_v_failed", error=str(e))
|
|
|
|
return result
|
|
|
|
|
|
def import_filenots(db: Session, file_path: str) -> Dict[str, Any]:
|
|
"""Import FILENOTS.csv → FileNots model."""
|
|
result = {'success': 0, 'errors': [], 'total_rows': 0}
|
|
|
|
try:
|
|
f, encoding = open_text_with_fallbacks(file_path)
|
|
reader = csv.DictReader(f)
|
|
|
|
batch = []
|
|
for row_num, row in enumerate(reader, start=2):
|
|
result['total_rows'] += 1
|
|
|
|
try:
|
|
file_no = clean_string(row.get('File_No'))
|
|
memo_date = parse_date(row.get('Memo_Date'))
|
|
|
|
if not file_no or not memo_date:
|
|
continue
|
|
|
|
record = FileNots(
|
|
file_no=file_no,
|
|
memo_date=memo_date,
|
|
memo_note=clean_string(row.get('Memo_Note'))
|
|
)
|
|
batch.append(record)
|
|
|
|
if len(batch) >= BATCH_SIZE:
|
|
db.bulk_save_objects(batch)
|
|
db.commit()
|
|
result['success'] += len(batch)
|
|
batch = []
|
|
|
|
except Exception as e:
|
|
result['errors'].append(f"Row {row_num}: {str(e)}")
|
|
|
|
if batch:
|
|
db.bulk_save_objects(batch)
|
|
db.commit()
|
|
result['success'] += len(batch)
|
|
|
|
f.close()
|
|
logger.info("import_filenots_complete", **result)
|
|
|
|
except Exception as e:
|
|
db.rollback()
|
|
result['errors'].append(f"Fatal error: {str(e)}")
|
|
logger.error("import_filenots_failed", error=str(e))
|
|
|
|
return result
|
|
|
|
|
|
def import_ledger(db: Session, file_path: str) -> Dict[str, Any]:
|
|
"""Import LEDGER.csv → Ledger model."""
|
|
result = {'success': 0, 'errors': [], 'total_rows': 0}
|
|
|
|
try:
|
|
f, encoding = open_text_with_fallbacks(file_path)
|
|
reader = csv.DictReader(f)
|
|
|
|
batch = []
|
|
for row_num, row in enumerate(reader, start=2):
|
|
result['total_rows'] += 1
|
|
|
|
try:
|
|
file_no = clean_string(row.get('File_No'))
|
|
item_no_str = clean_string(row.get('Item_No'))
|
|
|
|
if not file_no or not item_no_str:
|
|
continue
|
|
|
|
try:
|
|
item_no = int(item_no_str)
|
|
except ValueError:
|
|
result['errors'].append(f"Row {row_num}: Invalid Item_No '{item_no_str}'")
|
|
continue
|
|
|
|
record = Ledger(
|
|
file_no=file_no,
|
|
date=parse_date(row.get('Date')),
|
|
item_no=item_no,
|
|
empl_num=clean_string(row.get('Empl_Num')),
|
|
t_code=clean_string(row.get('T_Code')),
|
|
t_type=clean_string(row.get('T_Type')),
|
|
t_type_l=clean_string(row.get('T_Type_L')),
|
|
quantity=parse_decimal(row.get('Quantity')),
|
|
rate=parse_decimal(row.get('Rate')),
|
|
amount=parse_decimal(row.get('Amount')),
|
|
billed=clean_string(row.get('Billed')),
|
|
note=clean_string(row.get('Note'))
|
|
)
|
|
batch.append(record)
|
|
|
|
if len(batch) >= BATCH_SIZE:
|
|
db.bulk_save_objects(batch)
|
|
db.commit()
|
|
result['success'] += len(batch)
|
|
batch = []
|
|
|
|
except Exception as e:
|
|
result['errors'].append(f"Row {row_num}: {str(e)}")
|
|
|
|
if batch:
|
|
db.bulk_save_objects(batch)
|
|
db.commit()
|
|
result['success'] += len(batch)
|
|
|
|
f.close()
|
|
logger.info("import_ledger_complete", **result)
|
|
|
|
except Exception as e:
|
|
db.rollback()
|
|
result['errors'].append(f"Fatal error: {str(e)}")
|
|
logger.error("import_ledger_failed", error=str(e))
|
|
|
|
return result
|
|
|
|
|
|
def import_deposits(db: Session, file_path: str) -> Dict[str, Any]:
|
|
"""Import DEPOSITS.csv → Deposits model."""
|
|
result = {'success': 0, 'errors': [], 'total_rows': 0}
|
|
|
|
try:
|
|
f, encoding = open_text_with_fallbacks(file_path)
|
|
reader = csv.DictReader(f)
|
|
|
|
batch = []
|
|
for row_num, row in enumerate(reader, start=2):
|
|
result['total_rows'] += 1
|
|
|
|
try:
|
|
deposit_date = parse_date(row.get('Deposit_Date'))
|
|
|
|
if not deposit_date:
|
|
continue
|
|
|
|
record = Deposits(
|
|
deposit_date=deposit_date,
|
|
total=parse_decimal(row.get('Total'))
|
|
)
|
|
batch.append(record)
|
|
|
|
if len(batch) >= BATCH_SIZE:
|
|
db.bulk_save_objects(batch)
|
|
db.commit()
|
|
result['success'] += len(batch)
|
|
batch = []
|
|
|
|
except Exception as e:
|
|
result['errors'].append(f"Row {row_num}: {str(e)}")
|
|
|
|
if batch:
|
|
db.bulk_save_objects(batch)
|
|
db.commit()
|
|
result['success'] += len(batch)
|
|
|
|
f.close()
|
|
logger.info("import_deposits_complete", **result)
|
|
|
|
except Exception as e:
|
|
db.rollback()
|
|
result['errors'].append(f"Fatal error: {str(e)}")
|
|
logger.error("import_deposits_failed", error=str(e))
|
|
|
|
return result
|
|
|
|
|
|
def import_payments(db: Session, file_path: str) -> Dict[str, Any]:
|
|
"""Import PAYMENTS.csv → LegacyPayment model."""
|
|
result = {'success': 0, 'errors': [], 'total_rows': 0}
|
|
|
|
try:
|
|
f, encoding = open_text_with_fallbacks(file_path)
|
|
reader = csv.DictReader(f)
|
|
|
|
batch = []
|
|
for row_num, row in enumerate(reader, start=2):
|
|
result['total_rows'] += 1
|
|
|
|
try:
|
|
# LegacyPayment has auto-increment id, so we don't need to check for it
|
|
record = LegacyPayment(
|
|
deposit_date=parse_date(row.get('Deposit_Date')),
|
|
file_no=clean_string(row.get('File_No')),
|
|
rolodex_id=clean_string(row.get('Id')),
|
|
regarding=clean_string(row.get('Regarding')),
|
|
amount=parse_decimal(row.get('Amount')),
|
|
note=clean_string(row.get('Note'))
|
|
)
|
|
batch.append(record)
|
|
|
|
if len(batch) >= BATCH_SIZE:
|
|
db.bulk_save_objects(batch)
|
|
db.commit()
|
|
result['success'] += len(batch)
|
|
batch = []
|
|
|
|
except Exception as e:
|
|
result['errors'].append(f"Row {row_num}: {str(e)}")
|
|
|
|
if batch:
|
|
db.bulk_save_objects(batch)
|
|
db.commit()
|
|
result['success'] += len(batch)
|
|
|
|
f.close()
|
|
logger.info("import_payments_complete", **result)
|
|
|
|
except Exception as e:
|
|
db.rollback()
|
|
result['errors'].append(f"Fatal error: {str(e)}")
|
|
logger.error("import_payments_failed", error=str(e))
|
|
|
|
return result
|
|
|
|
|
|
# ============================================================================
|
|
# Specialized Imports (QDRO, Pensions, Plan Info)
|
|
# ============================================================================
|
|
|
|
def import_planinfo(db: Session, file_path: str) -> Dict[str, Any]:
|
|
"""Import PLANINFO.csv → PlanInfo model."""
|
|
result = {'success': 0, 'errors': [], 'total_rows': 0}
|
|
|
|
try:
|
|
f, encoding = open_text_with_fallbacks(file_path)
|
|
reader = csv.DictReader(f)
|
|
|
|
batch = []
|
|
for row_num, row in enumerate(reader, start=2):
|
|
result['total_rows'] += 1
|
|
|
|
try:
|
|
plan_id = clean_string(row.get('Plan_Id'))
|
|
if not plan_id:
|
|
continue
|
|
|
|
record = PlanInfo(
|
|
plan_id=plan_id,
|
|
plan_name=clean_string(row.get('Plan_Name')),
|
|
plan_type=clean_string(row.get('Plan_Type')),
|
|
empl_id_no=clean_string(row.get('Empl_Id_No')),
|
|
plan_no=clean_string(row.get('Plan_No')),
|
|
nra=clean_string(row.get('NRA')),
|
|
era=clean_string(row.get('ERA')),
|
|
errf=clean_string(row.get('ERRF')),
|
|
colas=clean_string(row.get('COLAS')),
|
|
divided_by=clean_string(row.get('Divided_By')),
|
|
drafted=clean_string(row.get('Drafted')),
|
|
benefit_c=clean_string(row.get('Benefit_C')),
|
|
qdro_c=clean_string(row.get('QDRO_C')),
|
|
rev=clean_string(row.get('^REV')),
|
|
pa=clean_string(row.get('^PA')),
|
|
form_name=clean_string(row.get('Form_Name')),
|
|
drafted_on=parse_date(row.get('Drafted_On')),
|
|
memo=clean_string(row.get('Memo'))
|
|
)
|
|
batch.append(record)
|
|
|
|
if len(batch) >= BATCH_SIZE:
|
|
db.bulk_save_objects(batch)
|
|
db.commit()
|
|
result['success'] += len(batch)
|
|
batch = []
|
|
|
|
except Exception as e:
|
|
result['errors'].append(f"Row {row_num}: {str(e)}")
|
|
|
|
if batch:
|
|
db.bulk_save_objects(batch)
|
|
db.commit()
|
|
result['success'] += len(batch)
|
|
|
|
f.close()
|
|
logger.info("import_planinfo_complete", **result)
|
|
|
|
except Exception as e:
|
|
db.rollback()
|
|
result['errors'].append(f"Fatal error: {str(e)}")
|
|
logger.error("import_planinfo_failed", error=str(e))
|
|
|
|
return result
|
|
|
|
|
|
def import_qdros(db: Session, file_path: str) -> Dict[str, Any]:
|
|
"""Import QDROS.csv → Qdros model."""
|
|
result = {'success': 0, 'errors': [], 'total_rows': 0}
|
|
|
|
try:
|
|
f, encoding = open_text_with_fallbacks(file_path)
|
|
reader = csv.DictReader(f)
|
|
|
|
batch = []
|
|
for row_num, row in enumerate(reader, start=2):
|
|
result['total_rows'] += 1
|
|
|
|
try:
|
|
file_no = clean_string(row.get('File_No'))
|
|
version = clean_string(row.get('Version'))
|
|
|
|
if not file_no or not version:
|
|
continue
|
|
|
|
record = Qdros(
|
|
file_no=file_no,
|
|
version=version,
|
|
plan_id=clean_string(row.get('Plan_Id')),
|
|
_1=clean_string(row.get('^1')),
|
|
_2=clean_string(row.get('^2')),
|
|
part=clean_string(row.get('^Part')),
|
|
altp=clean_string(row.get('^AltP')),
|
|
pet=clean_string(row.get('^Pet')),
|
|
res=clean_string(row.get('^Res')),
|
|
case_type=clean_string(row.get('Case_Type')),
|
|
case_code=clean_string(row.get('Case_Code')),
|
|
section=clean_string(row.get('Section')),
|
|
case_number=clean_string(row.get('Case_Number')),
|
|
judgment_date=parse_date(row.get('Judgment_Date')),
|
|
valuation_date=parse_date(row.get('Valuation_Date')),
|
|
married_on=parse_date(row.get('Married_On')),
|
|
percent_awarded=parse_decimal(row.get('Percent_Awarded')),
|
|
ven_city=clean_string(row.get('Ven_City')),
|
|
ven_cnty=clean_string(row.get('Ven_Cnty')),
|
|
ven_st=clean_string(row.get('Ven_St')),
|
|
draft_out=parse_date(row.get('Draft_Out')),
|
|
draft_apr=parse_date(row.get('Draft_Apr')),
|
|
final_out=parse_date(row.get('Final_Out')),
|
|
judge=clean_string(row.get('Judge')),
|
|
form_name=clean_string(row.get('Form_Name'))
|
|
)
|
|
batch.append(record)
|
|
|
|
if len(batch) >= BATCH_SIZE:
|
|
db.bulk_save_objects(batch)
|
|
db.commit()
|
|
result['success'] += len(batch)
|
|
batch = []
|
|
|
|
except Exception as e:
|
|
result['errors'].append(f"Row {row_num}: {str(e)}")
|
|
|
|
if batch:
|
|
db.bulk_save_objects(batch)
|
|
db.commit()
|
|
result['success'] += len(batch)
|
|
|
|
f.close()
|
|
logger.info("import_qdros_complete", **result)
|
|
|
|
except Exception as e:
|
|
db.rollback()
|
|
result['errors'].append(f"Fatal error: {str(e)}")
|
|
logger.error("import_qdros_failed", error=str(e))
|
|
|
|
return result
|
|
|
|
|
|
def import_pensions(db: Session, file_path: str) -> Dict[str, Any]:
|
|
"""Import PENSIONS.csv → Pensions model."""
|
|
result = {'success': 0, 'errors': [], 'total_rows': 0}
|
|
|
|
try:
|
|
f, encoding = open_text_with_fallbacks(file_path)
|
|
reader = csv.DictReader(f)
|
|
|
|
batch = []
|
|
for row_num, row in enumerate(reader, start=2):
|
|
result['total_rows'] += 1
|
|
|
|
try:
|
|
file_no = clean_string(row.get('File_No'))
|
|
version = clean_string(row.get('Version'))
|
|
|
|
if not file_no or not version:
|
|
continue
|
|
|
|
record = Pensions(
|
|
file_no=file_no,
|
|
version=version,
|
|
plan_id=clean_string(row.get('Plan_Id')),
|
|
plan_name=clean_string(row.get('Plan_Name')),
|
|
title=clean_string(row.get('Title')),
|
|
first=clean_string(row.get('First')),
|
|
last=clean_string(row.get('Last')),
|
|
birth=parse_date(row.get('Birth')),
|
|
race=clean_string(row.get('Race')),
|
|
sex=clean_string(row.get('Sex')),
|
|
info=parse_date(row.get('Info')),
|
|
valu=parse_date(row.get('Valu')),
|
|
accrued=parse_decimal(row.get('Accrued')),
|
|
vested_per=parse_decimal(row.get('Vested_Per')),
|
|
start_age=parse_decimal(row.get('Start_Age')),
|
|
cola=parse_decimal(row.get('COLA')),
|
|
max_cola=parse_decimal(row.get('Max_COLA')),
|
|
withdrawal=parse_decimal(row.get('Withdrawal')),
|
|
pre_dr=parse_decimal(row.get('Pre_DR')),
|
|
post_dr=parse_decimal(row.get('Post_DR')),
|
|
tax_rate=parse_decimal(row.get('Tax_Rate'))
|
|
)
|
|
batch.append(record)
|
|
|
|
if len(batch) >= BATCH_SIZE:
|
|
db.bulk_save_objects(batch)
|
|
db.commit()
|
|
result['success'] += len(batch)
|
|
batch = []
|
|
|
|
except Exception as e:
|
|
result['errors'].append(f"Row {row_num}: {str(e)}")
|
|
|
|
if batch:
|
|
db.bulk_save_objects(batch)
|
|
db.commit()
|
|
result['success'] += len(batch)
|
|
|
|
f.close()
|
|
logger.info("import_pensions_complete", **result)
|
|
|
|
except Exception as e:
|
|
db.rollback()
|
|
result['errors'].append(f"Fatal error: {str(e)}")
|
|
logger.error("import_pensions_failed", error=str(e))
|
|
|
|
return result
|
|
|
|
|
|
def import_pension_marriage(db: Session, file_path: str) -> Dict[str, Any]:
|
|
"""Import Pensions/MARRIAGE.csv → PensionMarriage model."""
|
|
result = {'success': 0, 'errors': [], 'total_rows': 0}
|
|
|
|
try:
|
|
f, encoding = open_text_with_fallbacks(file_path)
|
|
reader = csv.DictReader(f)
|
|
|
|
batch = []
|
|
for row_num, row in enumerate(reader, start=2):
|
|
result['total_rows'] += 1
|
|
|
|
try:
|
|
file_no = clean_string(row.get('File_No'))
|
|
version = clean_string(row.get('Version'))
|
|
|
|
if not file_no or not version:
|
|
continue
|
|
|
|
record = PensionMarriage(
|
|
file_no=file_no,
|
|
version=version,
|
|
married_from=parse_date(row.get('Married_From')),
|
|
married_to=parse_date(row.get('Married_To')),
|
|
married_years=parse_decimal(row.get('Married_Years')),
|
|
service_from=parse_date(row.get('Service_From')),
|
|
service_to=parse_date(row.get('Service_To')),
|
|
service_years=parse_decimal(row.get('Service_Years')),
|
|
marital_pct=parse_decimal(row.get('Marital_%'))
|
|
)
|
|
batch.append(record)
|
|
|
|
if len(batch) >= BATCH_SIZE:
|
|
db.bulk_save_objects(batch)
|
|
db.commit()
|
|
result['success'] += len(batch)
|
|
batch = []
|
|
|
|
except Exception as e:
|
|
result['errors'].append(f"Row {row_num}: {str(e)}")
|
|
|
|
if batch:
|
|
db.bulk_save_objects(batch)
|
|
db.commit()
|
|
result['success'] += len(batch)
|
|
|
|
f.close()
|
|
logger.info("import_pension_marriage_complete", **result)
|
|
|
|
except Exception as e:
|
|
db.rollback()
|
|
result['errors'].append(f"Fatal error: {str(e)}")
|
|
logger.error("import_pension_marriage_failed", error=str(e))
|
|
|
|
return result
|
|
|
|
|
|
def import_pension_death(db: Session, file_path: str) -> Dict[str, Any]:
|
|
"""Import Pensions/DEATH.csv → PensionDeath model."""
|
|
result = {'success': 0, 'errors': [], 'total_rows': 0}
|
|
|
|
try:
|
|
f, encoding = open_text_with_fallbacks(file_path)
|
|
reader = csv.DictReader(f)
|
|
|
|
batch = []
|
|
for row_num, row in enumerate(reader, start=2):
|
|
result['total_rows'] += 1
|
|
|
|
try:
|
|
file_no = clean_string(row.get('File_No'))
|
|
version = clean_string(row.get('Version'))
|
|
|
|
if not file_no or not version:
|
|
continue
|
|
|
|
record = PensionDeath(
|
|
file_no=file_no,
|
|
version=version,
|
|
lump1=parse_decimal(row.get('Lump1')),
|
|
lump2=parse_decimal(row.get('Lump2')),
|
|
growth1=parse_decimal(row.get('Growth1')),
|
|
growth2=parse_decimal(row.get('Growth2')),
|
|
disc1=parse_decimal(row.get('Disc1')),
|
|
disc2=parse_decimal(row.get('Disc2'))
|
|
)
|
|
batch.append(record)
|
|
|
|
if len(batch) >= BATCH_SIZE:
|
|
db.bulk_save_objects(batch)
|
|
db.commit()
|
|
result['success'] += len(batch)
|
|
batch = []
|
|
|
|
except Exception as e:
|
|
result['errors'].append(f"Row {row_num}: {str(e)}")
|
|
|
|
if batch:
|
|
db.bulk_save_objects(batch)
|
|
db.commit()
|
|
result['success'] += len(batch)
|
|
|
|
f.close()
|
|
logger.info("import_pension_death_complete", **result)
|
|
|
|
except Exception as e:
|
|
db.rollback()
|
|
result['errors'].append(f"Fatal error: {str(e)}")
|
|
logger.error("import_pension_death_failed", error=str(e))
|
|
|
|
return result
|
|
|
|
|
|
def import_pension_schedule(db: Session, file_path: str) -> Dict[str, Any]:
|
|
"""Import Pensions/SCHEDULE.csv → PensionSchedule model."""
|
|
result = {'success': 0, 'errors': [], 'total_rows': 0}
|
|
|
|
try:
|
|
f, encoding = open_text_with_fallbacks(file_path)
|
|
reader = csv.DictReader(f)
|
|
|
|
batch = []
|
|
for row_num, row in enumerate(reader, start=2):
|
|
result['total_rows'] += 1
|
|
|
|
try:
|
|
file_no = clean_string(row.get('File_No'))
|
|
version = clean_string(row.get('Version'))
|
|
|
|
if not file_no or not version:
|
|
continue
|
|
|
|
record = PensionSchedule(
|
|
file_no=file_no,
|
|
version=version,
|
|
vests_on=parse_date(row.get('Vests_On')),
|
|
vests_at=parse_decimal(row.get('Vests_At'))
|
|
)
|
|
batch.append(record)
|
|
|
|
if len(batch) >= BATCH_SIZE:
|
|
db.bulk_save_objects(batch)
|
|
db.commit()
|
|
result['success'] += len(batch)
|
|
batch = []
|
|
|
|
except Exception as e:
|
|
result['errors'].append(f"Row {row_num}: {str(e)}")
|
|
|
|
if batch:
|
|
db.bulk_save_objects(batch)
|
|
db.commit()
|
|
result['success'] += len(batch)
|
|
|
|
f.close()
|
|
logger.info("import_pension_schedule_complete", **result)
|
|
|
|
except Exception as e:
|
|
db.rollback()
|
|
result['errors'].append(f"Fatal error: {str(e)}")
|
|
logger.error("import_pension_schedule_failed", error=str(e))
|
|
|
|
return result
|
|
|
|
|
|
def import_pension_separate(db: Session, file_path: str) -> Dict[str, Any]:
|
|
"""Import Pensions/SEPARATE.csv → PensionSeparate model."""
|
|
result = {'success': 0, 'errors': [], 'total_rows': 0}
|
|
|
|
try:
|
|
f, encoding = open_text_with_fallbacks(file_path)
|
|
reader = csv.DictReader(f)
|
|
|
|
batch = []
|
|
for row_num, row in enumerate(reader, start=2):
|
|
result['total_rows'] += 1
|
|
|
|
try:
|
|
file_no = clean_string(row.get('File_No'))
|
|
version = clean_string(row.get('Version'))
|
|
|
|
if not file_no or not version:
|
|
continue
|
|
|
|
record = PensionSeparate(
|
|
file_no=file_no,
|
|
version=version,
|
|
separation_rate=parse_decimal(row.get('Separation_Rate'))
|
|
)
|
|
batch.append(record)
|
|
|
|
if len(batch) >= BATCH_SIZE:
|
|
db.bulk_save_objects(batch)
|
|
db.commit()
|
|
result['success'] += len(batch)
|
|
batch = []
|
|
|
|
except Exception as e:
|
|
result['errors'].append(f"Row {row_num}: {str(e)}")
|
|
|
|
if batch:
|
|
db.bulk_save_objects(batch)
|
|
db.commit()
|
|
result['success'] += len(batch)
|
|
|
|
f.close()
|
|
logger.info("import_pension_separate_complete", **result)
|
|
|
|
except Exception as e:
|
|
db.rollback()
|
|
result['errors'].append(f"Fatal error: {str(e)}")
|
|
logger.error("import_pension_separate_failed", error=str(e))
|
|
|
|
return result
|
|
|
|
|
|
def import_pension_results(db: Session, file_path: str) -> Dict[str, Any]:
|
|
"""Import Pensions/RESULTS.csv → PensionResults model."""
|
|
result = {'success': 0, 'errors': [], 'total_rows': 0}
|
|
|
|
try:
|
|
f, encoding = open_text_with_fallbacks(file_path)
|
|
reader = csv.DictReader(f)
|
|
|
|
batch = []
|
|
for row_num, row in enumerate(reader, start=2):
|
|
result['total_rows'] += 1
|
|
|
|
try:
|
|
# Note: RESULTS.csv might not have explicit file_no/version columns
|
|
# May need to derive from associated pension record
|
|
# For now, skip if missing
|
|
file_no = clean_string(row.get('File_No'))
|
|
version = clean_string(row.get('Version'))
|
|
|
|
if not file_no or not version:
|
|
continue
|
|
|
|
record = PensionResults(
|
|
file_no=file_no,
|
|
version=version,
|
|
accrued=parse_decimal(row.get('Accrued')),
|
|
start_age=parse_decimal(row.get('Start_Age')),
|
|
cola=parse_decimal(row.get('COLA')),
|
|
withdrawal=parse_decimal(row.get('Withdrawal')),
|
|
pre_dr=parse_decimal(row.get('Pre_DR')),
|
|
post_dr=parse_decimal(row.get('Post_DR')),
|
|
tax_rate=parse_decimal(row.get('Tax_Rate')),
|
|
age=parse_decimal(row.get('Age')),
|
|
years_from=parse_decimal(row.get('Years_From')),
|
|
life_exp=parse_decimal(row.get('Life_Exp')),
|
|
ev_monthly=parse_decimal(row.get('EV_Monthly')),
|
|
payments=parse_decimal(row.get('Payments')),
|
|
pay_out=parse_decimal(row.get('Pay_Out')),
|
|
fund_value=parse_decimal(row.get('Fund_Value')),
|
|
pv=parse_decimal(row.get('PV')),
|
|
mortality=parse_decimal(row.get('Mortality')),
|
|
pv_am=parse_decimal(row.get('PV_AM')),
|
|
pv_amt=parse_decimal(row.get('PV_AMT')),
|
|
pv_pre_db=parse_decimal(row.get('PV_Pre_DB')),
|
|
pv_annuity=parse_decimal(row.get('PV_Annuity')),
|
|
wv_at=parse_decimal(row.get('WV_AT')),
|
|
pv_plan=parse_decimal(row.get('PV_Plan')),
|
|
years_married=parse_decimal(row.get('Years_Married')),
|
|
years_service=parse_decimal(row.get('Years_Service')),
|
|
marr_per=parse_decimal(row.get('Marr_Per')),
|
|
marr_amt=parse_decimal(row.get('Marr_Amt'))
|
|
)
|
|
batch.append(record)
|
|
|
|
if len(batch) >= BATCH_SIZE:
|
|
db.bulk_save_objects(batch)
|
|
db.commit()
|
|
result['success'] += len(batch)
|
|
batch = []
|
|
|
|
except Exception as e:
|
|
result['errors'].append(f"Row {row_num}: {str(e)}")
|
|
|
|
if batch:
|
|
db.bulk_save_objects(batch)
|
|
db.commit()
|
|
result['success'] += len(batch)
|
|
|
|
f.close()
|
|
logger.info("import_pension_results_complete", **result)
|
|
|
|
except Exception as e:
|
|
db.rollback()
|
|
result['errors'].append(f"Fatal error: {str(e)}")
|
|
logger.error("import_pension_results_failed", error=str(e))
|
|
|
|
return result
|
|
|