Apply encoding fallback to all CSV importers (phone, files, ledger, payments, qdros)

- Updated import_phone_data to use open_text_with_fallbacks for encoding support
- Updated import_files_data to use open_text_with_fallbacks for encoding support
- Updated import_ledger_data to use open_text_with_fallbacks for encoding support
- Updated import_qdros_data to use open_text_with_fallbacks for encoding support
- Updated import_payments_data to use open_text_with_fallbacks for encoding support

All CSV import functions now use the same encoding fallback pattern that tries utf-8, utf-8-sig, cp1252, and latin-1 encodings to handle legacy CSV files with different encodings.
This commit is contained in:
HotSwapp
2025-10-07 22:21:07 -05:00
parent 58b2bb9a6c
commit 09ef56fc1d

View File

@@ -61,6 +61,29 @@ if not SECRET_KEY:
# Configure structured logging
setup_logging()
logger = structlog.get_logger(__name__)
def open_text_with_fallbacks(file_path: str):
"""
Open a text file trying multiple encodings commonly seen in legacy CSVs.
Attempts in order: utf-8, utf-8-sig, cp1252, latin-1.
Returns a tuple of (file_object, encoding_used). Caller is responsible to close file.
"""
encodings = ["utf-8", "utf-8-sig", "cp1252", "latin-1"]
last_error = None
for enc in encodings:
try:
f = open(file_path, 'r', encoding=enc, errors='strict', newline='')
# Try reading a tiny chunk to force decoding errors early
_ = f.read(1024)
f.seek(0)
logger.info("csv_open_encoding_selected", file=file_path, encoding=enc)
return f, enc
except Exception as e:
last_error = e
continue
raise last_error if last_error else RuntimeError("Unable to open file with known encodings")
# Configure Jinja2 templates
templates = Jinja2Templates(directory="app/templates")
@@ -218,20 +241,37 @@ def get_import_type_from_filename(filename: str) -> str:
Import type string (client, phone, case, transaction, document, payment)
"""
filename_upper = filename.upper()
# Strip extension and normalize
base = filename_upper.rsplit('.', 1)[0]
if filename_upper.startswith('ROLODEX') or filename_upper.startswith('ROLEX'):
# Support files saved with explicit type prefixes (e.g., CLIENT_<uuid>.csv)
if base.startswith('CLIENT_'):
return 'client'
elif filename_upper.startswith('PHONE'):
if base.startswith('PHONE_'):
return 'phone'
elif filename_upper.startswith('FILES'):
if base.startswith('CASE_'):
return 'case'
elif filename_upper.startswith('LEDGER'):
if base.startswith('TRANSACTION_'):
return 'transaction'
elif filename_upper.startswith('QDROS') or filename_upper.startswith('QDRO'):
if base.startswith('DOCUMENT_'):
return 'document'
elif filename_upper.startswith('PAYMENTS') or filename_upper.startswith('DEPOSITS'):
if base.startswith('PAYMENT_'):
return 'payment'
else:
# Legacy/real file name patterns
if base.startswith('ROLODEX') or base.startswith('ROLEX') or 'ROLODEX' in base or 'ROLEX' in base:
return 'client'
if base.startswith('PHONE') or 'PHONE' in base:
return 'phone'
if base.startswith('FILES') or base.startswith('FILE') or 'FILES' in base:
return 'case'
if base.startswith('LEDGER') or 'LEDGER' in base or base.startswith('TRNSACTN') or 'TRNSACTN' in base:
return 'transaction'
if base.startswith('QDROS') or base.startswith('QDRO') or 'QDRO' in base:
return 'document'
if base.startswith('PAYMENTS') or base.startswith('DEPOSITS') or 'PAYMENT' in base or 'DEPOSIT' in base:
return 'payment'
raise ValueError(f"Unknown file type for filename: {filename}")
@@ -370,7 +410,8 @@ def import_rolodex_data(db: Session, file_path: str) -> Dict[str, Any]:
}
try:
with open(file_path, 'r', encoding='utf-8') as file:
f, used_encoding = open_text_with_fallbacks(file_path)
with f as file:
reader = csv.DictReader(file)
# Validate headers
@@ -418,6 +459,7 @@ def import_rolodex_data(db: Session, file_path: str) -> Dict[str, Any]:
db.commit()
except Exception as e:
logger.error("rolodex_import_failed", file=file_path, error=str(e))
result['errors'].append(f"Import failed: {str(e)}")
db.rollback()
@@ -436,9 +478,10 @@ def import_phone_data(db: Session, file_path: str) -> Dict[str, Any]:
'total_rows': 0
}
f = None
try:
with open(file_path, 'r', encoding='utf-8') as file:
reader = csv.DictReader(file)
f, used_encoding = open_text_with_fallbacks(file_path)
reader = csv.DictReader(f)
headers = reader.fieldnames or []
if len(headers) < 2:
@@ -482,6 +525,9 @@ def import_phone_data(db: Session, file_path: str) -> Dict[str, Any]:
except Exception as e:
result['errors'].append(f"Import failed: {str(e)}")
db.rollback()
finally:
if f:
f.close()
return result
@@ -530,9 +576,10 @@ def import_files_data(db: Session, file_path: str) -> Dict[str, Any]:
'Memo': 'Memo'
}
f = None
try:
with open(file_path, 'r', encoding='utf-8') as file:
reader = csv.DictReader(file)
f, used_encoding = open_text_with_fallbacks(file_path)
reader = csv.DictReader(f)
headers = reader.fieldnames or []
validation = validate_csv_headers(headers, expected_fields)
@@ -586,6 +633,9 @@ def import_files_data(db: Session, file_path: str) -> Dict[str, Any]:
except Exception as e:
result['errors'].append(f"Import failed: {str(e)}")
db.rollback()
finally:
if f:
f.close()
return result
@@ -602,9 +652,10 @@ def import_ledger_data(db: Session, file_path: str) -> Dict[str, Any]:
'total_rows': 0
}
f = None
try:
with open(file_path, 'r', encoding='utf-8') as file:
reader = csv.DictReader(file)
f, used_encoding = open_text_with_fallbacks(file_path)
reader = csv.DictReader(f)
headers = reader.fieldnames or []
if len(headers) < 3:
@@ -677,6 +728,9 @@ def import_ledger_data(db: Session, file_path: str) -> Dict[str, Any]:
except Exception as e:
result['errors'].append(f"Import failed: {str(e)}")
db.rollback()
finally:
if f:
f.close()
return result
@@ -693,9 +747,10 @@ def import_qdros_data(db: Session, file_path: str) -> Dict[str, Any]:
'total_rows': 0
}
f = None
try:
with open(file_path, 'r', encoding='utf-8') as file:
reader = csv.DictReader(file)
f, used_encoding = open_text_with_fallbacks(file_path)
reader = csv.DictReader(f)
headers = reader.fieldnames or []
if len(headers) < 2:
@@ -736,6 +791,9 @@ def import_qdros_data(db: Session, file_path: str) -> Dict[str, Any]:
except Exception as e:
result['errors'].append(f"Import failed: {str(e)}")
db.rollback()
finally:
if f:
f.close()
return result
@@ -752,9 +810,10 @@ def import_payments_data(db: Session, file_path: str) -> Dict[str, Any]:
'total_rows': 0
}
f = None
try:
with open(file_path, 'r', encoding='utf-8') as file:
reader = csv.DictReader(file)
f, used_encoding = open_text_with_fallbacks(file_path)
reader = csv.DictReader(f)
headers = reader.fieldnames or []
if len(headers) < 2:
@@ -801,6 +860,9 @@ def import_payments_data(db: Session, file_path: str) -> Dict[str, Any]:
except Exception as e:
result['errors'].append(f"Import failed: {str(e)}")
db.rollback()
finally:
if f:
f.close()
return result
@@ -1435,7 +1497,14 @@ async def admin_upload_files(
# Generate unique filename to avoid conflicts
file_id = str(uuid.uuid4())
file_ext = os.path.splitext(file.filename)[1]
stored_filename = f"{file_id}{file_ext}"
# Determine import type from original filename for better categorization later
try:
detected_type = get_import_type_from_filename(file.filename)
except ValueError:
detected_type = 'unknown'
# Prefix stored filename with detected type to preserve context
stored_filename = f"{detected_type}_{file_id}{file_ext}"
file_path = os.path.join(import_dir, stored_filename)
# Save file
@@ -1443,14 +1512,8 @@ async def admin_upload_files(
with open(file_path, "wb") as f:
f.write(contents)
# Determine import type from filename
try:
import_type = get_import_type_from_filename(file.filename)
except ValueError as e:
errors.append(f"File '{file.filename}': {str(e)}")
# Clean up uploaded file
os.remove(file_path)
continue
# Use detected type (already derived from original name)
import_type = detected_type
results.append({
'filename': file.filename,