Add duplicate handling for pension import functions

- Added duplicate detection and handling for pensions, pension_death, pension_separate, and pension_results imports
- Tracks (file_no, version) composite keys in-memory during import
- Checks database for existing records before insert
- Handles IntegrityError gracefully with fallback to row-by-row insertion
- Returns 'skipped' count in import results
- Prevents transaction rollback cascades that previously caused all subsequent rows to fail
- Consistent with existing rolodex duplicate handling pattern
This commit is contained in:
HotSwapp
2025-10-13 09:35:35 -05:00
parent 69f1043be3
commit c3bbf927a5

View File

@@ -1653,13 +1653,16 @@ def import_qdros(db: Session, file_path: str) -> Dict[str, Any]:
def import_pensions(db: Session, file_path: str) -> Dict[str, Any]: def import_pensions(db: Session, file_path: str) -> Dict[str, Any]:
"""Import PENSIONS.csv → Pensions model.""" """Import PENSIONS.csv → Pensions model."""
result = {'success': 0, 'errors': [], 'total_rows': 0} result = {'success': 0, 'errors': [], 'total_rows': 0, 'skipped': 0}
try: try:
f, encoding = open_text_with_fallbacks(file_path) f, encoding = open_text_with_fallbacks(file_path)
reader = csv.DictReader(f) reader = csv.DictReader(f)
# Track (file_no, version) combinations we've seen in this import to handle duplicates
seen_in_import = set()
batch = [] batch = []
for row_num, row in enumerate(reader, start=2): for row_num, row in enumerate(reader, start=2):
result['total_rows'] += 1 result['total_rows'] += 1
@@ -1670,6 +1673,23 @@ def import_pensions(db: Session, file_path: str) -> Dict[str, Any]:
if not file_no or not version: if not file_no or not version:
continue continue
# Create composite key for duplicate tracking
composite_key = (file_no, version)
# Skip if we've already processed this combination in current import
if composite_key in seen_in_import:
result['skipped'] += 1
continue
# Skip if it already exists in database
if db.query(Pensions).filter(
Pensions.file_no == file_no,
Pensions.version == version
).first():
result['skipped'] += 1
seen_in_import.add(composite_key)
continue
record = Pensions( record = Pensions(
file_no=file_no, file_no=file_no,
version=version, version=version,
@@ -1694,20 +1714,47 @@ def import_pensions(db: Session, file_path: str) -> Dict[str, Any]:
tax_rate=parse_decimal(row.get('Tax_Rate')) tax_rate=parse_decimal(row.get('Tax_Rate'))
) )
batch.append(record) batch.append(record)
seen_in_import.add(composite_key)
if len(batch) >= BATCH_SIZE: if len(batch) >= BATCH_SIZE:
db.bulk_save_objects(batch) try:
db.commit() db.bulk_save_objects(batch)
result['success'] += len(batch) db.commit()
batch = [] result['success'] += len(batch)
batch = []
except IntegrityError as ie:
db.rollback()
# Handle any remaining duplicates by inserting one at a time
for record in batch:
try:
db.add(record)
db.commit()
result['success'] += 1
except IntegrityError:
db.rollback()
result['skipped'] += 1
batch = []
except Exception as e: except Exception as e:
result['errors'].append(f"Row {row_num}: {str(e)}") result['errors'].append(f"Row {row_num}: {str(e)}")
# Save remaining batch
if batch: if batch:
db.bulk_save_objects(batch) try:
db.commit() db.bulk_save_objects(batch)
result['success'] += len(batch) db.commit()
result['success'] += len(batch)
except IntegrityError:
db.rollback()
# Handle any remaining duplicates by inserting one at a time
for record in batch:
try:
db.add(record)
db.commit()
result['success'] += 1
except IntegrityError:
db.rollback()
result['skipped'] += 1
f.close() f.close()
logger.info("import_pensions_complete", **result) logger.info("import_pensions_complete", **result)
@@ -1779,13 +1826,16 @@ def import_pension_marriage(db: Session, file_path: str) -> Dict[str, Any]:
def import_pension_death(db: Session, file_path: str) -> Dict[str, Any]: def import_pension_death(db: Session, file_path: str) -> Dict[str, Any]:
"""Import Pensions/DEATH.csv → PensionDeath model.""" """Import Pensions/DEATH.csv → PensionDeath model."""
result = {'success': 0, 'errors': [], 'total_rows': 0} result = {'success': 0, 'errors': [], 'total_rows': 0, 'skipped': 0}
try: try:
f, encoding = open_text_with_fallbacks(file_path) f, encoding = open_text_with_fallbacks(file_path)
reader = csv.DictReader(f) reader = csv.DictReader(f)
# Track (file_no, version) combinations we've seen in this import to handle duplicates
seen_in_import = set()
batch = [] batch = []
for row_num, row in enumerate(reader, start=2): for row_num, row in enumerate(reader, start=2):
result['total_rows'] += 1 result['total_rows'] += 1
@@ -1796,6 +1846,23 @@ def import_pension_death(db: Session, file_path: str) -> Dict[str, Any]:
if not file_no or not version: if not file_no or not version:
continue continue
# Create composite key for duplicate tracking
composite_key = (file_no, version)
# Skip if we've already processed this combination in current import
if composite_key in seen_in_import:
result['skipped'] += 1
continue
# Skip if it already exists in database
if db.query(PensionDeath).filter(
PensionDeath.file_no == file_no,
PensionDeath.version == version
).first():
result['skipped'] += 1
seen_in_import.add(composite_key)
continue
record = PensionDeath( record = PensionDeath(
file_no=file_no, file_no=file_no,
version=version, version=version,
@@ -1807,20 +1874,47 @@ def import_pension_death(db: Session, file_path: str) -> Dict[str, Any]:
disc2=parse_decimal(row.get('Disc2')) disc2=parse_decimal(row.get('Disc2'))
) )
batch.append(record) batch.append(record)
seen_in_import.add(composite_key)
if len(batch) >= BATCH_SIZE: if len(batch) >= BATCH_SIZE:
db.bulk_save_objects(batch) try:
db.commit() db.bulk_save_objects(batch)
result['success'] += len(batch) db.commit()
batch = [] result['success'] += len(batch)
batch = []
except IntegrityError as ie:
db.rollback()
# Handle any remaining duplicates by inserting one at a time
for record in batch:
try:
db.add(record)
db.commit()
result['success'] += 1
except IntegrityError:
db.rollback()
result['skipped'] += 1
batch = []
except Exception as e: except Exception as e:
result['errors'].append(f"Row {row_num}: {str(e)}") result['errors'].append(f"Row {row_num}: {str(e)}")
# Save remaining batch
if batch: if batch:
db.bulk_save_objects(batch) try:
db.commit() db.bulk_save_objects(batch)
result['success'] += len(batch) db.commit()
result['success'] += len(batch)
except IntegrityError:
db.rollback()
# Handle any remaining duplicates by inserting one at a time
for record in batch:
try:
db.add(record)
db.commit()
result['success'] += 1
except IntegrityError:
db.rollback()
result['skipped'] += 1
f.close() f.close()
logger.info("import_pension_death_complete", **result) logger.info("import_pension_death_complete", **result)
@@ -1887,13 +1981,16 @@ def import_pension_schedule(db: Session, file_path: str) -> Dict[str, Any]:
def import_pension_separate(db: Session, file_path: str) -> Dict[str, Any]: def import_pension_separate(db: Session, file_path: str) -> Dict[str, Any]:
"""Import Pensions/SEPARATE.csv → PensionSeparate model.""" """Import Pensions/SEPARATE.csv → PensionSeparate model."""
result = {'success': 0, 'errors': [], 'total_rows': 0} result = {'success': 0, 'errors': [], 'total_rows': 0, 'skipped': 0}
try: try:
f, encoding = open_text_with_fallbacks(file_path) f, encoding = open_text_with_fallbacks(file_path)
reader = csv.DictReader(f) reader = csv.DictReader(f)
# Track (file_no, version) combinations we've seen in this import to handle duplicates
seen_in_import = set()
batch = [] batch = []
for row_num, row in enumerate(reader, start=2): for row_num, row in enumerate(reader, start=2):
result['total_rows'] += 1 result['total_rows'] += 1
@@ -1904,26 +2001,70 @@ def import_pension_separate(db: Session, file_path: str) -> Dict[str, Any]:
if not file_no or not version: if not file_no or not version:
continue continue
# Create composite key for duplicate tracking
composite_key = (file_no, version)
# Skip if we've already processed this combination in current import
if composite_key in seen_in_import:
result['skipped'] += 1
continue
# Skip if it already exists in database
if db.query(PensionSeparate).filter(
PensionSeparate.file_no == file_no,
PensionSeparate.version == version
).first():
result['skipped'] += 1
seen_in_import.add(composite_key)
continue
record = PensionSeparate( record = PensionSeparate(
file_no=file_no, file_no=file_no,
version=version, version=version,
separation_rate=parse_decimal(row.get('Separation_Rate')) separation_rate=parse_decimal(row.get('Separation_Rate'))
) )
batch.append(record) batch.append(record)
seen_in_import.add(composite_key)
if len(batch) >= BATCH_SIZE: if len(batch) >= BATCH_SIZE:
db.bulk_save_objects(batch) try:
db.commit() db.bulk_save_objects(batch)
result['success'] += len(batch) db.commit()
batch = [] result['success'] += len(batch)
batch = []
except IntegrityError as ie:
db.rollback()
# Handle any remaining duplicates by inserting one at a time
for record in batch:
try:
db.add(record)
db.commit()
result['success'] += 1
except IntegrityError:
db.rollback()
result['skipped'] += 1
batch = []
except Exception as e: except Exception as e:
result['errors'].append(f"Row {row_num}: {str(e)}") result['errors'].append(f"Row {row_num}: {str(e)}")
# Save remaining batch
if batch: if batch:
db.bulk_save_objects(batch) try:
db.commit() db.bulk_save_objects(batch)
result['success'] += len(batch) db.commit()
result['success'] += len(batch)
except IntegrityError:
db.rollback()
# Handle any remaining duplicates by inserting one at a time
for record in batch:
try:
db.add(record)
db.commit()
result['success'] += 1
except IntegrityError:
db.rollback()
result['skipped'] += 1
f.close() f.close()
logger.info("import_pension_separate_complete", **result) logger.info("import_pension_separate_complete", **result)
@@ -1938,13 +2079,16 @@ def import_pension_separate(db: Session, file_path: str) -> Dict[str, Any]:
def import_pension_results(db: Session, file_path: str) -> Dict[str, Any]: def import_pension_results(db: Session, file_path: str) -> Dict[str, Any]:
"""Import Pensions/RESULTS.csv → PensionResults model.""" """Import Pensions/RESULTS.csv → PensionResults model."""
result = {'success': 0, 'errors': [], 'total_rows': 0} result = {'success': 0, 'errors': [], 'total_rows': 0, 'skipped': 0}
try: try:
f, encoding = open_text_with_fallbacks(file_path) f, encoding = open_text_with_fallbacks(file_path)
reader = csv.DictReader(f) reader = csv.DictReader(f)
# Track (file_no, version) combinations we've seen in this import to handle duplicates
seen_in_import = set()
batch = [] batch = []
for row_num, row in enumerate(reader, start=2): for row_num, row in enumerate(reader, start=2):
result['total_rows'] += 1 result['total_rows'] += 1
@@ -1958,6 +2102,23 @@ def import_pension_results(db: Session, file_path: str) -> Dict[str, Any]:
if not file_no or not version: if not file_no or not version:
continue continue
# Create composite key for duplicate tracking
composite_key = (file_no, version)
# Skip if we've already processed this combination in current import
if composite_key in seen_in_import:
result['skipped'] += 1
continue
# Skip if it already exists in database
if db.query(PensionResults).filter(
PensionResults.file_no == file_no,
PensionResults.version == version
).first():
result['skipped'] += 1
seen_in_import.add(composite_key)
continue
record = PensionResults( record = PensionResults(
file_no=file_no, file_no=file_no,
version=version, version=version,
@@ -1989,20 +2150,47 @@ def import_pension_results(db: Session, file_path: str) -> Dict[str, Any]:
marr_amt=parse_decimal(row.get('Marr_Amt')) marr_amt=parse_decimal(row.get('Marr_Amt'))
) )
batch.append(record) batch.append(record)
seen_in_import.add(composite_key)
if len(batch) >= BATCH_SIZE: if len(batch) >= BATCH_SIZE:
db.bulk_save_objects(batch) try:
db.commit() db.bulk_save_objects(batch)
result['success'] += len(batch) db.commit()
batch = [] result['success'] += len(batch)
batch = []
except IntegrityError as ie:
db.rollback()
# Handle any remaining duplicates by inserting one at a time
for record in batch:
try:
db.add(record)
db.commit()
result['success'] += 1
except IntegrityError:
db.rollback()
result['skipped'] += 1
batch = []
except Exception as e: except Exception as e:
result['errors'].append(f"Row {row_num}: {str(e)}") result['errors'].append(f"Row {row_num}: {str(e)}")
# Save remaining batch
if batch: if batch:
db.bulk_save_objects(batch) try:
db.commit() db.bulk_save_objects(batch)
result['success'] += len(batch) db.commit()
result['success'] += len(batch)
except IntegrityError:
db.rollback()
# Handle any remaining duplicates by inserting one at a time
for record in batch:
try:
db.add(record)
db.commit()
result['success'] += 1
except IntegrityError:
db.rollback()
result['skipped'] += 1
f.close() f.close()
logger.info("import_pension_results_complete", **result) logger.info("import_pension_results_complete", **result)