From c3bbf927a557d296781836f7112f52fa3bf8720c Mon Sep 17 00:00:00 2001 From: HotSwapp <47397945+HotSwapp@users.noreply.github.com> Date: Mon, 13 Oct 2025 09:35:35 -0500 Subject: [PATCH] Add duplicate handling for pension import functions - Added duplicate detection and handling for pensions, pension_death, pension_separate, and pension_results imports - Tracks (file_no, version) composite keys in-memory during import - Checks database for existing records before insert - Handles IntegrityError gracefully with fallback to row-by-row insertion - Returns 'skipped' count in import results - Prevents transaction rollback cascades that previously caused all subsequent rows to fail - Consistent with existing rolodex duplicate handling pattern --- app/import_legacy.py | 252 +++++++++++++++++++++++++++++++++++++------ 1 file changed, 220 insertions(+), 32 deletions(-) diff --git a/app/import_legacy.py b/app/import_legacy.py index 0f8806b..73255e1 100644 --- a/app/import_legacy.py +++ b/app/import_legacy.py @@ -1653,13 +1653,16 @@ def import_qdros(db: Session, file_path: str) -> Dict[str, Any]: def import_pensions(db: Session, file_path: str) -> Dict[str, Any]: """Import PENSIONS.csv → Pensions model.""" - result = {'success': 0, 'errors': [], 'total_rows': 0} + result = {'success': 0, 'errors': [], 'total_rows': 0, 'skipped': 0} try: f, encoding = open_text_with_fallbacks(file_path) reader = csv.DictReader(f) + # Track (file_no, version) combinations we've seen in this import to handle duplicates + seen_in_import = set() batch = [] + for row_num, row in enumerate(reader, start=2): result['total_rows'] += 1 @@ -1670,6 +1673,23 @@ def import_pensions(db: Session, file_path: str) -> Dict[str, Any]: if not file_no or not version: continue + # Create composite key for duplicate tracking + composite_key = (file_no, version) + + # Skip if we've already processed this combination in current import + if composite_key in seen_in_import: + result['skipped'] += 1 + continue + + # Skip if it already exists in database + if db.query(Pensions).filter( + Pensions.file_no == file_no, + Pensions.version == version + ).first(): + result['skipped'] += 1 + seen_in_import.add(composite_key) + continue + record = Pensions( file_no=file_no, version=version, @@ -1694,20 +1714,47 @@ def import_pensions(db: Session, file_path: str) -> Dict[str, Any]: tax_rate=parse_decimal(row.get('Tax_Rate')) ) batch.append(record) + seen_in_import.add(composite_key) if len(batch) >= BATCH_SIZE: - db.bulk_save_objects(batch) - db.commit() - result['success'] += len(batch) - batch = [] + try: + db.bulk_save_objects(batch) + db.commit() + result['success'] += len(batch) + batch = [] + except IntegrityError as ie: + db.rollback() + # Handle any remaining duplicates by inserting one at a time + for record in batch: + try: + db.add(record) + db.commit() + result['success'] += 1 + except IntegrityError: + db.rollback() + result['skipped'] += 1 + batch = [] except Exception as e: result['errors'].append(f"Row {row_num}: {str(e)}") + # Save remaining batch if batch: - db.bulk_save_objects(batch) - db.commit() - result['success'] += len(batch) + try: + db.bulk_save_objects(batch) + db.commit() + result['success'] += len(batch) + except IntegrityError: + db.rollback() + # Handle any remaining duplicates by inserting one at a time + for record in batch: + try: + db.add(record) + db.commit() + result['success'] += 1 + except IntegrityError: + db.rollback() + result['skipped'] += 1 f.close() logger.info("import_pensions_complete", **result) @@ -1779,13 +1826,16 @@ def import_pension_marriage(db: Session, file_path: str) -> Dict[str, Any]: def import_pension_death(db: Session, file_path: str) -> Dict[str, Any]: """Import Pensions/DEATH.csv → PensionDeath model.""" - result = {'success': 0, 'errors': [], 'total_rows': 0} + result = {'success': 0, 'errors': [], 'total_rows': 0, 'skipped': 0} try: f, encoding = open_text_with_fallbacks(file_path) reader = csv.DictReader(f) + # Track (file_no, version) combinations we've seen in this import to handle duplicates + seen_in_import = set() batch = [] + for row_num, row in enumerate(reader, start=2): result['total_rows'] += 1 @@ -1796,6 +1846,23 @@ def import_pension_death(db: Session, file_path: str) -> Dict[str, Any]: if not file_no or not version: continue + # Create composite key for duplicate tracking + composite_key = (file_no, version) + + # Skip if we've already processed this combination in current import + if composite_key in seen_in_import: + result['skipped'] += 1 + continue + + # Skip if it already exists in database + if db.query(PensionDeath).filter( + PensionDeath.file_no == file_no, + PensionDeath.version == version + ).first(): + result['skipped'] += 1 + seen_in_import.add(composite_key) + continue + record = PensionDeath( file_no=file_no, version=version, @@ -1807,20 +1874,47 @@ def import_pension_death(db: Session, file_path: str) -> Dict[str, Any]: disc2=parse_decimal(row.get('Disc2')) ) batch.append(record) + seen_in_import.add(composite_key) if len(batch) >= BATCH_SIZE: - db.bulk_save_objects(batch) - db.commit() - result['success'] += len(batch) - batch = [] + try: + db.bulk_save_objects(batch) + db.commit() + result['success'] += len(batch) + batch = [] + except IntegrityError as ie: + db.rollback() + # Handle any remaining duplicates by inserting one at a time + for record in batch: + try: + db.add(record) + db.commit() + result['success'] += 1 + except IntegrityError: + db.rollback() + result['skipped'] += 1 + batch = [] except Exception as e: result['errors'].append(f"Row {row_num}: {str(e)}") + # Save remaining batch if batch: - db.bulk_save_objects(batch) - db.commit() - result['success'] += len(batch) + try: + db.bulk_save_objects(batch) + db.commit() + result['success'] += len(batch) + except IntegrityError: + db.rollback() + # Handle any remaining duplicates by inserting one at a time + for record in batch: + try: + db.add(record) + db.commit() + result['success'] += 1 + except IntegrityError: + db.rollback() + result['skipped'] += 1 f.close() logger.info("import_pension_death_complete", **result) @@ -1887,13 +1981,16 @@ def import_pension_schedule(db: Session, file_path: str) -> Dict[str, Any]: def import_pension_separate(db: Session, file_path: str) -> Dict[str, Any]: """Import Pensions/SEPARATE.csv → PensionSeparate model.""" - result = {'success': 0, 'errors': [], 'total_rows': 0} + result = {'success': 0, 'errors': [], 'total_rows': 0, 'skipped': 0} try: f, encoding = open_text_with_fallbacks(file_path) reader = csv.DictReader(f) + # Track (file_no, version) combinations we've seen in this import to handle duplicates + seen_in_import = set() batch = [] + for row_num, row in enumerate(reader, start=2): result['total_rows'] += 1 @@ -1904,26 +2001,70 @@ def import_pension_separate(db: Session, file_path: str) -> Dict[str, Any]: if not file_no or not version: continue + # Create composite key for duplicate tracking + composite_key = (file_no, version) + + # Skip if we've already processed this combination in current import + if composite_key in seen_in_import: + result['skipped'] += 1 + continue + + # Skip if it already exists in database + if db.query(PensionSeparate).filter( + PensionSeparate.file_no == file_no, + PensionSeparate.version == version + ).first(): + result['skipped'] += 1 + seen_in_import.add(composite_key) + continue + record = PensionSeparate( file_no=file_no, version=version, separation_rate=parse_decimal(row.get('Separation_Rate')) ) batch.append(record) + seen_in_import.add(composite_key) if len(batch) >= BATCH_SIZE: - db.bulk_save_objects(batch) - db.commit() - result['success'] += len(batch) - batch = [] + try: + db.bulk_save_objects(batch) + db.commit() + result['success'] += len(batch) + batch = [] + except IntegrityError as ie: + db.rollback() + # Handle any remaining duplicates by inserting one at a time + for record in batch: + try: + db.add(record) + db.commit() + result['success'] += 1 + except IntegrityError: + db.rollback() + result['skipped'] += 1 + batch = [] except Exception as e: result['errors'].append(f"Row {row_num}: {str(e)}") + # Save remaining batch if batch: - db.bulk_save_objects(batch) - db.commit() - result['success'] += len(batch) + try: + db.bulk_save_objects(batch) + db.commit() + result['success'] += len(batch) + except IntegrityError: + db.rollback() + # Handle any remaining duplicates by inserting one at a time + for record in batch: + try: + db.add(record) + db.commit() + result['success'] += 1 + except IntegrityError: + db.rollback() + result['skipped'] += 1 f.close() logger.info("import_pension_separate_complete", **result) @@ -1938,13 +2079,16 @@ def import_pension_separate(db: Session, file_path: str) -> Dict[str, Any]: def import_pension_results(db: Session, file_path: str) -> Dict[str, Any]: """Import Pensions/RESULTS.csv → PensionResults model.""" - result = {'success': 0, 'errors': [], 'total_rows': 0} + result = {'success': 0, 'errors': [], 'total_rows': 0, 'skipped': 0} try: f, encoding = open_text_with_fallbacks(file_path) reader = csv.DictReader(f) + # Track (file_no, version) combinations we've seen in this import to handle duplicates + seen_in_import = set() batch = [] + for row_num, row in enumerate(reader, start=2): result['total_rows'] += 1 @@ -1958,6 +2102,23 @@ def import_pension_results(db: Session, file_path: str) -> Dict[str, Any]: if not file_no or not version: continue + # Create composite key for duplicate tracking + composite_key = (file_no, version) + + # Skip if we've already processed this combination in current import + if composite_key in seen_in_import: + result['skipped'] += 1 + continue + + # Skip if it already exists in database + if db.query(PensionResults).filter( + PensionResults.file_no == file_no, + PensionResults.version == version + ).first(): + result['skipped'] += 1 + seen_in_import.add(composite_key) + continue + record = PensionResults( file_no=file_no, version=version, @@ -1989,20 +2150,47 @@ def import_pension_results(db: Session, file_path: str) -> Dict[str, Any]: marr_amt=parse_decimal(row.get('Marr_Amt')) ) batch.append(record) + seen_in_import.add(composite_key) if len(batch) >= BATCH_SIZE: - db.bulk_save_objects(batch) - db.commit() - result['success'] += len(batch) - batch = [] + try: + db.bulk_save_objects(batch) + db.commit() + result['success'] += len(batch) + batch = [] + except IntegrityError as ie: + db.rollback() + # Handle any remaining duplicates by inserting one at a time + for record in batch: + try: + db.add(record) + db.commit() + result['success'] += 1 + except IntegrityError: + db.rollback() + result['skipped'] += 1 + batch = [] except Exception as e: result['errors'].append(f"Row {row_num}: {str(e)}") + # Save remaining batch if batch: - db.bulk_save_objects(batch) - db.commit() - result['success'] += len(batch) + try: + db.bulk_save_objects(batch) + db.commit() + result['success'] += len(batch) + except IntegrityError: + db.rollback() + # Handle any remaining duplicates by inserting one at a time + for record in batch: + try: + db.add(record) + db.commit() + result['success'] += 1 + except IntegrityError: + db.rollback() + result['skipped'] += 1 f.close() logger.info("import_pension_results_complete", **result)