Files
delphi-database-v2/app/sync_legacy_to_modern.py
HotSwapp 7958556613 Fix: Improved CSV encoding detection for legacy data with non-standard characters
- Changed encoding fallback order to prioritize iso-8859-1/latin-1 over cp1252
- Increased encoding test from 1KB to 10KB to catch issues deeper in files
- Added proper file handle cleanup on encoding failures
- Resolves 'charmap codec can't decode byte 0x9d' error in rolodex import
- Tested with rolodex file containing 52,100 rows successfully
2025-10-12 19:19:25 -05:00

531 lines
18 KiB
Python

"""
Sync functions to populate modern models from legacy database tables.
This module provides functions to migrate data from the comprehensive legacy
schema to the simplified modern application models.
"""
from typing import Dict, Any
from sqlalchemy.orm import Session
from sqlalchemy.exc import IntegrityError
import structlog
from .models import (
# Legacy models
Rolodex, LegacyPhone, LegacyFile, Ledger, LegacyPayment, Qdros,
# Modern models
Client, Phone, Case, Transaction, Payment, Document
)
logger = structlog.get_logger(__name__)
BATCH_SIZE = 500
def sync_clients(db: Session, clear_existing: bool = False) -> Dict[str, Any]:
"""
Sync Rolodex → Client.
Maps legacy rolodex entries to modern simplified client records.
"""
result = {'success': 0, 'errors': [], 'skipped': 0}
try:
# Optionally clear existing modern client data
if clear_existing:
logger.info("sync_clients_clearing_existing")
db.query(Client).delete()
db.commit()
# Query all rolodex entries
rolodex_entries = db.query(Rolodex).all()
logger.info("sync_clients_processing", count=len(rolodex_entries))
batch = []
for rolex in rolodex_entries:
try:
# Build complete address from A1, A2, A3
address_parts = [
rolex.a1 or '',
rolex.a2 or '',
rolex.a3 or ''
]
address = ', '.join(filter(None, address_parts))
# Create modern client record
client = Client(
rolodex_id=rolex.id,
last_name=rolex.last,
first_name=rolex.first,
middle_initial=rolex.middle,
company=rolex.title, # Using title as company name
address=address if address else None,
city=rolex.city,
state=rolex.abrev,
zip_code=rolex.zip
)
batch.append(client)
if len(batch) >= BATCH_SIZE:
db.bulk_save_objects(batch)
db.commit()
result['success'] += len(batch)
batch = []
except Exception as e:
result['errors'].append(f"Rolodex ID {rolex.id}: {str(e)}")
result['skipped'] += 1
# Save remaining batch
if batch:
db.bulk_save_objects(batch)
db.commit()
result['success'] += len(batch)
logger.info("sync_clients_complete", **result)
except Exception as e:
db.rollback()
result['errors'].append(f"Fatal error: {str(e)}")
logger.error("sync_clients_failed", error=str(e))
return result
def sync_phones(db: Session, clear_existing: bool = False) -> Dict[str, Any]:
"""
Sync LegacyPhone → Phone.
Links phone numbers to modern client records via rolodex_id.
"""
result = {'success': 0, 'errors': [], 'skipped': 0}
try:
# Optionally clear existing phone data
if clear_existing:
logger.info("sync_phones_clearing_existing")
db.query(Phone).delete()
db.commit()
# Build lookup map: rolodex_id → client.id
clients = db.query(Client).all()
rolodex_to_client = {c.rolodex_id: c.id for c in clients}
logger.info("sync_phones_client_map", client_count=len(rolodex_to_client))
# Query all legacy phones
legacy_phones = db.query(LegacyPhone).all()
logger.info("sync_phones_processing", count=len(legacy_phones))
batch = []
for lphone in legacy_phones:
try:
# Find corresponding modern client
client_id = rolodex_to_client.get(lphone.id)
if not client_id:
result['errors'].append(f"No client found for rolodex ID: {lphone.id}")
result['skipped'] += 1
continue
# Create modern phone record
phone = Phone(
client_id=client_id,
phone_type=lphone.location if lphone.location else 'unknown',
phone_number=lphone.phone,
extension=None
)
batch.append(phone)
if len(batch) >= BATCH_SIZE:
db.bulk_save_objects(batch)
db.commit()
result['success'] += len(batch)
batch = []
except Exception as e:
result['errors'].append(f"Phone {lphone.id}/{lphone.phone}: {str(e)}")
result['skipped'] += 1
# Save remaining batch
if batch:
db.bulk_save_objects(batch)
db.commit()
result['success'] += len(batch)
logger.info("sync_phones_complete", **result)
except Exception as e:
db.rollback()
result['errors'].append(f"Fatal error: {str(e)}")
logger.error("sync_phones_failed", error=str(e))
return result
def sync_cases(db: Session, clear_existing: bool = False) -> Dict[str, Any]:
"""
Sync LegacyFile → Case.
Converts legacy file cabinet entries to modern case records.
"""
result = {'success': 0, 'errors': [], 'skipped': 0}
try:
# Optionally clear existing case data
if clear_existing:
logger.info("sync_cases_clearing_existing")
db.query(Case).delete()
db.commit()
# Build lookup map: rolodex_id → client.id
clients = db.query(Client).all()
rolodex_to_client = {c.rolodex_id: c.id for c in clients}
logger.info("sync_cases_client_map", client_count=len(rolodex_to_client))
# Query all legacy files
legacy_files = db.query(LegacyFile).all()
logger.info("sync_cases_processing", count=len(legacy_files))
batch = []
for lfile in legacy_files:
try:
# Find corresponding modern client
client_id = rolodex_to_client.get(lfile.id)
if not client_id:
result['errors'].append(f"No client found for rolodex ID: {lfile.id} (file {lfile.file_no})")
result['skipped'] += 1
continue
# Map legacy status to modern status
status = 'active'
if lfile.closed:
status = 'closed'
elif lfile.status and 'inactive' in lfile.status.lower():
status = 'inactive'
# Create modern case record
case = Case(
file_no=lfile.file_no,
client_id=client_id,
status=status,
case_type=lfile.file_type,
description=lfile.regarding,
open_date=lfile.opened,
close_date=lfile.closed
)
batch.append(case)
if len(batch) >= BATCH_SIZE:
db.bulk_save_objects(batch)
db.commit()
result['success'] += len(batch)
batch = []
except Exception as e:
result['errors'].append(f"File {lfile.file_no}: {str(e)}")
result['skipped'] += 1
# Save remaining batch
if batch:
db.bulk_save_objects(batch)
db.commit()
result['success'] += len(batch)
logger.info("sync_cases_complete", **result)
except Exception as e:
db.rollback()
result['errors'].append(f"Fatal error: {str(e)}")
logger.error("sync_cases_failed", error=str(e))
return result
def sync_transactions(db: Session, clear_existing: bool = False) -> Dict[str, Any]:
"""
Sync Ledger → Transaction.
Converts legacy ledger entries to modern transaction records.
"""
result = {'success': 0, 'errors': [], 'skipped': 0}
try:
# Optionally clear existing transaction data
if clear_existing:
logger.info("sync_transactions_clearing_existing")
db.query(Transaction).delete()
db.commit()
# Build lookup map: file_no → case.id
cases = db.query(Case).all()
file_no_to_case = {c.file_no: c.id for c in cases}
logger.info("sync_transactions_case_map", case_count=len(file_no_to_case))
# Query all ledger entries
ledger_entries = db.query(Ledger).all()
logger.info("sync_transactions_processing", count=len(ledger_entries))
batch = []
for ledger in ledger_entries:
try:
# Find corresponding modern case
case_id = file_no_to_case.get(ledger.file_no)
if not case_id:
result['errors'].append(f"No case found for file: {ledger.file_no}")
result['skipped'] += 1
continue
# Create modern transaction record with all ledger fields
transaction = Transaction(
case_id=case_id,
transaction_date=ledger.date,
transaction_type=ledger.t_type,
amount=float(ledger.amount) if ledger.amount else None,
description=ledger.note,
reference=str(ledger.item_no) if ledger.item_no else None,
# Ledger-specific fields
item_no=ledger.item_no,
employee_number=ledger.empl_num,
t_code=ledger.t_code,
t_type_l=ledger.t_type_l,
quantity=float(ledger.quantity) if ledger.quantity else None,
rate=float(ledger.rate) if ledger.rate else None,
billed=ledger.billed
)
batch.append(transaction)
if len(batch) >= BATCH_SIZE:
db.bulk_save_objects(batch)
db.commit()
result['success'] += len(batch)
batch = []
except Exception as e:
result['errors'].append(f"Ledger {ledger.file_no}/{ledger.item_no}: {str(e)}")
result['skipped'] += 1
# Save remaining batch
if batch:
db.bulk_save_objects(batch)
db.commit()
result['success'] += len(batch)
logger.info("sync_transactions_complete", **result)
except Exception as e:
db.rollback()
result['errors'].append(f"Fatal error: {str(e)}")
logger.error("sync_transactions_failed", error=str(e))
return result
def sync_payments(db: Session, clear_existing: bool = False) -> Dict[str, Any]:
"""
Sync LegacyPayment → Payment.
Converts legacy payment entries to modern payment records.
"""
result = {'success': 0, 'errors': [], 'skipped': 0}
try:
# Optionally clear existing payment data
if clear_existing:
logger.info("sync_payments_clearing_existing")
db.query(Payment).delete()
db.commit()
# Build lookup map: file_no → case.id
cases = db.query(Case).all()
file_no_to_case = {c.file_no: c.id for c in cases}
logger.info("sync_payments_case_map", case_count=len(file_no_to_case))
# Query all legacy payments
legacy_payments = db.query(LegacyPayment).all()
logger.info("sync_payments_processing", count=len(legacy_payments))
batch = []
for lpay in legacy_payments:
try:
# Find corresponding modern case
if not lpay.file_no:
result['skipped'] += 1
continue
case_id = file_no_to_case.get(lpay.file_no)
if not case_id:
result['errors'].append(f"No case found for file: {lpay.file_no}")
result['skipped'] += 1
continue
# Create modern payment record
payment = Payment(
case_id=case_id,
payment_date=lpay.deposit_date,
payment_type='deposit', # Legacy doesn't distinguish
amount=float(lpay.amount) if lpay.amount else None,
description=lpay.note if lpay.note else lpay.regarding,
check_number=None # Not in legacy PAYMENTS table
)
batch.append(payment)
if len(batch) >= BATCH_SIZE:
db.bulk_save_objects(batch)
db.commit()
result['success'] += len(batch)
batch = []
except Exception as e:
result['errors'].append(f"Payment {lpay.id}: {str(e)}")
result['skipped'] += 1
# Save remaining batch
if batch:
db.bulk_save_objects(batch)
db.commit()
result['success'] += len(batch)
logger.info("sync_payments_complete", **result)
except Exception as e:
db.rollback()
result['errors'].append(f"Fatal error: {str(e)}")
logger.error("sync_payments_failed", error=str(e))
return result
def sync_documents(db: Session, clear_existing: bool = False) -> Dict[str, Any]:
"""
Sync Qdros → Document.
Converts QDRO entries to modern document records.
"""
result = {'success': 0, 'errors': [], 'skipped': 0}
try:
# Optionally clear existing document data
if clear_existing:
logger.info("sync_documents_clearing_existing")
db.query(Document).delete()
db.commit()
# Build lookup map: file_no → case.id
cases = db.query(Case).all()
file_no_to_case = {c.file_no: c.id for c in cases}
logger.info("sync_documents_case_map", case_count=len(file_no_to_case))
# Query all QDRO entries
qdros = db.query(Qdros).all()
logger.info("sync_documents_processing", count=len(qdros))
batch = []
for qdro in qdros:
try:
# Find corresponding modern case
case_id = file_no_to_case.get(qdro.file_no)
if not case_id:
result['errors'].append(f"No case found for file: {qdro.file_no}")
result['skipped'] += 1
continue
# Build description from QDRO fields
desc_parts = []
if qdro.case_type:
desc_parts.append(f"Type: {qdro.case_type}")
if qdro.case_number:
desc_parts.append(f"Case#: {qdro.case_number}")
if qdro.plan_id:
desc_parts.append(f"Plan: {qdro.plan_id}")
description = '; '.join(desc_parts) if desc_parts else None
# Create modern document record
document = Document(
case_id=case_id,
document_type='QDRO',
file_name=qdro.form_name,
file_path=None, # Legacy doesn't have file paths
description=description,
uploaded_date=qdro.draft_out if qdro.draft_out else qdro.judgment_date
)
batch.append(document)
if len(batch) >= BATCH_SIZE:
db.bulk_save_objects(batch)
db.commit()
result['success'] += len(batch)
batch = []
except Exception as e:
result['errors'].append(f"QDRO {qdro.file_no}/{qdro.version}: {str(e)}")
result['skipped'] += 1
# Save remaining batch
if batch:
db.bulk_save_objects(batch)
db.commit()
result['success'] += len(batch)
logger.info("sync_documents_complete", **result)
except Exception as e:
db.rollback()
result['errors'].append(f"Fatal error: {str(e)}")
logger.error("sync_documents_failed", error=str(e))
return result
def sync_all(db: Session, clear_existing: bool = False) -> Dict[str, Any]:
"""
Run all sync functions in proper order.
Order matters due to foreign key dependencies:
1. Clients (no dependencies)
2. Phones (depends on Clients)
3. Cases (depends on Clients)
4. Transactions (depends on Cases)
5. Payments (depends on Cases)
6. Documents (depends on Cases)
"""
results = {
'clients': None,
'phones': None,
'cases': None,
'transactions': None,
'payments': None,
'documents': None
}
logger.info("sync_all_starting", clear_existing=clear_existing)
try:
results['clients'] = sync_clients(db, clear_existing)
logger.info("sync_all_clients_done", success=results['clients']['success'])
results['phones'] = sync_phones(db, clear_existing)
logger.info("sync_all_phones_done", success=results['phones']['success'])
results['cases'] = sync_cases(db, clear_existing)
logger.info("sync_all_cases_done", success=results['cases']['success'])
results['transactions'] = sync_transactions(db, clear_existing)
logger.info("sync_all_transactions_done", success=results['transactions']['success'])
results['payments'] = sync_payments(db, clear_existing)
logger.info("sync_all_payments_done", success=results['payments']['success'])
results['documents'] = sync_documents(db, clear_existing)
logger.info("sync_all_documents_done", success=results['documents']['success'])
logger.info("sync_all_complete")
except Exception as e:
logger.error("sync_all_failed", error=str(e))
raise
return results