Implement comprehensive CSV import system for legacy database migration
- Added 5 new legacy models to app/models.py (FileType, FileNots, RolexV, FVarLkup, RVarLkup) - Created app/import_legacy.py with import functions for all legacy tables: * Reference tables: TRNSTYPE, TRNSLKUP, FOOTERS, FILESTAT, EMPLOYEE, GRUPLKUP, FILETYPE, FVARLKUP, RVARLKUP * Core tables: ROLODEX, PHONE, ROLEX_V, FILES, FILES_R, FILES_V, FILENOTS, LEDGER, DEPOSITS, PAYMENTS * Specialized: PLANINFO, QDROS, PENSIONS and all pension-related tables - Created app/sync_legacy_to_modern.py with sync functions to populate modern models from legacy data - Updated admin routes in app/main.py: * Extended process_csv_import to support all new import types * Added /admin/sync endpoint for syncing legacy to modern models * Updated get_import_type_from_filename to recognize all CSV file patterns - Enhanced app/templates/admin.html with: * Import Order Guide showing recommended import sequence * Sync to Modern Models section with confirmation dialog * Sync results display with detailed per-table statistics * Updated supported file formats list - All import functions use batch processing (500 rows), proper error handling, and structured logging - Sync functions maintain foreign key integrity and skip orphaned records with warnings
This commit is contained in:
528
app/sync_legacy_to_modern.py
Normal file
528
app/sync_legacy_to_modern.py
Normal file
@@ -0,0 +1,528 @@
|
||||
"""
|
||||
Sync functions to populate modern models from legacy database tables.
|
||||
|
||||
This module provides functions to migrate data from the comprehensive legacy
|
||||
schema to the simplified modern application models.
|
||||
"""
|
||||
|
||||
from typing import Dict, Any
|
||||
from sqlalchemy.orm import Session
|
||||
from sqlalchemy.exc import IntegrityError
|
||||
import structlog
|
||||
|
||||
from .models import (
|
||||
# Legacy models
|
||||
Rolodex, LegacyPhone, LegacyFile, Ledger, LegacyPayment, Qdros,
|
||||
# Modern models
|
||||
Client, Phone, Case, Transaction, Payment, Document
|
||||
)
|
||||
|
||||
logger = structlog.get_logger(__name__)
|
||||
|
||||
BATCH_SIZE = 500
|
||||
|
||||
|
||||
def sync_clients(db: Session, clear_existing: bool = False) -> Dict[str, Any]:
|
||||
"""
|
||||
Sync Rolodex → Client.
|
||||
|
||||
Maps legacy rolodex entries to modern simplified client records.
|
||||
"""
|
||||
result = {'success': 0, 'errors': [], 'skipped': 0}
|
||||
|
||||
try:
|
||||
# Optionally clear existing modern client data
|
||||
if clear_existing:
|
||||
logger.info("sync_clients_clearing_existing")
|
||||
db.query(Client).delete()
|
||||
db.commit()
|
||||
|
||||
# Query all rolodex entries
|
||||
rolodex_entries = db.query(Rolodex).all()
|
||||
logger.info("sync_clients_processing", count=len(rolodex_entries))
|
||||
|
||||
batch = []
|
||||
for rolex in rolodex_entries:
|
||||
try:
|
||||
# Build complete address from A1, A2, A3
|
||||
address_parts = [
|
||||
rolex.a1 or '',
|
||||
rolex.a2 or '',
|
||||
rolex.a3 or ''
|
||||
]
|
||||
address = ', '.join(filter(None, address_parts))
|
||||
|
||||
# Create modern client record
|
||||
client = Client(
|
||||
rolodex_id=rolex.id,
|
||||
last_name=rolex.last,
|
||||
first_name=rolex.first,
|
||||
middle_initial=rolex.middle,
|
||||
company=rolex.title, # Using title as company name
|
||||
address=address if address else None,
|
||||
city=rolex.city,
|
||||
state=rolex.abrev,
|
||||
zip_code=rolex.zip
|
||||
)
|
||||
batch.append(client)
|
||||
|
||||
if len(batch) >= BATCH_SIZE:
|
||||
db.bulk_save_objects(batch)
|
||||
db.commit()
|
||||
result['success'] += len(batch)
|
||||
batch = []
|
||||
|
||||
except Exception as e:
|
||||
result['errors'].append(f"Rolodex ID {rolex.id}: {str(e)}")
|
||||
result['skipped'] += 1
|
||||
|
||||
# Save remaining batch
|
||||
if batch:
|
||||
db.bulk_save_objects(batch)
|
||||
db.commit()
|
||||
result['success'] += len(batch)
|
||||
|
||||
logger.info("sync_clients_complete", **result)
|
||||
|
||||
except Exception as e:
|
||||
db.rollback()
|
||||
result['errors'].append(f"Fatal error: {str(e)}")
|
||||
logger.error("sync_clients_failed", error=str(e))
|
||||
|
||||
return result
|
||||
|
||||
|
||||
def sync_phones(db: Session, clear_existing: bool = False) -> Dict[str, Any]:
|
||||
"""
|
||||
Sync LegacyPhone → Phone.
|
||||
|
||||
Links phone numbers to modern client records via rolodex_id.
|
||||
"""
|
||||
result = {'success': 0, 'errors': [], 'skipped': 0}
|
||||
|
||||
try:
|
||||
# Optionally clear existing phone data
|
||||
if clear_existing:
|
||||
logger.info("sync_phones_clearing_existing")
|
||||
db.query(Phone).delete()
|
||||
db.commit()
|
||||
|
||||
# Build lookup map: rolodex_id → client.id
|
||||
clients = db.query(Client).all()
|
||||
rolodex_to_client = {c.rolodex_id: c.id for c in clients}
|
||||
logger.info("sync_phones_client_map", client_count=len(rolodex_to_client))
|
||||
|
||||
# Query all legacy phones
|
||||
legacy_phones = db.query(LegacyPhone).all()
|
||||
logger.info("sync_phones_processing", count=len(legacy_phones))
|
||||
|
||||
batch = []
|
||||
for lphone in legacy_phones:
|
||||
try:
|
||||
# Find corresponding modern client
|
||||
client_id = rolodex_to_client.get(lphone.id)
|
||||
if not client_id:
|
||||
result['errors'].append(f"No client found for rolodex ID: {lphone.id}")
|
||||
result['skipped'] += 1
|
||||
continue
|
||||
|
||||
# Create modern phone record
|
||||
phone = Phone(
|
||||
client_id=client_id,
|
||||
phone_type=lphone.location if lphone.location else 'unknown',
|
||||
phone_number=lphone.phone,
|
||||
extension=None
|
||||
)
|
||||
batch.append(phone)
|
||||
|
||||
if len(batch) >= BATCH_SIZE:
|
||||
db.bulk_save_objects(batch)
|
||||
db.commit()
|
||||
result['success'] += len(batch)
|
||||
batch = []
|
||||
|
||||
except Exception as e:
|
||||
result['errors'].append(f"Phone {lphone.id}/{lphone.phone}: {str(e)}")
|
||||
result['skipped'] += 1
|
||||
|
||||
# Save remaining batch
|
||||
if batch:
|
||||
db.bulk_save_objects(batch)
|
||||
db.commit()
|
||||
result['success'] += len(batch)
|
||||
|
||||
logger.info("sync_phones_complete", **result)
|
||||
|
||||
except Exception as e:
|
||||
db.rollback()
|
||||
result['errors'].append(f"Fatal error: {str(e)}")
|
||||
logger.error("sync_phones_failed", error=str(e))
|
||||
|
||||
return result
|
||||
|
||||
|
||||
def sync_cases(db: Session, clear_existing: bool = False) -> Dict[str, Any]:
|
||||
"""
|
||||
Sync LegacyFile → Case.
|
||||
|
||||
Converts legacy file cabinet entries to modern case records.
|
||||
"""
|
||||
result = {'success': 0, 'errors': [], 'skipped': 0}
|
||||
|
||||
try:
|
||||
# Optionally clear existing case data
|
||||
if clear_existing:
|
||||
logger.info("sync_cases_clearing_existing")
|
||||
db.query(Case).delete()
|
||||
db.commit()
|
||||
|
||||
# Build lookup map: rolodex_id → client.id
|
||||
clients = db.query(Client).all()
|
||||
rolodex_to_client = {c.rolodex_id: c.id for c in clients}
|
||||
logger.info("sync_cases_client_map", client_count=len(rolodex_to_client))
|
||||
|
||||
# Query all legacy files
|
||||
legacy_files = db.query(LegacyFile).all()
|
||||
logger.info("sync_cases_processing", count=len(legacy_files))
|
||||
|
||||
batch = []
|
||||
for lfile in legacy_files:
|
||||
try:
|
||||
# Find corresponding modern client
|
||||
client_id = rolodex_to_client.get(lfile.id)
|
||||
if not client_id:
|
||||
result['errors'].append(f"No client found for rolodex ID: {lfile.id} (file {lfile.file_no})")
|
||||
result['skipped'] += 1
|
||||
continue
|
||||
|
||||
# Map legacy status to modern status
|
||||
status = 'active'
|
||||
if lfile.closed:
|
||||
status = 'closed'
|
||||
elif lfile.status and 'inactive' in lfile.status.lower():
|
||||
status = 'inactive'
|
||||
|
||||
# Create modern case record
|
||||
case = Case(
|
||||
file_no=lfile.file_no,
|
||||
client_id=client_id,
|
||||
status=status,
|
||||
case_type=lfile.file_type,
|
||||
description=lfile.regarding,
|
||||
open_date=lfile.opened,
|
||||
close_date=lfile.closed
|
||||
)
|
||||
batch.append(case)
|
||||
|
||||
if len(batch) >= BATCH_SIZE:
|
||||
db.bulk_save_objects(batch)
|
||||
db.commit()
|
||||
result['success'] += len(batch)
|
||||
batch = []
|
||||
|
||||
except Exception as e:
|
||||
result['errors'].append(f"File {lfile.file_no}: {str(e)}")
|
||||
result['skipped'] += 1
|
||||
|
||||
# Save remaining batch
|
||||
if batch:
|
||||
db.bulk_save_objects(batch)
|
||||
db.commit()
|
||||
result['success'] += len(batch)
|
||||
|
||||
logger.info("sync_cases_complete", **result)
|
||||
|
||||
except Exception as e:
|
||||
db.rollback()
|
||||
result['errors'].append(f"Fatal error: {str(e)}")
|
||||
logger.error("sync_cases_failed", error=str(e))
|
||||
|
||||
return result
|
||||
|
||||
|
||||
def sync_transactions(db: Session, clear_existing: bool = False) -> Dict[str, Any]:
|
||||
"""
|
||||
Sync Ledger → Transaction.
|
||||
|
||||
Converts legacy ledger entries to modern transaction records.
|
||||
"""
|
||||
result = {'success': 0, 'errors': [], 'skipped': 0}
|
||||
|
||||
try:
|
||||
# Optionally clear existing transaction data
|
||||
if clear_existing:
|
||||
logger.info("sync_transactions_clearing_existing")
|
||||
db.query(Transaction).delete()
|
||||
db.commit()
|
||||
|
||||
# Build lookup map: file_no → case.id
|
||||
cases = db.query(Case).all()
|
||||
file_no_to_case = {c.file_no: c.id for c in cases}
|
||||
logger.info("sync_transactions_case_map", case_count=len(file_no_to_case))
|
||||
|
||||
# Query all ledger entries
|
||||
ledger_entries = db.query(Ledger).all()
|
||||
logger.info("sync_transactions_processing", count=len(ledger_entries))
|
||||
|
||||
batch = []
|
||||
for ledger in ledger_entries:
|
||||
try:
|
||||
# Find corresponding modern case
|
||||
case_id = file_no_to_case.get(ledger.file_no)
|
||||
if not case_id:
|
||||
result['errors'].append(f"No case found for file: {ledger.file_no}")
|
||||
result['skipped'] += 1
|
||||
continue
|
||||
|
||||
# Create modern transaction record with all ledger fields
|
||||
transaction = Transaction(
|
||||
case_id=case_id,
|
||||
transaction_date=ledger.date,
|
||||
transaction_type=ledger.t_type,
|
||||
amount=float(ledger.amount) if ledger.amount else None,
|
||||
description=ledger.note,
|
||||
reference=str(ledger.item_no) if ledger.item_no else None,
|
||||
# Ledger-specific fields
|
||||
item_no=ledger.item_no,
|
||||
employee_number=ledger.empl_num,
|
||||
t_code=ledger.t_code,
|
||||
t_type_l=ledger.t_type_l,
|
||||
quantity=float(ledger.quantity) if ledger.quantity else None,
|
||||
rate=float(ledger.rate) if ledger.rate else None,
|
||||
billed=ledger.billed
|
||||
)
|
||||
batch.append(transaction)
|
||||
|
||||
if len(batch) >= BATCH_SIZE:
|
||||
db.bulk_save_objects(batch)
|
||||
db.commit()
|
||||
result['success'] += len(batch)
|
||||
batch = []
|
||||
|
||||
except Exception as e:
|
||||
result['errors'].append(f"Ledger {ledger.file_no}/{ledger.item_no}: {str(e)}")
|
||||
result['skipped'] += 1
|
||||
|
||||
# Save remaining batch
|
||||
if batch:
|
||||
db.bulk_save_objects(batch)
|
||||
db.commit()
|
||||
result['success'] += len(batch)
|
||||
|
||||
logger.info("sync_transactions_complete", **result)
|
||||
|
||||
except Exception as e:
|
||||
db.rollback()
|
||||
result['errors'].append(f"Fatal error: {str(e)}")
|
||||
logger.error("sync_transactions_failed", error=str(e))
|
||||
|
||||
return result
|
||||
|
||||
|
||||
def sync_payments(db: Session, clear_existing: bool = False) -> Dict[str, Any]:
|
||||
"""
|
||||
Sync LegacyPayment → Payment.
|
||||
|
||||
Converts legacy payment entries to modern payment records.
|
||||
"""
|
||||
result = {'success': 0, 'errors': [], 'skipped': 0}
|
||||
|
||||
try:
|
||||
# Optionally clear existing payment data
|
||||
if clear_existing:
|
||||
logger.info("sync_payments_clearing_existing")
|
||||
db.query(Payment).delete()
|
||||
db.commit()
|
||||
|
||||
# Build lookup map: file_no → case.id
|
||||
cases = db.query(Case).all()
|
||||
file_no_to_case = {c.file_no: c.id for c in cases}
|
||||
logger.info("sync_payments_case_map", case_count=len(file_no_to_case))
|
||||
|
||||
# Query all legacy payments
|
||||
legacy_payments = db.query(LegacyPayment).all()
|
||||
logger.info("sync_payments_processing", count=len(legacy_payments))
|
||||
|
||||
batch = []
|
||||
for lpay in legacy_payments:
|
||||
try:
|
||||
# Find corresponding modern case
|
||||
if not lpay.file_no:
|
||||
result['skipped'] += 1
|
||||
continue
|
||||
|
||||
case_id = file_no_to_case.get(lpay.file_no)
|
||||
if not case_id:
|
||||
result['errors'].append(f"No case found for file: {lpay.file_no}")
|
||||
result['skipped'] += 1
|
||||
continue
|
||||
|
||||
# Create modern payment record
|
||||
payment = Payment(
|
||||
case_id=case_id,
|
||||
payment_date=lpay.deposit_date,
|
||||
payment_type='deposit', # Legacy doesn't distinguish
|
||||
amount=float(lpay.amount) if lpay.amount else None,
|
||||
description=lpay.note if lpay.note else lpay.regarding,
|
||||
check_number=None # Not in legacy PAYMENTS table
|
||||
)
|
||||
batch.append(payment)
|
||||
|
||||
if len(batch) >= BATCH_SIZE:
|
||||
db.bulk_save_objects(batch)
|
||||
db.commit()
|
||||
result['success'] += len(batch)
|
||||
batch = []
|
||||
|
||||
except Exception as e:
|
||||
result['errors'].append(f"Payment {lpay.id}: {str(e)}")
|
||||
result['skipped'] += 1
|
||||
|
||||
# Save remaining batch
|
||||
if batch:
|
||||
db.bulk_save_objects(batch)
|
||||
db.commit()
|
||||
result['success'] += len(batch)
|
||||
|
||||
logger.info("sync_payments_complete", **result)
|
||||
|
||||
except Exception as e:
|
||||
db.rollback()
|
||||
result['errors'].append(f"Fatal error: {str(e)}")
|
||||
logger.error("sync_payments_failed", error=str(e))
|
||||
|
||||
return result
|
||||
|
||||
|
||||
def sync_documents(db: Session, clear_existing: bool = False) -> Dict[str, Any]:
|
||||
"""
|
||||
Sync Qdros → Document.
|
||||
|
||||
Converts QDRO entries to modern document records.
|
||||
"""
|
||||
result = {'success': 0, 'errors': [], 'skipped': 0}
|
||||
|
||||
try:
|
||||
# Optionally clear existing document data
|
||||
if clear_existing:
|
||||
logger.info("sync_documents_clearing_existing")
|
||||
db.query(Document).delete()
|
||||
db.commit()
|
||||
|
||||
# Build lookup map: file_no → case.id
|
||||
cases = db.query(Case).all()
|
||||
file_no_to_case = {c.file_no: c.id for c in cases}
|
||||
logger.info("sync_documents_case_map", case_count=len(file_no_to_case))
|
||||
|
||||
# Query all QDRO entries
|
||||
qdros = db.query(Qdros).all()
|
||||
logger.info("sync_documents_processing", count=len(qdros))
|
||||
|
||||
batch = []
|
||||
for qdro in qdros:
|
||||
try:
|
||||
# Find corresponding modern case
|
||||
case_id = file_no_to_case.get(qdro.file_no)
|
||||
if not case_id:
|
||||
result['errors'].append(f"No case found for file: {qdro.file_no}")
|
||||
result['skipped'] += 1
|
||||
continue
|
||||
|
||||
# Build description from QDRO fields
|
||||
desc_parts = []
|
||||
if qdro.case_type:
|
||||
desc_parts.append(f"Type: {qdro.case_type}")
|
||||
if qdro.case_number:
|
||||
desc_parts.append(f"Case#: {qdro.case_number}")
|
||||
if qdro.plan_id:
|
||||
desc_parts.append(f"Plan: {qdro.plan_id}")
|
||||
|
||||
description = '; '.join(desc_parts) if desc_parts else None
|
||||
|
||||
# Create modern document record
|
||||
document = Document(
|
||||
case_id=case_id,
|
||||
document_type='QDRO',
|
||||
file_name=qdro.form_name,
|
||||
file_path=None, # Legacy doesn't have file paths
|
||||
description=description,
|
||||
uploaded_date=qdro.draft_out if qdro.draft_out else qdro.judgment_date
|
||||
)
|
||||
batch.append(document)
|
||||
|
||||
if len(batch) >= BATCH_SIZE:
|
||||
db.bulk_save_objects(batch)
|
||||
db.commit()
|
||||
result['success'] += len(batch)
|
||||
batch = []
|
||||
|
||||
except Exception as e:
|
||||
result['errors'].append(f"QDRO {qdro.file_no}/{qdro.version}: {str(e)}")
|
||||
result['skipped'] += 1
|
||||
|
||||
# Save remaining batch
|
||||
if batch:
|
||||
db.bulk_save_objects(batch)
|
||||
db.commit()
|
||||
result['success'] += len(batch)
|
||||
|
||||
logger.info("sync_documents_complete", **result)
|
||||
|
||||
except Exception as e:
|
||||
db.rollback()
|
||||
result['errors'].append(f"Fatal error: {str(e)}")
|
||||
logger.error("sync_documents_failed", error=str(e))
|
||||
|
||||
return result
|
||||
|
||||
|
||||
def sync_all(db: Session, clear_existing: bool = False) -> Dict[str, Any]:
|
||||
"""
|
||||
Run all sync functions in proper order.
|
||||
|
||||
Order matters due to foreign key dependencies:
|
||||
1. Clients (no dependencies)
|
||||
2. Phones (depends on Clients)
|
||||
3. Cases (depends on Clients)
|
||||
4. Transactions (depends on Cases)
|
||||
5. Payments (depends on Cases)
|
||||
6. Documents (depends on Cases)
|
||||
"""
|
||||
results = {
|
||||
'clients': None,
|
||||
'phones': None,
|
||||
'cases': None,
|
||||
'transactions': None,
|
||||
'payments': None,
|
||||
'documents': None
|
||||
}
|
||||
|
||||
logger.info("sync_all_starting", clear_existing=clear_existing)
|
||||
|
||||
try:
|
||||
results['clients'] = sync_clients(db, clear_existing)
|
||||
logger.info("sync_all_clients_done", success=results['clients']['success'])
|
||||
|
||||
results['phones'] = sync_phones(db, clear_existing)
|
||||
logger.info("sync_all_phones_done", success=results['phones']['success'])
|
||||
|
||||
results['cases'] = sync_cases(db, clear_existing)
|
||||
logger.info("sync_all_cases_done", success=results['cases']['success'])
|
||||
|
||||
results['transactions'] = sync_transactions(db, clear_existing)
|
||||
logger.info("sync_all_transactions_done", success=results['transactions']['success'])
|
||||
|
||||
results['payments'] = sync_payments(db, clear_existing)
|
||||
logger.info("sync_all_payments_done", success=results['payments']['success'])
|
||||
|
||||
results['documents'] = sync_documents(db, clear_existing)
|
||||
logger.info("sync_all_documents_done", success=results['documents']['success'])
|
||||
|
||||
logger.info("sync_all_complete")
|
||||
|
||||
except Exception as e:
|
||||
logger.error("sync_all_failed", error=str(e))
|
||||
raise
|
||||
|
||||
return results
|
||||
|
||||
Reference in New Issue
Block a user