Files
delphi-database-v2/app/sync_legacy_to_modern.py
HotSwapp 4030dbd88e Implement comprehensive CSV import system for legacy database migration
- Added 5 new legacy models to app/models.py (FileType, FileNots, RolexV, FVarLkup, RVarLkup)
- Created app/import_legacy.py with import functions for all legacy tables:
  * Reference tables: TRNSTYPE, TRNSLKUP, FOOTERS, FILESTAT, EMPLOYEE, GRUPLKUP, FILETYPE, FVARLKUP, RVARLKUP
  * Core tables: ROLODEX, PHONE, ROLEX_V, FILES, FILES_R, FILES_V, FILENOTS, LEDGER, DEPOSITS, PAYMENTS
  * Specialized: PLANINFO, QDROS, PENSIONS and all pension-related tables
- Created app/sync_legacy_to_modern.py with sync functions to populate modern models from legacy data
- Updated admin routes in app/main.py:
  * Extended process_csv_import to support all new import types
  * Added /admin/sync endpoint for syncing legacy to modern models
  * Updated get_import_type_from_filename to recognize all CSV file patterns
- Enhanced app/templates/admin.html with:
  * Import Order Guide showing recommended import sequence
  * Sync to Modern Models section with confirmation dialog
  * Sync results display with detailed per-table statistics
  * Updated supported file formats list
- All import functions use batch processing (500 rows), proper error handling, and structured logging
- Sync functions maintain foreign key integrity and skip orphaned records with warnings
2025-10-08 09:41:38 -05:00

529 lines
18 KiB
Python

"""
Sync functions to populate modern models from legacy database tables.
This module provides functions to migrate data from the comprehensive legacy
schema to the simplified modern application models.
"""
from typing import Dict, Any
from sqlalchemy.orm import Session
from sqlalchemy.exc import IntegrityError
import structlog
from .models import (
# Legacy models
Rolodex, LegacyPhone, LegacyFile, Ledger, LegacyPayment, Qdros,
# Modern models
Client, Phone, Case, Transaction, Payment, Document
)
logger = structlog.get_logger(__name__)
BATCH_SIZE = 500
def sync_clients(db: Session, clear_existing: bool = False) -> Dict[str, Any]:
"""
Sync Rolodex → Client.
Maps legacy rolodex entries to modern simplified client records.
"""
result = {'success': 0, 'errors': [], 'skipped': 0}
try:
# Optionally clear existing modern client data
if clear_existing:
logger.info("sync_clients_clearing_existing")
db.query(Client).delete()
db.commit()
# Query all rolodex entries
rolodex_entries = db.query(Rolodex).all()
logger.info("sync_clients_processing", count=len(rolodex_entries))
batch = []
for rolex in rolodex_entries:
try:
# Build complete address from A1, A2, A3
address_parts = [
rolex.a1 or '',
rolex.a2 or '',
rolex.a3 or ''
]
address = ', '.join(filter(None, address_parts))
# Create modern client record
client = Client(
rolodex_id=rolex.id,
last_name=rolex.last,
first_name=rolex.first,
middle_initial=rolex.middle,
company=rolex.title, # Using title as company name
address=address if address else None,
city=rolex.city,
state=rolex.abrev,
zip_code=rolex.zip
)
batch.append(client)
if len(batch) >= BATCH_SIZE:
db.bulk_save_objects(batch)
db.commit()
result['success'] += len(batch)
batch = []
except Exception as e:
result['errors'].append(f"Rolodex ID {rolex.id}: {str(e)}")
result['skipped'] += 1
# Save remaining batch
if batch:
db.bulk_save_objects(batch)
db.commit()
result['success'] += len(batch)
logger.info("sync_clients_complete", **result)
except Exception as e:
db.rollback()
result['errors'].append(f"Fatal error: {str(e)}")
logger.error("sync_clients_failed", error=str(e))
return result
def sync_phones(db: Session, clear_existing: bool = False) -> Dict[str, Any]:
"""
Sync LegacyPhone → Phone.
Links phone numbers to modern client records via rolodex_id.
"""
result = {'success': 0, 'errors': [], 'skipped': 0}
try:
# Optionally clear existing phone data
if clear_existing:
logger.info("sync_phones_clearing_existing")
db.query(Phone).delete()
db.commit()
# Build lookup map: rolodex_id → client.id
clients = db.query(Client).all()
rolodex_to_client = {c.rolodex_id: c.id for c in clients}
logger.info("sync_phones_client_map", client_count=len(rolodex_to_client))
# Query all legacy phones
legacy_phones = db.query(LegacyPhone).all()
logger.info("sync_phones_processing", count=len(legacy_phones))
batch = []
for lphone in legacy_phones:
try:
# Find corresponding modern client
client_id = rolodex_to_client.get(lphone.id)
if not client_id:
result['errors'].append(f"No client found for rolodex ID: {lphone.id}")
result['skipped'] += 1
continue
# Create modern phone record
phone = Phone(
client_id=client_id,
phone_type=lphone.location if lphone.location else 'unknown',
phone_number=lphone.phone,
extension=None
)
batch.append(phone)
if len(batch) >= BATCH_SIZE:
db.bulk_save_objects(batch)
db.commit()
result['success'] += len(batch)
batch = []
except Exception as e:
result['errors'].append(f"Phone {lphone.id}/{lphone.phone}: {str(e)}")
result['skipped'] += 1
# Save remaining batch
if batch:
db.bulk_save_objects(batch)
db.commit()
result['success'] += len(batch)
logger.info("sync_phones_complete", **result)
except Exception as e:
db.rollback()
result['errors'].append(f"Fatal error: {str(e)}")
logger.error("sync_phones_failed", error=str(e))
return result
def sync_cases(db: Session, clear_existing: bool = False) -> Dict[str, Any]:
"""
Sync LegacyFile → Case.
Converts legacy file cabinet entries to modern case records.
"""
result = {'success': 0, 'errors': [], 'skipped': 0}
try:
# Optionally clear existing case data
if clear_existing:
logger.info("sync_cases_clearing_existing")
db.query(Case).delete()
db.commit()
# Build lookup map: rolodex_id → client.id
clients = db.query(Client).all()
rolodex_to_client = {c.rolodex_id: c.id for c in clients}
logger.info("sync_cases_client_map", client_count=len(rolodex_to_client))
# Query all legacy files
legacy_files = db.query(LegacyFile).all()
logger.info("sync_cases_processing", count=len(legacy_files))
batch = []
for lfile in legacy_files:
try:
# Find corresponding modern client
client_id = rolodex_to_client.get(lfile.id)
if not client_id:
result['errors'].append(f"No client found for rolodex ID: {lfile.id} (file {lfile.file_no})")
result['skipped'] += 1
continue
# Map legacy status to modern status
status = 'active'
if lfile.closed:
status = 'closed'
elif lfile.status and 'inactive' in lfile.status.lower():
status = 'inactive'
# Create modern case record
case = Case(
file_no=lfile.file_no,
client_id=client_id,
status=status,
case_type=lfile.file_type,
description=lfile.regarding,
open_date=lfile.opened,
close_date=lfile.closed
)
batch.append(case)
if len(batch) >= BATCH_SIZE:
db.bulk_save_objects(batch)
db.commit()
result['success'] += len(batch)
batch = []
except Exception as e:
result['errors'].append(f"File {lfile.file_no}: {str(e)}")
result['skipped'] += 1
# Save remaining batch
if batch:
db.bulk_save_objects(batch)
db.commit()
result['success'] += len(batch)
logger.info("sync_cases_complete", **result)
except Exception as e:
db.rollback()
result['errors'].append(f"Fatal error: {str(e)}")
logger.error("sync_cases_failed", error=str(e))
return result
def sync_transactions(db: Session, clear_existing: bool = False) -> Dict[str, Any]:
"""
Sync Ledger → Transaction.
Converts legacy ledger entries to modern transaction records.
"""
result = {'success': 0, 'errors': [], 'skipped': 0}
try:
# Optionally clear existing transaction data
if clear_existing:
logger.info("sync_transactions_clearing_existing")
db.query(Transaction).delete()
db.commit()
# Build lookup map: file_no → case.id
cases = db.query(Case).all()
file_no_to_case = {c.file_no: c.id for c in cases}
logger.info("sync_transactions_case_map", case_count=len(file_no_to_case))
# Query all ledger entries
ledger_entries = db.query(Ledger).all()
logger.info("sync_transactions_processing", count=len(ledger_entries))
batch = []
for ledger in ledger_entries:
try:
# Find corresponding modern case
case_id = file_no_to_case.get(ledger.file_no)
if not case_id:
result['errors'].append(f"No case found for file: {ledger.file_no}")
result['skipped'] += 1
continue
# Create modern transaction record with all ledger fields
transaction = Transaction(
case_id=case_id,
transaction_date=ledger.date,
transaction_type=ledger.t_type,
amount=float(ledger.amount) if ledger.amount else None,
description=ledger.note,
reference=str(ledger.item_no) if ledger.item_no else None,
# Ledger-specific fields
item_no=ledger.item_no,
employee_number=ledger.empl_num,
t_code=ledger.t_code,
t_type_l=ledger.t_type_l,
quantity=float(ledger.quantity) if ledger.quantity else None,
rate=float(ledger.rate) if ledger.rate else None,
billed=ledger.billed
)
batch.append(transaction)
if len(batch) >= BATCH_SIZE:
db.bulk_save_objects(batch)
db.commit()
result['success'] += len(batch)
batch = []
except Exception as e:
result['errors'].append(f"Ledger {ledger.file_no}/{ledger.item_no}: {str(e)}")
result['skipped'] += 1
# Save remaining batch
if batch:
db.bulk_save_objects(batch)
db.commit()
result['success'] += len(batch)
logger.info("sync_transactions_complete", **result)
except Exception as e:
db.rollback()
result['errors'].append(f"Fatal error: {str(e)}")
logger.error("sync_transactions_failed", error=str(e))
return result
def sync_payments(db: Session, clear_existing: bool = False) -> Dict[str, Any]:
"""
Sync LegacyPayment → Payment.
Converts legacy payment entries to modern payment records.
"""
result = {'success': 0, 'errors': [], 'skipped': 0}
try:
# Optionally clear existing payment data
if clear_existing:
logger.info("sync_payments_clearing_existing")
db.query(Payment).delete()
db.commit()
# Build lookup map: file_no → case.id
cases = db.query(Case).all()
file_no_to_case = {c.file_no: c.id for c in cases}
logger.info("sync_payments_case_map", case_count=len(file_no_to_case))
# Query all legacy payments
legacy_payments = db.query(LegacyPayment).all()
logger.info("sync_payments_processing", count=len(legacy_payments))
batch = []
for lpay in legacy_payments:
try:
# Find corresponding modern case
if not lpay.file_no:
result['skipped'] += 1
continue
case_id = file_no_to_case.get(lpay.file_no)
if not case_id:
result['errors'].append(f"No case found for file: {lpay.file_no}")
result['skipped'] += 1
continue
# Create modern payment record
payment = Payment(
case_id=case_id,
payment_date=lpay.deposit_date,
payment_type='deposit', # Legacy doesn't distinguish
amount=float(lpay.amount) if lpay.amount else None,
description=lpay.note if lpay.note else lpay.regarding,
check_number=None # Not in legacy PAYMENTS table
)
batch.append(payment)
if len(batch) >= BATCH_SIZE:
db.bulk_save_objects(batch)
db.commit()
result['success'] += len(batch)
batch = []
except Exception as e:
result['errors'].append(f"Payment {lpay.id}: {str(e)}")
result['skipped'] += 1
# Save remaining batch
if batch:
db.bulk_save_objects(batch)
db.commit()
result['success'] += len(batch)
logger.info("sync_payments_complete", **result)
except Exception as e:
db.rollback()
result['errors'].append(f"Fatal error: {str(e)}")
logger.error("sync_payments_failed", error=str(e))
return result
def sync_documents(db: Session, clear_existing: bool = False) -> Dict[str, Any]:
"""
Sync Qdros → Document.
Converts QDRO entries to modern document records.
"""
result = {'success': 0, 'errors': [], 'skipped': 0}
try:
# Optionally clear existing document data
if clear_existing:
logger.info("sync_documents_clearing_existing")
db.query(Document).delete()
db.commit()
# Build lookup map: file_no → case.id
cases = db.query(Case).all()
file_no_to_case = {c.file_no: c.id for c in cases}
logger.info("sync_documents_case_map", case_count=len(file_no_to_case))
# Query all QDRO entries
qdros = db.query(Qdros).all()
logger.info("sync_documents_processing", count=len(qdros))
batch = []
for qdro in qdros:
try:
# Find corresponding modern case
case_id = file_no_to_case.get(qdro.file_no)
if not case_id:
result['errors'].append(f"No case found for file: {qdro.file_no}")
result['skipped'] += 1
continue
# Build description from QDRO fields
desc_parts = []
if qdro.case_type:
desc_parts.append(f"Type: {qdro.case_type}")
if qdro.case_number:
desc_parts.append(f"Case#: {qdro.case_number}")
if qdro.plan_id:
desc_parts.append(f"Plan: {qdro.plan_id}")
description = '; '.join(desc_parts) if desc_parts else None
# Create modern document record
document = Document(
case_id=case_id,
document_type='QDRO',
file_name=qdro.form_name,
file_path=None, # Legacy doesn't have file paths
description=description,
uploaded_date=qdro.draft_out if qdro.draft_out else qdro.judgment_date
)
batch.append(document)
if len(batch) >= BATCH_SIZE:
db.bulk_save_objects(batch)
db.commit()
result['success'] += len(batch)
batch = []
except Exception as e:
result['errors'].append(f"QDRO {qdro.file_no}/{qdro.version}: {str(e)}")
result['skipped'] += 1
# Save remaining batch
if batch:
db.bulk_save_objects(batch)
db.commit()
result['success'] += len(batch)
logger.info("sync_documents_complete", **result)
except Exception as e:
db.rollback()
result['errors'].append(f"Fatal error: {str(e)}")
logger.error("sync_documents_failed", error=str(e))
return result
def sync_all(db: Session, clear_existing: bool = False) -> Dict[str, Any]:
"""
Run all sync functions in proper order.
Order matters due to foreign key dependencies:
1. Clients (no dependencies)
2. Phones (depends on Clients)
3. Cases (depends on Clients)
4. Transactions (depends on Cases)
5. Payments (depends on Cases)
6. Documents (depends on Cases)
"""
results = {
'clients': None,
'phones': None,
'cases': None,
'transactions': None,
'payments': None,
'documents': None
}
logger.info("sync_all_starting", clear_existing=clear_existing)
try:
results['clients'] = sync_clients(db, clear_existing)
logger.info("sync_all_clients_done", success=results['clients']['success'])
results['phones'] = sync_phones(db, clear_existing)
logger.info("sync_all_phones_done", success=results['phones']['success'])
results['cases'] = sync_cases(db, clear_existing)
logger.info("sync_all_cases_done", success=results['cases']['success'])
results['transactions'] = sync_transactions(db, clear_existing)
logger.info("sync_all_transactions_done", success=results['transactions']['success'])
results['payments'] = sync_payments(db, clear_existing)
logger.info("sync_all_payments_done", success=results['payments']['success'])
results['documents'] = sync_documents(db, clear_existing)
logger.info("sync_all_documents_done", success=results['documents']['success'])
logger.info("sync_all_complete")
except Exception as e:
logger.error("sync_all_failed", error=str(e))
raise
return results