From f7644a4f67a778558b4705dfb07376451aa109ae Mon Sep 17 00:00:00 2001 From: HotSwapp <47397945+HotSwapp@users.noreply.github.com> Date: Sun, 21 Sep 2025 20:37:13 -0500 Subject: [PATCH] working on new system for importing --- app/api/flexible.py | 281 --- app/api/import_data.py | 3193 --------------------------------- app/auth/security.py | 26 + app/database/base.py | 10 +- app/import_export/__init__.py | 6 + app/main.py | 20 +- app/models/flexible.py | 37 - static/js/flexible.js | 314 ---- static/js/main.js | 30 +- templates/flexible.html | 113 -- templates/import.html | 1849 ------------------- 11 files changed, 60 insertions(+), 5819 deletions(-) delete mode 100644 app/api/flexible.py delete mode 100644 app/api/import_data.py create mode 100644 app/import_export/__init__.py delete mode 100644 app/models/flexible.py delete mode 100644 static/js/flexible.js delete mode 100644 templates/flexible.html delete mode 100644 templates/import.html diff --git a/app/api/flexible.py b/app/api/flexible.py deleted file mode 100644 index d13e327..0000000 --- a/app/api/flexible.py +++ /dev/null @@ -1,281 +0,0 @@ -""" -Flexible Imports admin API: list, filter, and export unmapped rows captured during CSV imports. -""" -from typing import Optional, Dict, Any, List -from datetime import datetime -import csv -import io - -from fastapi import APIRouter, Depends, Query, HTTPException -from fastapi.responses import StreamingResponse -from sqlalchemy.orm import Session -from sqlalchemy import func, or_, cast, String - -from app.database.base import get_db -from app.auth.security import get_admin_user -from app.models.flexible import FlexibleImport - - -router = APIRouter(prefix="/flexible", tags=["flexible"]) - - -@router.get("/imports") -async def list_flexible_imports( - file_type: Optional[str] = Query(None, description="Filter by CSV file type (e.g., FILES.csv)"), - target_table: Optional[str] = Query(None, description="Filter by target model table name"), - q: Optional[str] = Query(None, description="Quick text search across file type, target table, and unmapped data"), - has_keys: Optional[List[str]] = Query( - None, - description="Filter rows where extra_data (or its 'unmapped' payload) contains these keys. Repeat param for multiple keys.", - ), - skip: int = Query(0, ge=0), - limit: int = Query(50, ge=1, le=500), - db: Session = Depends(get_db), - current_user=Depends(get_admin_user), -): - """List flexible import rows with optional filtering, quick search, and pagination.""" - query = db.query(FlexibleImport) - if file_type: - query = query.filter(FlexibleImport.file_type == file_type) - if target_table: - query = query.filter(FlexibleImport.target_table == target_table) - if q: - pattern = f"%{q.strip()}%" - # Search across file_type, target_table, and serialized JSON extra_data - query = query.filter( - or_( - FlexibleImport.file_type.ilike(pattern), - FlexibleImport.target_table.ilike(pattern), - cast(FlexibleImport.extra_data, String).ilike(pattern), - ) - ) - - # Filter by key presence inside JSON payload by string matching of the serialized JSON - # This is DB-agnostic and works across SQLite/Postgres, though not as precise as JSON operators. - if has_keys: - for k in [k for k in has_keys if k is not None and str(k).strip() != ""]: - key = str(k).strip() - # Look for the JSON key token followed by a colon, e.g. "key": - query = query.filter(cast(FlexibleImport.extra_data, String).ilike(f'%"{key}":%')) - - total = query.count() - items = ( - query.order_by(FlexibleImport.id.desc()) - .offset(skip) - .limit(limit) - .all() - ) - - def serialize(item: FlexibleImport) -> Dict[str, Any]: - return { - "id": item.id, - "file_type": item.file_type, - "target_table": item.target_table, - "primary_key_field": item.primary_key_field, - "primary_key_value": item.primary_key_value, - "extra_data": item.extra_data, - } - - return { - "total": total, - "skip": skip, - "limit": limit, - "items": [serialize(i) for i in items], - } - - -@router.get("/options") -async def flexible_options( - db: Session = Depends(get_db), - current_user=Depends(get_admin_user), -): - """Return distinct file types and target tables for filter dropdowns.""" - file_types: List[str] = [ - ft for (ft,) in db.query(func.distinct(FlexibleImport.file_type)).order_by(FlexibleImport.file_type.asc()).all() - if ft is not None - ] - target_tables: List[str] = [ - tt for (tt,) in db.query(func.distinct(FlexibleImport.target_table)).order_by(FlexibleImport.target_table.asc()).all() - if tt is not None and tt != "" - ] - return {"file_types": file_types, "target_tables": target_tables} - - -@router.get("/export") -async def export_unmapped_csv( - file_type: Optional[str] = Query(None, description="Filter by CSV file type (e.g., FILES.csv)"), - target_table: Optional[str] = Query(None, description="Filter by target model table name"), - has_keys: Optional[List[str]] = Query( - None, - description="Filter rows where extra_data (or its 'unmapped' payload) contains these keys. Repeat param for multiple keys.", - ), - db: Session = Depends(get_db), - current_user=Depends(get_admin_user), -): - """Export unmapped rows as CSV for review. Includes basic metadata columns and unmapped fields. - - If FlexibleImport.extra_data contains a nested 'unmapped' dict, those keys are exported. - Otherwise, all keys of extra_data are exported. - """ - query = db.query(FlexibleImport) - if file_type: - query = query.filter(FlexibleImport.file_type == file_type) - if target_table: - query = query.filter(FlexibleImport.target_table == target_table) - if has_keys: - for k in [k for k in has_keys if k is not None and str(k).strip() != ""]: - key = str(k).strip() - query = query.filter(cast(FlexibleImport.extra_data, String).ilike(f'%"{key}":%')) - - rows: List[FlexibleImport] = query.order_by(FlexibleImport.id.asc()).all() - if not rows: - raise HTTPException(status_code=404, detail="No matching flexible imports to export") - - # Determine union of unmapped keys across all rows - unmapped_keys: List[str] = [] - key_set = set() - for r in rows: - data = r.extra_data or {} - payload = data.get("unmapped") if isinstance(data, dict) and isinstance(data.get("unmapped"), dict) else data - if isinstance(payload, dict): - for k in payload.keys(): - if k not in key_set: - key_set.add(k) - unmapped_keys.append(k) - - # Prepare CSV - meta_headers = [ - "id", - "file_type", - "target_table", - "primary_key_field", - "primary_key_value", - ] - fieldnames = meta_headers + unmapped_keys - - output = io.StringIO() - writer = csv.DictWriter(output, fieldnames=fieldnames) - writer.writeheader() - - for r in rows: - row_out: Dict[str, Any] = { - "id": r.id, - "file_type": r.file_type, - "target_table": r.target_table or "", - "primary_key_field": r.primary_key_field or "", - "primary_key_value": r.primary_key_value or "", - } - data = r.extra_data or {} - payload = data.get("unmapped") if isinstance(data, dict) and isinstance(data.get("unmapped"), dict) else data - if isinstance(payload, dict): - for k in unmapped_keys: - v = payload.get(k) - # Normalize lists/dicts to JSON strings for CSV safety - if isinstance(v, (dict, list)): - try: - import json as _json - row_out[k] = _json.dumps(v, ensure_ascii=False) - except Exception: - row_out[k] = str(v) - else: - row_out[k] = v if v is not None else "" - writer.writerow(row_out) - - output.seek(0) - timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") - filename_parts = ["flexible_unmapped"] - if file_type: - filename_parts.append(file_type.replace("/", "-").replace(" ", "_")) - if target_table: - filename_parts.append(target_table.replace("/", "-").replace(" ", "_")) - filename = "_".join(filename_parts) + f"_{timestamp}.csv" - - return StreamingResponse( - iter([output.getvalue()]), - media_type="text/csv", - headers={ - "Content-Disposition": f"attachment; filename=\"{filename}\"", - }, - ) - - -@router.get("/export/{row_id}") -async def export_single_row_csv( - row_id: int, - db: Session = Depends(get_db), - current_user=Depends(get_admin_user), -): - """Export a single flexible import row as CSV. - - Includes metadata columns plus keys from the row's unmapped payload. - If FlexibleImport.extra_data contains a nested 'unmapped' dict, those keys are exported; - otherwise, all keys of extra_data are exported. - """ - row: Optional[FlexibleImport] = ( - db.query(FlexibleImport).filter(FlexibleImport.id == row_id).first() - ) - if not row: - raise HTTPException(status_code=404, detail="Flexible import row not found") - - data = row.extra_data or {} - payload = ( - data.get("unmapped") - if isinstance(data, dict) and isinstance(data.get("unmapped"), dict) - else data - ) - - unmapped_keys: List[str] = [] - if isinstance(payload, dict): - for k in payload.keys(): - unmapped_keys.append(k) - - meta_headers = [ - "id", - "file_type", - "target_table", - "primary_key_field", - "primary_key_value", - ] - fieldnames = meta_headers + unmapped_keys - - output = io.StringIO() - writer = csv.DictWriter(output, fieldnames=fieldnames) - writer.writeheader() - - row_out: Dict[str, Any] = { - "id": row.id, - "file_type": row.file_type, - "target_table": row.target_table or "", - "primary_key_field": row.primary_key_field or "", - "primary_key_value": row.primary_key_value or "", - } - if isinstance(payload, dict): - for k in unmapped_keys: - v = payload.get(k) - if isinstance(v, (dict, list)): - try: - import json as _json - row_out[k] = _json.dumps(v, ensure_ascii=False) - except Exception: - row_out[k] = str(v) - else: - row_out[k] = v if v is not None else "" - - writer.writerow(row_out) - output.seek(0) - - timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") - filename = ( - f"flexible_row_{row.id}_{row.file_type.replace('/', '-').replace(' ', '_')}_{timestamp}.csv" - if row.file_type - else f"flexible_row_{row.id}_{timestamp}.csv" - ) - - return StreamingResponse( - iter([output.getvalue()]), - media_type="text/csv", - headers={ - "Content-Disposition": f"attachment; filename=\"{filename}\"", - }, - ) - diff --git a/app/api/import_data.py b/app/api/import_data.py deleted file mode 100644 index 3bf7bf9..0000000 --- a/app/api/import_data.py +++ /dev/null @@ -1,3193 +0,0 @@ -""" -Data import API endpoints for CSV file uploads with auto-discovery mapping. -""" -import csv -import io -import zipfile -import re -import os -from pathlib import Path -from difflib import SequenceMatcher -from datetime import datetime, date, timezone -from decimal import Decimal -from typing import List, Dict, Any, Optional, Tuple -from fastapi import APIRouter, Depends, HTTPException, UploadFile, File as UploadFileForm, Form, Query, WebSocket -from fastapi.responses import StreamingResponse -from sqlalchemy.orm import Session -from app.database.base import get_db, SessionLocal -from app.auth.security import get_current_user -from app.models.user import User -from app.models.rolodex import Rolodex, Phone -from app.models.files import File -from app.models.ledger import Ledger -from app.models.qdro import QDRO -from app.models.pensions import Pension, PensionSchedule, MarriageHistory, DeathBenefit, SeparationAgreement, LifeTable, NumberTable, PensionResult -from app.models.lookups import Employee, FileType, FileStatus, TransactionType, TransactionCode, State, GroupLookup, Footer, PlanInfo, FormIndex, FormList, PrinterSetup, SystemSetup, FormKeyword -from app.models.additional import Payment, Deposit, FileNote, FormVariable, ReportVariable -from app.models.flexible import FlexibleImport -from app.models.audit import ImportAudit, ImportAuditFile -from app.config import settings -from app.utils.logging import import_logger -from app.middleware.websocket_middleware import get_websocket_manager -from app.services.websocket_pool import WebSocketMessage - -router = APIRouter(tags=["import"]) - -# WebSocket manager for import progress notifications -websocket_manager = get_websocket_manager() - -# Common encodings to try for legacy CSV files (order matters) -ENCODINGS = [ - 'utf-8-sig', - 'utf-8', - 'windows-1252', - 'iso-8859-1', - 'cp1252', -] - -# Unified import order used across batch operations -IMPORT_ORDER = [ - "STATES.csv", "GRUPLKUP.csv", "EMPLOYEE.csv", "FILETYPE.csv", "FOOTERS.csv", "FILESTAT.csv", - "TRNSTYPE.csv", "TRNSLKUP.csv", "SETUP.csv", "PRINTERS.csv", - "INX_LKUP.csv", - "ROLODEX.csv", "PHONE.csv", "FILES.csv", "LEDGER.csv", "TRNSACTN.csv", - "QDROS.csv", "PENSIONS.csv", "SCHEDULE.csv", "MARRIAGE.csv", "DEATH.csv", "SEPARATE.csv", "LIFETABL.csv", "NUMBERAL.csv", "PLANINFO.csv", "RESULTS.csv", "PAYMENTS.csv", "DEPOSITS.csv", - "FILENOTS.csv", "FORM_INX.csv", "FORM_LST.csv", "FVARLKUP.csv", "RVARLKUP.csv" -] - - -# CSV to Model mapping -CSV_MODEL_MAPPING = { - "ROLODEX.csv": Rolodex, - "ROLEX_V.csv": Rolodex, # Legacy/view alias - "PHONE.csv": Phone, - "FILES.csv": File, - "FILES_R.csv": File, # Legacy/report alias - "FILES_V.csv": File, # Legacy/view alias - "LEDGER.csv": Ledger, - "QDROS.csv": QDRO, - "PENSIONS.csv": Pension, - "SCHEDULE.csv": PensionSchedule, - "MARRIAGE.csv": MarriageHistory, - "DEATH.csv": DeathBenefit, - "SEPARATE.csv": SeparationAgreement, - "LIFETABL.csv": LifeTable, - "NUMBERAL.csv": NumberTable, - "EMPLOYEE.csv": Employee, - "FILETYPE.csv": FileType, - "FILESTAT.csv": FileStatus, - "TRNSTYPE.csv": TransactionType, - "TRNSLKUP.csv": TransactionCode, - "STATES.csv": State, - "GRUPLKUP.csv": GroupLookup, - "FOOTERS.csv": Footer, - "PLANINFO.csv": PlanInfo, - # Legacy alternate names from export directories - "FORM_INX.csv": FormIndex, - "FORM_LST.csv": FormList, - "PRINTERS.csv": PrinterSetup, - "SETUP.csv": SystemSetup, - # Additional models for complete legacy coverage - "DEPOSITS.csv": Deposit, - "FILENOTS.csv": FileNote, - "FVARLKUP.csv": FormVariable, - "RVARLKUP.csv": ReportVariable, - "PAYMENTS.csv": Payment, - "TRNSACTN.csv": Ledger, # Maps to existing Ledger model (same structure) - "INX_LKUP.csv": FormKeyword, - "RESULTS.csv": PensionResult -} - - -# ----------------------------- -# Progress aggregation helpers -# ----------------------------- -def _aggregate_batch_progress(db: Session, audit: ImportAudit) -> Dict[str, Any]: - """Compute progress summary for the given audit row.""" - processed_files = db.query(ImportAuditFile).filter(ImportAuditFile.audit_id == audit.id).count() - successful_files = db.query(ImportAuditFile).filter( - ImportAuditFile.audit_id == audit.id, - ImportAuditFile.status.in_(["success", "completed_with_errors", "skipped"]) - ).count() - failed_files = db.query(ImportAuditFile).filter( - ImportAuditFile.audit_id == audit.id, - ImportAuditFile.status == "failed" - ).count() - - total_files = audit.total_files or 0 - percent_complete: float = 0.0 - if total_files > 0: - try: - percent_complete = (processed_files / total_files) * 100.0 - except Exception: - percent_complete = 0.0 - - data = { - "audit_id": audit.id, - "status": audit.status, - "total_files": total_files, - "processed_files": processed_files, - "successful_files": successful_files, - "failed_files": failed_files, - "started_at": audit.started_at.isoformat() if audit.started_at else None, - "finished_at": audit.finished_at.isoformat() if audit.finished_at else None, - "percent": percent_complete, - "message": audit.message, - } - - try: - last_file = ( - db.query(ImportAuditFile) - .filter(ImportAuditFile.audit_id == audit.id) - .order_by(ImportAuditFile.id.desc()) - .first() - ) - if last_file: - data["last_file"] = { - "file_type": last_file.file_type, - "status": last_file.status, - "imported_count": last_file.imported_count, - "errors": last_file.errors, - "message": last_file.message, - "created_at": last_file.created_at.isoformat() if last_file.created_at else None, - } - except Exception: - pass - - return data - - -async def _broadcast_import_progress(db: Session, audit_id: int) -> None: - """Broadcast current progress for audit_id to its WebSocket topic.""" - audit = db.query(ImportAudit).filter(ImportAudit.id == audit_id).first() - if not audit: - return - payload = _aggregate_batch_progress(db, audit) - topic = f"import_batch_progress_{audit_id}" - try: - await websocket_manager.broadcast_to_topic( - topic=topic, - message_type="progress", - data=payload, - ) - except Exception: - # Non-fatal; continue API flow - pass - -# Minimal CSV template definitions (headers + one sample row) used for template downloads -CSV_IMPORT_TEMPLATES: Dict[str, Dict[str, List[str]]] = { - "FILES.csv": { - "headers": ["File_No", "Id", "Empl_Num", "File_Type", "Opened", "Status", "Rate_Per_Hour"], - "sample": ["F-001", "CLIENT-1", "EMP01", "CIVIL", "2024-01-01", "ACTIVE", "150"], - }, - "LEDGER.csv": { - "headers": ["File_No", "Date", "Empl_Num", "T_Code", "T_Type", "Amount"], - "sample": ["F-001", "2024-01-15", "EMP01", "FEE", "1", "500.00"], - }, - "PAYMENTS.csv": { - "headers": ["Deposit_Date", "Amount"], - "sample": ["2024-01-15", "1500.00"], - }, - # Additional templates for convenience - "TRNSACTN.csv": { - # Same structure as LEDGER.csv - "headers": ["File_No", "Date", "Empl_Num", "T_Code", "T_Type", "Amount"], - "sample": ["F-002", "2024-02-10", "EMP02", "FEE", "1", "250.00"], - }, - "DEPOSITS.csv": { - "headers": ["Deposit_Date", "Total"], - "sample": ["2024-02-10", "1500.00"], - }, - "ROLODEX.csv": { - # Minimal common contact fields - "headers": ["Id", "Last", "First", "A1", "City", "Abrev", "Zip", "Email"], - "sample": ["CLIENT-1", "Smith", "John", "123 Main St", "Denver", "CO", "80202", "john.smith@example.com"], - }, -} - -def _generate_csv_template_bytes(file_type: str) -> bytes: - """Return CSV template content for the given file type as bytes. - - Raises HTTPException if unsupported. - """ - key = (file_type or "").strip() - if key not in CSV_IMPORT_TEMPLATES: - raise HTTPException(status_code=400, detail=f"Unsupported template type: {file_type}. Choose one of: {list(CSV_IMPORT_TEMPLATES.keys())}") - - cfg = CSV_IMPORT_TEMPLATES[key] - output = io.StringIO() - writer = csv.writer(output) - writer.writerow(cfg["headers"]) - writer.writerow(cfg["sample"]) - output.seek(0) - return output.getvalue().encode("utf-8") - -# Field mappings for CSV columns to database fields -# Legacy header synonyms used as hints only (not required). Auto-discovery will work without exact matches. -REQUIRED_MODEL_FIELDS: Dict[str, List[str]] = { - # Files: core identifiers and billing/status fields used throughout the app - "FILES.csv": [ - "file_no", - "id", - "empl_num", - "file_type", - "opened", - "status", - "rate_per_hour", - ], - # Ledger: core transaction fields - "LEDGER.csv": [ - "file_no", - "date", - "empl_num", - "t_code", - "t_type", - "amount", - ], - # Payments: deposit date and amount are the only strictly required model fields - "PAYMENTS.csv": [ - "deposit_date", - "amount", - ], -} - -FIELD_MAPPINGS = { - "ROLODEX.csv": { - "Id": "id", - "Prefix": "prefix", - "First": "first", - "Middle": "middle", - "Last": "last", - "Suffix": "suffix", - "Title": "title", - "A1": "a1", - "A2": "a2", - "A3": "a3", - "City": "city", - "Abrev": "abrev", - "St": None, # Full state name - skip this field as model only has abrev - "Zip": "zip", - "Email": "email", - "DOB": "dob", - "SS#": "ss_number", - "Legal_Status": "legal_status", - "Group": "group", - "Memo": "memo" - }, - "PHONE.csv": { - "Id": "rolodex_id", - "Phone": "phone", - "Location": "location" - }, - "FILES.csv": { - "File_No": "file_no", - "Id": "id", - "File_Type": "file_type", - "Regarding": "regarding", - "Opened": "opened", - "Closed": "closed", - "Empl_Num": "empl_num", - "Rate_Per_Hour": "rate_per_hour", - "Status": "status", - "Footer_Code": "footer_code", - "Opposing": "opposing", - "Hours": "hours", - "Hours_P": "hours_p", - "Trust_Bal": "trust_bal", - "Trust_Bal_P": "trust_bal_p", - "Hourly_Fees": "hourly_fees", - "Hourly_Fees_P": "hourly_fees_p", - "Flat_Fees": "flat_fees", - "Flat_Fees_P": "flat_fees_p", - "Disbursements": "disbursements", - "Disbursements_P": "disbursements_p", - "Credit_Bal": "credit_bal", - "Credit_Bal_P": "credit_bal_p", - "Total_Charges": "total_charges", - "Total_Charges_P": "total_charges_p", - "Amount_Owing": "amount_owing", - "Amount_Owing_P": "amount_owing_p", - "Transferable": "transferable", - "Memo": "memo" - }, - "LEDGER.csv": { - "File_No": "file_no", - "Date": "date", - "Item_No": "item_no", - "Empl_Num": "empl_num", - "T_Code": "t_code", - "T_Type": "t_type", - "T_Type_L": "t_type_l", - "Quantity": "quantity", - "Rate": "rate", - "Amount": "amount", - "Billed": "billed", - "Note": "note" - }, - "QDROS.csv": { - "File_No": "file_no", - "Version": "version", - "Plan_Id": "plan_id", - "^1": "field1", - "^2": "field2", - "^Part": "part", - "^AltP": "altp", - "^Pet": "pet", - "^Res": "res", - "Case_Type": "case_type", - "Case_Code": "case_code", - "Section": "section", - "Case_Number": "case_number", - "Judgment_Date": "judgment_date", - "Valuation_Date": "valuation_date", - "Married_On": "married_on", - "Percent_Awarded": "percent_awarded", - "Ven_City": "ven_city", - "Ven_Cnty": "ven_cnty", - "Ven_St": "ven_st", - "Draft_Out": "draft_out", - "Draft_Apr": "draft_apr", - "Final_Out": "final_out", - "Judge": "judge", - "Form_Name": "form_name", - # Extended workflow/document fields (present in new exports or manual CSVs) - "Status": "status", - "Content": "content", - "Notes": "notes", - "Approval_Status": "approval_status", - "Approved_Date": "approved_date", - "Filed_Date": "filed_date" - }, - "PENSIONS.csv": { - "File_No": "file_no", - "Version": "version", - "Plan_Id": "plan_id", - "Plan_Name": "plan_name", - "Title": "title", - "First": "first", - "Last": "last", - "Birth": "birth", - "Race": "race", - "Sex": "sex", - "Info": "info", - "Valu": "valu", - "Accrued": "accrued", - "Vested_Per": "vested_per", - "Start_Age": "start_age", - "COLA": "cola", - "Max_COLA": "max_cola", - "Withdrawal": "withdrawal", - "Pre_DR": "pre_dr", - "Post_DR": "post_dr", - "Tax_Rate": "tax_rate" - }, - "EMPLOYEE.csv": { - "Empl_Num": "empl_num", - "Empl_Id": "initials", # Map employee ID to initials field - "Rate_Per_Hour": "rate_per_hour", - # Optional extended fields when present in enhanced exports - "First": "first_name", - "First_Name": "first_name", - "Last": "last_name", - "Last_Name": "last_name", - "Title": "title", - "Email": "email", - "Phone": "phone", - "Active": "active" - }, - "STATES.csv": { - "Abrev": "abbreviation", - "St": "name" - }, - "GRUPLKUP.csv": { - "Code": "group_code", - "Description": "description", - "Title": "title" - }, - "TRNSLKUP.csv": { - "T_Code": "t_code", - "T_Type": "t_type", - # "T_Type_L": not a field in TransactionCode model - "Amount": "default_rate", - "Description": "description" - }, - "TRNSTYPE.csv": { - "T_Type": "t_type", - "T_Type_L": "debit_credit", # D=Debit, C=Credit - "Header": "description", - "Footer": "footer_code" - }, - "FILETYPE.csv": { - "File_Type": "type_code", - "Description": "description", - "Default_Rate": "default_rate" - }, - "FILESTAT.csv": { - "Status": "status_code", - "Status_Code": "status_code", - "Definition": "description", - "Description": "description", - "Send": "send", - "Footer_Code": "footer_code", - "Sort_Order": "sort_order" - }, - "FOOTERS.csv": { - "F_Code": "footer_code", - "F_Footer": "content" - # Description is optional - not required for footers - }, - "PLANINFO.csv": { - "Plan_Id": "plan_id", - "Plan_Name": "plan_name", - "Plan_Type": "plan_type", - "Sponsor": "sponsor", - "Administrator": "administrator", - "Address1": "address1", - "Address2": "address2", - "City": "city", - "State": "state", - "Zip_Code": "zip_code", - "Phone": "phone", - "Notes": "notes" - }, - "INX_LKUP.csv": { - "Keyword": "keyword", - "Description": "description" - }, - "FORM_INX.csv": { - "Name": "form_id", - "Keyword": "keyword" - }, - "FORM_LST.csv": { - "Name": "form_id", - "Memo": "content", - "Status": "status" - }, - "PRINTERS.csv": { - # Legacy variants - "Printer_Name": "printer_name", - "Description": "description", - "Driver": "driver", - "Port": "port", - "Default_Printer": "default_printer", - # Observed legacy headers from export - "Number": "number", - "Name": "printer_name", - "Page_Break": "page_break", - "Setup_St": "setup_st", - "Reset_St": "reset_st", - "B_Underline": "b_underline", - "E_Underline": "e_underline", - "B_Bold": "b_bold", - "E_Bold": "e_bold", - # Optional report toggles - "Phone_Book": "phone_book", - "Rolodex_Info": "rolodex_info", - "Envelope": "envelope", - "File_Cabinet": "file_cabinet", - "Accounts": "accounts", - "Statements": "statements", - "Calendar": "calendar", - }, - "SETUP.csv": { - "Setting_Key": "setting_key", - "Setting_Value": "setting_value", - "Description": "description", - "Setting_Type": "setting_type" - }, - "SCHEDULE.csv": { - "File_No": "file_no", - "Version": "version", - "Vests_On": "vests_on", - "Vests_At": "vests_at" - }, - "MARRIAGE.csv": { - "File_No": "file_no", - "Version": "version", - "Married_From": "married_from", - "Married_To": "married_to", - "Married_Years": "married_years", - "Service_From": "service_from", - "Service_To": "service_to", - "Service_Years": "service_years", - "Marital_%": "marital_percent" - }, - "DEATH.csv": { - "File_No": "file_no", - "Version": "version", - "Beneficiary_Name": "beneficiary_name", - "Benefit_Amount": "benefit_amount", - "Benefit_Type": "benefit_type", - "Notes": "notes", - "Lump1": "lump1", - "Lump2": "lump2", - "Growth1": "growth1", - "Growth2": "growth2", - "Disc1": "disc1", - "Disc2": "disc2" - }, - "SEPARATE.csv": { - "File_No": "file_no", - "Version": "version", - "Agreement_Date": "agreement_date", - "Terms": "terms", - "Notes": "notes", - "Separation_Rate": "terms" - }, - "LIFETABL.csv": { - "AGE": "age", - "LE_AA": "le_aa", - "NA_AA": "na_aa", - "LE_AM": "le_am", - "NA_AM": "na_am", - "LE_AF": "le_af", - "NA_AF": "na_af", - "LE_WA": "le_wa", - "NA_WA": "na_wa", - "LE_WM": "le_wm", - "NA_WM": "na_wm", - "LE_WF": "le_wf", - "NA_WF": "na_wf", - "LE_BA": "le_ba", - "NA_BA": "na_ba", - "LE_BM": "le_bm", - "NA_BM": "na_bm", - "LE_BF": "le_bf", - "NA_BF": "na_bf", - "LE_HA": "le_ha", - "NA_HA": "na_ha", - "LE_HM": "le_hm", - "NA_HM": "na_hm", - "LE_HF": "le_hf", - "NA_HF": "na_hf" - }, - "NUMBERAL.csv": { - "Month": "month", - "NA_AA": "na_aa", - "NA_AM": "na_am", - "NA_AF": "na_af", - "NA_WA": "na_wa", - "NA_WM": "na_wm", - "NA_WF": "na_wf", - "NA_BA": "na_ba", - "NA_BM": "na_bm", - "NA_BF": "na_bf", - "NA_HA": "na_ha", - "NA_HM": "na_hm", - "NA_HF": "na_hf" - }, - "RESULTS.csv": { - "Accrued": "accrued", - "Start_Age": "start_age", - "COLA": "cola", - "Withdrawal": "withdrawal", - "Pre_DR": "pre_dr", - "Post_DR": "post_dr", - "Tax_Rate": "tax_rate", - "Age": "age", - "Years_From": "years_from", - "Life_Exp": "life_exp", - "EV_Monthly": "ev_monthly", - "Payments": "payments", - "Pay_Out": "pay_out", - "Fund_Value": "fund_value", - "PV": "pv", - "Mortality": "mortality", - "PV_AM": "pv_am", - "PV_AMT": "pv_amt", - "PV_Pre_DB": "pv_pre_db", - "PV_Annuity": "pv_annuity", - "WV_AT": "wv_at", - "PV_Plan": "pv_plan", - "Years_Married": "years_married", - "Years_Service": "years_service", - "Marr_Per": "marr_per", - "Marr_Amt": "marr_amt" - }, - # Additional CSV file mappings - "DEPOSITS.csv": { - "Deposit_Date": "deposit_date", - "Total": "total" - }, - "FILENOTS.csv": { - "File_No": "file_no", - "Memo_Date": "memo_date", - "Memo_Note": "memo_note" - }, - "FVARLKUP.csv": { - "Identifier": "identifier", - "Query": "query", - "Response": "response" - }, - "RVARLKUP.csv": { - "Identifier": "identifier", - "Query": "query" - }, - "PAYMENTS.csv": { - "Deposit_Date": "deposit_date", - "File_No": "file_no", - "Id": "client_id", - "Regarding": "regarding", - "Amount": "amount", - "Note": "note" - }, - "TRNSACTN.csv": { - # Maps to Ledger model - same structure as LEDGER.csv - "File_No": "file_no", - "Date": "date", - "Item_No": "item_no", - "Empl_Num": "empl_num", - "T_Code": "t_code", - "T_Type": "t_type", - "T_Type_L": "t_type_l", - "Quantity": "quantity", - "Rate": "rate", - "Amount": "amount", - "Billed": "billed", - "Note": "note" - }, - "EMPLOYEE.csv": { - "Empl_Num": "empl_num", - "Empl_Id": "initials", # Map employee ID to initials field - "Rate_Per_Hour": "rate_per_hour", - # Note: first_name, last_name, title, active, email, phone will need manual entry or separate import - # as they're not present in the legacy CSV structure - }, - "QDROS.csv": { - "File_No": "file_no", - "Version": "version", - "Plan_Id": "plan_id", - "^1": "field1", - "^2": "field2", - "^Part": "part", - "^AltP": "altp", - "^Pet": "pet", - "^Res": "res", - "Case_Type": "case_type", - "Case_Code": "case_code", - "Section": "section", - "Case_Number": "case_number", - "Judgment_Date": "judgment_date", - "Valuation_Date": "valuation_date", - "Married_On": "married_on", - "Percent_Awarded": "percent_awarded", - "Ven_City": "ven_city", - "Ven_Cnty": "ven_cnty", - "Ven_St": "ven_st", - "Draft_Out": "draft_out", - "Draft_Apr": "draft_apr", - "Final_Out": "final_out", - "Judge": "judge", - "Form_Name": "form_name" - } -} - - -def parse_date(date_str: str) -> Optional[date]: - """Parse date string in various formats""" - if not date_str or date_str.strip() == "": - return None - - date_formats = [ - "%Y-%m-%d", - "%m/%d/%Y", - "%d/%m/%Y", - "%m-%d-%Y", - "%d-%m-%Y", - "%Y/%m/%d" - ] - - for fmt in date_formats: - try: - return datetime.strptime(date_str.strip(), fmt).date() - except ValueError: - continue - - return None - - - -def make_json_safe(value: Any) -> Any: - """Recursively convert values to JSON-serializable types. - - - date/datetime -> ISO string - - Decimal -> float - - dict/list -> recurse - """ - if isinstance(value, (datetime, date)): - return value.isoformat() - if isinstance(value, Decimal): - try: - return float(value) - except Exception: - return str(value) - if isinstance(value, dict): - return {k: make_json_safe(v) for k, v in value.items()} - if isinstance(value, list): - return [make_json_safe(v) for v in value] - return value - - -def parse_csv_robust(csv_content: str) -> Tuple[List[Dict[str, str]], List[str]]: - """Parse CSV text robustly by handling broken newlines in unquoted fields. - - Returns tuple of (rows_as_dicts, headers) - """ - lines = (csv_content or "").strip().split('\n') - if not lines or (len(lines) == 1 and not lines[0].strip()): - return [], [] - - # Parse headers using the csv module to respect quoting - header_reader = csv.reader(io.StringIO(lines[0])) - headers = next(header_reader) - headers = [h.strip() for h in headers] - - rows_data: List[Dict[str, str]] = [] - for line_num, line in enumerate(lines[1:], start=2): - # Skip empty lines - if not line.strip(): - continue - try: - # Parse each line independently; avoids multiline parse explosions - line_reader = csv.reader(io.StringIO(line)) - fields = next(line_reader) - fields = [f.strip() for f in fields] - - # If clearly malformed (too few fields), skip - if len(fields) < max(1, len(headers) // 2): - continue - - # Pad or truncate to header length - while len(fields) < len(headers): - fields.append("") - fields = fields[:len(headers)] - - row_dict = dict(zip(headers, fields)) - rows_data.append(row_dict) - except Exception: - # Skip malformed row - continue - - return rows_data, headers - - -def parse_csv_with_fallback(text: str) -> Tuple[List[Dict[str, str]], List[str]]: - """Try csv.DictReader first; on failure, fall back to robust parser.""" - try: - reader = csv.DictReader(io.StringIO(text), delimiter=',', quotechar='"', quoting=csv.QUOTE_MINIMAL) - headers_local = reader.fieldnames or [] - rows_local: List[Dict[str, str]] = [] - for r in reader: - rows_local.append(r) - return rows_local, headers_local - except Exception: - return parse_csv_robust(text) - - -def _normalize_label(label: str) -> str: - """Normalize a header/field label for fuzzy comparison.""" - if not label: - return "" - # Lowercase, replace separators with space, remove non-alphanumerics, expand common short forms - lowered = label.strip().lower() - # Replace separators - lowered = re.sub(r"[\s\-]+", "_", lowered) - # Remove non-word characters except underscore - lowered = re.sub(r"[^a-z0-9_]", "", lowered) - # Expand a few common abbreviations - replacements = { - "num": "number", - "no": "number", - "amt": "amount", - "addr": "address", - "st": "state", - "dob": "dateofbirth", - "ss": "ssnumber", - } - tokens = [replacements.get(t, t) for t in lowered.split("_") if t] - return "".join(tokens) - - -def _get_model_columns(model_class) -> Tuple[Dict[str, Any], List[str]]: - """Return model columns mapping name->Column and list of primary key column names.""" - columns = {} - pk_names = [] - for col in model_class.__table__.columns: - if col.name in {"created_at", "updated_at"}: - continue - columns[col.name] = col - if col.primary_key: - pk_names.append(col.name) - return columns, pk_names - - -def _build_dynamic_mapping(headers: List[str], model_class, file_type: str) -> Dict[str, Any]: - """Create a mapping from CSV headers to model fields using synonyms and fuzzy similarity. - - Returns a dict with keys: mapping (csv_header->db_field), suggestions, unmapped_headers, mapped_headers - """ - model_columns, _ = _get_model_columns(model_class) - model_field_names = list(model_columns.keys()) - - # Start with legacy mapping hints when available - legacy_map = FIELD_MAPPINGS.get(file_type, {}) or {} - - mapping: Dict[str, Optional[str]] = {} - suggestions: Dict[str, List[Tuple[str, float]]] = {} - used_db_fields: set[str] = set() - - # 1) Exact legacy header key usage - for header in headers: - if header in legacy_map and legacy_map[header] is not None: - candidate = legacy_map[header] - if candidate in model_field_names and candidate not in used_db_fields: - mapping[header] = candidate - used_db_fields.add(candidate) - - # 2) Direct exact match against model fields (case-insensitive and normalized) - normalized_model = {name: _normalize_label(name) for name in model_field_names} - normalized_to_model = {v: k for k, v in normalized_model.items()} - - for header in headers: - if header in mapping: - continue - normalized_header = _normalize_label(header) - if normalized_header in normalized_to_model: - candidate = normalized_to_model[normalized_header] - if candidate not in used_db_fields: - mapping[header] = candidate - used_db_fields.add(candidate) - - # 3) Fuzzy best-match based on normalized strings - for header in headers: - if header in mapping: - continue - normalized_header = _normalize_label(header) - best_candidate = None - best_score = 0.0 - candidate_list: List[Tuple[str, float]] = [] - for model_field in model_field_names: - if model_field in used_db_fields: - continue - nm = normalized_model[model_field] - if not nm or not normalized_header: - score = 0.0 - else: - # Combine ratio and partial containment heuristic - ratio = SequenceMatcher(None, normalized_header, nm).ratio() - containment = 1.0 if (normalized_header in nm or nm in normalized_header) else 0.0 - score = max(ratio, 0.85 if containment else 0.0) - candidate_list.append((model_field, score)) - if score > best_score: - best_score = score - best_candidate = model_field - # Keep top 3 suggestions for UI - suggestions[header] = sorted(candidate_list, key=lambda x: x[1], reverse=True)[:3] - # Apply only if score above threshold - if best_candidate and best_score >= 0.82: - mapping[header] = best_candidate - used_db_fields.add(best_candidate) - - # 4) Any header explicitly mapped to None in legacy map is considered intentionally skipped - for header in headers: - if header not in mapping and header in legacy_map and legacy_map[header] is None: - mapping[header] = None - - mapped_headers = {h: f for h, f in mapping.items() if f is not None} - unmapped_headers = [h for h in headers if h not in mapping or mapping[h] is None] - - return { - "mapping": mapping, - "mapped_headers": mapped_headers, - "unmapped_headers": unmapped_headers, - "suggestions": suggestions, - } - - -def _validate_required_headers(file_type: str, mapped_headers: Dict[str, str]) -> Dict[str, Any]: - """Check that minimal required model fields for a given CSV type are present in mapped headers. - - Returns dict with: required_fields, missing_fields, ok. - """ - required_fields = REQUIRED_MODEL_FIELDS.get(file_type, []) - present_fields = set((mapped_headers or {}).values()) - missing_fields = [f for f in required_fields if f not in present_fields] - return { - "required_fields": required_fields, - "missing_fields": missing_fields, - "ok": len(missing_fields) == 0, - } - - -def _is_setup_wide_format(headers: List[str]) -> bool: - """Detect legacy wide-format SETUP.csv with column headers like L_Head1..L_Head10, Appl_Title, Default_Printer.""" - if not headers: - return False - known = {"Appl_Title", "Default_Printer"} - known.update({f"L_Head{i}" for i in range(1, 11)}) - return any(h in known for h in headers) - - -# Mapping from legacy wide-format SETUP.csv headers to canonical SystemSetup.setting_key values -SETUP_WIDE_HEADER_TO_KEY: Dict[str, str] = { - "Appl_Title": "appl_title", - "Default_Printer": "default_printer", - **{f"L_Head{i}": f"l_head{i}" for i in range(1, 11)}, -} - - -def _import_setup_wide(rows: List[Dict[str, str]], db: Session, replace_existing: bool) -> Tuple[int, List[Dict[str, Any]]]: - """Import legacy wide-format SETUP.csv as key/value pairs into SystemSetup. - - Each header maps to a setting key; values are taken from the row(s). If multiple rows exist, - subsequent non-empty values will overwrite earlier ones for the same key. - Returns (imported_count, errors). - """ - if replace_existing: - try: - db.query(SystemSetup).delete() - db.commit() - except Exception: - db.rollback() - # Proceed with upserts without clearing if deletion fails - pass - - imported_count = 0 - errors: List[Dict[str, Any]] = [] - for row_index, row in enumerate(rows, start=2): - if not isinstance(row, dict): - continue - for header, key in SETUP_WIDE_HEADER_TO_KEY.items(): - try: - if header not in row: - continue - value = row.get(header) - if value in (None, ""): - continue - existing = db.query(SystemSetup).filter(SystemSetup.setting_key == key).first() - if existing: - existing.setting_value = str(value) - else: - db.add(SystemSetup(setting_key=key, setting_value=str(value), description=f"Imported from SETUP.{header}")) - imported_count += 1 - if imported_count % 100 == 0: - db.commit() - except Exception as e: - db.rollback() - errors.append({"row": row_index, "field": header, "error": str(e)}) - try: - db.commit() - except Exception as e: - db.rollback() - errors.append({"row": None, "field": "commit", "error": str(e)}) - return imported_count, errors - - -def _get_required_fields(model_class) -> List[str]: - """Infer required (non-nullable) fields for a model to avoid DB errors. - - Excludes primary keys (which might be autoincrement or provided) and timestamp mixins. - """ - required = [] - for col in model_class.__table__.columns: - if col.name in {"created_at", "updated_at"}: - continue - if col.primary_key: - # If PK is a string or composite, we cannot assume optional; handle separately - continue - try: - is_required = not getattr(col, "nullable", True) - except Exception: - is_required = False - if is_required: - required.append(col.name) - return required - - -def convert_value(value: str, field_name: str) -> Any: - """Convert string value to appropriate type based on field name""" - if not value or value.strip() == "" or value.strip().lower() in ["null", "none", "n/a"]: - return None - - value = value.strip() - - # Date fields - if any(word in field_name.lower() for word in [ - "date", "dob", "birth", "opened", "closed", "judgment", "valuation", "married", "vests_on", "service", "approved", "filed", "agreement" - ]): - parsed_date = parse_date(value) - return parsed_date - - # Boolean fields - if any(word in field_name.lower() for word in [ - "active", "default_printer", "billed", "transferable", "send", - # PrinterSetup legacy toggles - "phone_book", "rolodex_info", "envelope", "file_cabinet", "accounts", "statements", "calendar" - ]): - if value.lower() in ["true", "1", "yes", "y", "on", "active"]: - return True - elif value.lower() in ["false", "0", "no", "n", "off", "inactive"]: - return False - else: - return None - - # Numeric fields (float) - if any(word in field_name.lower() for word in [ - "rate", "hour", "bal", "fee", "amount", "owing", "transfer", "valu", - "accrued", "vested", "cola", "tax", "percent", "benefit_amount", "mortality", - "value" - ]) or field_name.lower().startswith(("na_", "le_")): - try: - # Remove currency symbols and commas - cleaned_value = value.replace("$", "").replace(",", "").replace("%", "") - return float(cleaned_value) - except ValueError: - return 0.0 - - # Normalize debit_credit textual variants - if field_name.lower() == "debit_credit": - normalized = value.strip().upper() - if normalized in ["D", "DEBIT"]: - return "D" - if normalized in ["C", "CREDIT"]: - return "C" - return normalized[:1] if normalized else None - - # Integer fields - if any(word in field_name.lower() for word in [ - "item_no", "age", "start_age", "version", "line_number", "sort_order", "empl_num", "month", "number" - ]): - try: - return int(float(value)) # Handle cases like "1.0" - except ValueError: - # For employee numbers, return None to skip the record rather than 0 - if "empl_num" in field_name.lower(): - return None - return 0 - - # String fields - limit length to prevent database errors - if len(value) > 500: # Reasonable limit for most string fields - return value[:500] - - return value - - -def validate_foreign_keys(model_data: dict, model_class, db: Session) -> list[str]: - """Validate foreign key relationships before inserting data""" - errors = [] - - # Check Phone -> Rolodex relationship - if model_class == Phone and "rolodex_id" in model_data: - rolodex_id = model_data["rolodex_id"] - if rolodex_id and not db.query(Rolodex).filter(Rolodex.id == rolodex_id).first(): - errors.append(f"Rolodex ID '{rolodex_id}' not found") - - # Check File -> Rolodex relationship - if model_class == File and "id" in model_data: - rolodex_id = model_data["id"] - if rolodex_id and not db.query(Rolodex).filter(Rolodex.id == rolodex_id).first(): - errors.append(f"Owner Rolodex ID '{rolodex_id}' not found") - # Check File -> Footer relationship (default footer on file) - if model_class == File and "footer_code" in model_data: - footer = model_data.get("footer_code") - if footer: - exists = db.query(Footer).filter(Footer.footer_code == footer).first() - if not exists: - errors.append(f"Footer code '{footer}' not found for File") - - # Check FileStatus -> Footer (default footer exists) - if model_class == FileStatus and "footer_code" in model_data: - footer = model_data.get("footer_code") - if footer: - exists = db.query(Footer).filter(Footer.footer_code == footer).first() - if not exists: - errors.append(f"Footer code '{footer}' not found for FileStatus") - - # Check TransactionType -> Footer (default footer exists) - if model_class == TransactionType and "footer_code" in model_data: - footer = model_data.get("footer_code") - if footer: - exists = db.query(Footer).filter(Footer.footer_code == footer).first() - if not exists: - errors.append(f"Footer code '{footer}' not found for TransactionType") - - # Check Ledger -> TransactionType/TransactionCode cross references - if model_class == Ledger: - # Validate t_type exists - if "t_type" in model_data: - t_type_value = model_data.get("t_type") - if t_type_value and not db.query(TransactionType).filter(TransactionType.t_type == t_type_value).first(): - errors.append(f"Transaction type '{t_type_value}' not found") - # Validate t_code exists and matches t_type if both provided - if "t_code" in model_data: - t_code_value = model_data.get("t_code") - if t_code_value: - code_row = db.query(TransactionCode).filter(TransactionCode.t_code == t_code_value).first() - if not code_row: - errors.append(f"Transaction code '{t_code_value}' not found") - else: - ledger_t_type = model_data.get("t_type") - if ledger_t_type and getattr(code_row, "t_type", None) and code_row.t_type != ledger_t_type: - errors.append( - f"Transaction code '{t_code_value}' t_type '{code_row.t_type}' does not match ledger t_type '{ledger_t_type}'" - ) - - # Check Payment -> File and Rolodex relationships - if model_class == Payment: - if "file_no" in model_data: - file_no_value = model_data.get("file_no") - if file_no_value and not db.query(File).filter(File.file_no == file_no_value).first(): - errors.append(f"File number '{file_no_value}' not found for Payment") - if "client_id" in model_data: - client_id_value = model_data.get("client_id") - if client_id_value and not db.query(Rolodex).filter(Rolodex.id == client_id_value).first(): - errors.append(f"Client ID '{client_id_value}' not found for Payment") - - # Check QDRO -> PlanInfo (plan_id exists) - if model_class == QDRO and "plan_id" in model_data: - plan_id = model_data.get("plan_id") - if plan_id: - exists = db.query(PlanInfo).filter(PlanInfo.plan_id == plan_id).first() - if not exists: - errors.append(f"Plan ID '{plan_id}' not found for QDRO") - - # Add more foreign key validations as needed - return errors - - -@router.get("/available-files") -async def get_available_csv_files(current_user: User = Depends(get_current_user)): - """Get list of available CSV files for import""" - return { - "available_files": list(CSV_MODEL_MAPPING.keys()), - "descriptions": { - "ROLODEX.csv": "Customer/contact information", - "ROLEX_V.csv": "Customer/contact information (alias)", - "PHONE.csv": "Phone numbers linked to customers", - "FILES.csv": "Client files and cases", - "FILES_R.csv": "Client files and cases (alias)", - "FILES_V.csv": "Client files and cases (alias)", - "LEDGER.csv": "Financial transactions per file", - "QDROS.csv": "Legal documents and court orders", - "PENSIONS.csv": "Pension calculation data", - "SCHEDULE.csv": "Vesting schedules for pensions", - "MARRIAGE.csv": "Marriage history data", - "DEATH.csv": "Death benefit calculations", - "SEPARATE.csv": "Separation agreements", - "EMPLOYEE.csv": "Staff and employee information", - "STATES.csv": "US States lookup table", - "FILETYPE.csv": "File type categories", - "FILESTAT.csv": "File status codes", - "FOOTERS.csv": "Document footers and signatures", - "DEPOSITS.csv": "Daily bank deposit summaries", - "FILENOTS.csv": "File notes and case memos", - "FVARLKUP.csv": "Form template variables", - "RVARLKUP.csv": "Report template variables", - "PAYMENTS.csv": "Individual payments within deposits", - "TRNSACTN.csv": "Transaction details (maps to Ledger)", - "INX_LKUP.csv": "Form keywords lookup", - "PLANINFO.csv": "Pension plan information", - "RESULTS.csv": "Pension computed results", - "LIFETABL.csv": "Life expectancy table by age, sex, and race (rich typed)", - "NUMBERAL.csv": "Monthly survivor counts by sex and race (rich typed)" - }, - "auto_discovery": True - } - - -@router.get("/template/{file_type}") -async def download_csv_template( - file_type: str, - current_user: User = Depends(get_current_user) -): - """Download a minimal CSV template with required headers and one sample row. - - Supported templates include: {list(CSV_IMPORT_TEMPLATES.keys())} - """ - key = (file_type or "").strip() - if key not in CSV_IMPORT_TEMPLATES: - raise HTTPException(status_code=400, detail=f"Unsupported template type: {file_type}. Choose one of: {list(CSV_IMPORT_TEMPLATES.keys())}") - - content = _generate_csv_template_bytes(key) - - from datetime import datetime as _dt - ts = _dt.now().strftime("%Y%m%d_%H%M%S") - safe_name = key.replace(".csv", "") - filename = f"{safe_name}_template_{ts}.csv" - return StreamingResponse( - iter([content]), - media_type="text/csv", - headers={"Content-Disposition": f"attachment; filename=\"{filename}\""}, - ) - - -@router.get("/templates/bundle") -async def download_csv_templates_bundle( - files: Optional[List[str]] = Query(None, description="Repeat for each CSV template, e.g., files=FILES.csv&files=LEDGER.csv"), - current_user: User = Depends(get_current_user) -): - """Bundle selected CSV templates into a single ZIP. - - Example: GET /api/import/templates/bundle?files=FILES.csv&files=LEDGER.csv - """ - requested = files or [] - if not requested: - raise HTTPException(status_code=400, detail="Specify at least one 'files' query parameter") - - # Normalize and validate - normalized: List[str] = [] - for name in requested: - if not name: - continue - n = name.strip() - if not n.lower().endswith(".csv"): - n = f"{n}.csv" - n = n.upper() - if n in CSV_IMPORT_TEMPLATES: - normalized.append(n) - else: - # Ignore unknowns rather than fail the whole bundle - continue - - # Deduplicate while preserving order - seen = set() - selected = [] - for n in normalized: - if n not in seen: - seen.add(n) - selected.append(n) - - if not selected: - raise HTTPException(status_code=400, detail=f"No supported templates requested. Supported: {list(CSV_IMPORT_TEMPLATES.keys())}") - - zip_buffer = io.BytesIO() - with zipfile.ZipFile(zip_buffer, mode="w", compression=zipfile.ZIP_DEFLATED) as zf: - for fname in selected: - try: - content = _generate_csv_template_bytes(fname) - # Friendly name in zip: _template.csv - base = fname.replace(".CSV", "").upper() - arcname = f"{base}_template.csv" - zf.writestr(arcname, content) - except HTTPException: - # Skip unsupported just in case - continue - - zip_buffer.seek(0) - ts = datetime.now().strftime("%Y%m%d_%H%M%S") - filename = f"csv_templates_{ts}.zip" - return StreamingResponse( - iter([zip_buffer.getvalue()]), - media_type="application/zip", - headers={ - "Content-Disposition": f"attachment; filename=\"{filename}\"" - }, - ) - - -@router.post("/upload/{file_type}") -async def import_csv_data( - file_type: str, - file: UploadFile = UploadFileForm(...), - replace_existing: bool = Form(False), - db: Session = Depends(get_db), - current_user: User = Depends(get_current_user) -): - """Import data from CSV file""" - - # Validate file type - if file_type not in CSV_MODEL_MAPPING: - raise HTTPException( - status_code=400, - detail=f"Unsupported file type: {file_type}. Available types: {list(CSV_MODEL_MAPPING.keys())}" - ) - - # Validate file extension - if not file.filename.endswith('.csv'): - raise HTTPException(status_code=400, detail="File must be a CSV file") - - model_class = CSV_MODEL_MAPPING[file_type] - # Legacy mapping hints used internally by auto-discovery; not used strictly - legacy_hint_map = FIELD_MAPPINGS.get(file_type, {}) - - try: - # Read CSV content - content = await file.read() - - # Try multiple encodings for legacy CSV files - encodings = ENCODINGS - csv_content = None - for encoding in encodings: - try: - csv_content = content.decode(encoding) - break - except UnicodeDecodeError: - continue - - if csv_content is None: - raise HTTPException(status_code=400, detail="Could not decode CSV file. Please ensure it's saved in UTF-8, Windows-1252, or ISO-8859-1 encoding.") - - # Note: preprocess_csv helper removed as unused; robust parsing handled below - - # Custom robust parser for problematic legacy CSV files - class MockCSVReader: - def __init__(self, data, fieldnames): - self.data = data - self.fieldnames = fieldnames - self.index = 0 - - def __iter__(self): - return self - - def __next__(self): - if self.index >= len(self.data): - raise StopIteration - row = self.data[self.index] - self.index += 1 - return row - - try: - lines = csv_content.strip().split('\n') - if not lines: - raise ValueError("Empty CSV file") - - # Parse header using proper CSV parsing - header_reader = csv.reader(io.StringIO(lines[0])) - headers = next(header_reader) - headers = [h.strip() for h in headers] - # Debug logging removed in API path; rely on audit/logging if needed - # Build dynamic header mapping for this file/model - mapping_info = _build_dynamic_mapping(headers, model_class, file_type) - - # Parse data rows with proper CSV parsing - rows_data = [] - skipped_rows = 0 - - for line_num, line in enumerate(lines[1:], start=2): - # Skip empty lines - if not line.strip(): - continue - - try: - # Use proper CSV parsing to handle commas within quoted fields - line_reader = csv.reader(io.StringIO(line)) - fields = next(line_reader) - fields = [f.strip() for f in fields] - - # Skip rows that are clearly malformed (too few fields) - if len(fields) < len(headers) // 2: # Less than half the expected fields - skipped_rows += 1 - continue - - # Pad or truncate to match header length - while len(fields) < len(headers): - fields.append('') - fields = fields[:len(headers)] - - row_dict = dict(zip(headers, fields)) - rows_data.append(row_dict) - - except Exception as row_error: - import_logger.log_import_error(line_num, str(row_error), dict(zip(headers, fields)) if len(fields) <= len(headers) else None) - skipped_rows += 1 - continue - - csv_reader = MockCSVReader(rows_data, headers) - # Parsing summary suppressed to avoid noisy stdout in API - - except Exception as e: - # Keep error minimal for client; internal logging can capture 'e' - raise HTTPException(status_code=400, detail=f"Could not parse CSV file. The file appears to have serious formatting issues. Error: {str(e)}") - - # Special handling: legacy wide-format SETUP.csv (Appl_Title, L_Head1..10, Default_Printer) - if file_type == "SETUP.csv" and _is_setup_wide_format(headers): - imported_count, errors = _import_setup_wide(rows_data, db, replace_existing) - result = { - "file_type": file_type, - "imported_count": imported_count, - "errors": errors[:10], - "total_errors": len(errors), - "auto_mapping": { - "mapped_headers": {}, - "unmapped_headers": headers, - "wide_format": True, - "flexible_saved_rows": 0, - }, - } - if errors: - result["warning"] = f"Import completed with {len(errors)} errors" - return result - - imported_count = 0 - created_count = 0 - updated_count = 0 - errors = [] - flexible_saved = 0 - mapped_headers = mapping_info.get("mapped_headers", {}) - unmapped_headers = mapping_info.get("unmapped_headers", []) - # Special handling: assign line numbers per form for FORM_LST.csv - form_lst_line_counters: Dict[str, int] = {} - - # If replace_existing is True, delete all existing records and related flexible extras - if replace_existing: - db.query(model_class).delete() - db.query(FlexibleImport).filter( - FlexibleImport.file_type == file_type, - FlexibleImport.target_table == model_class.__tablename__, - ).delete() - db.commit() - - for row_num, row in enumerate(csv_reader, start=2): # Start at 2 for header row - try: - # Convert CSV row to model data - model_data: Dict[str, Any] = {} - # Apply discovered mapping - for csv_field, db_field in mapped_headers.items(): - if csv_field in row and db_field is not None: - converted_value = convert_value(row[csv_field], db_field) - if converted_value is not None: - model_data[db_field] = converted_value - - # Inject sequential line_number for FORM_LST rows grouped by form_id - if file_type == "FORM_LST.csv": - form_id_value = model_data.get("form_id") - if form_id_value: - current = form_lst_line_counters.get(str(form_id_value), 0) + 1 - form_lst_line_counters[str(form_id_value)] = current - # Only set if not provided - if "line_number" not in model_data: - model_data["line_number"] = current - - # Skip empty rows - if not any(model_data.values()): - continue - - # Fallback: if required non-nullable fields are missing, store row as flexible only - required_fields = _get_required_fields(model_class) - missing_required = [f for f in required_fields if model_data.get(f) in (None, "")] - if missing_required: - db.add( - FlexibleImport( - file_type=file_type, - target_table=model_class.__tablename__, - primary_key_field=None, - primary_key_value=None, - extra_data={ - "mapped": model_data, - "unmapped": {h: row.get(h) for h in unmapped_headers if row.get(h) not in (None, "")}, - "missing_required": missing_required, - }, - ) - ) - flexible_saved += 1 - # Do not attempt to insert into strict model; continue to next row - continue - - # Special validation for models with required fields - if model_class == Phone: - if 'phone' not in model_data or not model_data['phone']: - continue # Skip phone records without a phone number - - if model_class == Rolodex: - if 'last' not in model_data or not model_data['last']: - continue # Skip rolodex records without a last name/company name - - if model_class == Ledger: - # Skip ledger records without required fields - if 'empl_num' not in model_data or not model_data['empl_num']: - continue # Skip ledger records without employee number - if 'file_no' not in model_data or not model_data['file_no']: - continue # Skip ledger records without file number - - # Create or update model instance - instance = None - # Upsert behavior for printers - if model_class == PrinterSetup: - # Determine primary key field name - _, pk_names = _get_model_columns(model_class) - pk_field_name_local = pk_names[0] if len(pk_names) == 1 else None - pk_value_local = model_data.get(pk_field_name_local) if pk_field_name_local else None - if pk_field_name_local and pk_value_local: - existing = db.query(model_class).filter(getattr(model_class, pk_field_name_local) == pk_value_local).first() - if existing: - # Update mutable fields - for k, v in model_data.items(): - if k != pk_field_name_local: - setattr(existing, k, v) - instance = existing - updated_count += 1 - else: - instance = model_class(**model_data) - db.add(instance) - created_count += 1 - else: - # Fallback to insert if PK missing - instance = model_class(**model_data) - db.add(instance) - created_count += 1 - db.flush() - # Enforce single default - try: - if bool(model_data.get("default_printer")): - db.query(model_class).filter(getattr(model_class, pk_field_name_local) != getattr(instance, pk_field_name_local)).update({model_class.default_printer: False}) - except Exception: - pass - else: - # FK validation for known relationships - fk_errors = validate_foreign_keys(model_data, model_class, db) - if fk_errors: - for msg in fk_errors: - errors.append({"row": row_num, "error": msg}) - # Persist as flexible for traceability - db.add( - FlexibleImport( - file_type=file_type, - target_table=model_class.__tablename__, - primary_key_field=None, - primary_key_value=None, - extra_data={ - "mapped": model_data, - "fk_errors": fk_errors, - }, - ) - ) - flexible_saved += 1 - continue - instance = model_class(**model_data) - db.add(instance) - db.flush() # Ensure PK is available - - # Capture PK details for flexible storage linkage (single-column PKs only) - _, pk_names = _get_model_columns(model_class) - pk_field_name = pk_names[0] if len(pk_names) == 1 else None - pk_value = None - if pk_field_name: - try: - pk_value = getattr(instance, pk_field_name) - except Exception: - pk_value = None - - # Save unmapped fields into flexible storage (privacy-first, per-row JSON) - extra_data = {} - for csv_field in unmapped_headers: - if csv_field in row and row[csv_field] not in (None, ""): - extra_data[csv_field] = row[csv_field] - if extra_data: - db.add( - FlexibleImport( - file_type=file_type, - target_table=model_class.__tablename__, - primary_key_field=pk_field_name, - primary_key_value=str(pk_value) if pk_value is not None else None, - extra_data=extra_data, - ) - ) - flexible_saved += 1 - imported_count += 1 - - # Commit every 100 records to avoid memory issues - if imported_count % 100 == 0: - db.commit() - - except Exception as e: - # Rollback the transaction for this record - db.rollback() - # As a robustness measure, persist row in flexible storage instead of counting as error - try: - db.add( - FlexibleImport( - file_type=file_type, - target_table=model_class.__tablename__, - primary_key_field=None, - primary_key_value=None, - extra_data={ - "mapped": model_data, - "unmapped": {h: row.get(h) for h in unmapped_headers if row.get(h) not in (None, "")}, - "error": str(e), - }, - ) - ) - flexible_saved += 1 - except Exception as flex_e: - errors.append({ - "row": row_num, - "error": f"{str(e)} | Flexible save failed: {str(flex_e)}", - "data": row, - }) - continue - - # Final commit - db.commit() - - result = { - "file_type": file_type, - "imported_count": imported_count, - "errors": errors[:10], # Limit errors to first 10 - "total_errors": len(errors), - "auto_mapping": { - "mapped_headers": mapped_headers, - "unmapped_headers": unmapped_headers, - "flexible_saved_rows": flexible_saved, - }, - "validation": { - "fk_errors": len([e for e in errors if isinstance(e, dict) and 'error' in e and 'not found' in str(e['error']).lower()]) - } - } - # Include create/update breakdown for printers - if file_type == "PRINTERS.csv": - result["created_count"] = created_count - result["updated_count"] = updated_count - - if errors: - result["warning"] = f"Import completed with {len(errors)} errors" - - return result - - except Exception as e: - # Suppress stdout debug prints in API layer - db.rollback() - raise HTTPException(status_code=500, detail=f"Import failed: {str(e)}") - - -@router.get("/status") -async def get_import_status(db: Session = Depends(get_db), current_user: User = Depends(get_current_user)): - """Get current import status and record counts""" - - status = {} - - for file_type, model_class in CSV_MODEL_MAPPING.items(): - try: - count = db.query(model_class).count() - status[file_type] = { - "table_name": model_class.__tablename__, - "record_count": count - } - except Exception as e: - status[file_type] = { - "table_name": model_class.__tablename__, - "record_count": 0, - "error": str(e) - } - - return status - - -@router.delete("/clear/{file_type}") -async def clear_table_data( - file_type: str, - db: Session = Depends(get_db), - current_user: User = Depends(get_current_user) -): - """Clear all data from a specific table""" - - if file_type not in CSV_MODEL_MAPPING: - raise HTTPException(status_code=400, detail=f"Unknown file type: {file_type}") - - model_class = CSV_MODEL_MAPPING[file_type] - - try: - deleted_count = db.query(model_class).count() - db.query(model_class).delete() - # Also clear any flexible rows linked to this target table and file type - db.query(FlexibleImport).filter( - FlexibleImport.file_type == file_type, - FlexibleImport.target_table == model_class.__tablename__, - ).delete() - db.commit() - - return { - "file_type": file_type, - "table_name": model_class.__tablename__, - "deleted_count": deleted_count - } - - except Exception as e: - db.rollback() - raise HTTPException(status_code=500, detail=f"Clear operation failed: {str(e)}") - - -@router.post("/validate/{file_type}") -async def validate_csv_file( - file_type: str, - file: UploadFile = UploadFileForm(...), - current_user: User = Depends(get_current_user) -): - """Validate CSV file structure without importing""" - - if file_type not in CSV_MODEL_MAPPING: - raise HTTPException(status_code=400, detail=f"Unsupported file type: {file_type}") - - if not file.filename.endswith('.csv'): - raise HTTPException(status_code=400, detail="File must be a CSV file") - - # Use auto-discovery mapping for validation - - try: - content = await file.read() - - # Try multiple encodings for legacy CSV files - encodings = ENCODINGS - csv_content = None - for encoding in encodings: - try: - csv_content = content.decode(encoding) - break - except UnicodeDecodeError: - continue - - if csv_content is None: - raise HTTPException(status_code=400, detail="Could not decode CSV file. Please ensure it's saved in UTF-8, Windows-1252, or ISO-8859-1 encoding.") - - rows_list, csv_headers = parse_csv_with_fallback(csv_content) - model_class = CSV_MODEL_MAPPING[file_type] - mapping_info = _build_dynamic_mapping(csv_headers, model_class, file_type) - mapped_headers = mapping_info["mapped_headers"] - unmapped_headers = mapping_info["unmapped_headers"] - # Accept legacy wide-format SETUP.csv as valid (Appl_Title, L_Head1..10, Default_Printer) - wide_ok = (file_type == "SETUP.csv" and _is_setup_wide_format(csv_headers)) - - # Sample data validation - sample_rows = [] - errors = [] - - for row_num, row in enumerate(rows_list, start=2): - if row_num > 12: # Only check first 10 data rows - break - - sample_rows.append(row) - - # Check for data type issues on mapped fields - for csv_field, db_field in mapped_headers.items(): - if csv_field in row and row[csv_field]: - try: - convert_value(row[csv_field], db_field) - except Exception as e: - errors.append({ - "row": row_num, - "field": csv_field, - "value": row[csv_field], - "error": str(e) - }) - - return { - "file_type": file_type, - # Consider valid if we can map at least one column; we don't require exact header match - "valid": (len(mapped_headers) > 0 or wide_ok) and len(errors) == 0, - "headers": { - "found": csv_headers, - "mapped": mapped_headers, - "unmapped": unmapped_headers, - }, - "sample_data": sample_rows, - "validation_errors": errors[:5], # First 5 errors only - "total_errors": len(errors), - "auto_mapping": { - "suggestions": mapping_info["suggestions"], - "wide_format": wide_ok, - }, - } - - except Exception as e: - # Suppress stdout debug prints in API layer - raise HTTPException(status_code=500, detail=f"Validation failed: {str(e)}") - - -@router.get("/progress/{import_id}") -async def get_import_progress( - import_id: str, - current_user: User = Depends(get_current_user) -): - """Get import progress status (placeholder for future implementation)""" - # This would be used for long-running imports with background tasks - return { - "import_id": import_id, - "status": "not_implemented", - "message": "Real-time progress tracking not yet implemented" - } - - -@router.get("/current-batch") -async def get_current_batch( - db: Session = Depends(get_db), - current_user: User = Depends(get_current_user) -): - """Return the most recent running batch import for the current user, if any.""" - try: - row = ( - db.query(ImportAudit) - .filter(ImportAudit.status == "running") - .filter(ImportAudit.initiated_by_user_id == getattr(current_user, "id", None)) - .order_by(ImportAudit.started_at.desc()) - .first() - ) - if not row: - return {"running": False} - return { - "running": True, - "audit_id": row.id, - "started_at": row.started_at.isoformat() if row.started_at else None, - "total_files": row.total_files, - "message": row.message, - } - except Exception as e: - raise HTTPException(status_code=500, detail=f"Failed to get current batch: {str(e)}") - - -@router.get("/batch-progress/{audit_id}") -async def get_batch_progress( - audit_id: int, - db: Session = Depends(get_db), - current_user: User = Depends(get_current_user) -): - """Return real-time progress for a batch import using audit tables as the source of truth.""" - audit = db.query(ImportAudit).filter(ImportAudit.id == audit_id).first() - if not audit: - raise HTTPException(status_code=404, detail="Batch not found") - - # Authorization: allow only the initiating user or admins to view progress - try: - from app.utils.enhanced_auth import is_admin_user - is_admin = is_admin_user(current_user) - except Exception: - is_admin = False - if not is_admin and getattr(current_user, "id", None) != getattr(audit, "initiated_by_user_id", None): - raise HTTPException(status_code=403, detail="Not authorized to view this batch progress") - - # Aggregate per-file results to compute progress - processed_files = db.query(ImportAuditFile).filter(ImportAuditFile.audit_id == audit.id).count() - successful_files = db.query(ImportAuditFile).filter( - ImportAuditFile.audit_id == audit.id, - ImportAuditFile.status.in_(["success", "completed_with_errors", "skipped"]) - ).count() - failed_files = db.query(ImportAuditFile).filter( - ImportAuditFile.audit_id == audit.id, - ImportAuditFile.status == "failed" - ).count() - - total_files = audit.total_files or 0 - percent_complete: float = 0.0 - if total_files > 0: - try: - percent_complete = round((processed_files / total_files) * 100, 1) - except Exception: - percent_complete = 0.0 - - data = { - "audit_id": audit.id, - "status": audit.status, - "total_files": total_files, - "processed_files": processed_files, - "successful_files": successful_files, - "failed_files": failed_files, - "started_at": audit.started_at.isoformat() if audit.started_at else None, - "finished_at": audit.finished_at.isoformat() if audit.finished_at else None, - "percent": percent_complete, - "message": audit.message, - } - - # Include a brief summary of last processed file if desired (best-effort) - try: - last_file = ( - db.query(ImportAuditFile) - .filter(ImportAuditFile.audit_id == audit.id) - .order_by(ImportAuditFile.id.desc()) - .first() - ) - if last_file: - data["last_file"] = { - "file_type": last_file.file_type, - "status": last_file.status, - "imported_count": last_file.imported_count, - "errors": last_file.errors, - "message": last_file.message, - "created_at": last_file.created_at.isoformat() if last_file.created_at else None, - } - except Exception: - pass - - return data - - -@router.post("/batch-validate") -async def batch_validate_csv_files( - files: List[UploadFile] = UploadFileForm(...), - current_user: User = Depends(get_current_user) -): - """Validate multiple CSV files without importing""" - - if len(files) > 25: - raise HTTPException(status_code=400, detail="Maximum 25 files allowed per batch") - - validation_results = [] - - for file in files: - file_type = file.filename - - if file_type not in CSV_MODEL_MAPPING: - validation_results.append({ - "file_type": file_type, - "valid": False, - "error": f"Unsupported file type: {file_type}" - }) - continue - - if not file.filename.endswith('.csv'): - validation_results.append({ - "file_type": file_type, - "valid": False, - "error": "File must be a CSV file" - }) - continue - - model_class = CSV_MODEL_MAPPING.get(file_type) - - try: - content = await file.read() - - # Try multiple encodings for legacy CSV files (include BOM-friendly utf-8-sig) - encodings = ENCODINGS - csv_content = None - for encoding in encodings: - try: - csv_content = content.decode(encoding) - break - except UnicodeDecodeError: - continue - - if csv_content is None: - validation_results.append({ - "file_type": file_type, - "valid": False, - "error": "Could not decode CSV file encoding" - }) - continue - - # Handle CSV parsing issues with legacy files - rows_list, csv_headers = parse_csv_with_fallback(csv_content) - - # Check headers and build dynamic mapping - mapping_info = _build_dynamic_mapping(csv_headers, model_class, file_type) - mapped_headers = mapping_info["mapped_headers"] - unmapped_headers = mapping_info["unmapped_headers"] - header_validation = _validate_required_headers(file_type, mapped_headers) - header_validation = _validate_required_headers(file_type, mapped_headers) - header_validation = _validate_required_headers(file_type, mapped_headers) - header_validation = _validate_required_headers(file_type, mapped_headers) - - # Sample data validation - sample_rows = [] - errors = [] - - for row_num, row in enumerate(rows_list, start=2): - if row_num > 12: # Only check first 10 data rows - break - - sample_rows.append(row) - - # Check for data type issues - for csv_field, db_field in mapped_headers.items(): - if csv_field in row and row[csv_field]: - try: - convert_value(row[csv_field], db_field) - except Exception as e: - errors.append({ - "row": row_num, - "field": csv_field, - "value": row[csv_field], - "error": str(e) - }) - - # Consider valid if we can map at least one column; for SETUP.csv also accept recognized wide-format headers - wide_ok = (file_type == "SETUP.csv" and _is_setup_wide_format(csv_headers)) - validation_results.append({ - "file_type": file_type, - "valid": ((len(mapped_headers) > 0 or wide_ok) and len(errors) == 0 and header_validation.get("ok", True)), - "headers": { - "found": csv_headers, - "mapped": mapped_headers, - "unmapped": unmapped_headers - }, - "header_validation": header_validation, - "sample_data": sample_rows[:5], # Limit sample data for batch operation - "validation_errors": errors[:5], # First 5 errors only - "total_errors": len(errors), - "auto_mapping": { - "suggestions": mapping_info["suggestions"], - "wide_format": wide_ok, - }, - }) - - # Reset file pointer for potential future use - await file.seek(0) - - except Exception as e: - validation_results.append({ - "file_type": file_type, - "valid": False, - "error": f"Validation failed: {str(e)}" - }) - - # Summary statistics - total_files = len(validation_results) - valid_files = len([r for r in validation_results if r["valid"]]) - invalid_files = total_files - valid_files - - return { - "batch_validation_results": validation_results, - "summary": { - "total_files": total_files, - "valid_files": valid_files, - "invalid_files": invalid_files, - "all_valid": invalid_files == 0 - } - } - - -@router.post("/batch-upload") -async def batch_import_csv_files( - files: List[UploadFile] = UploadFileForm(...), - replace_existing: bool = Form(False), - db: Session = Depends(get_db), - current_user: User = Depends(get_current_user) -): - """Import multiple CSV files in optimal order""" - - if len(files) > 25: - raise HTTPException(status_code=400, detail="Maximum 25 files allowed per batch") - - # Define optimal import order based on dependencies - import_order = IMPORT_ORDER - - # Sort uploaded files by optimal import order - file_map = {f.filename: f for f in files} - ordered_files = [] - - for file_type in import_order: - if file_type in file_map: - ordered_files.append((file_type, file_map[file_type])) - del file_map[file_type] - - # Add any remaining files not in the predefined order - for filename, file in file_map.items(): - ordered_files.append((filename, file)) - - results = [] - total_imported = 0 - total_errors = 0 - - # Create import audit row (running) - audit_row = ImportAudit( - status="running", - total_files=len(files), - successful_files=0, - failed_files=0, - total_imported=0, - total_errors=0, - initiated_by_user_id=getattr(current_user, "id", None), - initiated_by_username=getattr(current_user, "username", None), - message="Batch import started", - ) - db.add(audit_row) - db.commit() - db.refresh(audit_row) - # Broadcast initial snapshot - try: - await _broadcast_import_progress(db, audit_row.id) - except Exception: - pass - - # Directory to persist uploaded files for this audit (for reruns) - audit_dir = Path(settings.upload_dir).joinpath("import_audits", str(audit_row.id)) - try: - audit_dir.mkdir(parents=True, exist_ok=True) - except Exception: - pass - - for file_type, file in ordered_files: - if file_type not in CSV_MODEL_MAPPING: - # Fallback flexible-only import for unknown file structures - try: - # Use async file operations for better performance - from app.services.async_file_operations import async_file_ops - - # Stream save to disk for potential reruns and processing - saved_path = None - try: - relative_path = f"import_audits/{audit_row.id}/{file_type}" - saved_file_path, file_size, checksum = await async_file_ops.stream_upload_file( - file, relative_path - ) - saved_path = str(async_file_ops.base_upload_dir / relative_path) - - # Stream read for processing - content = b"" - async for chunk in async_file_ops.stream_read_file(relative_path): - content += chunk - - except Exception as e: - # Fallback to traditional method - await file.seek(0) - content = await file.read() - try: - file_path = audit_dir.joinpath(file_type) - with open(file_path, "wb") as fh: - fh.write(content) - saved_path = str(file_path) - except Exception: - saved_path = None - encodings = ENCODINGS - csv_content = None - for encoding in encodings: - try: - csv_content = content.decode(encoding) - break - except UnicodeDecodeError: - continue - if csv_content is None: - results.append({ - "file_type": file_type, - "status": "failed", - "message": "Could not decode CSV file encoding" - }) - continue - rows_list, headers = parse_csv_with_fallback(csv_content) - flexible_count = 0 - for row in rows_list: - # Save entire row as flexible JSON - db.add( - FlexibleImport( - file_type=file_type, - target_table=None, - primary_key_field=None, - primary_key_value=None, - extra_data=make_json_safe({k: v for k, v in (row or {}).items() if v not in (None, "")}), - ) - ) - flexible_count += 1 - if flexible_count % 200 == 0: - db.commit() - db.commit() - total_imported += flexible_count - # Persist per-file result row - results.append({ - "file_type": file_type, - "status": "success", - "imported_count": flexible_count, - "errors": 0, - "message": f"Stored {flexible_count} rows as flexible data (no known model)", - "auto_mapping": { - "mapped_headers": {}, - "unmapped_headers": list(headers), - "flexible_saved_rows": flexible_count, - }, - }) - try: - db.add(ImportAuditFile( - audit_id=audit_row.id, - file_type=file_type, - status="success", - imported_count=flexible_count, - errors=0, - message=f"Stored {flexible_count} rows as flexible data", - details={"saved_path": saved_path} if saved_path else {} - )) - db.commit() - try: - await _broadcast_import_progress(db, audit_row.id) - except Exception: - pass - except Exception: - db.rollback() - continue - except Exception as e: - db.rollback() - results.append({ - "file_type": file_type, - "status": "failed", - "message": f"Flexible import failed: {str(e)}" - }) - try: - db.add(ImportAuditFile( - audit_id=audit_row.id, - file_type=file_type, - status="failed", - imported_count=0, - errors=1, - message=f"Flexible import failed: {str(e)}", - details={} - )) - db.commit() - try: - await _broadcast_import_progress(db, audit_row.id) - except Exception: - pass - except Exception: - db.rollback() - continue - - try: - # Reset file pointer - await file.seek(0) - - # Import this file using auto-discovery mapping - model_class = CSV_MODEL_MAPPING[file_type] - - content = await file.read() - # Save original upload to disk for potential reruns - saved_path = None - try: - file_path = audit_dir.joinpath(file_type) - with open(file_path, "wb") as fh: - fh.write(content) - saved_path = str(file_path) - except Exception: - saved_path = None - - # Try multiple encodings for legacy CSV files - encodings = ENCODINGS - csv_content = None - for encoding in encodings: - try: - csv_content = content.decode(encoding) - break - except UnicodeDecodeError: - continue - - if csv_content is None: - results.append({ - "file_type": file_type, - "status": "failed", - "message": "Could not decode CSV file encoding" - }) - try: - db.add(ImportAuditFile( - audit_id=audit_row.id, - file_type=file_type, - status="failed", - imported_count=0, - errors=1, - message="Could not decode CSV file encoding", - details={"saved_path": saved_path} if saved_path else {} - )) - db.commit() - try: - await _broadcast_import_progress(db, audit_row.id) - except Exception: - pass - except Exception: - db.rollback() - continue - - # Handle CSV parsing issues with legacy files - rows_list, csv_headers = parse_csv_with_fallback(csv_content) - mapping_info = _build_dynamic_mapping(csv_headers, model_class, file_type) - mapped_headers = mapping_info["mapped_headers"] - unmapped_headers = mapping_info["unmapped_headers"] - header_validation = _validate_required_headers(file_type, mapped_headers) - - # Special handling: legacy wide-format SETUP.csv (Appl_Title, L_Head1..10, Default_Printer) - if file_type == "SETUP.csv" and _is_setup_wide_format(csv_headers): - imported_count, errors = _import_setup_wide(rows_list, db, replace_existing) - total_imported += imported_count - total_errors += len(errors) - results.append({ - "file_type": file_type, - "status": "success" if not errors else "completed_with_errors", - "imported_count": imported_count, - "errors": len(errors), - "message": f"Imported {imported_count} settings" + (f" with {len(errors)} errors" if errors else ""), - "auto_mapping": { - "mapped_headers": {}, - "unmapped_headers": csv_headers, - "wide_format": True, - "flexible_saved_rows": 0, - }, - }) - try: - db.add(ImportAuditFile( - audit_id=audit_row.id, - file_type=file_type, - status="success" if not errors else "completed_with_errors", - imported_count=imported_count, - errors=len(errors), - message=f"Imported {imported_count} settings" + (f" with {len(errors)} errors" if errors else ""), - details={ - "mapped_headers": [], - "unmapped_count": len(csv_headers), - "flexible_saved_rows": 0, - "wide_format": True, - "header_validation": header_validation, - } - )) - db.commit() - try: - await _broadcast_import_progress(db, audit_row.id) - except Exception: - pass - except Exception: - db.rollback() - continue - - imported_count = 0 - errors = [] - flexible_saved = 0 - fk_error_summary: Dict[str, int] = {} - # Special handling: assign line numbers per form for FORM_LST.csv - form_lst_line_counters: Dict[str, int] = {} - - # If replace_existing is True and this is the first file of this type - if replace_existing: - db.query(model_class).delete() - db.query(FlexibleImport).filter( - FlexibleImport.file_type == file_type, - FlexibleImport.target_table == model_class.__tablename__, - ).delete() - db.commit() - - for row_num, row in enumerate(rows_list, start=2): - try: - model_data = {} - for csv_field, db_field in mapped_headers.items(): - if csv_field in row and db_field is not None: - converted_value = convert_value(row[csv_field], db_field) - if converted_value is not None: - model_data[db_field] = converted_value - - # Inject sequential line_number for FORM_LST rows grouped by form_id - if file_type == "FORM_LST.csv": - form_id_value = model_data.get("form_id") - if form_id_value: - current = form_lst_line_counters.get(str(form_id_value), 0) + 1 - form_lst_line_counters[str(form_id_value)] = current - if "line_number" not in model_data: - model_data["line_number"] = current - - if not any(model_data.values()): - continue - - # Fallback: if required non-nullable fields are missing, store row as flexible only - required_fields = _get_required_fields(model_class) - missing_required = [f for f in required_fields if model_data.get(f) in (None, "")] - if missing_required: - db.add( - FlexibleImport( - file_type=file_type, - target_table=model_class.__tablename__, - primary_key_field=None, - primary_key_value=None, - extra_data=make_json_safe({ - "mapped": model_data, - "unmapped": {h: row.get(h) for h in unmapped_headers if row.get(h) not in (None, "")}, - "missing_required": missing_required, - }), - ) - ) - flexible_saved += 1 - continue - - # Special validation for models with required fields - if model_class == Phone: - if 'phone' not in model_data or not model_data['phone']: - continue # Skip phone records without a phone number - - if model_class == Rolodex: - if 'last' not in model_data or not model_data['last']: - continue # Skip rolodex records without a last name/company name - - if model_class == Ledger: - # Skip ledger records without required fields - if 'empl_num' not in model_data or not model_data['empl_num']: - continue # Skip ledger records without employee number - if 'file_no' not in model_data or not model_data['file_no']: - continue # Skip ledger records without file number - - # FK validation for known relationships - fk_errors = validate_foreign_keys(model_data, model_class, db) - if fk_errors: - for msg in fk_errors: - errors.append({"row": row_num, "error": msg}) - fk_error_summary[msg] = fk_error_summary.get(msg, 0) + 1 - db.add( - FlexibleImport( - file_type=file_type, - target_table=model_class.__tablename__, - primary_key_field=None, - primary_key_value=None, - extra_data=make_json_safe({ - "mapped": model_data, - "fk_errors": fk_errors, - }), - ) - ) - flexible_saved += 1 - continue - instance = model_class(**model_data) - db.add(instance) - db.flush() - - # Link flexible extras - _, pk_names = _get_model_columns(model_class) - pk_field_name = pk_names[0] if len(pk_names) == 1 else None - pk_value = None - if pk_field_name: - try: - pk_value = getattr(instance, pk_field_name) - except Exception: - pk_value = None - extra_data = {} - for csv_field in unmapped_headers: - if csv_field in row and row[csv_field] not in (None, ""): - extra_data[csv_field] = row[csv_field] - if extra_data: - db.add( - FlexibleImport( - file_type=file_type, - target_table=model_class.__tablename__, - primary_key_field=pk_field_name, - primary_key_value=str(pk_value) if pk_value is not None else None, - extra_data=make_json_safe(extra_data), - ) - ) - flexible_saved += 1 - imported_count += 1 - - if imported_count % 100 == 0: - db.commit() - - except Exception as e: - # Rollback the transaction for this record - db.rollback() - # Persist row in flexible storage instead of counting as error only - try: - db.add( - FlexibleImport( - file_type=file_type, - target_table=model_class.__tablename__, - primary_key_field=None, - primary_key_value=None, - extra_data=make_json_safe({ - "mapped": model_data, - "unmapped": {h: row.get(h) for h in unmapped_headers if row.get(h) not in (None, "")}, - "error": str(e), - }), - ) - ) - flexible_saved += 1 - except Exception as flex_e: - errors.append({ - "row": row_num, - "error": f"{str(e)} | Flexible save failed: {str(flex_e)}", - }) - continue - - db.commit() - - total_imported += imported_count - total_errors += len(errors) - - results.append({ - "file_type": file_type, - "status": "success" if (len(errors) == 0 and header_validation.get("ok", True)) else "completed_with_errors", - "imported_count": imported_count, - "errors": len(errors), - "message": f"Imported {imported_count} records" + (f" with {len(errors)} errors" if errors else ""), - "header_validation": header_validation, - "validation": { - "fk_errors_total": sum(fk_error_summary.values()), - "fk_error_summary": fk_error_summary, - }, - "auto_mapping": { - "mapped_headers": mapped_headers, - "unmapped_headers": unmapped_headers, - "flexible_saved_rows": flexible_saved, - }, - }) - try: - db.add(ImportAuditFile( - audit_id=audit_row.id, - file_type=file_type, - status="success" if (len(errors) == 0 and header_validation.get("ok", True)) else "completed_with_errors", - imported_count=imported_count, - errors=len(errors), - message=f"Imported {imported_count} records" + (f" with {len(errors)} errors" if errors else ""), - details={ - "mapped_headers": list(mapped_headers.keys()), - "unmapped_count": len(unmapped_headers), - "flexible_saved_rows": flexible_saved, - "fk_errors_total": sum(fk_error_summary.values()), - "fk_error_summary": fk_error_summary, - "header_validation": header_validation, - **({"saved_path": saved_path} if saved_path else {}), - } - )) - db.commit() - try: - await _broadcast_import_progress(db, audit_row.id) - except Exception: - pass - except Exception: - db.rollback() - - except Exception as e: - db.rollback() - results.append({ - "file_type": file_type, - "status": "failed", - "message": f"Import failed: {str(e)}" - }) - try: - db.add(ImportAuditFile( - audit_id=audit_row.id, - file_type=file_type, - status="failed", - imported_count=0, - errors=1, - message=f"Import failed: {str(e)}", - details={"saved_path": saved_path} if saved_path else {} - )) - db.commit() - try: - await _broadcast_import_progress(db, audit_row.id) - except Exception: - pass - except Exception: - db.rollback() - - summary = { - "total_files": len(files), - "successful_files": len([r for r in results if r["status"] in ["success", "completed_with_errors"]]), - "failed_files": len([r for r in results if r["status"] == "failed"]), - "total_imported": total_imported, - "total_errors": total_errors - } - - # Finalize audit row - try: - audit_row.successful_files = summary["successful_files"] - audit_row.failed_files = summary["failed_files"] - audit_row.total_imported = summary["total_imported"] - audit_row.total_errors = summary["total_errors"] - audit_row.status = "success" if summary["failed_files"] == 0 and summary["total_errors"] == 0 else ( - "completed_with_errors" if summary["successful_files"] > 0 else "failed" - ) - audit_row.message = f"Batch import completed: {audit_row.successful_files}/{audit_row.total_files} files" - audit_row.finished_at = datetime.now(timezone.utc) - audit_row.details = { - "files": [ - {"file_type": r.get("file_type"), "status": r.get("status"), "imported_count": r.get("imported_count", 0), "errors": r.get("errors", 0)} - for r in results - ] - } - db.add(audit_row) - db.commit() - try: - await _broadcast_import_progress(db, audit_row.id) - except Exception: - pass - except Exception: - db.rollback() - - return { - "batch_results": results, - "summary": summary - } - - -@router.websocket("/batch-progress/ws/{audit_id}") -async def ws_import_batch_progress(websocket: WebSocket, audit_id: int): - """WebSocket: subscribe to real-time updates for an import audit using the pool.""" - # Authenticate first (without accepting) to enforce authorization before subscribing - user = await websocket_manager.authenticate_websocket(websocket) - if not user: - # Authentication failure handled here to avoid accepting unauthorized connections - try: - await websocket.close(code=4401, reason="Authentication failed") - except Exception: - pass - return - - # Authorization: only initiating user or admins may subscribe to this audit stream - db = SessionLocal() - try: - audit = db.query(ImportAudit).filter(ImportAudit.id == audit_id).first() - if not audit: - try: - await websocket.close(code=4404, reason="Batch not found") - except Exception: - pass - return - - is_admin = bool(getattr(user, "is_admin", False)) - if not is_admin and getattr(user, "id", None) != getattr(audit, "initiated_by_user_id", None): - import_logger.warning( - "Unauthorized WS subscription attempt for import batch", - audit_id=audit_id, - user_id=getattr(user, "id", None), - username=getattr(user, "username", None), - ) - try: - await websocket.close(code=4403, reason="Not authorized to subscribe to this batch") - except Exception: - pass - return - finally: - db.close() - - topic = f"import_batch_progress_{audit_id}" - - async def handle_ws_message(connection_id: str, message: WebSocketMessage): - # No-op for now; reserved for future client messages - import_logger.debug( - "Import WS msg", - connection_id=connection_id, - audit_id=audit_id, - type=message.type, - ) - - connection_id = await websocket_manager.handle_connection( - websocket=websocket, - topics={topic}, - require_auth=True, - metadata={"audit_id": audit_id, "endpoint": "import_batch_progress"}, - message_handler=handle_ws_message, - ) - - if connection_id: - db = SessionLocal() - try: - audit = db.query(ImportAudit).filter(ImportAudit.id == audit_id).first() - payload = _aggregate_batch_progress(db, audit) if audit else None - if payload: - initial_message = WebSocketMessage( - type="progress", - topic=topic, - data=payload, - ) - await websocket_manager.pool._send_to_connection(connection_id, initial_message) - finally: - db.close() - - -@router.get("/recent-batches") -async def recent_batch_imports( - limit: int = Query(5, ge=1, le=50), - offset: int = Query(0, ge=0), - status: Optional[str] = Query(None, description="Filter by status: running|success|completed_with_errors|failed"), - start: Optional[str] = Query(None, description="ISO datetime start for started_at filter"), - end: Optional[str] = Query(None, description="ISO datetime end for started_at filter"), - db: Session = Depends(get_db), - current_user: User = Depends(get_current_user) -): - """Return recent batch import audit rows (most recent first) with optional filters and pagination.""" - q = db.query(ImportAudit) - if status and status.lower() != "all": - q = q.filter(ImportAudit.status == status) - # Date range filters on started_at - try: - if start: - start_dt = datetime.fromisoformat(start) - q = q.filter(ImportAudit.started_at >= start_dt) - except Exception: - pass - try: - if end: - end_dt = datetime.fromisoformat(end) - q = q.filter(ImportAudit.started_at <= end_dt) - except Exception: - pass - total = q.count() - rows = ( - q.order_by(ImportAudit.started_at.desc()) - .offset(offset) - .limit(limit) - .all() - ) - def _row(r: ImportAudit): - return { - "id": r.id, - "started_at": r.started_at.isoformat() if r.started_at else None, - "finished_at": r.finished_at.isoformat() if r.finished_at else None, - "status": r.status, - "total_files": r.total_files, - "successful_files": r.successful_files, - "failed_files": r.failed_files, - "total_imported": r.total_imported, - "total_errors": r.total_errors, - "initiated_by": r.initiated_by_username, - "message": r.message, - } - return {"recent": [_row(r) for r in rows], "total": total, "limit": limit, "offset": offset} - - -@router.get("/recent-batches/{audit_id}") -async def get_batch_details( - audit_id: int, - db: Session = Depends(get_db), - current_user: User = Depends(get_current_user) -): - """Return a specific audit entry with per-file details.""" - audit = db.query(ImportAudit).filter(ImportAudit.id == audit_id).first() - if not audit: - raise HTTPException(status_code=404, detail="Audit entry not found") - files = ( - db.query(ImportAuditFile) - .filter(ImportAuditFile.audit_id == audit.id) - .order_by(ImportAuditFile.id.asc()) - .all() - ) - def _row(r: ImportAudit): - return { - "id": r.id, - "started_at": r.started_at.isoformat() if r.started_at else None, - "finished_at": r.finished_at.isoformat() if r.finished_at else None, - "status": r.status, - "total_files": r.total_files, - "successful_files": r.successful_files, - "failed_files": r.failed_files, - "total_imported": r.total_imported, - "total_errors": r.total_errors, - "initiated_by": r.initiated_by_username, - "message": r.message, - "details": r.details or {}, - } - def _file(f: ImportAuditFile): - return { - "id": f.id, - "file_type": f.file_type, - "status": f.status, - "imported_count": f.imported_count, - "errors": f.errors, - "message": f.message, - "details": f.details or {}, - "created_at": f.created_at.isoformat() if f.created_at else None, - } - return {"audit": _row(audit), "files": [_file(f) for f in files]} - - -@router.post("/recent-batches/{audit_id}/rerun-failed") -async def rerun_failed_files( - audit_id: int, - replace_existing: bool = Form(False), - db: Session = Depends(get_db), - current_user: User = Depends(get_current_user) -): - """Re-run only failed files for a given audit. Creates a new audit entry for the rerun.""" - prior = db.query(ImportAudit).filter(ImportAudit.id == audit_id).first() - if not prior: - raise HTTPException(status_code=404, detail="Audit entry not found") - failed_files: List[ImportAuditFile] = ( - db.query(ImportAuditFile) - .filter(ImportAuditFile.audit_id == audit_id, ImportAuditFile.status == "failed") - .all() - ) - if not failed_files: - raise HTTPException(status_code=400, detail="No failed files to rerun for this audit") - - # Build list of (file_type, path) that exist - items: List[Tuple[str, str]] = [] - for f in failed_files: - saved_path = None - try: - saved_path = (f.details or {}).get("saved_path") - except Exception: - saved_path = None - if saved_path and os.path.exists(saved_path): - items.append((f.file_type, saved_path)) - if not items: - raise HTTPException(status_code=400, detail="No saved files available to rerun. Upload again.") - - # Import order for sorting - import_order = IMPORT_ORDER - order_index = {name: i for i, name in enumerate(import_order)} - items.sort(key=lambda x: order_index.get(x[0], len(import_order) + 1)) - - # Create new audit row for rerun - rerun_audit = ImportAudit( - status="running", - total_files=len(items), - successful_files=0, - failed_files=0, - total_imported=0, - total_errors=0, - initiated_by_user_id=getattr(current_user, "id", None), - initiated_by_username=getattr(current_user, "username", None), - message=f"Rerun failed files for audit #{audit_id}", - details={"rerun_of": audit_id}, - ) - db.add(rerun_audit) - db.commit() - db.refresh(rerun_audit) - - # Directory to persist rerun files - rerun_dir = Path(settings.upload_dir).joinpath("import_audits", str(rerun_audit.id)) - try: - rerun_dir.mkdir(parents=True, exist_ok=True) - except Exception: - pass - - results: List[Dict[str, Any]] = [] - total_imported = 0 - total_errors = 0 - - for file_type, path in items: - try: - with open(path, "rb") as fh: - content = fh.read() - # Save a copy under the rerun audit - saved_path = None - try: - file_path = rerun_dir.joinpath(file_type) - with open(file_path, "wb") as out: - out.write(content) - saved_path = str(file_path) - except Exception: - saved_path = None - - if file_type not in CSV_MODEL_MAPPING: - # Flexible-only path - encodings = ENCODINGS - csv_content = None - for enc in encodings: - try: - csv_content = content.decode(enc) - break - except UnicodeDecodeError: - continue - if csv_content is None: - results.append({"file_type": file_type, "status": "failed", "message": "Could not decode CSV file encoding"}) - try: - db.add(ImportAuditFile( - audit_id=rerun_audit.id, - file_type=file_type, - status="failed", - imported_count=0, - errors=1, - message="Could not decode CSV file encoding", - details={"saved_path": saved_path} if saved_path else {} - )) - db.commit() - except Exception: - db.rollback() - continue - - rows_list, _headers = parse_csv_with_fallback(csv_content) - flexible_count = 0 - for row in rows_list: - db.add( - FlexibleImport( - file_type=file_type, - target_table=None, - primary_key_field=None, - primary_key_value=None, - extra_data=make_json_safe({k: v for k, v in (row or {}).items() if v not in (None, "")}), - ) - ) - flexible_count += 1 - if flexible_count % 200 == 0: - db.commit() - db.commit() - total_imported += flexible_count - results.append({ - "file_type": file_type, - "status": "success", - "imported_count": flexible_count, - "errors": 0, - "message": f"Stored {flexible_count} rows as flexible data (no known model)", - }) - try: - db.add(ImportAuditFile( - audit_id=rerun_audit.id, - file_type=file_type, - status="success", - imported_count=flexible_count, - errors=0, - message=f"Stored {flexible_count} rows as flexible data", - details={"saved_path": saved_path} if saved_path else {} - )) - db.commit() - except Exception: - db.rollback() - continue - - # Known model path - model_class = CSV_MODEL_MAPPING[file_type] - encodings = ENCODINGS - csv_content = None - for enc in encodings: - try: - csv_content = content.decode(enc) - break - except UnicodeDecodeError: - continue - if csv_content is None: - results.append({"file_type": file_type, "status": "failed", "message": "Could not decode CSV file encoding"}) - try: - db.add(ImportAuditFile( - audit_id=rerun_audit.id, - file_type=file_type, - status="failed", - imported_count=0, - errors=1, - message="Could not decode CSV file encoding", - details={"saved_path": saved_path} if saved_path else {} - )) - db.commit() - except Exception: - db.rollback() - continue - - csv_reader = csv.DictReader(io.StringIO(csv_content), delimiter=',', quotechar='"', quoting=csv.QUOTE_MINIMAL) - csv_headers = csv_reader.fieldnames or [] - mapping_info = _build_dynamic_mapping(csv_headers, model_class, file_type) - mapped_headers = mapping_info["mapped_headers"] - unmapped_headers = mapping_info["unmapped_headers"] - header_validation = _validate_required_headers(file_type, mapped_headers) - imported_count = 0 - errors: List[Dict[str, Any]] = [] - # Special handling: assign line numbers per form for FORM_LST.csv - form_lst_line_counters: Dict[str, int] = {} - - if replace_existing: - db.query(model_class).delete() - db.query(FlexibleImport).filter( - FlexibleImport.file_type == file_type, - FlexibleImport.target_table == model_class.__tablename__, - ).delete() - db.commit() - - for row_num, row in enumerate(csv_reader, start=2): - try: - model_data: Dict[str, Any] = {} - for csv_field, db_field in mapped_headers.items(): - if csv_field in row and db_field is not None: - converted_value = convert_value(row[csv_field], db_field) - if converted_value is not None: - model_data[db_field] = converted_value - # Inject sequential line_number for FORM_LST rows grouped by form_id - if file_type == "FORM_LST.csv": - form_id_value = model_data.get("form_id") - if form_id_value: - current = form_lst_line_counters.get(str(form_id_value), 0) + 1 - form_lst_line_counters[str(form_id_value)] = current - if "line_number" not in model_data: - model_data["line_number"] = current - if not any(model_data.values()): - continue - required_fields = _get_required_fields(model_class) - missing_required = [f for f in required_fields if model_data.get(f) in (None, "")] - if missing_required: - db.add( - FlexibleImport( - file_type=file_type, - target_table=model_class.__tablename__, - primary_key_field=None, - primary_key_value=None, - extra_data={ - "mapped": model_data, - "unmapped": {h: row.get(h) for h in unmapped_headers if row.get(h) not in (None, "")}, - "missing_required": missing_required, - }, - ) - ) - continue - - if model_class == Phone and (not model_data.get('phone')): - continue - if model_class == Rolodex and (not model_data.get('last')): - continue - if model_class == Ledger and (not model_data.get('empl_num') or not model_data.get('file_no')): - continue - - instance = model_class(**model_data) - db.add(instance) - db.flush() - - _, pk_names = _get_model_columns(model_class) - pk_field_name = pk_names[0] if len(pk_names) == 1 else None - pk_value = None - if pk_field_name: - try: - pk_value = getattr(instance, pk_field_name) - except Exception: - pk_value = None - extra_data = {} - for csv_field in unmapped_headers: - if csv_field in row and row[csv_field] not in (None, ""): - extra_data[csv_field] = row[csv_field] - if extra_data: - db.add( - FlexibleImport( - file_type=file_type, - target_table=model_class.__tablename__, - primary_key_field=pk_field_name, - primary_key_value=str(pk_value) if pk_value is not None else None, - extra_data=extra_data, - ) - ) - imported_count += 1 - if imported_count % 100 == 0: - db.commit() - except Exception as e: - db.rollback() - try: - db.add( - FlexibleImport( - file_type=file_type, - target_table=model_class.__tablename__, - primary_key_field=None, - primary_key_value=None, - extra_data={ - "mapped": model_data, - "unmapped": {h: row.get(h) for h in unmapped_headers if row.get(h) not in (None, "")}, - "error": str(e), - }, - ) - ) - except Exception: - errors.append({"row": row_num, "error": str(e)}) - continue - - db.commit() - total_imported += imported_count - total_errors += len(errors) - results.append({ - "file_type": file_type, - "status": "success" if (len(errors) == 0 and header_validation.get("ok", True)) else "completed_with_errors", - "imported_count": imported_count, - "errors": len(errors), - "message": f"Imported {imported_count} records" + (f" with {len(errors)} errors" if errors else ""), - "header_validation": header_validation, - }) - try: - db.add(ImportAuditFile( - audit_id=rerun_audit.id, - file_type=file_type, - status="success" if (len(errors) == 0 and header_validation.get("ok", True)) else "completed_with_errors", - imported_count=imported_count, - errors=len(errors), - message=f"Imported {imported_count} records" + (f" with {len(errors)} errors" if errors else ""), - details={**({"saved_path": saved_path} if saved_path else {}), "header_validation": header_validation} - )) - db.commit() - except Exception: - db.rollback() - - except Exception as e: - db.rollback() - results.append({"file_type": file_type, "status": "failed", "message": f"Import failed: {str(e)}"}) - try: - db.add(ImportAuditFile( - audit_id=rerun_audit.id, - file_type=file_type, - status="failed", - imported_count=0, - errors=1, - message=f"Import failed: {str(e)}", - details={} - )) - db.commit() - except Exception: - db.rollback() - - # Finalize rerun audit - summary = { - "total_files": len(items), - "successful_files": len([r for r in results if r["status"] in ["success", "completed_with_errors"]]), - "failed_files": len([r for r in results if r["status"] == "failed"]), - "total_imported": total_imported, - "total_errors": total_errors, - } - try: - rerun_audit.successful_files = summary["successful_files"] - rerun_audit.failed_files = summary["failed_files"] - rerun_audit.total_imported = summary["total_imported"] - rerun_audit.total_errors = summary["total_errors"] - rerun_audit.status = "success" if summary["failed_files"] == 0 and summary["total_errors"] == 0 else ( - "completed_with_errors" if summary["successful_files"] > 0 else "failed" - ) - rerun_audit.message = f"Rerun completed: {rerun_audit.successful_files}/{rerun_audit.total_files} files" - rerun_audit.finished_at = datetime.now(timezone.utc) - rerun_audit.details = {"rerun_of": audit_id} - db.add(rerun_audit) - db.commit() - except Exception: - db.rollback() - - return {"batch_results": results, "summary": summary, "rerun_audit_id": rerun_audit.id} - -@router.post("/upload-flexible") -async def upload_flexible_only( - file: UploadFile = UploadFileForm(...), - replace_existing: bool = Form(False), - db: Session = Depends(get_db), - current_user: User = Depends(get_current_user), -): - """Flexible-only single-file upload. - - Accepts any CSV and stores each row as a `FlexibleImport` record with `target_table=None`. - """ - # Ensure CSV - if not file.filename or not file.filename.lower().endswith(".csv"): - raise HTTPException(status_code=400, detail="File must be a CSV file") - - file_type = file.filename - - try: - # Optionally clear prior flexible rows for this file_type - if replace_existing: - db.query(FlexibleImport).filter( - FlexibleImport.file_type == file_type, - FlexibleImport.target_table == None, # noqa: E711 - ).delete() - db.commit() - - content = await file.read() - encodings = ENCODINGS - csv_content = None - for encoding in encodings: - try: - csv_content = content.decode(encoding) - break - except UnicodeDecodeError: - continue - if csv_content is None: - raise HTTPException(status_code=400, detail="Could not decode CSV file encoding") - - reader = csv.DictReader(io.StringIO(csv_content), delimiter=",", quotechar='"', quoting=csv.QUOTE_MINIMAL) - headers = reader.fieldnames or [] - - imported_count = 0 - for row in reader: - payload = {k: v for k, v in (row or {}).items() if v not in (None, "")} - db.add( - FlexibleImport( - file_type=file_type, - target_table=None, - primary_key_field=None, - primary_key_value=None, - extra_data=payload, - ) - ) - imported_count += 1 - if imported_count % 200 == 0: - db.commit() - - db.commit() - - return { - "file_type": file_type, - "imported_count": imported_count, - "errors": [], - "total_errors": 0, - "auto_mapping": { - "mapped_headers": {}, - "unmapped_headers": list(headers), - "flexible_saved_rows": imported_count, - }, - "message": f"Stored {imported_count} rows as flexible data (no known model)", - } - except HTTPException: - raise - except Exception as e: - db.rollback() - raise HTTPException(status_code=500, detail=f"Flexible upload failed: {str(e)}") \ No newline at end of file diff --git a/app/auth/security.py b/app/auth/security.py index 1f2b045..c8f7c50 100644 --- a/app/auth/security.py +++ b/app/auth/security.py @@ -20,6 +20,7 @@ pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") # JWT Security security = HTTPBearer() +optional_security = HTTPBearer(auto_error=False) def verify_password(plain_password: str, hashed_password: str) -> bool: @@ -190,6 +191,31 @@ def get_current_user( return user +def get_optional_current_user( + credentials: Optional[HTTPAuthorizationCredentials] = Depends(optional_security), + db: Session = Depends(get_db) +) -> Optional[User]: + """Get current authenticated user, but allow None if not authenticated""" + if not credentials: + return None + + try: + token = credentials.credentials + username = verify_token(token) + + if username is None: + return None + + user = db.query(User).filter(User.username == username).first() + if user is None or not user.is_active: + return None + + return user + + except Exception: + return None + + def get_admin_user(current_user: User = Depends(get_current_user)) -> User: """Require admin privileges""" if not current_user.is_admin: diff --git a/app/database/base.py b/app/database/base.py index 168ea16..0f5b6a3 100644 --- a/app/database/base.py +++ b/app/database/base.py @@ -9,7 +9,15 @@ from app.config import settings engine = create_engine( settings.database_url, - connect_args={"check_same_thread": False} if "sqlite" in settings.database_url else {} + connect_args={ + "check_same_thread": False, + # SQLite performance optimizations for bulk imports + "timeout": 30, + } if "sqlite" in settings.database_url else {}, + # Performance settings for bulk operations + pool_pre_ping=True, + pool_recycle=3600, # Recycle connections after 1 hour + echo=False # Set to True for SQL debugging ) SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) diff --git a/app/import_export/__init__.py b/app/import_export/__init__.py new file mode 100644 index 0000000..841a20f --- /dev/null +++ b/app/import_export/__init__.py @@ -0,0 +1,6 @@ +""" +Import/Export module for Delphi Database System + +This module provides clean, modular CSV import functionality +for all database tables. +""" \ No newline at end of file diff --git a/app/main.py b/app/main.py index c679510..766986e 100644 --- a/app/main.py +++ b/app/main.py @@ -160,8 +160,6 @@ from app.api.documents import router as documents_router from app.api.billing import router as billing_router from app.api.search import router as search_router from app.api.admin import router as admin_router -from app.api.import_data import router as import_router -from app.api.flexible import router as flexible_router from app.api.support import router as support_router from app.api.settings import router as settings_router from app.api.mortality import router as mortality_router @@ -189,10 +187,8 @@ app.include_router(billing_router, prefix="/api/billing", tags=["billing"]) app.include_router(documents_router, prefix="/api/documents", tags=["documents"]) app.include_router(search_router, prefix="/api/search", tags=["search"]) app.include_router(admin_router, prefix="/api/admin", tags=["admin"]) -app.include_router(import_router, prefix="/api/import", tags=["import"]) app.include_router(support_router, prefix="/api/support", tags=["support"]) app.include_router(settings_router, prefix="/api/settings", tags=["settings"]) -app.include_router(flexible_router, prefix="/api") app.include_router(mortality_router, prefix="/api/mortality", tags=["mortality"]) app.include_router(pensions_router, prefix="/api/pensions", tags=["pensions"]) app.include_router(pension_valuation_router, prefix="/api/pensions", tags=["pensions-valuation"]) @@ -288,22 +284,10 @@ async def admin_page(request: Request): ) -@app.get("/import", response_class=HTMLResponse) -async def import_page(request: Request): - """Data import management page (admin only)""" - return templates.TemplateResponse( - "import.html", - {"request": request, "title": "Data Import - " + settings.app_name} - ) -@app.get("/flexible", response_class=HTMLResponse) -async def flexible_page(request: Request): - """Flexible imports admin page (admin only).""" - return templates.TemplateResponse( - "flexible.html", - {"request": request, "title": "Flexible Imports - " + settings.app_name} - ) + + @app.get("/health") diff --git a/app/models/flexible.py b/app/models/flexible.py deleted file mode 100644 index 3bccacf..0000000 --- a/app/models/flexible.py +++ /dev/null @@ -1,37 +0,0 @@ -""" -Flexible storage for unmapped CSV columns during import -""" -from sqlalchemy import Column, Integer, String -from sqlalchemy.types import JSON - -from app.models.base import BaseModel - - -class FlexibleImport(BaseModel): - """Stores per-row extra/unmapped data for any import, without persisting mapping patterns.""" - - __tablename__ = "flexible_imports" - - id = Column(Integer, primary_key=True, autoincrement=True) - - # The CSV filename used by the importer (e.g., "FILES.csv" or arbitrary names in flexible mode) - file_type = Column(String(120), nullable=False, index=True) - - # The SQLAlchemy model table this extra data is associated with (if any) - target_table = Column(String(120), nullable=True, index=True) - - # Optional link to the primary record created in the target table - primary_key_field = Column(String(120), nullable=True) - primary_key_value = Column(String(255), nullable=True, index=True) - - # Extra unmapped columns from the CSV row - extra_data = Column(JSON, nullable=False) - - def __repr__(self) -> str: # pragma: no cover - repr utility - return ( - f"" - ) - - diff --git a/static/js/flexible.js b/static/js/flexible.js deleted file mode 100644 index 96edabc..0000000 --- a/static/js/flexible.js +++ /dev/null @@ -1,314 +0,0 @@ -(function() { - const apiBase = '/api/flexible'; - let state = { - fileType: '', - targetTable: '', - q: '', - skip: 0, - limit: 50, - total: 0, - hasKeys: [], - }; - - function q(id) { return document.getElementById(id); } - - function formatPreviewHtml(obj, term) { - // Returns sanitized HTML with clickable keys - try { - const payload = obj && obj.unmapped && typeof obj.unmapped === 'object' ? obj.unmapped : obj; - const keys = Object.keys(payload || {}).slice(0, 5); - const segments = keys.map((k) => { - const safeKey = window.htmlSanitizer.escape(String(k)); - const valueStr = String(payload[k]).slice(0, 60); - const valueHtml = term && term.trim().length > 0 ? highlight(valueStr, term) : window.htmlSanitizer.escape(valueStr); - return `: ${valueHtml}`; - }); - return segments.join(', '); - } catch (_) { return ''; } - } - - function escapeRegExp(str) { - return String(str).replace(/[.*+?^${}()|[\]\\]/g, '\\$&'); - } - - function highlight(text, term) { - if (!term) return window.htmlSanitizer.escape(text); - const pattern = new RegExp(escapeRegExp(term), 'ig'); - const escaped = window.htmlSanitizer.escape(text); - // Replace on the escaped string to avoid breaking HTML - return escaped.replace(pattern, (m) => `${window.htmlSanitizer.escape(m)}`); - } - - async function loadOptions() { - try { - const res = await window.http.wrappedFetch(`${apiBase}/options`); - if (!res.ok) throw await window.http.toError(res, 'Failed to load options'); - const data = await res.json(); - const fileSel = q('filterFileType'); - const tableSel = q('filterTargetTable'); - // Clear existing except first - fileSel.length = 1; tableSel.length = 1; - (data.file_types || []).forEach(v => { - const opt = document.createElement('option'); - opt.value = v; opt.textContent = v; fileSel.appendChild(opt); - }); - (data.target_tables || []).forEach(v => { - const opt = document.createElement('option'); - opt.value = v; opt.textContent = v; tableSel.appendChild(opt); - }); - } catch (e) { - alert(window.http.formatAlert(e, 'Error loading options')); - } - } - - async function loadRows() { - try { - const params = new URLSearchParams(); - if (state.fileType) params.set('file_type', state.fileType); - if (state.targetTable) params.set('target_table', state.targetTable); - if (state.q) params.set('q', state.q); - if (Array.isArray(state.hasKeys)) { - state.hasKeys.forEach((k) => { - if (k && String(k).trim().length > 0) params.append('has_keys', String(k).trim()); - }); - } - params.set('skip', String(state.skip)); - params.set('limit', String(state.limit)); - const res = await window.http.wrappedFetch(`${apiBase}/imports?${params.toString()}`); - if (!res.ok) throw await window.http.toError(res, 'Failed to load flexible imports'); - const data = await res.json(); - state.total = data.total || 0; - renderRows(data.items || []); - renderMeta(); - renderKeyChips(); - } catch (e) { - alert(window.http.formatAlert(e, 'Error loading flexible imports')); - } - } - - function renderRows(items) { - const tbody = q('flexibleRows'); - tbody.innerHTML = ''; - items.forEach(item => { - const tr = document.createElement('tr'); - tr.className = 'hover:bg-neutral-50 dark:hover:bg-neutral-700/40 cursor-pointer'; - tr.innerHTML = ` - ${item.id} - ${window.htmlSanitizer.escape(item.file_type || '')} - ${window.htmlSanitizer.escape(item.target_table || '')} - ${window.htmlSanitizer.escape((item.primary_key_field || '') + (item.primary_key_value ? '=' + item.primary_key_value : ''))} - - - - - `; - // Set sanitized highlighted preview - const previewCell = tr.querySelector('.previewCell'); - const previewHtml = formatPreviewHtml(item.extra_data || {}, state.q); - window.setSafeHTML(previewCell, previewHtml); - // Bind click on keys to add filters - previewCell.querySelectorAll('.key-link').forEach((btn) => { - btn.addEventListener('click', (ev) => { - ev.stopPropagation(); - const key = btn.getAttribute('data-key') || ''; - addKeyFilter(key); - }); - }); - // Row click opens modal - tr.addEventListener('click', (ev) => { - // Ignore clicks on the export button inside the row - const target = ev.target.closest('button[data-action="export"]'); - if (target) return; - openDetailModal(item); - }); - // Export button handler - tr.querySelector('button[data-action="export"]').addEventListener('click', (ev) => { - ev.stopPropagation(); - exportSingleRow(item.id); - }); - tbody.appendChild(tr); - }); - } - - function renderMeta() { - const start = state.total === 0 ? 0 : state.skip + 1; - const end = Math.min(state.skip + state.limit, state.total); - q('rowsMeta').textContent = `Showing ${start}-${end} of ${state.total}`; - q('prevPageBtn').disabled = state.skip === 0; - q('nextPageBtn').disabled = state.skip + state.limit >= state.total; - } - - function applyFilters() { - state.fileType = q('filterFileType').value || ''; - state.targetTable = q('filterTargetTable').value || ''; - state.q = (q('quickSearch').value || '').trim(); - state.skip = 0; - loadRows(); - } - - function addKeyFilter(key) { - const k = String(key || '').trim(); - if (!k) return; - if (!Array.isArray(state.hasKeys)) state.hasKeys = []; - if (!state.hasKeys.includes(k)) { - state.hasKeys.push(k); - state.skip = 0; - loadRows(); - } - } - - function removeKeyFilter(key) { - const k = String(key || '').trim(); - if (!k) return; - state.hasKeys = (state.hasKeys || []).filter((x) => x !== k); - state.skip = 0; - loadRows(); - } - - function clearKeyFilters() { - if ((state.hasKeys || []).length === 0) return; - state.hasKeys = []; - state.skip = 0; - loadRows(); - } - - function renderKeyChips() { - const container = q('keyChipsContainer'); - const chipsWrap = q('keyChips'); - const clearBtn = q('clearKeyChips'); - if (!container || !chipsWrap) return; - chipsWrap.innerHTML = ''; - const keys = state.hasKeys || []; - if (keys.length === 0) { - container.classList.add('hidden'); - } else { - container.classList.remove('hidden'); - keys.forEach((k) => { - const btn = document.createElement('button'); - btn.type = 'button'; - btn.className = 'inline-flex items-center gap-1 px-2 py-1 rounded-full text-xs bg-primary-50 text-primary-700 border border-primary-200 hover:bg-primary-100 dark:bg-primary-900/30 dark:text-primary-200 dark:border-primary-800'; - btn.setAttribute('data-chip-key', k); - btn.innerHTML = `${window.htmlSanitizer.escape(k)} `; - btn.addEventListener('click', (ev) => { - ev.stopPropagation(); - removeKeyFilter(k); - }); - chipsWrap.appendChild(btn); - }); - } - if (clearBtn) { - clearBtn.onclick = (ev) => { ev.preventDefault(); clearKeyFilters(); }; - } - } - - async function exportCsv() { - try { - const params = new URLSearchParams(); - if (state.fileType) params.set('file_type', state.fileType); - if (state.targetTable) params.set('target_table', state.targetTable); - if (Array.isArray(state.hasKeys)) { - state.hasKeys.forEach((k) => { - if (k && String(k).trim().length > 0) params.append('has_keys', String(k).trim()); - }); - } - const url = `${apiBase}/export?${params.toString()}`; - const res = await window.http.wrappedFetch(url); - if (!res.ok) throw await window.http.toError(res, 'Export failed'); - const blob = await res.blob(); - const a = document.createElement('a'); - const objectUrl = URL.createObjectURL(blob); - a.href = objectUrl; - a.download = 'flexible_unmapped.csv'; - document.body.appendChild(a); - a.click(); - a.remove(); - setTimeout(() => URL.revokeObjectURL(objectUrl), 1000); - } catch (e) { - alert(window.http.formatAlert(e, 'Error exporting CSV')); - } - } - - async function exportSingleRow(rowId) { - try { - const res = await window.http.wrappedFetch(`${apiBase}/export/${rowId}`); - if (!res.ok) throw await window.http.toError(res, 'Export failed'); - const blob = await res.blob(); - const a = document.createElement('a'); - const objectUrl = URL.createObjectURL(blob); - a.href = objectUrl; - a.download = `flexible_row_${rowId}.csv`; - document.body.appendChild(a); - a.click(); - a.remove(); - setTimeout(() => URL.revokeObjectURL(objectUrl), 1000); - } catch (e) { - alert(window.http.formatAlert(e, 'Error exporting row CSV')); - } - } - - function openDetailModal(item) { - // Populate fields - q('detailRowId').textContent = `#${item.id}`; - q('detailFileType').textContent = item.file_type || ''; - q('detailTargetTable').textContent = item.target_table || ''; - q('detailPkField').textContent = item.primary_key_field || ''; - q('detailPkValue').textContent = item.primary_key_value || ''; - try { - const pretty = JSON.stringify(item.extra_data || {}, null, 2); - q('detailJson').textContent = pretty; - } catch (_) { - q('detailJson').textContent = ''; - } - const exportBtn = q('detailExportBtn'); - exportBtn.onclick = () => exportSingleRow(item.id); - openModal('flexibleDetailModal'); - } - - function bindEvents() { - q('applyFiltersBtn').addEventListener('click', applyFilters); - q('exportCsvBtn').addEventListener('click', exportCsv); - const clearBtn = q('clearKeyChips'); - if (clearBtn) clearBtn.addEventListener('click', (ev) => { ev.preventDefault(); clearKeyFilters(); }); - // Quick search with debounce - const searchInput = q('quickSearch'); - let searchTimer = null; - searchInput.addEventListener('input', () => { - const value = searchInput.value || ''; - clearTimeout(searchTimer); - searchTimer = setTimeout(() => { - state.q = value.trim(); - state.skip = 0; - loadRows(); - }, 300); - }); - searchInput.addEventListener('keydown', (ev) => { - if (ev.key === 'Enter') { - ev.preventDefault(); - clearTimeout(searchTimer); - state.q = (searchInput.value || '').trim(); - state.skip = 0; - loadRows(); - } - }); - q('prevPageBtn').addEventListener('click', () => { - state.skip = Math.max(0, state.skip - state.limit); - loadRows(); - }); - q('nextPageBtn').addEventListener('click', () => { - if (state.skip + state.limit < state.total) { - state.skip += state.limit; - loadRows(); - } - }); - } - - document.addEventListener('DOMContentLoaded', () => { - bindEvents(); - loadOptions().then(loadRows); - }); -})(); - - diff --git a/static/js/main.js b/static/js/main.js index 68d8407..0e10f71 100644 --- a/static/js/main.js +++ b/static/js/main.js @@ -199,14 +199,25 @@ function initializeBatchProgressUI() { async function cancelBatch(batchId) { try { - const resp = await window.http.wrappedFetch(`/api/billing/statements/batch-progress/${encodeURIComponent(batchId)}`, { method: 'DELETE' }); - if (!resp.ok) { - throw await window.http.toError(resp, 'Failed to cancel batch'); + if (!confirm(`Are you sure you want to cancel batch ${batchId}?`)) { + return; } + + // Import functionality removed + + const result = await resp.json(); + console.log('Import batch cancelled:', result.message); + // Let stream update the row; no-op here + // The progress will be updated via WebSocket } catch (e) { - console.warn('Cancel failed', e); - try { alert(window.http.formatAlert(e, 'Cancel failed')); } catch (_) {} + console.warn('Cancel import batch failed', e); + try { + const errorMsg = window.http.formatAlert(e, 'Cancel import batch failed'); + alert(errorMsg); + } catch (_) { + alert('Failed to cancel import batch'); + } } } @@ -464,14 +475,7 @@ async function checkUserPermissions() { const adminDivider = document.getElementById('admin-menu-divider'); if (adminItem) adminItem.classList.remove('hidden'); if (adminDivider) adminDivider.classList.remove('hidden'); - const importDesktop = document.getElementById('nav-import-desktop'); - const importMobile = document.getElementById('nav-import-mobile'); - if (importDesktop) importDesktop.classList.remove('hidden'); - if (importMobile) importMobile.classList.remove('hidden'); - const flexibleDesktop = document.getElementById('nav-flexible-desktop'); - const flexibleMobile = document.getElementById('nav-flexible-mobile'); - if (flexibleDesktop) flexibleDesktop.classList.remove('hidden'); - if (flexibleMobile) flexibleMobile.classList.remove('hidden'); + // Import navigation items removed } const userDropdownName = document.querySelector('#userDropdown button span'); if (user.full_name && userDropdownName) { diff --git a/templates/flexible.html b/templates/flexible.html deleted file mode 100644 index c1dbb78..0000000 --- a/templates/flexible.html +++ /dev/null @@ -1,113 +0,0 @@ -{% extends "base.html" %} - -{% block content %} -
-
-

Flexible Imports

-
- -
-
- -
-
-
- - -
-
- - -
-
- - -
-
- -
-
- - -
- -
-
- - - - - - - - - - - - - -
IDFile TypeTarget TablePKUnmapped PreviewActions
-
-
-
Loading...
-
- - -
-
-
- - - -
-{% endblock %} - -{% block extra_scripts %} - -{% endblock %} - - diff --git a/templates/import.html b/templates/import.html deleted file mode 100644 index 51f769f..0000000 --- a/templates/import.html +++ /dev/null @@ -1,1849 +0,0 @@ -{% extends "base.html" %} - -{% block title %}Data Import - Delphi Database{% endblock %} - -{% block content %} -
- -
-
-
- -
-

Data Import

-
-
- -
-
- - -
-
-
- - Current Database Status -
-
-
-
-
- -

Loading import status...

-
-
-
-
- - -
-
-
-
- - Upload CSV Files -
-
- - - - -
-
-
-
- - - - - -
-
- - - - - - - - - - - - - - -
-
-
- - Recent Batch Uploads -
-
-
-
- -

Loading recent batches...

-
-
-
- - -
-
-
- - Data Management -
-
-
-
-
-
Clear Table Data
-

Remove all records from a specific table (cannot be undone)

-
- - -
-
-
-
Quick Actions
-
- - -
-
-
-
-
-
- - -{% endblock %} \ No newline at end of file