From 4030dbd88e4062bf575ae2804429f5c52434dc11 Mon Sep 17 00:00:00 2001 From: HotSwapp <47397945+HotSwapp@users.noreply.github.com> Date: Wed, 8 Oct 2025 09:41:38 -0500 Subject: [PATCH] Implement comprehensive CSV import system for legacy database migration - Added 5 new legacy models to app/models.py (FileType, FileNots, RolexV, FVarLkup, RVarLkup) - Created app/import_legacy.py with import functions for all legacy tables: * Reference tables: TRNSTYPE, TRNSLKUP, FOOTERS, FILESTAT, EMPLOYEE, GRUPLKUP, FILETYPE, FVARLKUP, RVARLKUP * Core tables: ROLODEX, PHONE, ROLEX_V, FILES, FILES_R, FILES_V, FILENOTS, LEDGER, DEPOSITS, PAYMENTS * Specialized: PLANINFO, QDROS, PENSIONS and all pension-related tables - Created app/sync_legacy_to_modern.py with sync functions to populate modern models from legacy data - Updated admin routes in app/main.py: * Extended process_csv_import to support all new import types * Added /admin/sync endpoint for syncing legacy to modern models * Updated get_import_type_from_filename to recognize all CSV file patterns - Enhanced app/templates/admin.html with: * Import Order Guide showing recommended import sequence * Sync to Modern Models section with confirmation dialog * Sync results display with detailed per-table statistics * Updated supported file formats list - All import functions use batch processing (500 rows), proper error handling, and structured logging - Sync functions maintain foreign key integrity and skip orphaned records with warnings --- app/import_legacy.py | 1615 ++++++++++++++++++++++++++++++++++ app/main.py | 207 ++++- app/models.py | 65 ++ app/sync_legacy_to_modern.py | 528 +++++++++++ app/templates/admin.html | 168 +++- delphi.db | Bin 417792 -> 417792 bytes 6 files changed, 2545 insertions(+), 38 deletions(-) create mode 100644 app/import_legacy.py create mode 100644 app/sync_legacy_to_modern.py diff --git a/app/import_legacy.py b/app/import_legacy.py new file mode 100644 index 0000000..f115ab9 --- /dev/null +++ b/app/import_legacy.py @@ -0,0 +1,1615 @@ +""" +Legacy CSV import functions for Delphi Database. + +This module provides import functions for all legacy database tables, +reading from old-csv files and populating the legacy SQLAlchemy models. +""" + +import csv +import os +from datetime import datetime +from decimal import Decimal, InvalidOperation +from typing import Dict, Any, Optional +from sqlalchemy.orm import Session +from sqlalchemy.exc import IntegrityError +import structlog + +from .models import ( + Rolodex, LegacyPhone, LegacyFile, FilesR, FilesV, FileNots, + Ledger, Deposits, LegacyPayment, TrnsType, TrnsLkup, + Footers, FileStat, Employee, GroupLkup, FileType, + Qdros, PlanInfo, Pensions, PensionMarriage, PensionDeath, + PensionSchedule, PensionSeparate, PensionResults, + RolexV, FVarLkup, RVarLkup +) + +logger = structlog.get_logger(__name__) + +# Batch size for commits +BATCH_SIZE = 500 + + +def open_text_with_fallbacks(file_path: str): + """ + Open a text file trying multiple encodings commonly seen in legacy CSVs. + + Returns a tuple of (file_object, encoding_used). + """ + encodings = ["utf-8", "utf-8-sig", "cp1252", "windows-1252", "cp1250", "iso-8859-1", "latin-1"] + last_error = None + for enc in encodings: + try: + f = open(file_path, 'r', encoding=enc, errors='strict', newline='') + _ = f.read(1024) + f.seek(0) + logger.info("csv_open_encoding_selected", file=file_path, encoding=enc) + return f, enc + except Exception as e: + last_error = e + logger.warning("encoding_fallback_failed", file=file_path, encoding=enc, error=str(e)) + continue + + error_msg = f"Unable to open file '{file_path}' with any supported encodings" + if last_error: + error_msg += f". Last error: {str(last_error)}" + raise RuntimeError(error_msg) + + +def parse_date(date_str: str) -> Optional[datetime]: + """Parse date string in various formats, return None if blank/invalid.""" + if not date_str or not date_str.strip(): + return None + + date_str = date_str.strip() + + # Try common date formats + formats = [ + "%m/%d/%Y", + "%m/%d/%y", + "%Y-%m-%d", + "%m-%d-%Y", + "%m-%d-%y", + ] + + for fmt in formats: + try: + return datetime.strptime(date_str, fmt) + except ValueError: + continue + + logger.warning("date_parse_failed", date_string=date_str) + return None + + +def parse_decimal(value: str) -> Optional[Decimal]: + """Parse decimal string, return None if blank/invalid.""" + if not value or not value.strip(): + return None + + try: + return Decimal(value.strip()) + except (InvalidOperation, ValueError): + logger.warning("decimal_parse_failed", value=value) + return None + + +def clean_string(value: str) -> Optional[str]: + """Clean string value, return None if blank.""" + if not value or not value.strip(): + return None + return value.strip() + + +# ============================================================================ +# Reference Table Imports (should be imported first) +# ============================================================================ + +def import_trnstype(db: Session, file_path: str) -> Dict[str, Any]: + """Import TRNSTYPE.csv → TrnsType model.""" + result = {'success': 0, 'errors': [], 'total_rows': 0} + + try: + f, encoding = open_text_with_fallbacks(file_path) + reader = csv.DictReader(f) + + batch = [] + for row_num, row in enumerate(reader, start=2): + result['total_rows'] += 1 + + try: + t_type = clean_string(row.get('T_Type')) + if not t_type: + continue + + record = TrnsType( + t_type=t_type, + t_type_l=clean_string(row.get('T_Type_L')), + header=clean_string(row.get('Header')), + footer=clean_string(row.get('Footer')) + ) + batch.append(record) + + if len(batch) >= BATCH_SIZE: + db.bulk_save_objects(batch) + db.commit() + result['success'] += len(batch) + batch = [] + + except Exception as e: + result['errors'].append(f"Row {row_num}: {str(e)}") + + # Save remaining batch + if batch: + db.bulk_save_objects(batch) + db.commit() + result['success'] += len(batch) + + f.close() + logger.info("import_trnstype_complete", **result) + + except Exception as e: + db.rollback() + result['errors'].append(f"Fatal error: {str(e)}") + logger.error("import_trnstype_failed", error=str(e)) + + return result + + +def import_trnslkup(db: Session, file_path: str) -> Dict[str, Any]: + """Import TRNSLKUP.csv → TrnsLkup model.""" + result = {'success': 0, 'errors': [], 'total_rows': 0} + + try: + f, encoding = open_text_with_fallbacks(file_path) + reader = csv.DictReader(f) + + batch = [] + for row_num, row in enumerate(reader, start=2): + result['total_rows'] += 1 + + try: + t_code = clean_string(row.get('T_Code')) + if not t_code: + continue + + record = TrnsLkup( + t_code=t_code, + t_type=clean_string(row.get('T_Type')), + t_type_l=clean_string(row.get('T_Type_L')), + amount=parse_decimal(row.get('Amount')), + description=clean_string(row.get('Description')) + ) + batch.append(record) + + if len(batch) >= BATCH_SIZE: + db.bulk_save_objects(batch) + db.commit() + result['success'] += len(batch) + batch = [] + + except Exception as e: + result['errors'].append(f"Row {row_num}: {str(e)}") + + if batch: + db.bulk_save_objects(batch) + db.commit() + result['success'] += len(batch) + + f.close() + logger.info("import_trnslkup_complete", **result) + + except Exception as e: + db.rollback() + result['errors'].append(f"Fatal error: {str(e)}") + logger.error("import_trnslkup_failed", error=str(e)) + + return result + + +def import_footers(db: Session, file_path: str) -> Dict[str, Any]: + """Import FOOTERS.csv → Footers model.""" + result = {'success': 0, 'errors': [], 'total_rows': 0} + + try: + f, encoding = open_text_with_fallbacks(file_path) + reader = csv.DictReader(f) + + batch = [] + for row_num, row in enumerate(reader, start=2): + result['total_rows'] += 1 + + try: + f_code = clean_string(row.get('F_Code')) + if not f_code: + continue + + record = Footers( + f_code=f_code, + f_footer=clean_string(row.get('F_Footer')) + ) + batch.append(record) + + if len(batch) >= BATCH_SIZE: + db.bulk_save_objects(batch) + db.commit() + result['success'] += len(batch) + batch = [] + + except Exception as e: + result['errors'].append(f"Row {row_num}: {str(e)}") + + if batch: + db.bulk_save_objects(batch) + db.commit() + result['success'] += len(batch) + + f.close() + logger.info("import_footers_complete", **result) + + except Exception as e: + db.rollback() + result['errors'].append(f"Fatal error: {str(e)}") + logger.error("import_footers_failed", error=str(e)) + + return result + + +def import_filestat(db: Session, file_path: str) -> Dict[str, Any]: + """Import FILESTAT.csv → FileStat model.""" + result = {'success': 0, 'errors': [], 'total_rows': 0} + + try: + f, encoding = open_text_with_fallbacks(file_path) + reader = csv.DictReader(f) + + batch = [] + for row_num, row in enumerate(reader, start=2): + result['total_rows'] += 1 + + try: + status = clean_string(row.get('Status')) + if not status: + continue + + record = FileStat( + status=status, + definition=clean_string(row.get('Definition')), + send=clean_string(row.get('Send')), + footer_code=clean_string(row.get('Footer_Code')) + ) + batch.append(record) + + if len(batch) >= BATCH_SIZE: + db.bulk_save_objects(batch) + db.commit() + result['success'] += len(batch) + batch = [] + + except Exception as e: + result['errors'].append(f"Row {row_num}: {str(e)}") + + if batch: + db.bulk_save_objects(batch) + db.commit() + result['success'] += len(batch) + + f.close() + logger.info("import_filestat_complete", **result) + + except Exception as e: + db.rollback() + result['errors'].append(f"Fatal error: {str(e)}") + logger.error("import_filestat_failed", error=str(e)) + + return result + + +def import_employee(db: Session, file_path: str) -> Dict[str, Any]: + """Import EMPLOYEE.csv → Employee model.""" + result = {'success': 0, 'errors': [], 'total_rows': 0} + + try: + f, encoding = open_text_with_fallbacks(file_path) + reader = csv.DictReader(f) + + batch = [] + for row_num, row in enumerate(reader, start=2): + result['total_rows'] += 1 + + try: + empl_num = clean_string(row.get('Empl_Num')) + if not empl_num: + continue + + record = Employee( + empl_num=empl_num, + empl_id=clean_string(row.get('Empl_Id')), + rate_per_hour=parse_decimal(row.get('Rate_Per_Hour')) + ) + batch.append(record) + + if len(batch) >= BATCH_SIZE: + db.bulk_save_objects(batch) + db.commit() + result['success'] += len(batch) + batch = [] + + except Exception as e: + result['errors'].append(f"Row {row_num}: {str(e)}") + + if batch: + db.bulk_save_objects(batch) + db.commit() + result['success'] += len(batch) + + f.close() + logger.info("import_employee_complete", **result) + + except Exception as e: + db.rollback() + result['errors'].append(f"Fatal error: {str(e)}") + logger.error("import_employee_failed", error=str(e)) + + return result + + +def import_gruplkup(db: Session, file_path: str) -> Dict[str, Any]: + """Import GRUPLKUP.csv → GroupLkup model.""" + result = {'success': 0, 'errors': [], 'total_rows': 0} + + try: + f, encoding = open_text_with_fallbacks(file_path) + reader = csv.DictReader(f) + + batch = [] + for row_num, row in enumerate(reader, start=2): + result['total_rows'] += 1 + + try: + code = clean_string(row.get('Code')) + if not code: + continue + + record = GroupLkup( + code=code, + description=clean_string(row.get('Description')), + title=clean_string(row.get('Title')) + ) + batch.append(record) + + if len(batch) >= BATCH_SIZE: + db.bulk_save_objects(batch) + db.commit() + result['success'] += len(batch) + batch = [] + + except Exception as e: + result['errors'].append(f"Row {row_num}: {str(e)}") + + if batch: + db.bulk_save_objects(batch) + db.commit() + result['success'] += len(batch) + + f.close() + logger.info("import_gruplkup_complete", **result) + + except Exception as e: + db.rollback() + result['errors'].append(f"Fatal error: {str(e)}") + logger.error("import_gruplkup_failed", error=str(e)) + + return result + + +def import_filetype(db: Session, file_path: str) -> Dict[str, Any]: + """Import FILETYPE.csv → FileType model.""" + result = {'success': 0, 'errors': [], 'total_rows': 0} + + try: + f, encoding = open_text_with_fallbacks(file_path) + reader = csv.DictReader(f) + + batch = [] + for row_num, row in enumerate(reader, start=2): + result['total_rows'] += 1 + + try: + file_type = clean_string(row.get('File_Type')) + if not file_type: + continue + + record = FileType(file_type=file_type) + batch.append(record) + + if len(batch) >= BATCH_SIZE: + db.bulk_save_objects(batch) + db.commit() + result['success'] += len(batch) + batch = [] + + except Exception as e: + result['errors'].append(f"Row {row_num}: {str(e)}") + + if batch: + db.bulk_save_objects(batch) + db.commit() + result['success'] += len(batch) + + f.close() + logger.info("import_filetype_complete", **result) + + except Exception as e: + db.rollback() + result['errors'].append(f"Fatal error: {str(e)}") + logger.error("import_filetype_failed", error=str(e)) + + return result + + +def import_fvarlkup(db: Session, file_path: str) -> Dict[str, Any]: + """Import FVARLKUP.csv → FVarLkup model.""" + result = {'success': 0, 'errors': [], 'total_rows': 0} + + try: + f, encoding = open_text_with_fallbacks(file_path) + reader = csv.DictReader(f) + + batch = [] + for row_num, row in enumerate(reader, start=2): + result['total_rows'] += 1 + + try: + identifier = clean_string(row.get('Identifier')) + if not identifier: + continue + + record = FVarLkup( + identifier=identifier, + query=clean_string(row.get('Query')), + response=clean_string(row.get('Response')) + ) + batch.append(record) + + if len(batch) >= BATCH_SIZE: + db.bulk_save_objects(batch) + db.commit() + result['success'] += len(batch) + batch = [] + + except Exception as e: + result['errors'].append(f"Row {row_num}: {str(e)}") + + if batch: + db.bulk_save_objects(batch) + db.commit() + result['success'] += len(batch) + + f.close() + logger.info("import_fvarlkup_complete", **result) + + except Exception as e: + db.rollback() + result['errors'].append(f"Fatal error: {str(e)}") + logger.error("import_fvarlkup_failed", error=str(e)) + + return result + + +def import_rvarlkup(db: Session, file_path: str) -> Dict[str, Any]: + """Import RVARLKUP.csv → RVarLkup model.""" + result = {'success': 0, 'errors': [], 'total_rows': 0} + + try: + f, encoding = open_text_with_fallbacks(file_path) + reader = csv.DictReader(f) + + batch = [] + for row_num, row in enumerate(reader, start=2): + result['total_rows'] += 1 + + try: + identifier = clean_string(row.get('Identifier')) + if not identifier: + continue + + record = RVarLkup( + identifier=identifier, + query=clean_string(row.get('Query')) + ) + batch.append(record) + + if len(batch) >= BATCH_SIZE: + db.bulk_save_objects(batch) + db.commit() + result['success'] += len(batch) + batch = [] + + except Exception as e: + result['errors'].append(f"Row {row_num}: {str(e)}") + + if batch: + db.bulk_save_objects(batch) + db.commit() + result['success'] += len(batch) + + f.close() + logger.info("import_rvarlkup_complete", **result) + + except Exception as e: + db.rollback() + result['errors'].append(f"Fatal error: {str(e)}") + logger.error("import_rvarlkup_failed", error=str(e)) + + return result + + +# ============================================================================ +# Core Data Table Imports +# ============================================================================ + +def import_rolodex(db: Session, file_path: str) -> Dict[str, Any]: + """Import ROLODEX.csv → Rolodex model.""" + result = {'success': 0, 'errors': [], 'total_rows': 0} + + try: + f, encoding = open_text_with_fallbacks(file_path) + reader = csv.DictReader(f) + + batch = [] + for row_num, row in enumerate(reader, start=2): + result['total_rows'] += 1 + + try: + rolodex_id = clean_string(row.get('Id')) + if not rolodex_id: + continue + + record = Rolodex( + id=rolodex_id, + prefix=clean_string(row.get('Prefix')), + first=clean_string(row.get('First')), + middle=clean_string(row.get('Middle')), + last=clean_string(row.get('Last')), + suffix=clean_string(row.get('Suffix')), + title=clean_string(row.get('Title')), + a1=clean_string(row.get('A1')), + a2=clean_string(row.get('A2')), + a3=clean_string(row.get('A3')), + city=clean_string(row.get('City')), + abrev=clean_string(row.get('Abrev')), + st=clean_string(row.get('St')), + zip=clean_string(row.get('Zip')), + email=clean_string(row.get('Email')), + dob=parse_date(row.get('DOB')), + ss=clean_string(row.get('SS#')), + legal_status=clean_string(row.get('Legal_Status')), + group=clean_string(row.get('Group')), + memo=clean_string(row.get('Memo')) + ) + batch.append(record) + + if len(batch) >= BATCH_SIZE: + db.bulk_save_objects(batch) + db.commit() + result['success'] += len(batch) + batch = [] + + except Exception as e: + result['errors'].append(f"Row {row_num}: {str(e)}") + + if batch: + db.bulk_save_objects(batch) + db.commit() + result['success'] += len(batch) + + f.close() + logger.info("import_rolodex_complete", **result) + + except Exception as e: + db.rollback() + result['errors'].append(f"Fatal error: {str(e)}") + logger.error("import_rolodex_failed", error=str(e)) + + return result + + +def import_phone(db: Session, file_path: str) -> Dict[str, Any]: + """Import PHONE.csv → LegacyPhone model.""" + result = {'success': 0, 'errors': [], 'total_rows': 0} + + try: + f, encoding = open_text_with_fallbacks(file_path) + reader = csv.DictReader(f) + + batch = [] + for row_num, row in enumerate(reader, start=2): + result['total_rows'] += 1 + + try: + rolodex_id = clean_string(row.get('Id')) + phone = clean_string(row.get('Phone')) + + if not rolodex_id or not phone: + continue + + record = LegacyPhone( + id=rolodex_id, + phone=phone, + location=clean_string(row.get('Location')) + ) + batch.append(record) + + if len(batch) >= BATCH_SIZE: + db.bulk_save_objects(batch) + db.commit() + result['success'] += len(batch) + batch = [] + + except Exception as e: + result['errors'].append(f"Row {row_num}: {str(e)}") + + if batch: + db.bulk_save_objects(batch) + db.commit() + result['success'] += len(batch) + + f.close() + logger.info("import_phone_complete", **result) + + except Exception as e: + db.rollback() + result['errors'].append(f"Fatal error: {str(e)}") + logger.error("import_phone_failed", error=str(e)) + + return result + + +def import_rolex_v(db: Session, file_path: str) -> Dict[str, Any]: + """Import ROLEX_V.csv → RolexV model.""" + result = {'success': 0, 'errors': [], 'total_rows': 0} + + try: + f, encoding = open_text_with_fallbacks(file_path) + reader = csv.DictReader(f) + + batch = [] + for row_num, row in enumerate(reader, start=2): + result['total_rows'] += 1 + + try: + rolodex_id = clean_string(row.get('Id')) + identifier = clean_string(row.get('Identifier')) + + if not rolodex_id or not identifier: + continue + + record = RolexV( + id=rolodex_id, + identifier=identifier, + response=clean_string(row.get('Response')) + ) + batch.append(record) + + if len(batch) >= BATCH_SIZE: + db.bulk_save_objects(batch) + db.commit() + result['success'] += len(batch) + batch = [] + + except Exception as e: + result['errors'].append(f"Row {row_num}: {str(e)}") + + if batch: + db.bulk_save_objects(batch) + db.commit() + result['success'] += len(batch) + + f.close() + logger.info("import_rolex_v_complete", **result) + + except Exception as e: + db.rollback() + result['errors'].append(f"Fatal error: {str(e)}") + logger.error("import_rolex_v_failed", error=str(e)) + + return result + + +def import_files(db: Session, file_path: str) -> Dict[str, Any]: + """Import FILES.csv → LegacyFile model.""" + result = {'success': 0, 'errors': [], 'total_rows': 0} + + try: + f, encoding = open_text_with_fallbacks(file_path) + reader = csv.DictReader(f) + + batch = [] + for row_num, row in enumerate(reader, start=2): + result['total_rows'] += 1 + + try: + file_no = clean_string(row.get('File_No')) + if not file_no: + continue + + record = LegacyFile( + file_no=file_no, + id=clean_string(row.get('Id')), + file_type=clean_string(row.get('File_Type')), + regarding=clean_string(row.get('Regarding')), + opened=parse_date(row.get('Opened')), + closed=parse_date(row.get('Closed')), + empl_num=clean_string(row.get('Empl_Num')), + rate_per_hour=parse_decimal(row.get('Rate_Per_Hour')), + status=clean_string(row.get('Status')), + footer_code=clean_string(row.get('Footer_Code')), + opposing=clean_string(row.get('Opposing')), + hours=parse_decimal(row.get('Hours')), + hours_p=parse_decimal(row.get('Hours_P')), + trust_bal=parse_decimal(row.get('Trust_Bal')), + trust_bal_p=parse_decimal(row.get('Trust_Bal_P')), + hourly_fees=parse_decimal(row.get('Hourly_Fees')), + hourly_fees_p=parse_decimal(row.get('Hourly_Fees_P')), + flat_fees=parse_decimal(row.get('Flat_Fees')), + flat_fees_p=parse_decimal(row.get('Flat_Fees_P')), + disbursements=parse_decimal(row.get('Disbursements')), + disbursements_p=parse_decimal(row.get('Disbursements_P')), + credit_bal=parse_decimal(row.get('Credit_Bal')), + credit_bal_p=parse_decimal(row.get('Credit_Bal_P')), + total_charges=parse_decimal(row.get('Total_Charges')), + total_charges_p=parse_decimal(row.get('Total_Charges_P')), + amount_owing=parse_decimal(row.get('Amount_Owing')), + amount_owing_p=parse_decimal(row.get('Amount_Owing_P')), + transferable=parse_decimal(row.get('Transferable')), + memo=clean_string(row.get('Memo')) + ) + batch.append(record) + + if len(batch) >= BATCH_SIZE: + db.bulk_save_objects(batch) + db.commit() + result['success'] += len(batch) + batch = [] + + except Exception as e: + result['errors'].append(f"Row {row_num}: {str(e)}") + + if batch: + db.bulk_save_objects(batch) + db.commit() + result['success'] += len(batch) + + f.close() + logger.info("import_files_complete", **result) + + except Exception as e: + db.rollback() + result['errors'].append(f"Fatal error: {str(e)}") + logger.error("import_files_failed", error=str(e)) + + return result + + +def import_files_r(db: Session, file_path: str) -> Dict[str, Any]: + """Import FILES_R.csv → FilesR model.""" + result = {'success': 0, 'errors': [], 'total_rows': 0} + + try: + f, encoding = open_text_with_fallbacks(file_path) + reader = csv.DictReader(f) + + batch = [] + for row_num, row in enumerate(reader, start=2): + result['total_rows'] += 1 + + try: + file_no = clean_string(row.get('File_No')) + relationship = clean_string(row.get('Relationship')) + rolodex_id = clean_string(row.get('Rolodex_Id')) + + if not file_no or not relationship or not rolodex_id: + continue + + record = FilesR( + file_no=file_no, + relationship=relationship, + rolodex_id=rolodex_id + ) + batch.append(record) + + if len(batch) >= BATCH_SIZE: + db.bulk_save_objects(batch) + db.commit() + result['success'] += len(batch) + batch = [] + + except Exception as e: + result['errors'].append(f"Row {row_num}: {str(e)}") + + if batch: + db.bulk_save_objects(batch) + db.commit() + result['success'] += len(batch) + + f.close() + logger.info("import_files_r_complete", **result) + + except Exception as e: + db.rollback() + result['errors'].append(f"Fatal error: {str(e)}") + logger.error("import_files_r_failed", error=str(e)) + + return result + + +def import_files_v(db: Session, file_path: str) -> Dict[str, Any]: + """Import FILES_V.csv → FilesV model.""" + result = {'success': 0, 'errors': [], 'total_rows': 0} + + try: + f, encoding = open_text_with_fallbacks(file_path) + reader = csv.DictReader(f) + + batch = [] + for row_num, row in enumerate(reader, start=2): + result['total_rows'] += 1 + + try: + file_no = clean_string(row.get('File_No')) + identifier = clean_string(row.get('Identifier')) + + if not file_no or not identifier: + continue + + record = FilesV( + file_no=file_no, + identifier=identifier, + response=clean_string(row.get('Response')) + ) + batch.append(record) + + if len(batch) >= BATCH_SIZE: + db.bulk_save_objects(batch) + db.commit() + result['success'] += len(batch) + batch = [] + + except Exception as e: + result['errors'].append(f"Row {row_num}: {str(e)}") + + if batch: + db.bulk_save_objects(batch) + db.commit() + result['success'] += len(batch) + + f.close() + logger.info("import_files_v_complete", **result) + + except Exception as e: + db.rollback() + result['errors'].append(f"Fatal error: {str(e)}") + logger.error("import_files_v_failed", error=str(e)) + + return result + + +def import_filenots(db: Session, file_path: str) -> Dict[str, Any]: + """Import FILENOTS.csv → FileNots model.""" + result = {'success': 0, 'errors': [], 'total_rows': 0} + + try: + f, encoding = open_text_with_fallbacks(file_path) + reader = csv.DictReader(f) + + batch = [] + for row_num, row in enumerate(reader, start=2): + result['total_rows'] += 1 + + try: + file_no = clean_string(row.get('File_No')) + memo_date = parse_date(row.get('Memo_Date')) + + if not file_no or not memo_date: + continue + + record = FileNots( + file_no=file_no, + memo_date=memo_date, + memo_note=clean_string(row.get('Memo_Note')) + ) + batch.append(record) + + if len(batch) >= BATCH_SIZE: + db.bulk_save_objects(batch) + db.commit() + result['success'] += len(batch) + batch = [] + + except Exception as e: + result['errors'].append(f"Row {row_num}: {str(e)}") + + if batch: + db.bulk_save_objects(batch) + db.commit() + result['success'] += len(batch) + + f.close() + logger.info("import_filenots_complete", **result) + + except Exception as e: + db.rollback() + result['errors'].append(f"Fatal error: {str(e)}") + logger.error("import_filenots_failed", error=str(e)) + + return result + + +def import_ledger(db: Session, file_path: str) -> Dict[str, Any]: + """Import LEDGER.csv → Ledger model.""" + result = {'success': 0, 'errors': [], 'total_rows': 0} + + try: + f, encoding = open_text_with_fallbacks(file_path) + reader = csv.DictReader(f) + + batch = [] + for row_num, row in enumerate(reader, start=2): + result['total_rows'] += 1 + + try: + file_no = clean_string(row.get('File_No')) + item_no_str = clean_string(row.get('Item_No')) + + if not file_no or not item_no_str: + continue + + try: + item_no = int(item_no_str) + except ValueError: + result['errors'].append(f"Row {row_num}: Invalid Item_No '{item_no_str}'") + continue + + record = Ledger( + file_no=file_no, + date=parse_date(row.get('Date')), + item_no=item_no, + empl_num=clean_string(row.get('Empl_Num')), + t_code=clean_string(row.get('T_Code')), + t_type=clean_string(row.get('T_Type')), + t_type_l=clean_string(row.get('T_Type_L')), + quantity=parse_decimal(row.get('Quantity')), + rate=parse_decimal(row.get('Rate')), + amount=parse_decimal(row.get('Amount')), + billed=clean_string(row.get('Billed')), + note=clean_string(row.get('Note')) + ) + batch.append(record) + + if len(batch) >= BATCH_SIZE: + db.bulk_save_objects(batch) + db.commit() + result['success'] += len(batch) + batch = [] + + except Exception as e: + result['errors'].append(f"Row {row_num}: {str(e)}") + + if batch: + db.bulk_save_objects(batch) + db.commit() + result['success'] += len(batch) + + f.close() + logger.info("import_ledger_complete", **result) + + except Exception as e: + db.rollback() + result['errors'].append(f"Fatal error: {str(e)}") + logger.error("import_ledger_failed", error=str(e)) + + return result + + +def import_deposits(db: Session, file_path: str) -> Dict[str, Any]: + """Import DEPOSITS.csv → Deposits model.""" + result = {'success': 0, 'errors': [], 'total_rows': 0} + + try: + f, encoding = open_text_with_fallbacks(file_path) + reader = csv.DictReader(f) + + batch = [] + for row_num, row in enumerate(reader, start=2): + result['total_rows'] += 1 + + try: + deposit_date = parse_date(row.get('Deposit_Date')) + + if not deposit_date: + continue + + record = Deposits( + deposit_date=deposit_date, + total=parse_decimal(row.get('Total')) + ) + batch.append(record) + + if len(batch) >= BATCH_SIZE: + db.bulk_save_objects(batch) + db.commit() + result['success'] += len(batch) + batch = [] + + except Exception as e: + result['errors'].append(f"Row {row_num}: {str(e)}") + + if batch: + db.bulk_save_objects(batch) + db.commit() + result['success'] += len(batch) + + f.close() + logger.info("import_deposits_complete", **result) + + except Exception as e: + db.rollback() + result['errors'].append(f"Fatal error: {str(e)}") + logger.error("import_deposits_failed", error=str(e)) + + return result + + +def import_payments(db: Session, file_path: str) -> Dict[str, Any]: + """Import PAYMENTS.csv → LegacyPayment model.""" + result = {'success': 0, 'errors': [], 'total_rows': 0} + + try: + f, encoding = open_text_with_fallbacks(file_path) + reader = csv.DictReader(f) + + batch = [] + for row_num, row in enumerate(reader, start=2): + result['total_rows'] += 1 + + try: + # LegacyPayment has auto-increment id, so we don't need to check for it + record = LegacyPayment( + deposit_date=parse_date(row.get('Deposit_Date')), + file_no=clean_string(row.get('File_No')), + rolodex_id=clean_string(row.get('Id')), + regarding=clean_string(row.get('Regarding')), + amount=parse_decimal(row.get('Amount')), + note=clean_string(row.get('Note')) + ) + batch.append(record) + + if len(batch) >= BATCH_SIZE: + db.bulk_save_objects(batch) + db.commit() + result['success'] += len(batch) + batch = [] + + except Exception as e: + result['errors'].append(f"Row {row_num}: {str(e)}") + + if batch: + db.bulk_save_objects(batch) + db.commit() + result['success'] += len(batch) + + f.close() + logger.info("import_payments_complete", **result) + + except Exception as e: + db.rollback() + result['errors'].append(f"Fatal error: {str(e)}") + logger.error("import_payments_failed", error=str(e)) + + return result + + +# ============================================================================ +# Specialized Imports (QDRO, Pensions, Plan Info) +# ============================================================================ + +def import_planinfo(db: Session, file_path: str) -> Dict[str, Any]: + """Import PLANINFO.csv → PlanInfo model.""" + result = {'success': 0, 'errors': [], 'total_rows': 0} + + try: + f, encoding = open_text_with_fallbacks(file_path) + reader = csv.DictReader(f) + + batch = [] + for row_num, row in enumerate(reader, start=2): + result['total_rows'] += 1 + + try: + plan_id = clean_string(row.get('Plan_Id')) + if not plan_id: + continue + + record = PlanInfo( + plan_id=plan_id, + plan_name=clean_string(row.get('Plan_Name')), + plan_type=clean_string(row.get('Plan_Type')), + empl_id_no=clean_string(row.get('Empl_Id_No')), + plan_no=clean_string(row.get('Plan_No')), + nra=clean_string(row.get('NRA')), + era=clean_string(row.get('ERA')), + errf=clean_string(row.get('ERRF')), + colas=clean_string(row.get('COLAS')), + divided_by=clean_string(row.get('Divided_By')), + drafted=clean_string(row.get('Drafted')), + benefit_c=clean_string(row.get('Benefit_C')), + qdro_c=clean_string(row.get('QDRO_C')), + rev=clean_string(row.get('^REV')), + pa=clean_string(row.get('^PA')), + form_name=clean_string(row.get('Form_Name')), + drafted_on=parse_date(row.get('Drafted_On')), + memo=clean_string(row.get('Memo')) + ) + batch.append(record) + + if len(batch) >= BATCH_SIZE: + db.bulk_save_objects(batch) + db.commit() + result['success'] += len(batch) + batch = [] + + except Exception as e: + result['errors'].append(f"Row {row_num}: {str(e)}") + + if batch: + db.bulk_save_objects(batch) + db.commit() + result['success'] += len(batch) + + f.close() + logger.info("import_planinfo_complete", **result) + + except Exception as e: + db.rollback() + result['errors'].append(f"Fatal error: {str(e)}") + logger.error("import_planinfo_failed", error=str(e)) + + return result + + +def import_qdros(db: Session, file_path: str) -> Dict[str, Any]: + """Import QDROS.csv → Qdros model.""" + result = {'success': 0, 'errors': [], 'total_rows': 0} + + try: + f, encoding = open_text_with_fallbacks(file_path) + reader = csv.DictReader(f) + + batch = [] + for row_num, row in enumerate(reader, start=2): + result['total_rows'] += 1 + + try: + file_no = clean_string(row.get('File_No')) + version = clean_string(row.get('Version')) + + if not file_no or not version: + continue + + record = Qdros( + file_no=file_no, + version=version, + plan_id=clean_string(row.get('Plan_Id')), + _1=clean_string(row.get('^1')), + _2=clean_string(row.get('^2')), + part=clean_string(row.get('^Part')), + altp=clean_string(row.get('^AltP')), + pet=clean_string(row.get('^Pet')), + res=clean_string(row.get('^Res')), + case_type=clean_string(row.get('Case_Type')), + case_code=clean_string(row.get('Case_Code')), + section=clean_string(row.get('Section')), + case_number=clean_string(row.get('Case_Number')), + judgment_date=parse_date(row.get('Judgment_Date')), + valuation_date=parse_date(row.get('Valuation_Date')), + married_on=parse_date(row.get('Married_On')), + percent_awarded=parse_decimal(row.get('Percent_Awarded')), + ven_city=clean_string(row.get('Ven_City')), + ven_cnty=clean_string(row.get('Ven_Cnty')), + ven_st=clean_string(row.get('Ven_St')), + draft_out=parse_date(row.get('Draft_Out')), + draft_apr=parse_date(row.get('Draft_Apr')), + final_out=parse_date(row.get('Final_Out')), + judge=clean_string(row.get('Judge')), + form_name=clean_string(row.get('Form_Name')) + ) + batch.append(record) + + if len(batch) >= BATCH_SIZE: + db.bulk_save_objects(batch) + db.commit() + result['success'] += len(batch) + batch = [] + + except Exception as e: + result['errors'].append(f"Row {row_num}: {str(e)}") + + if batch: + db.bulk_save_objects(batch) + db.commit() + result['success'] += len(batch) + + f.close() + logger.info("import_qdros_complete", **result) + + except Exception as e: + db.rollback() + result['errors'].append(f"Fatal error: {str(e)}") + logger.error("import_qdros_failed", error=str(e)) + + return result + + +def import_pensions(db: Session, file_path: str) -> Dict[str, Any]: + """Import PENSIONS.csv → Pensions model.""" + result = {'success': 0, 'errors': [], 'total_rows': 0} + + try: + f, encoding = open_text_with_fallbacks(file_path) + reader = csv.DictReader(f) + + batch = [] + for row_num, row in enumerate(reader, start=2): + result['total_rows'] += 1 + + try: + file_no = clean_string(row.get('File_No')) + version = clean_string(row.get('Version')) + + if not file_no or not version: + continue + + record = Pensions( + file_no=file_no, + version=version, + plan_id=clean_string(row.get('Plan_Id')), + plan_name=clean_string(row.get('Plan_Name')), + title=clean_string(row.get('Title')), + first=clean_string(row.get('First')), + last=clean_string(row.get('Last')), + birth=parse_date(row.get('Birth')), + race=clean_string(row.get('Race')), + sex=clean_string(row.get('Sex')), + info=parse_date(row.get('Info')), + valu=parse_date(row.get('Valu')), + accrued=parse_decimal(row.get('Accrued')), + vested_per=parse_decimal(row.get('Vested_Per')), + start_age=parse_decimal(row.get('Start_Age')), + cola=parse_decimal(row.get('COLA')), + max_cola=parse_decimal(row.get('Max_COLA')), + withdrawal=parse_decimal(row.get('Withdrawal')), + pre_dr=parse_decimal(row.get('Pre_DR')), + post_dr=parse_decimal(row.get('Post_DR')), + tax_rate=parse_decimal(row.get('Tax_Rate')) + ) + batch.append(record) + + if len(batch) >= BATCH_SIZE: + db.bulk_save_objects(batch) + db.commit() + result['success'] += len(batch) + batch = [] + + except Exception as e: + result['errors'].append(f"Row {row_num}: {str(e)}") + + if batch: + db.bulk_save_objects(batch) + db.commit() + result['success'] += len(batch) + + f.close() + logger.info("import_pensions_complete", **result) + + except Exception as e: + db.rollback() + result['errors'].append(f"Fatal error: {str(e)}") + logger.error("import_pensions_failed", error=str(e)) + + return result + + +def import_pension_marriage(db: Session, file_path: str) -> Dict[str, Any]: + """Import Pensions/MARRIAGE.csv → PensionMarriage model.""" + result = {'success': 0, 'errors': [], 'total_rows': 0} + + try: + f, encoding = open_text_with_fallbacks(file_path) + reader = csv.DictReader(f) + + batch = [] + for row_num, row in enumerate(reader, start=2): + result['total_rows'] += 1 + + try: + file_no = clean_string(row.get('File_No')) + version = clean_string(row.get('Version')) + + if not file_no or not version: + continue + + record = PensionMarriage( + file_no=file_no, + version=version, + married_from=parse_date(row.get('Married_From')), + married_to=parse_date(row.get('Married_To')), + married_years=parse_decimal(row.get('Married_Years')), + service_from=parse_date(row.get('Service_From')), + service_to=parse_date(row.get('Service_To')), + service_years=parse_decimal(row.get('Service_Years')), + marital_pct=parse_decimal(row.get('Marital_%')) + ) + batch.append(record) + + if len(batch) >= BATCH_SIZE: + db.bulk_save_objects(batch) + db.commit() + result['success'] += len(batch) + batch = [] + + except Exception as e: + result['errors'].append(f"Row {row_num}: {str(e)}") + + if batch: + db.bulk_save_objects(batch) + db.commit() + result['success'] += len(batch) + + f.close() + logger.info("import_pension_marriage_complete", **result) + + except Exception as e: + db.rollback() + result['errors'].append(f"Fatal error: {str(e)}") + logger.error("import_pension_marriage_failed", error=str(e)) + + return result + + +def import_pension_death(db: Session, file_path: str) -> Dict[str, Any]: + """Import Pensions/DEATH.csv → PensionDeath model.""" + result = {'success': 0, 'errors': [], 'total_rows': 0} + + try: + f, encoding = open_text_with_fallbacks(file_path) + reader = csv.DictReader(f) + + batch = [] + for row_num, row in enumerate(reader, start=2): + result['total_rows'] += 1 + + try: + file_no = clean_string(row.get('File_No')) + version = clean_string(row.get('Version')) + + if not file_no or not version: + continue + + record = PensionDeath( + file_no=file_no, + version=version, + lump1=parse_decimal(row.get('Lump1')), + lump2=parse_decimal(row.get('Lump2')), + growth1=parse_decimal(row.get('Growth1')), + growth2=parse_decimal(row.get('Growth2')), + disc1=parse_decimal(row.get('Disc1')), + disc2=parse_decimal(row.get('Disc2')) + ) + batch.append(record) + + if len(batch) >= BATCH_SIZE: + db.bulk_save_objects(batch) + db.commit() + result['success'] += len(batch) + batch = [] + + except Exception as e: + result['errors'].append(f"Row {row_num}: {str(e)}") + + if batch: + db.bulk_save_objects(batch) + db.commit() + result['success'] += len(batch) + + f.close() + logger.info("import_pension_death_complete", **result) + + except Exception as e: + db.rollback() + result['errors'].append(f"Fatal error: {str(e)}") + logger.error("import_pension_death_failed", error=str(e)) + + return result + + +def import_pension_schedule(db: Session, file_path: str) -> Dict[str, Any]: + """Import Pensions/SCHEDULE.csv → PensionSchedule model.""" + result = {'success': 0, 'errors': [], 'total_rows': 0} + + try: + f, encoding = open_text_with_fallbacks(file_path) + reader = csv.DictReader(f) + + batch = [] + for row_num, row in enumerate(reader, start=2): + result['total_rows'] += 1 + + try: + file_no = clean_string(row.get('File_No')) + version = clean_string(row.get('Version')) + + if not file_no or not version: + continue + + record = PensionSchedule( + file_no=file_no, + version=version, + vests_on=parse_date(row.get('Vests_On')), + vests_at=parse_decimal(row.get('Vests_At')) + ) + batch.append(record) + + if len(batch) >= BATCH_SIZE: + db.bulk_save_objects(batch) + db.commit() + result['success'] += len(batch) + batch = [] + + except Exception as e: + result['errors'].append(f"Row {row_num}: {str(e)}") + + if batch: + db.bulk_save_objects(batch) + db.commit() + result['success'] += len(batch) + + f.close() + logger.info("import_pension_schedule_complete", **result) + + except Exception as e: + db.rollback() + result['errors'].append(f"Fatal error: {str(e)}") + logger.error("import_pension_schedule_failed", error=str(e)) + + return result + + +def import_pension_separate(db: Session, file_path: str) -> Dict[str, Any]: + """Import Pensions/SEPARATE.csv → PensionSeparate model.""" + result = {'success': 0, 'errors': [], 'total_rows': 0} + + try: + f, encoding = open_text_with_fallbacks(file_path) + reader = csv.DictReader(f) + + batch = [] + for row_num, row in enumerate(reader, start=2): + result['total_rows'] += 1 + + try: + file_no = clean_string(row.get('File_No')) + version = clean_string(row.get('Version')) + + if not file_no or not version: + continue + + record = PensionSeparate( + file_no=file_no, + version=version, + separation_rate=parse_decimal(row.get('Separation_Rate')) + ) + batch.append(record) + + if len(batch) >= BATCH_SIZE: + db.bulk_save_objects(batch) + db.commit() + result['success'] += len(batch) + batch = [] + + except Exception as e: + result['errors'].append(f"Row {row_num}: {str(e)}") + + if batch: + db.bulk_save_objects(batch) + db.commit() + result['success'] += len(batch) + + f.close() + logger.info("import_pension_separate_complete", **result) + + except Exception as e: + db.rollback() + result['errors'].append(f"Fatal error: {str(e)}") + logger.error("import_pension_separate_failed", error=str(e)) + + return result + + +def import_pension_results(db: Session, file_path: str) -> Dict[str, Any]: + """Import Pensions/RESULTS.csv → PensionResults model.""" + result = {'success': 0, 'errors': [], 'total_rows': 0} + + try: + f, encoding = open_text_with_fallbacks(file_path) + reader = csv.DictReader(f) + + batch = [] + for row_num, row in enumerate(reader, start=2): + result['total_rows'] += 1 + + try: + # Note: RESULTS.csv might not have explicit file_no/version columns + # May need to derive from associated pension record + # For now, skip if missing + file_no = clean_string(row.get('File_No')) + version = clean_string(row.get('Version')) + + if not file_no or not version: + continue + + record = PensionResults( + file_no=file_no, + version=version, + accrued=parse_decimal(row.get('Accrued')), + start_age=parse_decimal(row.get('Start_Age')), + cola=parse_decimal(row.get('COLA')), + withdrawal=parse_decimal(row.get('Withdrawal')), + pre_dr=parse_decimal(row.get('Pre_DR')), + post_dr=parse_decimal(row.get('Post_DR')), + tax_rate=parse_decimal(row.get('Tax_Rate')), + age=parse_decimal(row.get('Age')), + years_from=parse_decimal(row.get('Years_From')), + life_exp=parse_decimal(row.get('Life_Exp')), + ev_monthly=parse_decimal(row.get('EV_Monthly')), + payments=parse_decimal(row.get('Payments')), + pay_out=parse_decimal(row.get('Pay_Out')), + fund_value=parse_decimal(row.get('Fund_Value')), + pv=parse_decimal(row.get('PV')), + mortality=parse_decimal(row.get('Mortality')), + pv_am=parse_decimal(row.get('PV_AM')), + pv_amt=parse_decimal(row.get('PV_AMT')), + pv_pre_db=parse_decimal(row.get('PV_Pre_DB')), + pv_annuity=parse_decimal(row.get('PV_Annuity')), + wv_at=parse_decimal(row.get('WV_AT')), + pv_plan=parse_decimal(row.get('PV_Plan')), + years_married=parse_decimal(row.get('Years_Married')), + years_service=parse_decimal(row.get('Years_Service')), + marr_per=parse_decimal(row.get('Marr_Per')), + marr_amt=parse_decimal(row.get('Marr_Amt')) + ) + batch.append(record) + + if len(batch) >= BATCH_SIZE: + db.bulk_save_objects(batch) + db.commit() + result['success'] += len(batch) + batch = [] + + except Exception as e: + result['errors'].append(f"Row {row_num}: {str(e)}") + + if batch: + db.bulk_save_objects(batch) + db.commit() + result['success'] += len(batch) + + f.close() + logger.info("import_pension_results_complete", **result) + + except Exception as e: + db.rollback() + result['errors'].append(f"Fatal error: {str(e)}") + logger.error("import_pension_results_failed", error=str(e)) + + return result + diff --git a/app/main.py b/app/main.py index 4fe11a2..dce845f 100644 --- a/app/main.py +++ b/app/main.py @@ -49,6 +49,8 @@ from .schemas import ( FilesListResponse, LedgerListResponse, ) +from . import import_legacy +from . import sync_legacy_to_modern # Load environment variables load_dotenv() @@ -237,45 +239,77 @@ app.mount("/static", StaticFiles(directory="static"), name="static") def get_import_type_from_filename(filename: str) -> str: """ - Determine import type based on filename pattern. + Determine import type based on filename pattern for legacy CSV files. Args: filename: Name of the uploaded CSV file Returns: - Import type string (client, phone, case, transaction, document, payment) + Import type string matching the import function keys """ filename_upper = filename.upper() # Strip extension and normalize base = filename_upper.rsplit('.', 1)[0] - # Support files saved with explicit type prefixes (e.g., CLIENT_.csv) - if base.startswith('CLIENT_'): - return 'client' - if base.startswith('PHONE_'): + # Reference tables + if 'TRNSTYPE' in base: + return 'trnstype' + if 'TRNSLKUP' in base: + return 'trnslkup' + if 'FOOTER' in base: + return 'footers' + if 'FILESTAT' in base: + return 'filestat' + if 'EMPLOYEE' in base: + return 'employee' + if 'GRUPLKUP' in base or 'GROUPLKUP' in base: + return 'gruplkup' + if 'FILETYPE' in base: + return 'filetype' + if 'FVARLKUP' in base: + return 'fvarlkup' + if 'RVARLKUP' in base: + return 'rvarlkup' + + # Core data tables + if 'ROLEX_V' in base or 'ROLEXV' in base: + return 'rolex_v' + if 'ROLODEX' in base or 'ROLEX' in base: + return 'rolodex' + if 'FILES_R' in base or 'FILESR' in base: + return 'files_r' + if 'FILES_V' in base or 'FILESV' in base: + return 'files_v' + if 'FILENOTS' in base or 'FILE_NOTS' in base: + return 'filenots' + if 'FILES' in base or 'FILE' in base: + return 'files' + if 'PHONE' in base: return 'phone' - if base.startswith('CASE_'): - return 'case' - if base.startswith('TRANSACTION_'): - return 'transaction' - if base.startswith('DOCUMENT_'): - return 'document' - if base.startswith('PAYMENT_'): - return 'payment' - - # Legacy/real file name patterns - if base.startswith('ROLODEX') or base.startswith('ROLEX') or 'ROLODEX' in base or 'ROLEX' in base: - return 'client' - if base.startswith('PHONE') or 'PHONE' in base: - return 'phone' - if base.startswith('FILES') or base.startswith('FILE') or 'FILES' in base: - return 'case' - if base.startswith('LEDGER') or 'LEDGER' in base or base.startswith('TRNSACTN') or 'TRNSACTN' in base: - return 'transaction' - if base.startswith('QDROS') or base.startswith('QDRO') or 'QDRO' in base: - return 'document' - if base.startswith('PAYMENTS') or base.startswith('DEPOSITS') or 'PAYMENT' in base or 'DEPOSIT' in base: - return 'payment' + if 'LEDGER' in base: + return 'ledger' + if 'DEPOSITS' in base or 'DEPOSIT' in base: + return 'deposits' + if 'PAYMENTS' in base or 'PAYMENT' in base: + return 'payments' + + # Specialized tables + if 'PLANINFO' in base or 'PLAN_INFO' in base: + return 'planinfo' + if 'QDROS' in base or 'QDRO' in base: + return 'qdros' + if 'MARRIAGE' in base: + return 'pension_marriage' + if 'DEATH' in base: + return 'pension_death' + if 'SCHEDULE' in base: + return 'pension_schedule' + if 'SEPARATE' in base: + return 'pension_separate' + if 'RESULTS' in base: + return 'pension_results' + if 'PENSIONS' in base or 'PENSION' in base: + return 'pensions' raise ValueError(f"Unknown file type for filename: {filename}") @@ -874,23 +908,49 @@ def import_payments_data(db: Session, file_path: str) -> Dict[str, Any]: def process_csv_import(db: Session, import_type: str, file_path: str) -> Dict[str, Any]: """ - Process CSV import based on type. + Process CSV import based on type using legacy import functions. Args: db: Database session - import_type: Type of import (client, phone, case, transaction, document, payment) + import_type: Type of import file_path: Path to CSV file Returns: Dict with import results """ import_functions = { - 'client': import_rolodex_data, - 'phone': import_phone_data, - 'case': import_files_data, - 'transaction': import_ledger_data, - 'document': import_qdros_data, - 'payment': import_payments_data + # Reference tables (import first) + 'trnstype': import_legacy.import_trnstype, + 'trnslkup': import_legacy.import_trnslkup, + 'footers': import_legacy.import_footers, + 'filestat': import_legacy.import_filestat, + 'employee': import_legacy.import_employee, + 'gruplkup': import_legacy.import_gruplkup, + 'filetype': import_legacy.import_filetype, + 'fvarlkup': import_legacy.import_fvarlkup, + 'rvarlkup': import_legacy.import_rvarlkup, + + # Core data tables + 'rolodex': import_legacy.import_rolodex, + 'phone': import_legacy.import_phone, + 'rolex_v': import_legacy.import_rolex_v, + 'files': import_legacy.import_files, + 'files_r': import_legacy.import_files_r, + 'files_v': import_legacy.import_files_v, + 'filenots': import_legacy.import_filenots, + 'ledger': import_legacy.import_ledger, + 'deposits': import_legacy.import_deposits, + 'payments': import_legacy.import_payments, + + # Specialized tables + 'planinfo': import_legacy.import_planinfo, + 'qdros': import_legacy.import_qdros, + 'pensions': import_legacy.import_pensions, + 'pension_marriage': import_legacy.import_pension_marriage, + 'pension_death': import_legacy.import_pension_death, + 'pension_schedule': import_legacy.import_pension_schedule, + 'pension_separate': import_legacy.import_pension_separate, + 'pension_results': import_legacy.import_pension_results, } import_func = import_functions.get(import_type) @@ -1566,7 +1626,17 @@ async def admin_import_data( return RedirectResponse(url="/login", status_code=302) # Validate data type - valid_types = ['client', 'phone', 'case', 'transaction', 'document', 'payment'] + valid_types = [ + # Reference tables + 'trnstype', 'trnslkup', 'footers', 'filestat', 'employee', + 'gruplkup', 'filetype', 'fvarlkup', 'rvarlkup', + # Core data tables + 'rolodex', 'phone', 'rolex_v', 'files', 'files_r', 'files_v', + 'filenots', 'ledger', 'deposits', 'payments', + # Specialized tables + 'planinfo', 'qdros', 'pensions', 'pension_marriage', + 'pension_death', 'pension_schedule', 'pension_separate', 'pension_results' + ] if data_type not in valid_types: return templates.TemplateResponse("admin.html", { "request": request, @@ -1670,6 +1740,69 @@ async def admin_import_data( }) +@app.post("/admin/sync") +async def admin_sync_data( + request: Request, + db: Session = Depends(get_db) +): + """ + Sync legacy database models to modern application models. + + This route triggers the sync process to populate the simplified + modern models (Client, Phone, Case, Transaction, Payment, Document) + from the comprehensive legacy models. + """ + # Check authentication + user = get_current_user_from_session(request.session) + if not user: + return RedirectResponse(url="/login", status_code=302) + + # Get form data for confirmation + form = await request.form() + clear_existing = form.get("clear_existing") == "true" + + try: + logger.info( + "admin_sync_starting", + clear_existing=clear_existing, + username=user.username + ) + + # Run all sync functions + results = sync_legacy_to_modern.sync_all(db, clear_existing=clear_existing) + + # Calculate totals + total_synced = sum(r['success'] for r in results.values() if r) + total_skipped = sum(r['skipped'] for r in results.values() if r) + total_errors = sum(len(r['errors']) for r in results.values() if r) + + logger.info( + "admin_sync_complete", + total_synced=total_synced, + total_skipped=total_skipped, + total_errors=total_errors, + username=user.username + ) + + return templates.TemplateResponse("admin.html", { + "request": request, + "user": user, + "sync_results": results, + "total_synced": total_synced, + "total_skipped": total_skipped, + "total_sync_errors": total_errors, + "show_sync_results": True + }) + + except Exception as e: + logger.error("admin_sync_failed", error=str(e), username=user.username) + return templates.TemplateResponse("admin.html", { + "request": request, + "user": user, + "error": f"Sync failed: {str(e)}" + }) + + @app.get("/admin") async def admin_panel(request: Request, db: Session = Depends(get_db)): """ diff --git a/app/models.py b/app/models.py index 93fd068..c40a721 100644 --- a/app/models.py +++ b/app/models.py @@ -683,3 +683,68 @@ class PensionSeparate(Base): __table_args__ = ( ForeignKeyConstraint(["file_no", "version"], ["pensions.file_no", "pensions.version"], ondelete="CASCADE"), ) + + +class FileType(Base): + """FILETYPE reference table for file/case types.""" + __tablename__ = "filetype" + + file_type = Column(String, primary_key=True) + + def __repr__(self): + return f"" + + +class FileNots(Base): + """FILENOTS table for file memos/notes.""" + __tablename__ = "filenots" + + file_no = Column(String, ForeignKey("files.file_no", ondelete="CASCADE"), primary_key=True) + memo_date = Column(Date, primary_key=True) + memo_note = Column(Text) + + __table_args__ = ( + Index("ix_filenots_file_no", "file_no"), + ) + + def __repr__(self): + return f"" + + +class RolexV(Base): + """ROLEX_V variables per rolodex entry.""" + __tablename__ = "rolex_v" + + id = Column(String, ForeignKey("rolodex.id", ondelete="CASCADE"), primary_key=True) + identifier = Column(String, primary_key=True) + response = Column(Text) + + __table_args__ = ( + Index("ix_rolex_v_id", "id"), + ) + + def __repr__(self): + return f"" + + +class FVarLkup(Base): + """FVARLKUP file variable lookup table.""" + __tablename__ = "fvarlkup" + + identifier = Column(String, primary_key=True) + query = Column(Text) + response = Column(Text) + + def __repr__(self): + return f"" + + +class RVarLkup(Base): + """RVARLKUP rolodex variable lookup table.""" + __tablename__ = "rvarlkup" + + identifier = Column(String, primary_key=True) + query = Column(Text) + + def __repr__(self): + return f"" diff --git a/app/sync_legacy_to_modern.py b/app/sync_legacy_to_modern.py new file mode 100644 index 0000000..8078a4a --- /dev/null +++ b/app/sync_legacy_to_modern.py @@ -0,0 +1,528 @@ +""" +Sync functions to populate modern models from legacy database tables. + +This module provides functions to migrate data from the comprehensive legacy +schema to the simplified modern application models. +""" + +from typing import Dict, Any +from sqlalchemy.orm import Session +from sqlalchemy.exc import IntegrityError +import structlog + +from .models import ( + # Legacy models + Rolodex, LegacyPhone, LegacyFile, Ledger, LegacyPayment, Qdros, + # Modern models + Client, Phone, Case, Transaction, Payment, Document +) + +logger = structlog.get_logger(__name__) + +BATCH_SIZE = 500 + + +def sync_clients(db: Session, clear_existing: bool = False) -> Dict[str, Any]: + """ + Sync Rolodex → Client. + + Maps legacy rolodex entries to modern simplified client records. + """ + result = {'success': 0, 'errors': [], 'skipped': 0} + + try: + # Optionally clear existing modern client data + if clear_existing: + logger.info("sync_clients_clearing_existing") + db.query(Client).delete() + db.commit() + + # Query all rolodex entries + rolodex_entries = db.query(Rolodex).all() + logger.info("sync_clients_processing", count=len(rolodex_entries)) + + batch = [] + for rolex in rolodex_entries: + try: + # Build complete address from A1, A2, A3 + address_parts = [ + rolex.a1 or '', + rolex.a2 or '', + rolex.a3 or '' + ] + address = ', '.join(filter(None, address_parts)) + + # Create modern client record + client = Client( + rolodex_id=rolex.id, + last_name=rolex.last, + first_name=rolex.first, + middle_initial=rolex.middle, + company=rolex.title, # Using title as company name + address=address if address else None, + city=rolex.city, + state=rolex.abrev, + zip_code=rolex.zip + ) + batch.append(client) + + if len(batch) >= BATCH_SIZE: + db.bulk_save_objects(batch) + db.commit() + result['success'] += len(batch) + batch = [] + + except Exception as e: + result['errors'].append(f"Rolodex ID {rolex.id}: {str(e)}") + result['skipped'] += 1 + + # Save remaining batch + if batch: + db.bulk_save_objects(batch) + db.commit() + result['success'] += len(batch) + + logger.info("sync_clients_complete", **result) + + except Exception as e: + db.rollback() + result['errors'].append(f"Fatal error: {str(e)}") + logger.error("sync_clients_failed", error=str(e)) + + return result + + +def sync_phones(db: Session, clear_existing: bool = False) -> Dict[str, Any]: + """ + Sync LegacyPhone → Phone. + + Links phone numbers to modern client records via rolodex_id. + """ + result = {'success': 0, 'errors': [], 'skipped': 0} + + try: + # Optionally clear existing phone data + if clear_existing: + logger.info("sync_phones_clearing_existing") + db.query(Phone).delete() + db.commit() + + # Build lookup map: rolodex_id → client.id + clients = db.query(Client).all() + rolodex_to_client = {c.rolodex_id: c.id for c in clients} + logger.info("sync_phones_client_map", client_count=len(rolodex_to_client)) + + # Query all legacy phones + legacy_phones = db.query(LegacyPhone).all() + logger.info("sync_phones_processing", count=len(legacy_phones)) + + batch = [] + for lphone in legacy_phones: + try: + # Find corresponding modern client + client_id = rolodex_to_client.get(lphone.id) + if not client_id: + result['errors'].append(f"No client found for rolodex ID: {lphone.id}") + result['skipped'] += 1 + continue + + # Create modern phone record + phone = Phone( + client_id=client_id, + phone_type=lphone.location if lphone.location else 'unknown', + phone_number=lphone.phone, + extension=None + ) + batch.append(phone) + + if len(batch) >= BATCH_SIZE: + db.bulk_save_objects(batch) + db.commit() + result['success'] += len(batch) + batch = [] + + except Exception as e: + result['errors'].append(f"Phone {lphone.id}/{lphone.phone}: {str(e)}") + result['skipped'] += 1 + + # Save remaining batch + if batch: + db.bulk_save_objects(batch) + db.commit() + result['success'] += len(batch) + + logger.info("sync_phones_complete", **result) + + except Exception as e: + db.rollback() + result['errors'].append(f"Fatal error: {str(e)}") + logger.error("sync_phones_failed", error=str(e)) + + return result + + +def sync_cases(db: Session, clear_existing: bool = False) -> Dict[str, Any]: + """ + Sync LegacyFile → Case. + + Converts legacy file cabinet entries to modern case records. + """ + result = {'success': 0, 'errors': [], 'skipped': 0} + + try: + # Optionally clear existing case data + if clear_existing: + logger.info("sync_cases_clearing_existing") + db.query(Case).delete() + db.commit() + + # Build lookup map: rolodex_id → client.id + clients = db.query(Client).all() + rolodex_to_client = {c.rolodex_id: c.id for c in clients} + logger.info("sync_cases_client_map", client_count=len(rolodex_to_client)) + + # Query all legacy files + legacy_files = db.query(LegacyFile).all() + logger.info("sync_cases_processing", count=len(legacy_files)) + + batch = [] + for lfile in legacy_files: + try: + # Find corresponding modern client + client_id = rolodex_to_client.get(lfile.id) + if not client_id: + result['errors'].append(f"No client found for rolodex ID: {lfile.id} (file {lfile.file_no})") + result['skipped'] += 1 + continue + + # Map legacy status to modern status + status = 'active' + if lfile.closed: + status = 'closed' + elif lfile.status and 'inactive' in lfile.status.lower(): + status = 'inactive' + + # Create modern case record + case = Case( + file_no=lfile.file_no, + client_id=client_id, + status=status, + case_type=lfile.file_type, + description=lfile.regarding, + open_date=lfile.opened, + close_date=lfile.closed + ) + batch.append(case) + + if len(batch) >= BATCH_SIZE: + db.bulk_save_objects(batch) + db.commit() + result['success'] += len(batch) + batch = [] + + except Exception as e: + result['errors'].append(f"File {lfile.file_no}: {str(e)}") + result['skipped'] += 1 + + # Save remaining batch + if batch: + db.bulk_save_objects(batch) + db.commit() + result['success'] += len(batch) + + logger.info("sync_cases_complete", **result) + + except Exception as e: + db.rollback() + result['errors'].append(f"Fatal error: {str(e)}") + logger.error("sync_cases_failed", error=str(e)) + + return result + + +def sync_transactions(db: Session, clear_existing: bool = False) -> Dict[str, Any]: + """ + Sync Ledger → Transaction. + + Converts legacy ledger entries to modern transaction records. + """ + result = {'success': 0, 'errors': [], 'skipped': 0} + + try: + # Optionally clear existing transaction data + if clear_existing: + logger.info("sync_transactions_clearing_existing") + db.query(Transaction).delete() + db.commit() + + # Build lookup map: file_no → case.id + cases = db.query(Case).all() + file_no_to_case = {c.file_no: c.id for c in cases} + logger.info("sync_transactions_case_map", case_count=len(file_no_to_case)) + + # Query all ledger entries + ledger_entries = db.query(Ledger).all() + logger.info("sync_transactions_processing", count=len(ledger_entries)) + + batch = [] + for ledger in ledger_entries: + try: + # Find corresponding modern case + case_id = file_no_to_case.get(ledger.file_no) + if not case_id: + result['errors'].append(f"No case found for file: {ledger.file_no}") + result['skipped'] += 1 + continue + + # Create modern transaction record with all ledger fields + transaction = Transaction( + case_id=case_id, + transaction_date=ledger.date, + transaction_type=ledger.t_type, + amount=float(ledger.amount) if ledger.amount else None, + description=ledger.note, + reference=str(ledger.item_no) if ledger.item_no else None, + # Ledger-specific fields + item_no=ledger.item_no, + employee_number=ledger.empl_num, + t_code=ledger.t_code, + t_type_l=ledger.t_type_l, + quantity=float(ledger.quantity) if ledger.quantity else None, + rate=float(ledger.rate) if ledger.rate else None, + billed=ledger.billed + ) + batch.append(transaction) + + if len(batch) >= BATCH_SIZE: + db.bulk_save_objects(batch) + db.commit() + result['success'] += len(batch) + batch = [] + + except Exception as e: + result['errors'].append(f"Ledger {ledger.file_no}/{ledger.item_no}: {str(e)}") + result['skipped'] += 1 + + # Save remaining batch + if batch: + db.bulk_save_objects(batch) + db.commit() + result['success'] += len(batch) + + logger.info("sync_transactions_complete", **result) + + except Exception as e: + db.rollback() + result['errors'].append(f"Fatal error: {str(e)}") + logger.error("sync_transactions_failed", error=str(e)) + + return result + + +def sync_payments(db: Session, clear_existing: bool = False) -> Dict[str, Any]: + """ + Sync LegacyPayment → Payment. + + Converts legacy payment entries to modern payment records. + """ + result = {'success': 0, 'errors': [], 'skipped': 0} + + try: + # Optionally clear existing payment data + if clear_existing: + logger.info("sync_payments_clearing_existing") + db.query(Payment).delete() + db.commit() + + # Build lookup map: file_no → case.id + cases = db.query(Case).all() + file_no_to_case = {c.file_no: c.id for c in cases} + logger.info("sync_payments_case_map", case_count=len(file_no_to_case)) + + # Query all legacy payments + legacy_payments = db.query(LegacyPayment).all() + logger.info("sync_payments_processing", count=len(legacy_payments)) + + batch = [] + for lpay in legacy_payments: + try: + # Find corresponding modern case + if not lpay.file_no: + result['skipped'] += 1 + continue + + case_id = file_no_to_case.get(lpay.file_no) + if not case_id: + result['errors'].append(f"No case found for file: {lpay.file_no}") + result['skipped'] += 1 + continue + + # Create modern payment record + payment = Payment( + case_id=case_id, + payment_date=lpay.deposit_date, + payment_type='deposit', # Legacy doesn't distinguish + amount=float(lpay.amount) if lpay.amount else None, + description=lpay.note if lpay.note else lpay.regarding, + check_number=None # Not in legacy PAYMENTS table + ) + batch.append(payment) + + if len(batch) >= BATCH_SIZE: + db.bulk_save_objects(batch) + db.commit() + result['success'] += len(batch) + batch = [] + + except Exception as e: + result['errors'].append(f"Payment {lpay.id}: {str(e)}") + result['skipped'] += 1 + + # Save remaining batch + if batch: + db.bulk_save_objects(batch) + db.commit() + result['success'] += len(batch) + + logger.info("sync_payments_complete", **result) + + except Exception as e: + db.rollback() + result['errors'].append(f"Fatal error: {str(e)}") + logger.error("sync_payments_failed", error=str(e)) + + return result + + +def sync_documents(db: Session, clear_existing: bool = False) -> Dict[str, Any]: + """ + Sync Qdros → Document. + + Converts QDRO entries to modern document records. + """ + result = {'success': 0, 'errors': [], 'skipped': 0} + + try: + # Optionally clear existing document data + if clear_existing: + logger.info("sync_documents_clearing_existing") + db.query(Document).delete() + db.commit() + + # Build lookup map: file_no → case.id + cases = db.query(Case).all() + file_no_to_case = {c.file_no: c.id for c in cases} + logger.info("sync_documents_case_map", case_count=len(file_no_to_case)) + + # Query all QDRO entries + qdros = db.query(Qdros).all() + logger.info("sync_documents_processing", count=len(qdros)) + + batch = [] + for qdro in qdros: + try: + # Find corresponding modern case + case_id = file_no_to_case.get(qdro.file_no) + if not case_id: + result['errors'].append(f"No case found for file: {qdro.file_no}") + result['skipped'] += 1 + continue + + # Build description from QDRO fields + desc_parts = [] + if qdro.case_type: + desc_parts.append(f"Type: {qdro.case_type}") + if qdro.case_number: + desc_parts.append(f"Case#: {qdro.case_number}") + if qdro.plan_id: + desc_parts.append(f"Plan: {qdro.plan_id}") + + description = '; '.join(desc_parts) if desc_parts else None + + # Create modern document record + document = Document( + case_id=case_id, + document_type='QDRO', + file_name=qdro.form_name, + file_path=None, # Legacy doesn't have file paths + description=description, + uploaded_date=qdro.draft_out if qdro.draft_out else qdro.judgment_date + ) + batch.append(document) + + if len(batch) >= BATCH_SIZE: + db.bulk_save_objects(batch) + db.commit() + result['success'] += len(batch) + batch = [] + + except Exception as e: + result['errors'].append(f"QDRO {qdro.file_no}/{qdro.version}: {str(e)}") + result['skipped'] += 1 + + # Save remaining batch + if batch: + db.bulk_save_objects(batch) + db.commit() + result['success'] += len(batch) + + logger.info("sync_documents_complete", **result) + + except Exception as e: + db.rollback() + result['errors'].append(f"Fatal error: {str(e)}") + logger.error("sync_documents_failed", error=str(e)) + + return result + + +def sync_all(db: Session, clear_existing: bool = False) -> Dict[str, Any]: + """ + Run all sync functions in proper order. + + Order matters due to foreign key dependencies: + 1. Clients (no dependencies) + 2. Phones (depends on Clients) + 3. Cases (depends on Clients) + 4. Transactions (depends on Cases) + 5. Payments (depends on Cases) + 6. Documents (depends on Cases) + """ + results = { + 'clients': None, + 'phones': None, + 'cases': None, + 'transactions': None, + 'payments': None, + 'documents': None + } + + logger.info("sync_all_starting", clear_existing=clear_existing) + + try: + results['clients'] = sync_clients(db, clear_existing) + logger.info("sync_all_clients_done", success=results['clients']['success']) + + results['phones'] = sync_phones(db, clear_existing) + logger.info("sync_all_phones_done", success=results['phones']['success']) + + results['cases'] = sync_cases(db, clear_existing) + logger.info("sync_all_cases_done", success=results['cases']['success']) + + results['transactions'] = sync_transactions(db, clear_existing) + logger.info("sync_all_transactions_done", success=results['transactions']['success']) + + results['payments'] = sync_payments(db, clear_existing) + logger.info("sync_all_payments_done", success=results['payments']['success']) + + results['documents'] = sync_documents(db, clear_existing) + logger.info("sync_all_documents_done", success=results['documents']['success']) + + logger.info("sync_all_complete") + + except Exception as e: + logger.error("sync_all_failed", error=str(e)) + raise + + return results + diff --git a/app/templates/admin.html b/app/templates/admin.html index 2050b68..13ad864 100644 --- a/app/templates/admin.html +++ b/app/templates/admin.html @@ -49,7 +49,8 @@
- Supported formats: ROLODEX*.csv, PHONE*.csv, FILES*.csv, LEDGER*.csv, QDROS*.csv, PAYMENTS*.csv + Supported formats: ROLODEX, PHONE, FILES, LEDGER, PAYMENTS, DEPOSITS, QDROS, PENSIONS, PLANINFO, + TRNSTYPE, TRNSLKUP, FOOTERS, FILESTAT, EMPLOYEE, GRUPLKUP, FILETYPE, and all related tables (*.csv)
+ + + + + + {% if show_sync_results and sync_results %} +
+
+
+ Sync Results +
+
+
+
+
+
+
+

{{ total_synced or 0 }}

+ Records Synced +
+
+
+
+
+
+

{{ total_skipped or 0 }}

+ Records Skipped +
+
+
+
+
+
+

{{ total_sync_errors or 0 }}

+ Errors +
+
+
+
+ +
Detailed Results by Table:
+
+ + + + + + + + + + + {% for table_name, result in sync_results.items() %} + + + + + + + {% if result.errors %} + + + + {% endif %} + {% endfor %} + +
Modern TableSyncedSkippedErrors
{{ table_name.title() }}{{ result.success }}{{ result.skipped }}{{ result.errors|length }}
+
+ View Errors ({{ result.errors|length }}) +
    + {% for error in result.errors[:10] %} +
  • {{ error }}
  • + {% endfor %} + {% if result.errors|length > 10 %} +
  • ... and {{ result.errors|length - 10 }} more errors
  • + {% endif %} +
+
+
+
+
+
+ {% endif %} +
@@ -425,5 +576,20 @@ document.addEventListener('DOMContentLoaded', function() { } }); }); + +// Sync confirmation function +function confirmSync() { + const clearCheckbox = document.getElementById('clearExisting'); + const clearExisting = clearCheckbox.checked; + + let message = "Are you sure you want to sync legacy data to modern models?"; + if (clearExisting) { + message += "\n\n⚠️ WARNING: This will DELETE all existing Client, Phone, Case, Transaction, Payment, and Document records before syncing!"; + } + + if (confirm(message)) { + document.getElementById('syncForm').submit(); + } +} {% endblock %} diff --git a/delphi.db b/delphi.db index 3d2018c65395763ee18c595e06a7a7f3e994ed04..9317d5597e6af00d27e2af9a931253b973f10b42 100644 GIT binary patch delta 185 zcmZoTAlYz0a)LBt#Y7or#)`&-tqF`v^cDE{*D>(_;s3_}f&Uf%6aIVrH~25{pW#2o ze}I1%{}%prn*|lR_}N&Q85#Nb+PBy<0x=U1GXpUT5VLOIV$T*Jz`?}Z$G~@-w-2bi ziI=&_jBh))BO4>5g`tTR5E&a985o)B8XD*tSSY}R^el`_Ee$QO$TXSpg0wR+PCpaR GW&!|pA~ajV^f$um_)m}d4CNtjc6JyyJ L8K<9)XEOx=4frR9