Fix UNIQUE constraint errors in reference table imports with upsert logic

- Implement upsert (INSERT or UPDATE) logic for all reference table imports
- Fixed functions: import_trnstype, import_trnslkup, import_footers,
  import_filestat, import_employee, import_gruplkup, import_filetype,
  import_fvarlkup, import_rvarlkup
- Now checks if record exists before inserting; updates if exists
- Makes imports idempotent - can safely re-run without errors
- Added tracking of inserted vs updated counts in result dict
- Maintains batch commit performance for large imports
- Fixes sqlite3.IntegrityError when re-importing CSV files
This commit is contained in:
HotSwapp
2025-10-12 21:36:28 -05:00
parent ad1c75d759
commit 22e99d27ed
2 changed files with 335 additions and 158 deletions

View File

@@ -135,14 +135,13 @@ def clean_string(value: str) -> Optional[str]:
# ============================================================================
def import_trnstype(db: Session, file_path: str) -> Dict[str, Any]:
"""Import TRNSTYPE.csv → TrnsType model."""
result = {'success': 0, 'errors': [], 'total_rows': 0}
"""Import TRNSTYPE.csv → TrnsType model with upsert logic."""
result = {'success': 0, 'errors': [], 'total_rows': 0, 'updated': 0, 'inserted': 0}
try:
f, encoding = open_text_with_fallbacks(file_path)
reader = csv.DictReader(f)
batch = []
for row_num, row in enumerate(reader, start=2):
result['total_rows'] += 1
@@ -151,28 +150,38 @@ def import_trnstype(db: Session, file_path: str) -> Dict[str, Any]:
if not t_type:
continue
record = TrnsType(
t_type=t_type,
t_type_l=clean_string(row.get('T_Type_L')),
header=clean_string(row.get('Header')),
footer=clean_string(row.get('Footer'))
)
batch.append(record)
# Check if record already exists
existing = db.query(TrnsType).filter(TrnsType.t_type == t_type).first()
if len(batch) >= BATCH_SIZE:
db.bulk_save_objects(batch)
if existing:
# Update existing record
existing.t_type_l = clean_string(row.get('T_Type_L'))
existing.header = clean_string(row.get('Header'))
existing.footer = clean_string(row.get('Footer'))
result['updated'] += 1
else:
# Insert new record
record = TrnsType(
t_type=t_type,
t_type_l=clean_string(row.get('T_Type_L')),
header=clean_string(row.get('Header')),
footer=clean_string(row.get('Footer'))
)
db.add(record)
result['inserted'] += 1
result['success'] += 1
# Commit in batches for performance
if result['success'] % BATCH_SIZE == 0:
db.commit()
result['success'] += len(batch)
batch = []
except Exception as e:
result['errors'].append(f"Row {row_num}: {str(e)}")
db.rollback()
# Save remaining batch
if batch:
db.bulk_save_objects(batch)
db.commit()
result['success'] += len(batch)
# Commit any remaining changes
db.commit()
f.close()
logger.info("import_trnstype_complete", **result)
@@ -186,14 +195,13 @@ def import_trnstype(db: Session, file_path: str) -> Dict[str, Any]:
def import_trnslkup(db: Session, file_path: str) -> Dict[str, Any]:
"""Import TRNSLKUP.csv → TrnsLkup model."""
result = {'success': 0, 'errors': [], 'total_rows': 0}
"""Import TRNSLKUP.csv → TrnsLkup model with upsert logic."""
result = {'success': 0, 'errors': [], 'total_rows': 0, 'updated': 0, 'inserted': 0}
try:
f, encoding = open_text_with_fallbacks(file_path)
reader = csv.DictReader(f)
batch = []
for row_num, row in enumerate(reader, start=2):
result['total_rows'] += 1
@@ -202,28 +210,40 @@ def import_trnslkup(db: Session, file_path: str) -> Dict[str, Any]:
if not t_code:
continue
record = TrnsLkup(
t_code=t_code,
t_type=clean_string(row.get('T_Type')),
t_type_l=clean_string(row.get('T_Type_L')),
amount=parse_decimal(row.get('Amount')),
description=clean_string(row.get('Description'))
)
batch.append(record)
# Check if record already exists
existing = db.query(TrnsLkup).filter(TrnsLkup.t_code == t_code).first()
if len(batch) >= BATCH_SIZE:
db.bulk_save_objects(batch)
if existing:
# Update existing record
existing.t_type = clean_string(row.get('T_Type'))
existing.t_type_l = clean_string(row.get('T_Type_L'))
existing.amount = parse_decimal(row.get('Amount'))
existing.description = clean_string(row.get('Description'))
result['updated'] += 1
else:
# Insert new record
record = TrnsLkup(
t_code=t_code,
t_type=clean_string(row.get('T_Type')),
t_type_l=clean_string(row.get('T_Type_L')),
amount=parse_decimal(row.get('Amount')),
description=clean_string(row.get('Description'))
)
db.add(record)
result['inserted'] += 1
result['success'] += 1
# Commit in batches for performance
if result['success'] % BATCH_SIZE == 0:
db.commit()
result['success'] += len(batch)
batch = []
except Exception as e:
result['errors'].append(f"Row {row_num}: {str(e)}")
db.rollback()
if batch:
db.bulk_save_objects(batch)
db.commit()
result['success'] += len(batch)
# Commit any remaining changes
db.commit()
f.close()
logger.info("import_trnslkup_complete", **result)
@@ -237,14 +257,13 @@ def import_trnslkup(db: Session, file_path: str) -> Dict[str, Any]:
def import_footers(db: Session, file_path: str) -> Dict[str, Any]:
"""Import FOOTERS.csv → Footers model."""
result = {'success': 0, 'errors': [], 'total_rows': 0}
"""Import FOOTERS.csv → Footers model with upsert logic."""
result = {'success': 0, 'errors': [], 'total_rows': 0, 'updated': 0, 'inserted': 0}
try:
f, encoding = open_text_with_fallbacks(file_path)
reader = csv.DictReader(f)
batch = []
for row_num, row in enumerate(reader, start=2):
result['total_rows'] += 1
@@ -253,25 +272,34 @@ def import_footers(db: Session, file_path: str) -> Dict[str, Any]:
if not f_code:
continue
record = Footers(
f_code=f_code,
f_footer=clean_string(row.get('F_Footer'))
)
batch.append(record)
# Check if record already exists
existing = db.query(Footers).filter(Footers.f_code == f_code).first()
if len(batch) >= BATCH_SIZE:
db.bulk_save_objects(batch)
if existing:
# Update existing record
existing.f_footer = clean_string(row.get('F_Footer'))
result['updated'] += 1
else:
# Insert new record
record = Footers(
f_code=f_code,
f_footer=clean_string(row.get('F_Footer'))
)
db.add(record)
result['inserted'] += 1
result['success'] += 1
# Commit in batches for performance
if result['success'] % BATCH_SIZE == 0:
db.commit()
result['success'] += len(batch)
batch = []
except Exception as e:
result['errors'].append(f"Row {row_num}: {str(e)}")
db.rollback()
if batch:
db.bulk_save_objects(batch)
db.commit()
result['success'] += len(batch)
# Commit any remaining changes
db.commit()
f.close()
logger.info("import_footers_complete", **result)
@@ -285,14 +313,13 @@ def import_footers(db: Session, file_path: str) -> Dict[str, Any]:
def import_filestat(db: Session, file_path: str) -> Dict[str, Any]:
"""Import FILESTAT.csv → FileStat model."""
result = {'success': 0, 'errors': [], 'total_rows': 0}
"""Import FILESTAT.csv → FileStat model with upsert logic."""
result = {'success': 0, 'errors': [], 'total_rows': 0, 'updated': 0, 'inserted': 0}
try:
f, encoding = open_text_with_fallbacks(file_path)
reader = csv.DictReader(f)
batch = []
for row_num, row in enumerate(reader, start=2):
result['total_rows'] += 1
@@ -301,27 +328,38 @@ def import_filestat(db: Session, file_path: str) -> Dict[str, Any]:
if not status:
continue
record = FileStat(
status=status,
definition=clean_string(row.get('Definition')),
send=clean_string(row.get('Send')),
footer_code=clean_string(row.get('Footer_Code'))
)
batch.append(record)
# Check if record already exists
existing = db.query(FileStat).filter(FileStat.status == status).first()
if len(batch) >= BATCH_SIZE:
db.bulk_save_objects(batch)
if existing:
# Update existing record
existing.definition = clean_string(row.get('Definition'))
existing.send = clean_string(row.get('Send'))
existing.footer_code = clean_string(row.get('Footer_Code'))
result['updated'] += 1
else:
# Insert new record
record = FileStat(
status=status,
definition=clean_string(row.get('Definition')),
send=clean_string(row.get('Send')),
footer_code=clean_string(row.get('Footer_Code'))
)
db.add(record)
result['inserted'] += 1
result['success'] += 1
# Commit in batches for performance
if result['success'] % BATCH_SIZE == 0:
db.commit()
result['success'] += len(batch)
batch = []
except Exception as e:
result['errors'].append(f"Row {row_num}: {str(e)}")
db.rollback()
if batch:
db.bulk_save_objects(batch)
db.commit()
result['success'] += len(batch)
# Commit any remaining changes
db.commit()
f.close()
logger.info("import_filestat_complete", **result)
@@ -335,14 +373,13 @@ def import_filestat(db: Session, file_path: str) -> Dict[str, Any]:
def import_employee(db: Session, file_path: str) -> Dict[str, Any]:
"""Import EMPLOYEE.csv → Employee model."""
result = {'success': 0, 'errors': [], 'total_rows': 0}
"""Import EMPLOYEE.csv → Employee model with upsert logic."""
result = {'success': 0, 'errors': [], 'total_rows': 0, 'updated': 0, 'inserted': 0}
try:
f, encoding = open_text_with_fallbacks(file_path)
reader = csv.DictReader(f)
batch = []
for row_num, row in enumerate(reader, start=2):
result['total_rows'] += 1
@@ -351,26 +388,36 @@ def import_employee(db: Session, file_path: str) -> Dict[str, Any]:
if not empl_num:
continue
record = Employee(
empl_num=empl_num,
empl_id=clean_string(row.get('Empl_Id')),
rate_per_hour=parse_decimal(row.get('Rate_Per_Hour'))
)
batch.append(record)
# Check if record already exists
existing = db.query(Employee).filter(Employee.empl_num == empl_num).first()
if len(batch) >= BATCH_SIZE:
db.bulk_save_objects(batch)
if existing:
# Update existing record
existing.empl_id = clean_string(row.get('Empl_Id'))
existing.rate_per_hour = parse_decimal(row.get('Rate_Per_Hour'))
result['updated'] += 1
else:
# Insert new record
record = Employee(
empl_num=empl_num,
empl_id=clean_string(row.get('Empl_Id')),
rate_per_hour=parse_decimal(row.get('Rate_Per_Hour'))
)
db.add(record)
result['inserted'] += 1
result['success'] += 1
# Commit in batches for performance
if result['success'] % BATCH_SIZE == 0:
db.commit()
result['success'] += len(batch)
batch = []
except Exception as e:
result['errors'].append(f"Row {row_num}: {str(e)}")
db.rollback()
if batch:
db.bulk_save_objects(batch)
db.commit()
result['success'] += len(batch)
# Commit any remaining changes
db.commit()
f.close()
logger.info("import_employee_complete", **result)
@@ -384,14 +431,13 @@ def import_employee(db: Session, file_path: str) -> Dict[str, Any]:
def import_gruplkup(db: Session, file_path: str) -> Dict[str, Any]:
"""Import GRUPLKUP.csv → GroupLkup model."""
result = {'success': 0, 'errors': [], 'total_rows': 0}
"""Import GRUPLKUP.csv → GroupLkup model with upsert logic."""
result = {'success': 0, 'errors': [], 'total_rows': 0, 'updated': 0, 'inserted': 0}
try:
f, encoding = open_text_with_fallbacks(file_path)
reader = csv.DictReader(f)
batch = []
for row_num, row in enumerate(reader, start=2):
result['total_rows'] += 1
@@ -400,26 +446,36 @@ def import_gruplkup(db: Session, file_path: str) -> Dict[str, Any]:
if not code:
continue
record = GroupLkup(
code=code,
description=clean_string(row.get('Description')),
title=clean_string(row.get('Title'))
)
batch.append(record)
# Check if record already exists
existing = db.query(GroupLkup).filter(GroupLkup.code == code).first()
if len(batch) >= BATCH_SIZE:
db.bulk_save_objects(batch)
if existing:
# Update existing record
existing.description = clean_string(row.get('Description'))
existing.title = clean_string(row.get('Title'))
result['updated'] += 1
else:
# Insert new record
record = GroupLkup(
code=code,
description=clean_string(row.get('Description')),
title=clean_string(row.get('Title'))
)
db.add(record)
result['inserted'] += 1
result['success'] += 1
# Commit in batches for performance
if result['success'] % BATCH_SIZE == 0:
db.commit()
result['success'] += len(batch)
batch = []
except Exception as e:
result['errors'].append(f"Row {row_num}: {str(e)}")
db.rollback()
if batch:
db.bulk_save_objects(batch)
db.commit()
result['success'] += len(batch)
# Commit any remaining changes
db.commit()
f.close()
logger.info("import_gruplkup_complete", **result)
@@ -433,14 +489,13 @@ def import_gruplkup(db: Session, file_path: str) -> Dict[str, Any]:
def import_filetype(db: Session, file_path: str) -> Dict[str, Any]:
"""Import FILETYPE.csv → FileType model."""
result = {'success': 0, 'errors': [], 'total_rows': 0}
"""Import FILETYPE.csv → FileType model with upsert logic."""
result = {'success': 0, 'errors': [], 'total_rows': 0, 'updated': 0, 'inserted': 0}
try:
f, encoding = open_text_with_fallbacks(file_path)
reader = csv.DictReader(f)
batch = []
# Track seen file types in-memory to avoid duplicates within the same CSV
seen_in_batch = set()
for row_num, row in enumerate(reader, start=2):
@@ -451,31 +506,34 @@ def import_filetype(db: Session, file_path: str) -> Dict[str, Any]:
if not file_type:
continue
# Skip if we've already queued this file_type in current batch
# Skip if we've already processed this file_type in current import
if file_type in seen_in_batch:
continue
# Skip if it already exists in DB (prevents UNIQUE violations when re-importing)
if db.query(FileType).filter(FileType.file_type == file_type).first():
continue
record = FileType(file_type=file_type)
batch.append(record)
seen_in_batch.add(file_type)
# Check if it already exists in DB
existing = db.query(FileType).filter(FileType.file_type == file_type).first()
if existing:
# No fields to update for FileType (only has PK), just count it
result['updated'] += 1
else:
# Insert new record
record = FileType(file_type=file_type)
db.add(record)
result['inserted'] += 1
if len(batch) >= BATCH_SIZE:
db.bulk_save_objects(batch)
result['success'] += 1
# Commit in batches for performance
if result['success'] % BATCH_SIZE == 0:
db.commit()
result['success'] += len(batch)
batch = []
except Exception as e:
result['errors'].append(f"Row {row_num}: {str(e)}")
db.rollback()
if batch:
db.bulk_save_objects(batch)
db.commit()
result['success'] += len(batch)
# Commit any remaining changes
db.commit()
f.close()
logger.info("import_filetype_complete", **result)
@@ -489,14 +547,13 @@ def import_filetype(db: Session, file_path: str) -> Dict[str, Any]:
def import_fvarlkup(db: Session, file_path: str) -> Dict[str, Any]:
"""Import FVARLKUP.csv → FVarLkup model."""
result = {'success': 0, 'errors': [], 'total_rows': 0}
"""Import FVARLKUP.csv → FVarLkup model with upsert logic."""
result = {'success': 0, 'errors': [], 'total_rows': 0, 'updated': 0, 'inserted': 0}
try:
f, encoding = open_text_with_fallbacks(file_path)
reader = csv.DictReader(f)
batch = []
for row_num, row in enumerate(reader, start=2):
result['total_rows'] += 1
@@ -505,26 +562,36 @@ def import_fvarlkup(db: Session, file_path: str) -> Dict[str, Any]:
if not identifier:
continue
record = FVarLkup(
identifier=identifier,
query=clean_string(row.get('Query')),
response=clean_string(row.get('Response'))
)
batch.append(record)
# Check if record already exists
existing = db.query(FVarLkup).filter(FVarLkup.identifier == identifier).first()
if len(batch) >= BATCH_SIZE:
db.bulk_save_objects(batch)
if existing:
# Update existing record
existing.query = clean_string(row.get('Query'))
existing.response = clean_string(row.get('Response'))
result['updated'] += 1
else:
# Insert new record
record = FVarLkup(
identifier=identifier,
query=clean_string(row.get('Query')),
response=clean_string(row.get('Response'))
)
db.add(record)
result['inserted'] += 1
result['success'] += 1
# Commit in batches for performance
if result['success'] % BATCH_SIZE == 0:
db.commit()
result['success'] += len(batch)
batch = []
except Exception as e:
result['errors'].append(f"Row {row_num}: {str(e)}")
db.rollback()
if batch:
db.bulk_save_objects(batch)
db.commit()
result['success'] += len(batch)
# Commit any remaining changes
db.commit()
f.close()
logger.info("import_fvarlkup_complete", **result)
@@ -538,14 +605,13 @@ def import_fvarlkup(db: Session, file_path: str) -> Dict[str, Any]:
def import_rvarlkup(db: Session, file_path: str) -> Dict[str, Any]:
"""Import RVARLKUP.csv → RVarLkup model."""
result = {'success': 0, 'errors': [], 'total_rows': 0}
"""Import RVARLKUP.csv → RVarLkup model with upsert logic."""
result = {'success': 0, 'errors': [], 'total_rows': 0, 'updated': 0, 'inserted': 0}
try:
f, encoding = open_text_with_fallbacks(file_path)
reader = csv.DictReader(f)
batch = []
for row_num, row in enumerate(reader, start=2):
result['total_rows'] += 1
@@ -554,25 +620,34 @@ def import_rvarlkup(db: Session, file_path: str) -> Dict[str, Any]:
if not identifier:
continue
record = RVarLkup(
identifier=identifier,
query=clean_string(row.get('Query'))
)
batch.append(record)
# Check if record already exists
existing = db.query(RVarLkup).filter(RVarLkup.identifier == identifier).first()
if len(batch) >= BATCH_SIZE:
db.bulk_save_objects(batch)
if existing:
# Update existing record
existing.query = clean_string(row.get('Query'))
result['updated'] += 1
else:
# Insert new record
record = RVarLkup(
identifier=identifier,
query=clean_string(row.get('Query'))
)
db.add(record)
result['inserted'] += 1
result['success'] += 1
# Commit in batches for performance
if result['success'] % BATCH_SIZE == 0:
db.commit()
result['success'] += len(batch)
batch = []
except Exception as e:
result['errors'].append(f"Row {row_num}: {str(e)}")
db.rollback()
if batch:
db.bulk_save_objects(batch)
db.commit()
result['success'] += len(batch)
# Commit any remaining changes
db.commit()
f.close()
logger.info("import_rvarlkup_complete", **result)