""" Legacy CSV import functions for Delphi Database. This module provides import functions for all legacy database tables, reading from old-csv files and populating the legacy SQLAlchemy models. """ import csv import os from datetime import datetime from decimal import Decimal, InvalidOperation from typing import Dict, Any, Optional from sqlalchemy.orm import Session from sqlalchemy.exc import IntegrityError import structlog from .models import ( Rolodex, LegacyPhone, LegacyFile, FilesR, FilesV, FileNots, Ledger, Deposits, LegacyPayment, TrnsType, TrnsLkup, Footers, FileStat, Employee, GroupLkup, FileType, Qdros, PlanInfo, Pensions, PensionMarriage, PensionDeath, PensionSchedule, PensionSeparate, PensionResults, RolexV, FVarLkup, RVarLkup ) logger = structlog.get_logger(__name__) # Batch size for commits BATCH_SIZE = 500 def open_text_with_fallbacks(file_path: str): """ Open a text file trying multiple encodings commonly seen in legacy CSVs. Returns a tuple of (file_object, encoding_used). """ # First try strict mode with common encodings # Try latin-1/iso-8859-1 earlier as they are more forgiving and commonly used in legacy data encodings = ["utf-8", "utf-8-sig", "iso-8859-1", "latin-1", "cp1252", "windows-1252", "cp1250"] last_error = None for enc in encodings: try: f = open(file_path, 'r', encoding=enc, errors='strict', newline='') # Read more than 1KB to catch encoding issues deeper in the file # Many legacy CSVs have issues beyond the first few rows _ = f.read(10240) # Read 10KB to test f.seek(0) logger.info("csv_open_encoding_selected", file=file_path, encoding=enc) return f, enc except Exception as e: last_error = e logger.warning("encoding_fallback_failed", file=file_path, encoding=enc, error=str(e)) try: f.close() except: pass continue # If strict mode fails, try with error replacement for robustness logger.warning("strict_encoding_failed", file=file_path, trying_with_replace=True) try: # Try UTF-8 with error replacement first (most common case) f = open(file_path, 'r', encoding='utf-8', errors='replace', newline='') _ = f.read(1024) f.seek(0) logger.info("csv_open_encoding_with_replace", file=file_path, encoding="utf-8-replace") return f, "utf-8-replace" except Exception as e: logger.warning("utf8_replace_failed", file=file_path, error=str(e)) # Final fallback: use latin-1 with replace (handles any byte sequence) try: f = open(file_path, 'r', encoding='latin-1', errors='replace', newline='') _ = f.read(1024) f.seek(0) logger.info("csv_open_encoding_fallback", file=file_path, encoding="latin-1-replace") return f, "latin-1-replace" except Exception as e: last_error = e error_msg = f"Unable to open file '{file_path}' with any supported encodings" if last_error: error_msg += f". Last error: {str(last_error)}" raise RuntimeError(error_msg) def parse_date(date_str: str) -> Optional[datetime]: """Parse date string in various formats, return None if blank/invalid.""" if not date_str or not date_str.strip(): return None date_str = date_str.strip() # Try common date formats formats = [ "%m/%d/%Y", "%m/%d/%y", "%Y-%m-%d", "%m-%d-%Y", "%m-%d-%y", ] for fmt in formats: try: return datetime.strptime(date_str, fmt) except ValueError: continue logger.warning("date_parse_failed", date_string=date_str) return None def parse_decimal(value: str) -> Optional[Decimal]: """Parse decimal string, return None if blank/invalid.""" if not value or not value.strip(): return None try: return Decimal(value.strip()) except (InvalidOperation, ValueError): logger.warning("decimal_parse_failed", value=value) return None def clean_string(value: str) -> Optional[str]: """Clean string value, return None if blank.""" if not value or not value.strip(): return None return value.strip() # ============================================================================ # Reference Table Imports (should be imported first) # ============================================================================ def import_trnstype(db: Session, file_path: str) -> Dict[str, Any]: """Import TRNSTYPE.csv → TrnsType model.""" result = {'success': 0, 'errors': [], 'total_rows': 0} try: f, encoding = open_text_with_fallbacks(file_path) reader = csv.DictReader(f) batch = [] for row_num, row in enumerate(reader, start=2): result['total_rows'] += 1 try: t_type = clean_string(row.get('T_Type')) if not t_type: continue record = TrnsType( t_type=t_type, t_type_l=clean_string(row.get('T_Type_L')), header=clean_string(row.get('Header')), footer=clean_string(row.get('Footer')) ) batch.append(record) if len(batch) >= BATCH_SIZE: db.bulk_save_objects(batch) db.commit() result['success'] += len(batch) batch = [] except Exception as e: result['errors'].append(f"Row {row_num}: {str(e)}") # Save remaining batch if batch: db.bulk_save_objects(batch) db.commit() result['success'] += len(batch) f.close() logger.info("import_trnstype_complete", **result) except Exception as e: db.rollback() result['errors'].append(f"Fatal error: {str(e)}") logger.error("import_trnstype_failed", error=str(e)) return result def import_trnslkup(db: Session, file_path: str) -> Dict[str, Any]: """Import TRNSLKUP.csv → TrnsLkup model.""" result = {'success': 0, 'errors': [], 'total_rows': 0} try: f, encoding = open_text_with_fallbacks(file_path) reader = csv.DictReader(f) batch = [] for row_num, row in enumerate(reader, start=2): result['total_rows'] += 1 try: t_code = clean_string(row.get('T_Code')) if not t_code: continue record = TrnsLkup( t_code=t_code, t_type=clean_string(row.get('T_Type')), t_type_l=clean_string(row.get('T_Type_L')), amount=parse_decimal(row.get('Amount')), description=clean_string(row.get('Description')) ) batch.append(record) if len(batch) >= BATCH_SIZE: db.bulk_save_objects(batch) db.commit() result['success'] += len(batch) batch = [] except Exception as e: result['errors'].append(f"Row {row_num}: {str(e)}") if batch: db.bulk_save_objects(batch) db.commit() result['success'] += len(batch) f.close() logger.info("import_trnslkup_complete", **result) except Exception as e: db.rollback() result['errors'].append(f"Fatal error: {str(e)}") logger.error("import_trnslkup_failed", error=str(e)) return result def import_footers(db: Session, file_path: str) -> Dict[str, Any]: """Import FOOTERS.csv → Footers model.""" result = {'success': 0, 'errors': [], 'total_rows': 0} try: f, encoding = open_text_with_fallbacks(file_path) reader = csv.DictReader(f) batch = [] for row_num, row in enumerate(reader, start=2): result['total_rows'] += 1 try: f_code = clean_string(row.get('F_Code')) if not f_code: continue record = Footers( f_code=f_code, f_footer=clean_string(row.get('F_Footer')) ) batch.append(record) if len(batch) >= BATCH_SIZE: db.bulk_save_objects(batch) db.commit() result['success'] += len(batch) batch = [] except Exception as e: result['errors'].append(f"Row {row_num}: {str(e)}") if batch: db.bulk_save_objects(batch) db.commit() result['success'] += len(batch) f.close() logger.info("import_footers_complete", **result) except Exception as e: db.rollback() result['errors'].append(f"Fatal error: {str(e)}") logger.error("import_footers_failed", error=str(e)) return result def import_filestat(db: Session, file_path: str) -> Dict[str, Any]: """Import FILESTAT.csv → FileStat model.""" result = {'success': 0, 'errors': [], 'total_rows': 0} try: f, encoding = open_text_with_fallbacks(file_path) reader = csv.DictReader(f) batch = [] for row_num, row in enumerate(reader, start=2): result['total_rows'] += 1 try: status = clean_string(row.get('Status')) if not status: continue record = FileStat( status=status, definition=clean_string(row.get('Definition')), send=clean_string(row.get('Send')), footer_code=clean_string(row.get('Footer_Code')) ) batch.append(record) if len(batch) >= BATCH_SIZE: db.bulk_save_objects(batch) db.commit() result['success'] += len(batch) batch = [] except Exception as e: result['errors'].append(f"Row {row_num}: {str(e)}") if batch: db.bulk_save_objects(batch) db.commit() result['success'] += len(batch) f.close() logger.info("import_filestat_complete", **result) except Exception as e: db.rollback() result['errors'].append(f"Fatal error: {str(e)}") logger.error("import_filestat_failed", error=str(e)) return result def import_employee(db: Session, file_path: str) -> Dict[str, Any]: """Import EMPLOYEE.csv → Employee model.""" result = {'success': 0, 'errors': [], 'total_rows': 0} try: f, encoding = open_text_with_fallbacks(file_path) reader = csv.DictReader(f) batch = [] for row_num, row in enumerate(reader, start=2): result['total_rows'] += 1 try: empl_num = clean_string(row.get('Empl_Num')) if not empl_num: continue record = Employee( empl_num=empl_num, empl_id=clean_string(row.get('Empl_Id')), rate_per_hour=parse_decimal(row.get('Rate_Per_Hour')) ) batch.append(record) if len(batch) >= BATCH_SIZE: db.bulk_save_objects(batch) db.commit() result['success'] += len(batch) batch = [] except Exception as e: result['errors'].append(f"Row {row_num}: {str(e)}") if batch: db.bulk_save_objects(batch) db.commit() result['success'] += len(batch) f.close() logger.info("import_employee_complete", **result) except Exception as e: db.rollback() result['errors'].append(f"Fatal error: {str(e)}") logger.error("import_employee_failed", error=str(e)) return result def import_gruplkup(db: Session, file_path: str) -> Dict[str, Any]: """Import GRUPLKUP.csv → GroupLkup model.""" result = {'success': 0, 'errors': [], 'total_rows': 0} try: f, encoding = open_text_with_fallbacks(file_path) reader = csv.DictReader(f) batch = [] for row_num, row in enumerate(reader, start=2): result['total_rows'] += 1 try: code = clean_string(row.get('Code')) if not code: continue record = GroupLkup( code=code, description=clean_string(row.get('Description')), title=clean_string(row.get('Title')) ) batch.append(record) if len(batch) >= BATCH_SIZE: db.bulk_save_objects(batch) db.commit() result['success'] += len(batch) batch = [] except Exception as e: result['errors'].append(f"Row {row_num}: {str(e)}") if batch: db.bulk_save_objects(batch) db.commit() result['success'] += len(batch) f.close() logger.info("import_gruplkup_complete", **result) except Exception as e: db.rollback() result['errors'].append(f"Fatal error: {str(e)}") logger.error("import_gruplkup_failed", error=str(e)) return result def import_filetype(db: Session, file_path: str) -> Dict[str, Any]: """Import FILETYPE.csv → FileType model.""" result = {'success': 0, 'errors': [], 'total_rows': 0} try: f, encoding = open_text_with_fallbacks(file_path) reader = csv.DictReader(f) batch = [] # Track seen file types in-memory to avoid duplicates within the same CSV seen_in_batch = set() for row_num, row in enumerate(reader, start=2): result['total_rows'] += 1 try: file_type = clean_string(row.get('File_Type')) if not file_type: continue # Skip if we've already queued this file_type in current batch if file_type in seen_in_batch: continue # Skip if it already exists in DB (prevents UNIQUE violations when re-importing) if db.query(FileType).filter(FileType.file_type == file_type).first(): continue record = FileType(file_type=file_type) batch.append(record) seen_in_batch.add(file_type) if len(batch) >= BATCH_SIZE: db.bulk_save_objects(batch) db.commit() result['success'] += len(batch) batch = [] except Exception as e: result['errors'].append(f"Row {row_num}: {str(e)}") if batch: db.bulk_save_objects(batch) db.commit() result['success'] += len(batch) f.close() logger.info("import_filetype_complete", **result) except Exception as e: db.rollback() result['errors'].append(f"Fatal error: {str(e)}") logger.error("import_filetype_failed", error=str(e)) return result def import_fvarlkup(db: Session, file_path: str) -> Dict[str, Any]: """Import FVARLKUP.csv → FVarLkup model.""" result = {'success': 0, 'errors': [], 'total_rows': 0} try: f, encoding = open_text_with_fallbacks(file_path) reader = csv.DictReader(f) batch = [] for row_num, row in enumerate(reader, start=2): result['total_rows'] += 1 try: identifier = clean_string(row.get('Identifier')) if not identifier: continue record = FVarLkup( identifier=identifier, query=clean_string(row.get('Query')), response=clean_string(row.get('Response')) ) batch.append(record) if len(batch) >= BATCH_SIZE: db.bulk_save_objects(batch) db.commit() result['success'] += len(batch) batch = [] except Exception as e: result['errors'].append(f"Row {row_num}: {str(e)}") if batch: db.bulk_save_objects(batch) db.commit() result['success'] += len(batch) f.close() logger.info("import_fvarlkup_complete", **result) except Exception as e: db.rollback() result['errors'].append(f"Fatal error: {str(e)}") logger.error("import_fvarlkup_failed", error=str(e)) return result def import_rvarlkup(db: Session, file_path: str) -> Dict[str, Any]: """Import RVARLKUP.csv → RVarLkup model.""" result = {'success': 0, 'errors': [], 'total_rows': 0} try: f, encoding = open_text_with_fallbacks(file_path) reader = csv.DictReader(f) batch = [] for row_num, row in enumerate(reader, start=2): result['total_rows'] += 1 try: identifier = clean_string(row.get('Identifier')) if not identifier: continue record = RVarLkup( identifier=identifier, query=clean_string(row.get('Query')) ) batch.append(record) if len(batch) >= BATCH_SIZE: db.bulk_save_objects(batch) db.commit() result['success'] += len(batch) batch = [] except Exception as e: result['errors'].append(f"Row {row_num}: {str(e)}") if batch: db.bulk_save_objects(batch) db.commit() result['success'] += len(batch) f.close() logger.info("import_rvarlkup_complete", **result) except Exception as e: db.rollback() result['errors'].append(f"Fatal error: {str(e)}") logger.error("import_rvarlkup_failed", error=str(e)) return result # ============================================================================ # Core Data Table Imports # ============================================================================ def import_rolodex(db: Session, file_path: str) -> Dict[str, Any]: """Import ROLODEX.csv → Rolodex model.""" result = {'success': 0, 'errors': [], 'total_rows': 0, 'skipped': 0} try: f, encoding = open_text_with_fallbacks(file_path) reader = csv.DictReader(f) # Track IDs we've seen in this import to handle duplicates seen_in_import = set() batch = [] for row_num, row in enumerate(reader, start=2): result['total_rows'] += 1 try: rolodex_id = clean_string(row.get('Id')) if not rolodex_id: continue # Skip if we've already processed this ID in current import if rolodex_id in seen_in_import: result['skipped'] += 1 continue # Skip if it already exists in database if db.query(Rolodex).filter(Rolodex.id == rolodex_id).first(): result['skipped'] += 1 seen_in_import.add(rolodex_id) continue record = Rolodex( id=rolodex_id, prefix=clean_string(row.get('Prefix')), first=clean_string(row.get('First')), middle=clean_string(row.get('Middle')), last=clean_string(row.get('Last')), suffix=clean_string(row.get('Suffix')), title=clean_string(row.get('Title')), a1=clean_string(row.get('A1')), a2=clean_string(row.get('A2')), a3=clean_string(row.get('A3')), city=clean_string(row.get('City')), abrev=clean_string(row.get('Abrev')), st=clean_string(row.get('St')), zip=clean_string(row.get('Zip')), email=clean_string(row.get('Email')), dob=parse_date(row.get('DOB')), ss=clean_string(row.get('SS#')), legal_status=clean_string(row.get('Legal_Status')), group=clean_string(row.get('Group')), memo=clean_string(row.get('Memo')) ) batch.append(record) seen_in_import.add(rolodex_id) if len(batch) >= BATCH_SIZE: try: db.bulk_save_objects(batch) db.commit() result['success'] += len(batch) batch = [] except IntegrityError as ie: db.rollback() # Handle any remaining duplicates by inserting one at a time for record in batch: try: db.add(record) db.commit() result['success'] += 1 except IntegrityError: db.rollback() result['skipped'] += 1 batch = [] except Exception as e: result['errors'].append(f"Row {row_num}: {str(e)}") # Save remaining batch if batch: try: db.bulk_save_objects(batch) db.commit() result['success'] += len(batch) except IntegrityError: db.rollback() # Handle any remaining duplicates by inserting one at a time for record in batch: try: db.add(record) db.commit() result['success'] += 1 except IntegrityError: db.rollback() result['skipped'] += 1 f.close() logger.info("import_rolodex_complete", **result) except Exception as e: db.rollback() result['errors'].append(f"Fatal error: {str(e)}") logger.error("import_rolodex_failed", error=str(e)) return result def import_phone(db: Session, file_path: str) -> Dict[str, Any]: """Import PHONE.csv → LegacyPhone model.""" result = {'success': 0, 'errors': [], 'total_rows': 0} try: f, encoding = open_text_with_fallbacks(file_path) reader = csv.DictReader(f) batch = [] for row_num, row in enumerate(reader, start=2): result['total_rows'] += 1 try: rolodex_id = clean_string(row.get('Id')) phone = clean_string(row.get('Phone')) if not rolodex_id or not phone: continue record = LegacyPhone( id=rolodex_id, phone=phone, location=clean_string(row.get('Location')) ) batch.append(record) if len(batch) >= BATCH_SIZE: db.bulk_save_objects(batch) db.commit() result['success'] += len(batch) batch = [] except Exception as e: result['errors'].append(f"Row {row_num}: {str(e)}") if batch: db.bulk_save_objects(batch) db.commit() result['success'] += len(batch) f.close() logger.info("import_phone_complete", **result) except Exception as e: db.rollback() result['errors'].append(f"Fatal error: {str(e)}") logger.error("import_phone_failed", error=str(e)) return result def import_rolex_v(db: Session, file_path: str) -> Dict[str, Any]: """Import ROLEX_V.csv → RolexV model.""" result = {'success': 0, 'errors': [], 'total_rows': 0} try: f, encoding = open_text_with_fallbacks(file_path) reader = csv.DictReader(f) batch = [] for row_num, row in enumerate(reader, start=2): result['total_rows'] += 1 try: rolodex_id = clean_string(row.get('Id')) identifier = clean_string(row.get('Identifier')) if not rolodex_id or not identifier: continue record = RolexV( id=rolodex_id, identifier=identifier, response=clean_string(row.get('Response')) ) batch.append(record) if len(batch) >= BATCH_SIZE: db.bulk_save_objects(batch) db.commit() result['success'] += len(batch) batch = [] except Exception as e: result['errors'].append(f"Row {row_num}: {str(e)}") if batch: db.bulk_save_objects(batch) db.commit() result['success'] += len(batch) f.close() logger.info("import_rolex_v_complete", **result) except Exception as e: db.rollback() result['errors'].append(f"Fatal error: {str(e)}") logger.error("import_rolex_v_failed", error=str(e)) return result def import_files(db: Session, file_path: str) -> Dict[str, Any]: """Import FILES.csv → LegacyFile model.""" result = {'success': 0, 'errors': [], 'total_rows': 0} try: f, encoding = open_text_with_fallbacks(file_path) reader = csv.DictReader(f) batch = [] for row_num, row in enumerate(reader, start=2): result['total_rows'] += 1 try: file_no = clean_string(row.get('File_No')) if not file_no: continue record = LegacyFile( file_no=file_no, id=clean_string(row.get('Id')), file_type=clean_string(row.get('File_Type')), regarding=clean_string(row.get('Regarding')), opened=parse_date(row.get('Opened')), closed=parse_date(row.get('Closed')), empl_num=clean_string(row.get('Empl_Num')), rate_per_hour=parse_decimal(row.get('Rate_Per_Hour')), status=clean_string(row.get('Status')), footer_code=clean_string(row.get('Footer_Code')), opposing=clean_string(row.get('Opposing')), hours=parse_decimal(row.get('Hours')), hours_p=parse_decimal(row.get('Hours_P')), trust_bal=parse_decimal(row.get('Trust_Bal')), trust_bal_p=parse_decimal(row.get('Trust_Bal_P')), hourly_fees=parse_decimal(row.get('Hourly_Fees')), hourly_fees_p=parse_decimal(row.get('Hourly_Fees_P')), flat_fees=parse_decimal(row.get('Flat_Fees')), flat_fees_p=parse_decimal(row.get('Flat_Fees_P')), disbursements=parse_decimal(row.get('Disbursements')), disbursements_p=parse_decimal(row.get('Disbursements_P')), credit_bal=parse_decimal(row.get('Credit_Bal')), credit_bal_p=parse_decimal(row.get('Credit_Bal_P')), total_charges=parse_decimal(row.get('Total_Charges')), total_charges_p=parse_decimal(row.get('Total_Charges_P')), amount_owing=parse_decimal(row.get('Amount_Owing')), amount_owing_p=parse_decimal(row.get('Amount_Owing_P')), transferable=parse_decimal(row.get('Transferable')), memo=clean_string(row.get('Memo')) ) batch.append(record) if len(batch) >= BATCH_SIZE: db.bulk_save_objects(batch) db.commit() result['success'] += len(batch) batch = [] except Exception as e: result['errors'].append(f"Row {row_num}: {str(e)}") if batch: db.bulk_save_objects(batch) db.commit() result['success'] += len(batch) f.close() logger.info("import_files_complete", **result) except Exception as e: db.rollback() result['errors'].append(f"Fatal error: {str(e)}") logger.error("import_files_failed", error=str(e)) return result def import_files_r(db: Session, file_path: str) -> Dict[str, Any]: """Import FILES_R.csv → FilesR model.""" result = {'success': 0, 'errors': [], 'total_rows': 0} try: f, encoding = open_text_with_fallbacks(file_path) reader = csv.DictReader(f) batch = [] for row_num, row in enumerate(reader, start=2): result['total_rows'] += 1 try: file_no = clean_string(row.get('File_No')) relationship = clean_string(row.get('Relationship')) rolodex_id = clean_string(row.get('Rolodex_Id')) if not file_no or not relationship or not rolodex_id: continue record = FilesR( file_no=file_no, relationship=relationship, rolodex_id=rolodex_id ) batch.append(record) if len(batch) >= BATCH_SIZE: db.bulk_save_objects(batch) db.commit() result['success'] += len(batch) batch = [] except Exception as e: result['errors'].append(f"Row {row_num}: {str(e)}") if batch: db.bulk_save_objects(batch) db.commit() result['success'] += len(batch) f.close() logger.info("import_files_r_complete", **result) except Exception as e: db.rollback() result['errors'].append(f"Fatal error: {str(e)}") logger.error("import_files_r_failed", error=str(e)) return result def import_files_v(db: Session, file_path: str) -> Dict[str, Any]: """Import FILES_V.csv → FilesV model.""" result = {'success': 0, 'errors': [], 'total_rows': 0} try: f, encoding = open_text_with_fallbacks(file_path) reader = csv.DictReader(f) batch = [] for row_num, row in enumerate(reader, start=2): result['total_rows'] += 1 try: file_no = clean_string(row.get('File_No')) identifier = clean_string(row.get('Identifier')) if not file_no or not identifier: continue record = FilesV( file_no=file_no, identifier=identifier, response=clean_string(row.get('Response')) ) batch.append(record) if len(batch) >= BATCH_SIZE: db.bulk_save_objects(batch) db.commit() result['success'] += len(batch) batch = [] except Exception as e: result['errors'].append(f"Row {row_num}: {str(e)}") if batch: db.bulk_save_objects(batch) db.commit() result['success'] += len(batch) f.close() logger.info("import_files_v_complete", **result) except Exception as e: db.rollback() result['errors'].append(f"Fatal error: {str(e)}") logger.error("import_files_v_failed", error=str(e)) return result def import_filenots(db: Session, file_path: str) -> Dict[str, Any]: """Import FILENOTS.csv → FileNots model.""" result = {'success': 0, 'errors': [], 'total_rows': 0} try: f, encoding = open_text_with_fallbacks(file_path) reader = csv.DictReader(f) batch = [] for row_num, row in enumerate(reader, start=2): result['total_rows'] += 1 try: file_no = clean_string(row.get('File_No')) memo_date = parse_date(row.get('Memo_Date')) if not file_no or not memo_date: continue record = FileNots( file_no=file_no, memo_date=memo_date, memo_note=clean_string(row.get('Memo_Note')) ) batch.append(record) if len(batch) >= BATCH_SIZE: db.bulk_save_objects(batch) db.commit() result['success'] += len(batch) batch = [] except Exception as e: result['errors'].append(f"Row {row_num}: {str(e)}") if batch: db.bulk_save_objects(batch) db.commit() result['success'] += len(batch) f.close() logger.info("import_filenots_complete", **result) except Exception as e: db.rollback() result['errors'].append(f"Fatal error: {str(e)}") logger.error("import_filenots_failed", error=str(e)) return result def import_ledger(db: Session, file_path: str) -> Dict[str, Any]: """Import LEDGER.csv → Ledger model.""" result = {'success': 0, 'errors': [], 'total_rows': 0} try: f, encoding = open_text_with_fallbacks(file_path) reader = csv.DictReader(f) batch = [] for row_num, row in enumerate(reader, start=2): result['total_rows'] += 1 try: file_no = clean_string(row.get('File_No')) item_no_str = clean_string(row.get('Item_No')) if not file_no or not item_no_str: continue try: item_no = int(item_no_str) except ValueError: result['errors'].append(f"Row {row_num}: Invalid Item_No '{item_no_str}'") continue record = Ledger( file_no=file_no, date=parse_date(row.get('Date')), item_no=item_no, empl_num=clean_string(row.get('Empl_Num')), t_code=clean_string(row.get('T_Code')), t_type=clean_string(row.get('T_Type')), t_type_l=clean_string(row.get('T_Type_L')), quantity=parse_decimal(row.get('Quantity')), rate=parse_decimal(row.get('Rate')), amount=parse_decimal(row.get('Amount')), billed=clean_string(row.get('Billed')), note=clean_string(row.get('Note')) ) batch.append(record) if len(batch) >= BATCH_SIZE: db.bulk_save_objects(batch) db.commit() result['success'] += len(batch) batch = [] except Exception as e: result['errors'].append(f"Row {row_num}: {str(e)}") if batch: db.bulk_save_objects(batch) db.commit() result['success'] += len(batch) f.close() logger.info("import_ledger_complete", **result) except Exception as e: db.rollback() result['errors'].append(f"Fatal error: {str(e)}") logger.error("import_ledger_failed", error=str(e)) return result def import_deposits(db: Session, file_path: str) -> Dict[str, Any]: """Import DEPOSITS.csv → Deposits model.""" result = {'success': 0, 'errors': [], 'total_rows': 0} try: f, encoding = open_text_with_fallbacks(file_path) reader = csv.DictReader(f) batch = [] for row_num, row in enumerate(reader, start=2): result['total_rows'] += 1 try: deposit_date = parse_date(row.get('Deposit_Date')) if not deposit_date: continue record = Deposits( deposit_date=deposit_date, total=parse_decimal(row.get('Total')) ) batch.append(record) if len(batch) >= BATCH_SIZE: db.bulk_save_objects(batch) db.commit() result['success'] += len(batch) batch = [] except Exception as e: result['errors'].append(f"Row {row_num}: {str(e)}") if batch: db.bulk_save_objects(batch) db.commit() result['success'] += len(batch) f.close() logger.info("import_deposits_complete", **result) except Exception as e: db.rollback() result['errors'].append(f"Fatal error: {str(e)}") logger.error("import_deposits_failed", error=str(e)) return result def import_payments(db: Session, file_path: str) -> Dict[str, Any]: """Import PAYMENTS.csv → LegacyPayment model.""" result = {'success': 0, 'errors': [], 'total_rows': 0} try: f, encoding = open_text_with_fallbacks(file_path) reader = csv.DictReader(f) batch = [] for row_num, row in enumerate(reader, start=2): result['total_rows'] += 1 try: # LegacyPayment has auto-increment id, so we don't need to check for it record = LegacyPayment( deposit_date=parse_date(row.get('Deposit_Date')), file_no=clean_string(row.get('File_No')), rolodex_id=clean_string(row.get('Id')), regarding=clean_string(row.get('Regarding')), amount=parse_decimal(row.get('Amount')), note=clean_string(row.get('Note')) ) batch.append(record) if len(batch) >= BATCH_SIZE: db.bulk_save_objects(batch) db.commit() result['success'] += len(batch) batch = [] except Exception as e: result['errors'].append(f"Row {row_num}: {str(e)}") if batch: db.bulk_save_objects(batch) db.commit() result['success'] += len(batch) f.close() logger.info("import_payments_complete", **result) except Exception as e: db.rollback() result['errors'].append(f"Fatal error: {str(e)}") logger.error("import_payments_failed", error=str(e)) return result # ============================================================================ # Specialized Imports (QDRO, Pensions, Plan Info) # ============================================================================ def import_planinfo(db: Session, file_path: str) -> Dict[str, Any]: """Import PLANINFO.csv → PlanInfo model.""" result = {'success': 0, 'errors': [], 'total_rows': 0} try: f, encoding = open_text_with_fallbacks(file_path) reader = csv.DictReader(f) batch = [] for row_num, row in enumerate(reader, start=2): result['total_rows'] += 1 try: plan_id = clean_string(row.get('Plan_Id')) if not plan_id: continue record = PlanInfo( plan_id=plan_id, plan_name=clean_string(row.get('Plan_Name')), plan_type=clean_string(row.get('Plan_Type')), empl_id_no=clean_string(row.get('Empl_Id_No')), plan_no=clean_string(row.get('Plan_No')), nra=clean_string(row.get('NRA')), era=clean_string(row.get('ERA')), errf=clean_string(row.get('ERRF')), colas=clean_string(row.get('COLAS')), divided_by=clean_string(row.get('Divided_By')), drafted=clean_string(row.get('Drafted')), benefit_c=clean_string(row.get('Benefit_C')), qdro_c=clean_string(row.get('QDRO_C')), rev=clean_string(row.get('^REV')), pa=clean_string(row.get('^PA')), form_name=clean_string(row.get('Form_Name')), drafted_on=parse_date(row.get('Drafted_On')), memo=clean_string(row.get('Memo')) ) batch.append(record) if len(batch) >= BATCH_SIZE: db.bulk_save_objects(batch) db.commit() result['success'] += len(batch) batch = [] except Exception as e: result['errors'].append(f"Row {row_num}: {str(e)}") if batch: db.bulk_save_objects(batch) db.commit() result['success'] += len(batch) f.close() logger.info("import_planinfo_complete", **result) except Exception as e: db.rollback() result['errors'].append(f"Fatal error: {str(e)}") logger.error("import_planinfo_failed", error=str(e)) return result def import_qdros(db: Session, file_path: str) -> Dict[str, Any]: """Import QDROS.csv → Qdros model.""" result = {'success': 0, 'errors': [], 'total_rows': 0} try: f, encoding = open_text_with_fallbacks(file_path) reader = csv.DictReader(f) batch = [] for row_num, row in enumerate(reader, start=2): result['total_rows'] += 1 try: file_no = clean_string(row.get('File_No')) version = clean_string(row.get('Version')) if not file_no or not version: continue record = Qdros( file_no=file_no, version=version, plan_id=clean_string(row.get('Plan_Id')), _1=clean_string(row.get('^1')), _2=clean_string(row.get('^2')), part=clean_string(row.get('^Part')), altp=clean_string(row.get('^AltP')), pet=clean_string(row.get('^Pet')), res=clean_string(row.get('^Res')), case_type=clean_string(row.get('Case_Type')), case_code=clean_string(row.get('Case_Code')), section=clean_string(row.get('Section')), case_number=clean_string(row.get('Case_Number')), judgment_date=parse_date(row.get('Judgment_Date')), valuation_date=parse_date(row.get('Valuation_Date')), married_on=parse_date(row.get('Married_On')), percent_awarded=parse_decimal(row.get('Percent_Awarded')), ven_city=clean_string(row.get('Ven_City')), ven_cnty=clean_string(row.get('Ven_Cnty')), ven_st=clean_string(row.get('Ven_St')), draft_out=parse_date(row.get('Draft_Out')), draft_apr=parse_date(row.get('Draft_Apr')), final_out=parse_date(row.get('Final_Out')), judge=clean_string(row.get('Judge')), form_name=clean_string(row.get('Form_Name')) ) batch.append(record) if len(batch) >= BATCH_SIZE: db.bulk_save_objects(batch) db.commit() result['success'] += len(batch) batch = [] except Exception as e: result['errors'].append(f"Row {row_num}: {str(e)}") if batch: db.bulk_save_objects(batch) db.commit() result['success'] += len(batch) f.close() logger.info("import_qdros_complete", **result) except Exception as e: db.rollback() result['errors'].append(f"Fatal error: {str(e)}") logger.error("import_qdros_failed", error=str(e)) return result def import_pensions(db: Session, file_path: str) -> Dict[str, Any]: """Import PENSIONS.csv → Pensions model.""" result = {'success': 0, 'errors': [], 'total_rows': 0} try: f, encoding = open_text_with_fallbacks(file_path) reader = csv.DictReader(f) batch = [] for row_num, row in enumerate(reader, start=2): result['total_rows'] += 1 try: file_no = clean_string(row.get('File_No')) version = clean_string(row.get('Version')) if not file_no or not version: continue record = Pensions( file_no=file_no, version=version, plan_id=clean_string(row.get('Plan_Id')), plan_name=clean_string(row.get('Plan_Name')), title=clean_string(row.get('Title')), first=clean_string(row.get('First')), last=clean_string(row.get('Last')), birth=parse_date(row.get('Birth')), race=clean_string(row.get('Race')), sex=clean_string(row.get('Sex')), info=parse_date(row.get('Info')), valu=parse_date(row.get('Valu')), accrued=parse_decimal(row.get('Accrued')), vested_per=parse_decimal(row.get('Vested_Per')), start_age=parse_decimal(row.get('Start_Age')), cola=parse_decimal(row.get('COLA')), max_cola=parse_decimal(row.get('Max_COLA')), withdrawal=parse_decimal(row.get('Withdrawal')), pre_dr=parse_decimal(row.get('Pre_DR')), post_dr=parse_decimal(row.get('Post_DR')), tax_rate=parse_decimal(row.get('Tax_Rate')) ) batch.append(record) if len(batch) >= BATCH_SIZE: db.bulk_save_objects(batch) db.commit() result['success'] += len(batch) batch = [] except Exception as e: result['errors'].append(f"Row {row_num}: {str(e)}") if batch: db.bulk_save_objects(batch) db.commit() result['success'] += len(batch) f.close() logger.info("import_pensions_complete", **result) except Exception as e: db.rollback() result['errors'].append(f"Fatal error: {str(e)}") logger.error("import_pensions_failed", error=str(e)) return result def import_pension_marriage(db: Session, file_path: str) -> Dict[str, Any]: """Import Pensions/MARRIAGE.csv → PensionMarriage model.""" result = {'success': 0, 'errors': [], 'total_rows': 0} try: f, encoding = open_text_with_fallbacks(file_path) reader = csv.DictReader(f) batch = [] for row_num, row in enumerate(reader, start=2): result['total_rows'] += 1 try: file_no = clean_string(row.get('File_No')) version = clean_string(row.get('Version')) if not file_no or not version: continue record = PensionMarriage( file_no=file_no, version=version, married_from=parse_date(row.get('Married_From')), married_to=parse_date(row.get('Married_To')), married_years=parse_decimal(row.get('Married_Years')), service_from=parse_date(row.get('Service_From')), service_to=parse_date(row.get('Service_To')), service_years=parse_decimal(row.get('Service_Years')), marital_pct=parse_decimal(row.get('Marital_%')) ) batch.append(record) if len(batch) >= BATCH_SIZE: db.bulk_save_objects(batch) db.commit() result['success'] += len(batch) batch = [] except Exception as e: result['errors'].append(f"Row {row_num}: {str(e)}") if batch: db.bulk_save_objects(batch) db.commit() result['success'] += len(batch) f.close() logger.info("import_pension_marriage_complete", **result) except Exception as e: db.rollback() result['errors'].append(f"Fatal error: {str(e)}") logger.error("import_pension_marriage_failed", error=str(e)) return result def import_pension_death(db: Session, file_path: str) -> Dict[str, Any]: """Import Pensions/DEATH.csv → PensionDeath model.""" result = {'success': 0, 'errors': [], 'total_rows': 0} try: f, encoding = open_text_with_fallbacks(file_path) reader = csv.DictReader(f) batch = [] for row_num, row in enumerate(reader, start=2): result['total_rows'] += 1 try: file_no = clean_string(row.get('File_No')) version = clean_string(row.get('Version')) if not file_no or not version: continue record = PensionDeath( file_no=file_no, version=version, lump1=parse_decimal(row.get('Lump1')), lump2=parse_decimal(row.get('Lump2')), growth1=parse_decimal(row.get('Growth1')), growth2=parse_decimal(row.get('Growth2')), disc1=parse_decimal(row.get('Disc1')), disc2=parse_decimal(row.get('Disc2')) ) batch.append(record) if len(batch) >= BATCH_SIZE: db.bulk_save_objects(batch) db.commit() result['success'] += len(batch) batch = [] except Exception as e: result['errors'].append(f"Row {row_num}: {str(e)}") if batch: db.bulk_save_objects(batch) db.commit() result['success'] += len(batch) f.close() logger.info("import_pension_death_complete", **result) except Exception as e: db.rollback() result['errors'].append(f"Fatal error: {str(e)}") logger.error("import_pension_death_failed", error=str(e)) return result def import_pension_schedule(db: Session, file_path: str) -> Dict[str, Any]: """Import Pensions/SCHEDULE.csv → PensionSchedule model.""" result = {'success': 0, 'errors': [], 'total_rows': 0} try: f, encoding = open_text_with_fallbacks(file_path) reader = csv.DictReader(f) batch = [] for row_num, row in enumerate(reader, start=2): result['total_rows'] += 1 try: file_no = clean_string(row.get('File_No')) version = clean_string(row.get('Version')) if not file_no or not version: continue record = PensionSchedule( file_no=file_no, version=version, vests_on=parse_date(row.get('Vests_On')), vests_at=parse_decimal(row.get('Vests_At')) ) batch.append(record) if len(batch) >= BATCH_SIZE: db.bulk_save_objects(batch) db.commit() result['success'] += len(batch) batch = [] except Exception as e: result['errors'].append(f"Row {row_num}: {str(e)}") if batch: db.bulk_save_objects(batch) db.commit() result['success'] += len(batch) f.close() logger.info("import_pension_schedule_complete", **result) except Exception as e: db.rollback() result['errors'].append(f"Fatal error: {str(e)}") logger.error("import_pension_schedule_failed", error=str(e)) return result def import_pension_separate(db: Session, file_path: str) -> Dict[str, Any]: """Import Pensions/SEPARATE.csv → PensionSeparate model.""" result = {'success': 0, 'errors': [], 'total_rows': 0} try: f, encoding = open_text_with_fallbacks(file_path) reader = csv.DictReader(f) batch = [] for row_num, row in enumerate(reader, start=2): result['total_rows'] += 1 try: file_no = clean_string(row.get('File_No')) version = clean_string(row.get('Version')) if not file_no or not version: continue record = PensionSeparate( file_no=file_no, version=version, separation_rate=parse_decimal(row.get('Separation_Rate')) ) batch.append(record) if len(batch) >= BATCH_SIZE: db.bulk_save_objects(batch) db.commit() result['success'] += len(batch) batch = [] except Exception as e: result['errors'].append(f"Row {row_num}: {str(e)}") if batch: db.bulk_save_objects(batch) db.commit() result['success'] += len(batch) f.close() logger.info("import_pension_separate_complete", **result) except Exception as e: db.rollback() result['errors'].append(f"Fatal error: {str(e)}") logger.error("import_pension_separate_failed", error=str(e)) return result def import_pension_results(db: Session, file_path: str) -> Dict[str, Any]: """Import Pensions/RESULTS.csv → PensionResults model.""" result = {'success': 0, 'errors': [], 'total_rows': 0} try: f, encoding = open_text_with_fallbacks(file_path) reader = csv.DictReader(f) batch = [] for row_num, row in enumerate(reader, start=2): result['total_rows'] += 1 try: # Note: RESULTS.csv might not have explicit file_no/version columns # May need to derive from associated pension record # For now, skip if missing file_no = clean_string(row.get('File_No')) version = clean_string(row.get('Version')) if not file_no or not version: continue record = PensionResults( file_no=file_no, version=version, accrued=parse_decimal(row.get('Accrued')), start_age=parse_decimal(row.get('Start_Age')), cola=parse_decimal(row.get('COLA')), withdrawal=parse_decimal(row.get('Withdrawal')), pre_dr=parse_decimal(row.get('Pre_DR')), post_dr=parse_decimal(row.get('Post_DR')), tax_rate=parse_decimal(row.get('Tax_Rate')), age=parse_decimal(row.get('Age')), years_from=parse_decimal(row.get('Years_From')), life_exp=parse_decimal(row.get('Life_Exp')), ev_monthly=parse_decimal(row.get('EV_Monthly')), payments=parse_decimal(row.get('Payments')), pay_out=parse_decimal(row.get('Pay_Out')), fund_value=parse_decimal(row.get('Fund_Value')), pv=parse_decimal(row.get('PV')), mortality=parse_decimal(row.get('Mortality')), pv_am=parse_decimal(row.get('PV_AM')), pv_amt=parse_decimal(row.get('PV_AMT')), pv_pre_db=parse_decimal(row.get('PV_Pre_DB')), pv_annuity=parse_decimal(row.get('PV_Annuity')), wv_at=parse_decimal(row.get('WV_AT')), pv_plan=parse_decimal(row.get('PV_Plan')), years_married=parse_decimal(row.get('Years_Married')), years_service=parse_decimal(row.get('Years_Service')), marr_per=parse_decimal(row.get('Marr_Per')), marr_amt=parse_decimal(row.get('Marr_Amt')) ) batch.append(record) if len(batch) >= BATCH_SIZE: db.bulk_save_objects(batch) db.commit() result['success'] += len(batch) batch = [] except Exception as e: result['errors'].append(f"Row {row_num}: {str(e)}") if batch: db.bulk_save_objects(batch) db.commit() result['success'] += len(batch) f.close() logger.info("import_pension_results_complete", **result) except Exception as e: db.rollback() result['errors'].append(f"Fatal error: {str(e)}") logger.error("import_pension_results_failed", error=str(e)) return result