- Changed encoding fallback order to prioritize iso-8859-1/latin-1 over cp1252 - Increased encoding test from 1KB to 10KB to catch issues deeper in files - Added proper file handle cleanup on encoding failures - Resolves 'charmap codec can't decode byte 0x9d' error in rolodex import - Tested with rolodex file containing 52,100 rows successfully
531 lines
18 KiB
Python
531 lines
18 KiB
Python
"""
|
|
Sync functions to populate modern models from legacy database tables.
|
|
|
|
This module provides functions to migrate data from the comprehensive legacy
|
|
schema to the simplified modern application models.
|
|
"""
|
|
|
|
from typing import Dict, Any
|
|
from sqlalchemy.orm import Session
|
|
from sqlalchemy.exc import IntegrityError
|
|
import structlog
|
|
|
|
from .models import (
|
|
# Legacy models
|
|
Rolodex, LegacyPhone, LegacyFile, Ledger, LegacyPayment, Qdros,
|
|
# Modern models
|
|
Client, Phone, Case, Transaction, Payment, Document
|
|
)
|
|
|
|
logger = structlog.get_logger(__name__)
|
|
|
|
BATCH_SIZE = 500
|
|
|
|
|
|
def sync_clients(db: Session, clear_existing: bool = False) -> Dict[str, Any]:
|
|
"""
|
|
Sync Rolodex → Client.
|
|
|
|
Maps legacy rolodex entries to modern simplified client records.
|
|
"""
|
|
result = {'success': 0, 'errors': [], 'skipped': 0}
|
|
|
|
try:
|
|
# Optionally clear existing modern client data
|
|
if clear_existing:
|
|
logger.info("sync_clients_clearing_existing")
|
|
db.query(Client).delete()
|
|
db.commit()
|
|
|
|
# Query all rolodex entries
|
|
rolodex_entries = db.query(Rolodex).all()
|
|
logger.info("sync_clients_processing", count=len(rolodex_entries))
|
|
|
|
batch = []
|
|
for rolex in rolodex_entries:
|
|
try:
|
|
# Build complete address from A1, A2, A3
|
|
address_parts = [
|
|
rolex.a1 or '',
|
|
rolex.a2 or '',
|
|
rolex.a3 or ''
|
|
]
|
|
address = ', '.join(filter(None, address_parts))
|
|
|
|
# Create modern client record
|
|
client = Client(
|
|
rolodex_id=rolex.id,
|
|
last_name=rolex.last,
|
|
first_name=rolex.first,
|
|
middle_initial=rolex.middle,
|
|
company=rolex.title, # Using title as company name
|
|
address=address if address else None,
|
|
city=rolex.city,
|
|
state=rolex.abrev,
|
|
zip_code=rolex.zip
|
|
)
|
|
batch.append(client)
|
|
|
|
if len(batch) >= BATCH_SIZE:
|
|
db.bulk_save_objects(batch)
|
|
db.commit()
|
|
result['success'] += len(batch)
|
|
batch = []
|
|
|
|
except Exception as e:
|
|
result['errors'].append(f"Rolodex ID {rolex.id}: {str(e)}")
|
|
result['skipped'] += 1
|
|
|
|
# Save remaining batch
|
|
if batch:
|
|
db.bulk_save_objects(batch)
|
|
db.commit()
|
|
result['success'] += len(batch)
|
|
|
|
logger.info("sync_clients_complete", **result)
|
|
|
|
except Exception as e:
|
|
db.rollback()
|
|
result['errors'].append(f"Fatal error: {str(e)}")
|
|
logger.error("sync_clients_failed", error=str(e))
|
|
|
|
return result
|
|
|
|
|
|
def sync_phones(db: Session, clear_existing: bool = False) -> Dict[str, Any]:
|
|
"""
|
|
Sync LegacyPhone → Phone.
|
|
|
|
Links phone numbers to modern client records via rolodex_id.
|
|
"""
|
|
result = {'success': 0, 'errors': [], 'skipped': 0}
|
|
|
|
try:
|
|
# Optionally clear existing phone data
|
|
if clear_existing:
|
|
logger.info("sync_phones_clearing_existing")
|
|
db.query(Phone).delete()
|
|
db.commit()
|
|
|
|
# Build lookup map: rolodex_id → client.id
|
|
clients = db.query(Client).all()
|
|
rolodex_to_client = {c.rolodex_id: c.id for c in clients}
|
|
logger.info("sync_phones_client_map", client_count=len(rolodex_to_client))
|
|
|
|
# Query all legacy phones
|
|
legacy_phones = db.query(LegacyPhone).all()
|
|
logger.info("sync_phones_processing", count=len(legacy_phones))
|
|
|
|
batch = []
|
|
for lphone in legacy_phones:
|
|
try:
|
|
# Find corresponding modern client
|
|
client_id = rolodex_to_client.get(lphone.id)
|
|
if not client_id:
|
|
result['errors'].append(f"No client found for rolodex ID: {lphone.id}")
|
|
result['skipped'] += 1
|
|
continue
|
|
|
|
# Create modern phone record
|
|
phone = Phone(
|
|
client_id=client_id,
|
|
phone_type=lphone.location if lphone.location else 'unknown',
|
|
phone_number=lphone.phone,
|
|
extension=None
|
|
)
|
|
batch.append(phone)
|
|
|
|
if len(batch) >= BATCH_SIZE:
|
|
db.bulk_save_objects(batch)
|
|
db.commit()
|
|
result['success'] += len(batch)
|
|
batch = []
|
|
|
|
except Exception as e:
|
|
result['errors'].append(f"Phone {lphone.id}/{lphone.phone}: {str(e)}")
|
|
result['skipped'] += 1
|
|
|
|
# Save remaining batch
|
|
if batch:
|
|
db.bulk_save_objects(batch)
|
|
db.commit()
|
|
result['success'] += len(batch)
|
|
|
|
logger.info("sync_phones_complete", **result)
|
|
|
|
except Exception as e:
|
|
db.rollback()
|
|
result['errors'].append(f"Fatal error: {str(e)}")
|
|
logger.error("sync_phones_failed", error=str(e))
|
|
|
|
return result
|
|
|
|
|
|
def sync_cases(db: Session, clear_existing: bool = False) -> Dict[str, Any]:
|
|
"""
|
|
Sync LegacyFile → Case.
|
|
|
|
Converts legacy file cabinet entries to modern case records.
|
|
"""
|
|
result = {'success': 0, 'errors': [], 'skipped': 0}
|
|
|
|
try:
|
|
# Optionally clear existing case data
|
|
if clear_existing:
|
|
logger.info("sync_cases_clearing_existing")
|
|
db.query(Case).delete()
|
|
db.commit()
|
|
|
|
# Build lookup map: rolodex_id → client.id
|
|
clients = db.query(Client).all()
|
|
rolodex_to_client = {c.rolodex_id: c.id for c in clients}
|
|
logger.info("sync_cases_client_map", client_count=len(rolodex_to_client))
|
|
|
|
# Query all legacy files
|
|
legacy_files = db.query(LegacyFile).all()
|
|
logger.info("sync_cases_processing", count=len(legacy_files))
|
|
|
|
batch = []
|
|
for lfile in legacy_files:
|
|
try:
|
|
# Find corresponding modern client
|
|
client_id = rolodex_to_client.get(lfile.id)
|
|
if not client_id:
|
|
result['errors'].append(f"No client found for rolodex ID: {lfile.id} (file {lfile.file_no})")
|
|
result['skipped'] += 1
|
|
continue
|
|
|
|
# Map legacy status to modern status
|
|
status = 'active'
|
|
if lfile.closed:
|
|
status = 'closed'
|
|
elif lfile.status and 'inactive' in lfile.status.lower():
|
|
status = 'inactive'
|
|
|
|
# Create modern case record
|
|
case = Case(
|
|
file_no=lfile.file_no,
|
|
client_id=client_id,
|
|
status=status,
|
|
case_type=lfile.file_type,
|
|
description=lfile.regarding,
|
|
open_date=lfile.opened,
|
|
close_date=lfile.closed
|
|
)
|
|
batch.append(case)
|
|
|
|
if len(batch) >= BATCH_SIZE:
|
|
db.bulk_save_objects(batch)
|
|
db.commit()
|
|
result['success'] += len(batch)
|
|
batch = []
|
|
|
|
except Exception as e:
|
|
result['errors'].append(f"File {lfile.file_no}: {str(e)}")
|
|
result['skipped'] += 1
|
|
|
|
# Save remaining batch
|
|
if batch:
|
|
db.bulk_save_objects(batch)
|
|
db.commit()
|
|
result['success'] += len(batch)
|
|
|
|
logger.info("sync_cases_complete", **result)
|
|
|
|
except Exception as e:
|
|
db.rollback()
|
|
result['errors'].append(f"Fatal error: {str(e)}")
|
|
logger.error("sync_cases_failed", error=str(e))
|
|
|
|
return result
|
|
|
|
|
|
def sync_transactions(db: Session, clear_existing: bool = False) -> Dict[str, Any]:
|
|
"""
|
|
Sync Ledger → Transaction.
|
|
|
|
Converts legacy ledger entries to modern transaction records.
|
|
"""
|
|
result = {'success': 0, 'errors': [], 'skipped': 0}
|
|
|
|
try:
|
|
# Optionally clear existing transaction data
|
|
if clear_existing:
|
|
logger.info("sync_transactions_clearing_existing")
|
|
db.query(Transaction).delete()
|
|
db.commit()
|
|
|
|
# Build lookup map: file_no → case.id
|
|
cases = db.query(Case).all()
|
|
file_no_to_case = {c.file_no: c.id for c in cases}
|
|
logger.info("sync_transactions_case_map", case_count=len(file_no_to_case))
|
|
|
|
# Query all ledger entries
|
|
ledger_entries = db.query(Ledger).all()
|
|
logger.info("sync_transactions_processing", count=len(ledger_entries))
|
|
|
|
batch = []
|
|
for ledger in ledger_entries:
|
|
try:
|
|
# Find corresponding modern case
|
|
case_id = file_no_to_case.get(ledger.file_no)
|
|
if not case_id:
|
|
result['errors'].append(f"No case found for file: {ledger.file_no}")
|
|
result['skipped'] += 1
|
|
continue
|
|
|
|
# Create modern transaction record with all ledger fields
|
|
transaction = Transaction(
|
|
case_id=case_id,
|
|
transaction_date=ledger.date,
|
|
transaction_type=ledger.t_type,
|
|
amount=float(ledger.amount) if ledger.amount else None,
|
|
description=ledger.note,
|
|
reference=str(ledger.item_no) if ledger.item_no else None,
|
|
# Ledger-specific fields
|
|
item_no=ledger.item_no,
|
|
employee_number=ledger.empl_num,
|
|
t_code=ledger.t_code,
|
|
t_type_l=ledger.t_type_l,
|
|
quantity=float(ledger.quantity) if ledger.quantity else None,
|
|
rate=float(ledger.rate) if ledger.rate else None,
|
|
billed=ledger.billed
|
|
)
|
|
batch.append(transaction)
|
|
|
|
if len(batch) >= BATCH_SIZE:
|
|
db.bulk_save_objects(batch)
|
|
db.commit()
|
|
result['success'] += len(batch)
|
|
batch = []
|
|
|
|
except Exception as e:
|
|
result['errors'].append(f"Ledger {ledger.file_no}/{ledger.item_no}: {str(e)}")
|
|
result['skipped'] += 1
|
|
|
|
# Save remaining batch
|
|
if batch:
|
|
db.bulk_save_objects(batch)
|
|
db.commit()
|
|
result['success'] += len(batch)
|
|
|
|
logger.info("sync_transactions_complete", **result)
|
|
|
|
except Exception as e:
|
|
db.rollback()
|
|
result['errors'].append(f"Fatal error: {str(e)}")
|
|
logger.error("sync_transactions_failed", error=str(e))
|
|
|
|
return result
|
|
|
|
|
|
def sync_payments(db: Session, clear_existing: bool = False) -> Dict[str, Any]:
|
|
"""
|
|
Sync LegacyPayment → Payment.
|
|
|
|
Converts legacy payment entries to modern payment records.
|
|
"""
|
|
result = {'success': 0, 'errors': [], 'skipped': 0}
|
|
|
|
try:
|
|
# Optionally clear existing payment data
|
|
if clear_existing:
|
|
logger.info("sync_payments_clearing_existing")
|
|
db.query(Payment).delete()
|
|
db.commit()
|
|
|
|
# Build lookup map: file_no → case.id
|
|
cases = db.query(Case).all()
|
|
file_no_to_case = {c.file_no: c.id for c in cases}
|
|
logger.info("sync_payments_case_map", case_count=len(file_no_to_case))
|
|
|
|
# Query all legacy payments
|
|
legacy_payments = db.query(LegacyPayment).all()
|
|
logger.info("sync_payments_processing", count=len(legacy_payments))
|
|
|
|
batch = []
|
|
for lpay in legacy_payments:
|
|
try:
|
|
# Find corresponding modern case
|
|
if not lpay.file_no:
|
|
result['skipped'] += 1
|
|
continue
|
|
|
|
case_id = file_no_to_case.get(lpay.file_no)
|
|
if not case_id:
|
|
result['errors'].append(f"No case found for file: {lpay.file_no}")
|
|
result['skipped'] += 1
|
|
continue
|
|
|
|
# Create modern payment record
|
|
payment = Payment(
|
|
case_id=case_id,
|
|
payment_date=lpay.deposit_date,
|
|
payment_type='deposit', # Legacy doesn't distinguish
|
|
amount=float(lpay.amount) if lpay.amount else None,
|
|
description=lpay.note if lpay.note else lpay.regarding,
|
|
check_number=None # Not in legacy PAYMENTS table
|
|
)
|
|
batch.append(payment)
|
|
|
|
if len(batch) >= BATCH_SIZE:
|
|
db.bulk_save_objects(batch)
|
|
db.commit()
|
|
result['success'] += len(batch)
|
|
batch = []
|
|
|
|
except Exception as e:
|
|
result['errors'].append(f"Payment {lpay.id}: {str(e)}")
|
|
result['skipped'] += 1
|
|
|
|
# Save remaining batch
|
|
if batch:
|
|
db.bulk_save_objects(batch)
|
|
db.commit()
|
|
result['success'] += len(batch)
|
|
|
|
logger.info("sync_payments_complete", **result)
|
|
|
|
except Exception as e:
|
|
db.rollback()
|
|
result['errors'].append(f"Fatal error: {str(e)}")
|
|
logger.error("sync_payments_failed", error=str(e))
|
|
|
|
return result
|
|
|
|
|
|
def sync_documents(db: Session, clear_existing: bool = False) -> Dict[str, Any]:
|
|
"""
|
|
Sync Qdros → Document.
|
|
|
|
Converts QDRO entries to modern document records.
|
|
"""
|
|
result = {'success': 0, 'errors': [], 'skipped': 0}
|
|
|
|
try:
|
|
# Optionally clear existing document data
|
|
if clear_existing:
|
|
logger.info("sync_documents_clearing_existing")
|
|
db.query(Document).delete()
|
|
db.commit()
|
|
|
|
# Build lookup map: file_no → case.id
|
|
cases = db.query(Case).all()
|
|
file_no_to_case = {c.file_no: c.id for c in cases}
|
|
logger.info("sync_documents_case_map", case_count=len(file_no_to_case))
|
|
|
|
# Query all QDRO entries
|
|
qdros = db.query(Qdros).all()
|
|
logger.info("sync_documents_processing", count=len(qdros))
|
|
|
|
batch = []
|
|
for qdro in qdros:
|
|
try:
|
|
# Find corresponding modern case
|
|
case_id = file_no_to_case.get(qdro.file_no)
|
|
if not case_id:
|
|
result['errors'].append(f"No case found for file: {qdro.file_no}")
|
|
result['skipped'] += 1
|
|
continue
|
|
|
|
# Build description from QDRO fields
|
|
desc_parts = []
|
|
if qdro.case_type:
|
|
desc_parts.append(f"Type: {qdro.case_type}")
|
|
if qdro.case_number:
|
|
desc_parts.append(f"Case#: {qdro.case_number}")
|
|
if qdro.plan_id:
|
|
desc_parts.append(f"Plan: {qdro.plan_id}")
|
|
|
|
description = '; '.join(desc_parts) if desc_parts else None
|
|
|
|
# Create modern document record
|
|
document = Document(
|
|
case_id=case_id,
|
|
document_type='QDRO',
|
|
file_name=qdro.form_name,
|
|
file_path=None, # Legacy doesn't have file paths
|
|
description=description,
|
|
uploaded_date=qdro.draft_out if qdro.draft_out else qdro.judgment_date
|
|
)
|
|
batch.append(document)
|
|
|
|
if len(batch) >= BATCH_SIZE:
|
|
db.bulk_save_objects(batch)
|
|
db.commit()
|
|
result['success'] += len(batch)
|
|
batch = []
|
|
|
|
except Exception as e:
|
|
result['errors'].append(f"QDRO {qdro.file_no}/{qdro.version}: {str(e)}")
|
|
result['skipped'] += 1
|
|
|
|
# Save remaining batch
|
|
if batch:
|
|
db.bulk_save_objects(batch)
|
|
db.commit()
|
|
result['success'] += len(batch)
|
|
|
|
logger.info("sync_documents_complete", **result)
|
|
|
|
except Exception as e:
|
|
db.rollback()
|
|
result['errors'].append(f"Fatal error: {str(e)}")
|
|
logger.error("sync_documents_failed", error=str(e))
|
|
|
|
return result
|
|
|
|
|
|
def sync_all(db: Session, clear_existing: bool = False) -> Dict[str, Any]:
|
|
"""
|
|
Run all sync functions in proper order.
|
|
|
|
Order matters due to foreign key dependencies:
|
|
1. Clients (no dependencies)
|
|
2. Phones (depends on Clients)
|
|
3. Cases (depends on Clients)
|
|
4. Transactions (depends on Cases)
|
|
5. Payments (depends on Cases)
|
|
6. Documents (depends on Cases)
|
|
"""
|
|
results = {
|
|
'clients': None,
|
|
'phones': None,
|
|
'cases': None,
|
|
'transactions': None,
|
|
'payments': None,
|
|
'documents': None
|
|
}
|
|
|
|
logger.info("sync_all_starting", clear_existing=clear_existing)
|
|
|
|
try:
|
|
results['clients'] = sync_clients(db, clear_existing)
|
|
logger.info("sync_all_clients_done", success=results['clients']['success'])
|
|
|
|
results['phones'] = sync_phones(db, clear_existing)
|
|
logger.info("sync_all_phones_done", success=results['phones']['success'])
|
|
|
|
results['cases'] = sync_cases(db, clear_existing)
|
|
logger.info("sync_all_cases_done", success=results['cases']['success'])
|
|
|
|
results['transactions'] = sync_transactions(db, clear_existing)
|
|
logger.info("sync_all_transactions_done", success=results['transactions']['success'])
|
|
|
|
results['payments'] = sync_payments(db, clear_existing)
|
|
logger.info("sync_all_payments_done", success=results['payments']['success'])
|
|
|
|
results['documents'] = sync_documents(db, clear_existing)
|
|
logger.info("sync_all_documents_done", success=results['documents']['success'])
|
|
|
|
logger.info("sync_all_complete")
|
|
|
|
except Exception as e:
|
|
logger.error("sync_all_failed", error=str(e))
|
|
raise
|
|
|
|
return results
|
|
|
|
|
|
|