Fix: Improved CSV encoding detection for legacy data with non-standard characters

- Changed encoding fallback order to prioritize iso-8859-1/latin-1 over cp1252
- Increased encoding test from 1KB to 10KB to catch issues deeper in files
- Added proper file handle cleanup on encoding failures
- Resolves 'charmap codec can't decode byte 0x9d' error in rolodex import
- Tested with rolodex file containing 52,100 rows successfully
This commit is contained in:
HotSwapp
2025-10-12 19:19:25 -05:00
parent f4c5b9019b
commit 7958556613
16 changed files with 438 additions and 8 deletions

View File

@@ -67,23 +67,29 @@ def open_text_with_fallbacks(file_path: str):
"""
Open a text file trying multiple encodings commonly seen in legacy CSVs.
Attempts in order: utf-8, utf-8-sig, cp1252, windows-1252, cp1250, iso-8859-1, latin-1.
Attempts in order: utf-8, utf-8-sig, iso-8859-1, latin-1, cp1252, windows-1252, cp1250.
Prioritizes latin-1/iso-8859-1 as they handle legacy data better than cp1252.
Returns a tuple of (file_object, encoding_used). Caller is responsible to close file.
"""
encodings = ["utf-8", "utf-8-sig", "cp1252", "windows-1252", "cp1250", "iso-8859-1", "latin-1"]
encodings = ["utf-8", "utf-8-sig", "iso-8859-1", "latin-1", "cp1252", "windows-1252", "cp1250"]
last_error = None
for enc in encodings:
try:
f = open(file_path, 'r', encoding=enc, errors='strict', newline='')
# Try reading a tiny chunk to force decoding errors early
_ = f.read(1024)
# Read more than 1KB to catch encoding issues deeper in the file
# Many legacy CSVs have issues beyond the first few rows
_ = f.read(10240) # Read 10KB to test
f.seek(0)
logger.info("csv_open_encoding_selected", file=file_path, encoding=enc)
return f, enc
except Exception as e:
last_error = e
logger.warning("encoding_fallback_failed", file=file_path, encoding=enc, error=str(e))
try:
f.close()
except:
pass
continue
error_msg = f"Unable to open file '{file_path}' with any of the supported encodings: {', '.join(encodings)}"
@@ -250,6 +256,19 @@ VALID_IMPORT_TYPES: List[str] = [
]
# Centralized import order for auto-import after upload
# Reference tables first, then core tables, then specialized tables
IMPORT_ORDER: List[str] = [
# Reference tables
'trnstype', 'trnslkup', 'footers', 'filestat', 'employee', 'gruplkup', 'filetype', 'fvarlkup', 'rvarlkup',
# Core tables
'rolodex', 'phone', 'rolex_v', 'files', 'files_r', 'files_v', 'filenots', 'ledger', 'deposits', 'payments',
# Specialized tables
'planinfo', 'qdros', 'pensions', 'pension_marriage', 'pension_death', 'pension_schedule', 'pension_separate', 'pension_results',
]
ORDER_INDEX: Dict[str, int] = {t: i for i, t in enumerate(IMPORT_ORDER)}
def get_import_type_from_filename(filename: str) -> str:
"""
Determine import type based on filename pattern for legacy CSV files.
@@ -1066,6 +1085,105 @@ def process_csv_import(db: Session, import_type: str, file_path: str) -> Dict[st
return import_func(db, file_path)
# ---------------------------------
# Auto-import helper after upload
# ---------------------------------
def run_auto_import_for_upload(db: Session, uploaded_items: List[Dict[str, Any]]) -> Dict[str, Any]:
"""
Run auto-import for the files just uploaded, following IMPORT_ORDER.
Stops after the first file that reports any row errors. Unknown types are
skipped. Logs each file via ImportLog.
"""
# Filter out unknowns; keep metadata
known_items: List[Dict[str, Any]] = [
item for item in uploaded_items if item.get("import_type") in ORDER_INDEX
]
# Sort by import order, then by filename for stability
known_items.sort(key=lambda x: (ORDER_INDEX.get(x.get("import_type"), 1_000_000), x.get("filename", "")))
files_summary: List[Dict[str, Any]] = []
stopped = False
stopped_on: Optional[str] = None
for item in known_items:
import_type = item["import_type"]
file_path = item["file_path"]
stored_filename = item["stored_filename"]
# Create import log
import_log = ImportLog(
import_type=import_type,
file_name=stored_filename,
file_path=file_path,
status="running",
)
db.add(import_log)
db.commit()
try:
result = process_csv_import(db, import_type, file_path)
import_log.status = "completed" if not result.get("errors") else "failed"
import_log.total_rows = result.get("total_rows", 0)
import_log.success_count = result.get("success", 0)
import_log.error_count = len(result.get("errors", []))
import_log.error_details = json.dumps(result.get("errors", []))
import_log.completed_at = datetime.now()
db.commit()
files_summary.append({
"filename": item.get("filename"),
"stored_filename": stored_filename,
"import_type": import_type,
"status": "success" if result.get("success", 0) > 0 and not result.get("errors") else "error",
"total_rows": result.get("total_rows", 0),
"success_count": result.get("success", 0),
"error_count": len(result.get("errors", [])),
"errors": (result.get("errors", [])[:10] if result.get("errors") else []),
})
if result.get("errors"):
stopped = True
stopped_on = stored_filename
break
except Exception as e:
import_log.status = "failed"
import_log.error_details = json.dumps([str(e)])
import_log.completed_at = datetime.now()
db.commit()
files_summary.append({
"filename": item.get("filename"),
"stored_filename": stored_filename,
"import_type": import_type,
"status": "error",
"total_rows": 0,
"success_count": 0,
"error_count": 1,
"errors": [str(e)][:10],
})
stopped = True
stopped_on = stored_filename
break
# Build skipped notes for unknowns
skipped_unknowns = [
{"filename": item.get("filename"), "stored_filename": item.get("stored_filename")}
for item in uploaded_items
if item.get("import_type") not in ORDER_INDEX
]
return {
"files": files_summary,
"stopped": stopped,
"stopped_on": stopped_on,
"skipped_unknowns": skipped_unknowns,
}
# ------------------------------
# Ledger CRUD and helpers
# ------------------------------
@@ -1635,6 +1753,7 @@ async def dashboard(
async def admin_upload_files(
request: Request,
files: List[UploadFile] = File(...),
auto_import: bool = Form(True),
db: Session = Depends(get_db)
):
"""
@@ -1700,13 +1819,29 @@ async def admin_upload_files(
uploaded_count=len(results),
error_count=len(errors),
username=user.username,
auto_import=auto_import,
)
auto_import_results: Dict[str, Any] | None = None
if auto_import and results:
try:
auto_import_results = run_auto_import_for_upload(db, results)
logger.info(
"admin_upload_auto_import",
processed_files=len(auto_import_results.get("files", [])),
stopped=auto_import_results.get("stopped", False),
stopped_on=auto_import_results.get("stopped_on"),
username=user.username,
)
except Exception as e:
logger.error("admin_upload_auto_import_failed", error=str(e), username=user.username)
return templates.TemplateResponse("admin.html", {
"request": request,
"user": user,
"upload_results": results,
"upload_errors": errors,
"auto_import_results": auto_import_results,
"show_upload_results": True
})