From 22e99d27edd52a88fb510f7b2ba066a7d3c1f163 Mon Sep 17 00:00:00 2001 From: HotSwapp <47397945+HotSwapp@users.noreply.github.com> Date: Sun, 12 Oct 2025 21:36:28 -0500 Subject: [PATCH] Fix UNIQUE constraint errors in reference table imports with upsert logic - Implement upsert (INSERT or UPDATE) logic for all reference table imports - Fixed functions: import_trnstype, import_trnslkup, import_footers, import_filestat, import_employee, import_gruplkup, import_filetype, import_fvarlkup, import_rvarlkup - Now checks if record exists before inserting; updates if exists - Makes imports idempotent - can safely re-run without errors - Added tracking of inserted vs updated counts in result dict - Maintains batch commit performance for large imports - Fixes sqlite3.IntegrityError when re-importing CSV files --- app/import_legacy.py | 391 ++++++++++++++++++++++++++----------------- docs/UPSERT_FIX.md | 102 +++++++++++ 2 files changed, 335 insertions(+), 158 deletions(-) create mode 100644 docs/UPSERT_FIX.md diff --git a/app/import_legacy.py b/app/import_legacy.py index 9f0b4d1..30a5743 100644 --- a/app/import_legacy.py +++ b/app/import_legacy.py @@ -135,14 +135,13 @@ def clean_string(value: str) -> Optional[str]: # ============================================================================ def import_trnstype(db: Session, file_path: str) -> Dict[str, Any]: - """Import TRNSTYPE.csv → TrnsType model.""" - result = {'success': 0, 'errors': [], 'total_rows': 0} + """Import TRNSTYPE.csv → TrnsType model with upsert logic.""" + result = {'success': 0, 'errors': [], 'total_rows': 0, 'updated': 0, 'inserted': 0} try: f, encoding = open_text_with_fallbacks(file_path) reader = csv.DictReader(f) - batch = [] for row_num, row in enumerate(reader, start=2): result['total_rows'] += 1 @@ -151,28 +150,38 @@ def import_trnstype(db: Session, file_path: str) -> Dict[str, Any]: if not t_type: continue - record = TrnsType( - t_type=t_type, - t_type_l=clean_string(row.get('T_Type_L')), - header=clean_string(row.get('Header')), - footer=clean_string(row.get('Footer')) - ) - batch.append(record) + # Check if record already exists + existing = db.query(TrnsType).filter(TrnsType.t_type == t_type).first() - if len(batch) >= BATCH_SIZE: - db.bulk_save_objects(batch) + if existing: + # Update existing record + existing.t_type_l = clean_string(row.get('T_Type_L')) + existing.header = clean_string(row.get('Header')) + existing.footer = clean_string(row.get('Footer')) + result['updated'] += 1 + else: + # Insert new record + record = TrnsType( + t_type=t_type, + t_type_l=clean_string(row.get('T_Type_L')), + header=clean_string(row.get('Header')), + footer=clean_string(row.get('Footer')) + ) + db.add(record) + result['inserted'] += 1 + + result['success'] += 1 + + # Commit in batches for performance + if result['success'] % BATCH_SIZE == 0: db.commit() - result['success'] += len(batch) - batch = [] except Exception as e: result['errors'].append(f"Row {row_num}: {str(e)}") + db.rollback() - # Save remaining batch - if batch: - db.bulk_save_objects(batch) - db.commit() - result['success'] += len(batch) + # Commit any remaining changes + db.commit() f.close() logger.info("import_trnstype_complete", **result) @@ -186,14 +195,13 @@ def import_trnstype(db: Session, file_path: str) -> Dict[str, Any]: def import_trnslkup(db: Session, file_path: str) -> Dict[str, Any]: - """Import TRNSLKUP.csv → TrnsLkup model.""" - result = {'success': 0, 'errors': [], 'total_rows': 0} + """Import TRNSLKUP.csv → TrnsLkup model with upsert logic.""" + result = {'success': 0, 'errors': [], 'total_rows': 0, 'updated': 0, 'inserted': 0} try: f, encoding = open_text_with_fallbacks(file_path) reader = csv.DictReader(f) - batch = [] for row_num, row in enumerate(reader, start=2): result['total_rows'] += 1 @@ -202,28 +210,40 @@ def import_trnslkup(db: Session, file_path: str) -> Dict[str, Any]: if not t_code: continue - record = TrnsLkup( - t_code=t_code, - t_type=clean_string(row.get('T_Type')), - t_type_l=clean_string(row.get('T_Type_L')), - amount=parse_decimal(row.get('Amount')), - description=clean_string(row.get('Description')) - ) - batch.append(record) + # Check if record already exists + existing = db.query(TrnsLkup).filter(TrnsLkup.t_code == t_code).first() - if len(batch) >= BATCH_SIZE: - db.bulk_save_objects(batch) + if existing: + # Update existing record + existing.t_type = clean_string(row.get('T_Type')) + existing.t_type_l = clean_string(row.get('T_Type_L')) + existing.amount = parse_decimal(row.get('Amount')) + existing.description = clean_string(row.get('Description')) + result['updated'] += 1 + else: + # Insert new record + record = TrnsLkup( + t_code=t_code, + t_type=clean_string(row.get('T_Type')), + t_type_l=clean_string(row.get('T_Type_L')), + amount=parse_decimal(row.get('Amount')), + description=clean_string(row.get('Description')) + ) + db.add(record) + result['inserted'] += 1 + + result['success'] += 1 + + # Commit in batches for performance + if result['success'] % BATCH_SIZE == 0: db.commit() - result['success'] += len(batch) - batch = [] except Exception as e: result['errors'].append(f"Row {row_num}: {str(e)}") + db.rollback() - if batch: - db.bulk_save_objects(batch) - db.commit() - result['success'] += len(batch) + # Commit any remaining changes + db.commit() f.close() logger.info("import_trnslkup_complete", **result) @@ -237,14 +257,13 @@ def import_trnslkup(db: Session, file_path: str) -> Dict[str, Any]: def import_footers(db: Session, file_path: str) -> Dict[str, Any]: - """Import FOOTERS.csv → Footers model.""" - result = {'success': 0, 'errors': [], 'total_rows': 0} + """Import FOOTERS.csv → Footers model with upsert logic.""" + result = {'success': 0, 'errors': [], 'total_rows': 0, 'updated': 0, 'inserted': 0} try: f, encoding = open_text_with_fallbacks(file_path) reader = csv.DictReader(f) - batch = [] for row_num, row in enumerate(reader, start=2): result['total_rows'] += 1 @@ -253,25 +272,34 @@ def import_footers(db: Session, file_path: str) -> Dict[str, Any]: if not f_code: continue - record = Footers( - f_code=f_code, - f_footer=clean_string(row.get('F_Footer')) - ) - batch.append(record) + # Check if record already exists + existing = db.query(Footers).filter(Footers.f_code == f_code).first() - if len(batch) >= BATCH_SIZE: - db.bulk_save_objects(batch) + if existing: + # Update existing record + existing.f_footer = clean_string(row.get('F_Footer')) + result['updated'] += 1 + else: + # Insert new record + record = Footers( + f_code=f_code, + f_footer=clean_string(row.get('F_Footer')) + ) + db.add(record) + result['inserted'] += 1 + + result['success'] += 1 + + # Commit in batches for performance + if result['success'] % BATCH_SIZE == 0: db.commit() - result['success'] += len(batch) - batch = [] except Exception as e: result['errors'].append(f"Row {row_num}: {str(e)}") + db.rollback() - if batch: - db.bulk_save_objects(batch) - db.commit() - result['success'] += len(batch) + # Commit any remaining changes + db.commit() f.close() logger.info("import_footers_complete", **result) @@ -285,14 +313,13 @@ def import_footers(db: Session, file_path: str) -> Dict[str, Any]: def import_filestat(db: Session, file_path: str) -> Dict[str, Any]: - """Import FILESTAT.csv → FileStat model.""" - result = {'success': 0, 'errors': [], 'total_rows': 0} + """Import FILESTAT.csv → FileStat model with upsert logic.""" + result = {'success': 0, 'errors': [], 'total_rows': 0, 'updated': 0, 'inserted': 0} try: f, encoding = open_text_with_fallbacks(file_path) reader = csv.DictReader(f) - batch = [] for row_num, row in enumerate(reader, start=2): result['total_rows'] += 1 @@ -301,27 +328,38 @@ def import_filestat(db: Session, file_path: str) -> Dict[str, Any]: if not status: continue - record = FileStat( - status=status, - definition=clean_string(row.get('Definition')), - send=clean_string(row.get('Send')), - footer_code=clean_string(row.get('Footer_Code')) - ) - batch.append(record) + # Check if record already exists + existing = db.query(FileStat).filter(FileStat.status == status).first() - if len(batch) >= BATCH_SIZE: - db.bulk_save_objects(batch) + if existing: + # Update existing record + existing.definition = clean_string(row.get('Definition')) + existing.send = clean_string(row.get('Send')) + existing.footer_code = clean_string(row.get('Footer_Code')) + result['updated'] += 1 + else: + # Insert new record + record = FileStat( + status=status, + definition=clean_string(row.get('Definition')), + send=clean_string(row.get('Send')), + footer_code=clean_string(row.get('Footer_Code')) + ) + db.add(record) + result['inserted'] += 1 + + result['success'] += 1 + + # Commit in batches for performance + if result['success'] % BATCH_SIZE == 0: db.commit() - result['success'] += len(batch) - batch = [] except Exception as e: result['errors'].append(f"Row {row_num}: {str(e)}") + db.rollback() - if batch: - db.bulk_save_objects(batch) - db.commit() - result['success'] += len(batch) + # Commit any remaining changes + db.commit() f.close() logger.info("import_filestat_complete", **result) @@ -335,14 +373,13 @@ def import_filestat(db: Session, file_path: str) -> Dict[str, Any]: def import_employee(db: Session, file_path: str) -> Dict[str, Any]: - """Import EMPLOYEE.csv → Employee model.""" - result = {'success': 0, 'errors': [], 'total_rows': 0} + """Import EMPLOYEE.csv → Employee model with upsert logic.""" + result = {'success': 0, 'errors': [], 'total_rows': 0, 'updated': 0, 'inserted': 0} try: f, encoding = open_text_with_fallbacks(file_path) reader = csv.DictReader(f) - batch = [] for row_num, row in enumerate(reader, start=2): result['total_rows'] += 1 @@ -351,26 +388,36 @@ def import_employee(db: Session, file_path: str) -> Dict[str, Any]: if not empl_num: continue - record = Employee( - empl_num=empl_num, - empl_id=clean_string(row.get('Empl_Id')), - rate_per_hour=parse_decimal(row.get('Rate_Per_Hour')) - ) - batch.append(record) + # Check if record already exists + existing = db.query(Employee).filter(Employee.empl_num == empl_num).first() - if len(batch) >= BATCH_SIZE: - db.bulk_save_objects(batch) + if existing: + # Update existing record + existing.empl_id = clean_string(row.get('Empl_Id')) + existing.rate_per_hour = parse_decimal(row.get('Rate_Per_Hour')) + result['updated'] += 1 + else: + # Insert new record + record = Employee( + empl_num=empl_num, + empl_id=clean_string(row.get('Empl_Id')), + rate_per_hour=parse_decimal(row.get('Rate_Per_Hour')) + ) + db.add(record) + result['inserted'] += 1 + + result['success'] += 1 + + # Commit in batches for performance + if result['success'] % BATCH_SIZE == 0: db.commit() - result['success'] += len(batch) - batch = [] except Exception as e: result['errors'].append(f"Row {row_num}: {str(e)}") + db.rollback() - if batch: - db.bulk_save_objects(batch) - db.commit() - result['success'] += len(batch) + # Commit any remaining changes + db.commit() f.close() logger.info("import_employee_complete", **result) @@ -384,14 +431,13 @@ def import_employee(db: Session, file_path: str) -> Dict[str, Any]: def import_gruplkup(db: Session, file_path: str) -> Dict[str, Any]: - """Import GRUPLKUP.csv → GroupLkup model.""" - result = {'success': 0, 'errors': [], 'total_rows': 0} + """Import GRUPLKUP.csv → GroupLkup model with upsert logic.""" + result = {'success': 0, 'errors': [], 'total_rows': 0, 'updated': 0, 'inserted': 0} try: f, encoding = open_text_with_fallbacks(file_path) reader = csv.DictReader(f) - batch = [] for row_num, row in enumerate(reader, start=2): result['total_rows'] += 1 @@ -400,26 +446,36 @@ def import_gruplkup(db: Session, file_path: str) -> Dict[str, Any]: if not code: continue - record = GroupLkup( - code=code, - description=clean_string(row.get('Description')), - title=clean_string(row.get('Title')) - ) - batch.append(record) + # Check if record already exists + existing = db.query(GroupLkup).filter(GroupLkup.code == code).first() - if len(batch) >= BATCH_SIZE: - db.bulk_save_objects(batch) + if existing: + # Update existing record + existing.description = clean_string(row.get('Description')) + existing.title = clean_string(row.get('Title')) + result['updated'] += 1 + else: + # Insert new record + record = GroupLkup( + code=code, + description=clean_string(row.get('Description')), + title=clean_string(row.get('Title')) + ) + db.add(record) + result['inserted'] += 1 + + result['success'] += 1 + + # Commit in batches for performance + if result['success'] % BATCH_SIZE == 0: db.commit() - result['success'] += len(batch) - batch = [] except Exception as e: result['errors'].append(f"Row {row_num}: {str(e)}") + db.rollback() - if batch: - db.bulk_save_objects(batch) - db.commit() - result['success'] += len(batch) + # Commit any remaining changes + db.commit() f.close() logger.info("import_gruplkup_complete", **result) @@ -433,14 +489,13 @@ def import_gruplkup(db: Session, file_path: str) -> Dict[str, Any]: def import_filetype(db: Session, file_path: str) -> Dict[str, Any]: - """Import FILETYPE.csv → FileType model.""" - result = {'success': 0, 'errors': [], 'total_rows': 0} + """Import FILETYPE.csv → FileType model with upsert logic.""" + result = {'success': 0, 'errors': [], 'total_rows': 0, 'updated': 0, 'inserted': 0} try: f, encoding = open_text_with_fallbacks(file_path) reader = csv.DictReader(f) - batch = [] # Track seen file types in-memory to avoid duplicates within the same CSV seen_in_batch = set() for row_num, row in enumerate(reader, start=2): @@ -451,31 +506,34 @@ def import_filetype(db: Session, file_path: str) -> Dict[str, Any]: if not file_type: continue - # Skip if we've already queued this file_type in current batch + # Skip if we've already processed this file_type in current import if file_type in seen_in_batch: continue - - # Skip if it already exists in DB (prevents UNIQUE violations when re-importing) - if db.query(FileType).filter(FileType.file_type == file_type).first(): - continue - - record = FileType(file_type=file_type) - batch.append(record) seen_in_batch.add(file_type) + + # Check if it already exists in DB + existing = db.query(FileType).filter(FileType.file_type == file_type).first() + if existing: + # No fields to update for FileType (only has PK), just count it + result['updated'] += 1 + else: + # Insert new record + record = FileType(file_type=file_type) + db.add(record) + result['inserted'] += 1 - if len(batch) >= BATCH_SIZE: - db.bulk_save_objects(batch) + result['success'] += 1 + + # Commit in batches for performance + if result['success'] % BATCH_SIZE == 0: db.commit() - result['success'] += len(batch) - batch = [] except Exception as e: result['errors'].append(f"Row {row_num}: {str(e)}") + db.rollback() - if batch: - db.bulk_save_objects(batch) - db.commit() - result['success'] += len(batch) + # Commit any remaining changes + db.commit() f.close() logger.info("import_filetype_complete", **result) @@ -489,14 +547,13 @@ def import_filetype(db: Session, file_path: str) -> Dict[str, Any]: def import_fvarlkup(db: Session, file_path: str) -> Dict[str, Any]: - """Import FVARLKUP.csv → FVarLkup model.""" - result = {'success': 0, 'errors': [], 'total_rows': 0} + """Import FVARLKUP.csv → FVarLkup model with upsert logic.""" + result = {'success': 0, 'errors': [], 'total_rows': 0, 'updated': 0, 'inserted': 0} try: f, encoding = open_text_with_fallbacks(file_path) reader = csv.DictReader(f) - batch = [] for row_num, row in enumerate(reader, start=2): result['total_rows'] += 1 @@ -505,26 +562,36 @@ def import_fvarlkup(db: Session, file_path: str) -> Dict[str, Any]: if not identifier: continue - record = FVarLkup( - identifier=identifier, - query=clean_string(row.get('Query')), - response=clean_string(row.get('Response')) - ) - batch.append(record) + # Check if record already exists + existing = db.query(FVarLkup).filter(FVarLkup.identifier == identifier).first() - if len(batch) >= BATCH_SIZE: - db.bulk_save_objects(batch) + if existing: + # Update existing record + existing.query = clean_string(row.get('Query')) + existing.response = clean_string(row.get('Response')) + result['updated'] += 1 + else: + # Insert new record + record = FVarLkup( + identifier=identifier, + query=clean_string(row.get('Query')), + response=clean_string(row.get('Response')) + ) + db.add(record) + result['inserted'] += 1 + + result['success'] += 1 + + # Commit in batches for performance + if result['success'] % BATCH_SIZE == 0: db.commit() - result['success'] += len(batch) - batch = [] except Exception as e: result['errors'].append(f"Row {row_num}: {str(e)}") + db.rollback() - if batch: - db.bulk_save_objects(batch) - db.commit() - result['success'] += len(batch) + # Commit any remaining changes + db.commit() f.close() logger.info("import_fvarlkup_complete", **result) @@ -538,14 +605,13 @@ def import_fvarlkup(db: Session, file_path: str) -> Dict[str, Any]: def import_rvarlkup(db: Session, file_path: str) -> Dict[str, Any]: - """Import RVARLKUP.csv → RVarLkup model.""" - result = {'success': 0, 'errors': [], 'total_rows': 0} + """Import RVARLKUP.csv → RVarLkup model with upsert logic.""" + result = {'success': 0, 'errors': [], 'total_rows': 0, 'updated': 0, 'inserted': 0} try: f, encoding = open_text_with_fallbacks(file_path) reader = csv.DictReader(f) - batch = [] for row_num, row in enumerate(reader, start=2): result['total_rows'] += 1 @@ -554,25 +620,34 @@ def import_rvarlkup(db: Session, file_path: str) -> Dict[str, Any]: if not identifier: continue - record = RVarLkup( - identifier=identifier, - query=clean_string(row.get('Query')) - ) - batch.append(record) + # Check if record already exists + existing = db.query(RVarLkup).filter(RVarLkup.identifier == identifier).first() - if len(batch) >= BATCH_SIZE: - db.bulk_save_objects(batch) + if existing: + # Update existing record + existing.query = clean_string(row.get('Query')) + result['updated'] += 1 + else: + # Insert new record + record = RVarLkup( + identifier=identifier, + query=clean_string(row.get('Query')) + ) + db.add(record) + result['inserted'] += 1 + + result['success'] += 1 + + # Commit in batches for performance + if result['success'] % BATCH_SIZE == 0: db.commit() - result['success'] += len(batch) - batch = [] except Exception as e: result['errors'].append(f"Row {row_num}: {str(e)}") + db.rollback() - if batch: - db.bulk_save_objects(batch) - db.commit() - result['success'] += len(batch) + # Commit any remaining changes + db.commit() f.close() logger.info("import_rvarlkup_complete", **result) diff --git a/docs/UPSERT_FIX.md b/docs/UPSERT_FIX.md new file mode 100644 index 0000000..0253403 --- /dev/null +++ b/docs/UPSERT_FIX.md @@ -0,0 +1,102 @@ +# Upsert Fix for Reference Table Imports + +## Issue +When attempting to re-import CSV files for reference tables (like `trnstype`, `trnslkup`, `footers`, etc.), the application encountered UNIQUE constraint errors because the import functions tried to insert duplicate records: + +``` +Fatal error: (sqlite3.IntegrityError) UNIQUE constraint failed: trnstype.t_type +``` + +## Root Cause +The original import functions used `bulk_save_objects()` which only performs INSERT operations. When the same CSV was imported multiple times (e.g., during development, testing, or data refresh), the function attempted to insert records with primary keys that already existed in the database. + +## Solution +Implemented **upsert logic** (INSERT or UPDATE) for all reference table import functions: + +### Modified Functions +1. `import_trnstype()` - Transaction types +2. `import_trnslkup()` - Transaction lookup codes +3. `import_footers()` - Footer templates +4. `import_filestat()` - File status definitions +5. `import_employee()` - Employee records +6. `import_gruplkup()` - Group lookup codes +7. `import_filetype()` - File type definitions +8. `import_fvarlkup()` - File variable lookups +9. `import_rvarlkup()` - Rolodex variable lookups + +### Key Changes + +#### Before (Insert-Only) +```python +record = TrnsType( + t_type=t_type, + t_type_l=clean_string(row.get('T_Type_L')), + header=clean_string(row.get('Header')), + footer=clean_string(row.get('Footer')) +) +batch.append(record) +if len(batch) >= BATCH_SIZE: + db.bulk_save_objects(batch) + db.commit() +``` + +#### After (Upsert Logic) +```python +# Check if record already exists +existing = db.query(TrnsType).filter(TrnsType.t_type == t_type).first() + +if existing: + # Update existing record + existing.t_type_l = clean_string(row.get('T_Type_L')) + existing.header = clean_string(row.get('Header')) + existing.footer = clean_string(row.get('Footer')) + result['updated'] += 1 +else: + # Insert new record + record = TrnsType( + t_type=t_type, + t_type_l=clean_string(row.get('T_Type_L')), + header=clean_string(row.get('Header')), + footer=clean_string(row.get('Footer')) + ) + db.add(record) + result['inserted'] += 1 + +result['success'] += 1 + +# Commit in batches for performance +if result['success'] % BATCH_SIZE == 0: + db.commit() +``` + +## Benefits + +1. **Idempotent Imports**: Can safely re-run imports without errors +2. **Data Updates**: Automatically updates existing records with new data from CSV +3. **Better Tracking**: Result dictionaries now include: + - `inserted`: Count of new records added + - `updated`: Count of existing records updated + - `success`: Total successful operations +4. **Error Handling**: Individual row errors don't block the entire import + +## Testing + +To verify the fix works: + +1. Import a CSV file (e.g., `trnstype.csv`) +2. Import the same file again +3. The second import should succeed with `updated` count matching the first import's `inserted` count + +## Performance Considerations + +- Still uses batch commits (every BATCH_SIZE operations) +- Individual record checks are necessary to prevent constraint violations +- For large datasets, this is slightly slower than bulk insert but provides reliability + +## Future Enhancements + +Consider implementing database-specific upsert operations for better performance: +- SQLite: `INSERT OR REPLACE` +- PostgreSQL: `INSERT ... ON CONFLICT DO UPDATE` +- MySQL: `INSERT ... ON DUPLICATE KEY UPDATE` +