Files
delphi-database/app/api/import_data.py
2025-08-11 21:58:25 -05:00

964 lines
33 KiB
Python

"""
Data import API endpoints for CSV file uploads
"""
import csv
import io
from datetime import datetime
from typing import List, Dict, Any, Optional
from fastapi import APIRouter, Depends, HTTPException, UploadFile, File as UploadFileForm, Form
from sqlalchemy.orm import Session
from app.database.base import get_db
from app.auth.security import get_current_user
from app.models.user import User
from app.models.rolodex import Rolodex, Phone
from app.models.files import File
from app.models.ledger import Ledger
from app.models.qdro import QDRO
from app.models.pensions import Pension, PensionSchedule, MarriageHistory, DeathBenefit, SeparationAgreement, LifeTable, NumberTable
from app.models.lookups import Employee, FileType, FileStatus, TransactionType, TransactionCode, State, GroupLookup, Footer, PlanInfo, FormIndex, FormList, PrinterSetup, SystemSetup
from app.models.additional import Payment, Deposit, FileNote, FormVariable, ReportVariable
router = APIRouter(tags=["import"])
# CSV to Model mapping
CSV_MODEL_MAPPING = {
"ROLODEX.csv": Rolodex,
"PHONE.csv": Phone,
"FILES.csv": File,
"LEDGER.csv": Ledger,
"QDROS.csv": QDRO,
"PENSIONS.csv": Pension,
"SCHEDULE.csv": PensionSchedule,
"MARRIAGE.csv": MarriageHistory,
"DEATH.csv": DeathBenefit,
"SEPARATE.csv": SeparationAgreement,
"LIFETABL.csv": LifeTable,
"NUMBERAL.csv": NumberTable,
"EMPLOYEE.csv": Employee,
"FILETYPE.csv": FileType,
"FILESTAT.csv": FileStatus,
"TRNSTYPE.csv": TransactionType,
"TRNSLKUP.csv": TransactionCode,
"STATES.csv": State,
"GRUPLKUP.csv": GroupLookup,
"FOOTERS.csv": Footer,
"PLANINFO.csv": PlanInfo,
"FORM_INX.csv": FormIndex,
"FORM_LST.csv": FormList,
"PRINTERS.csv": PrinterSetup,
"SETUP.csv": SystemSetup,
# Additional models for complete legacy coverage
"DEPOSITS.csv": Deposit,
"FILENOTS.csv": FileNote,
"FVARLKUP.csv": FormVariable,
"RVARLKUP.csv": ReportVariable,
"PAYMENTS.csv": Payment,
"TRNSACTN.csv": Ledger # Maps to existing Ledger model (same structure)
}
# Field mappings for CSV columns to database fields
FIELD_MAPPINGS = {
"ROLODEX.csv": {
"Id": "id",
"Prefix": "prefix",
"First": "first",
"Middle": "middle",
"Last": "last",
"Suffix": "suffix",
"Title": "title",
"A1": "a1",
"A2": "a2",
"A3": "a3",
"City": "city",
"Abrev": "abrev",
"St": None, # Full state name - skip this field as model only has abrev
"Zip": "zip",
"Email": "email",
"DOB": "dob",
"SS#": "ss_number",
"Legal_Status": "legal_status",
"Group": "group",
"Memo": "memo"
},
"PHONE.csv": {
"Id": "rolodex_id",
"Phone": "phone",
"Location": "location"
},
"FILES.csv": {
"File_No": "file_no",
"Id": "id",
"File_Type": "file_type",
"Regarding": "regarding",
"Opened": "opened",
"Closed": "closed",
"Empl_Num": "empl_num",
"Rate_Per_Hour": "rate_per_hour",
"Status": "status",
"Footer_Code": "footer_code",
"Opposing": "opposing",
"Hours": "hours",
"Hours_P": "hours_p",
"Trust_Bal": "trust_bal",
"Trust_Bal_P": "trust_bal_p",
"Hourly_Fees": "hourly_fees",
"Hourly_Fees_P": "hourly_fees_p",
"Flat_Fees": "flat_fees",
"Flat_Fees_P": "flat_fees_p",
"Disbursements": "disbursements",
"Disbursements_P": "disbursements_p",
"Credit_Bal": "credit_bal",
"Credit_Bal_P": "credit_bal_p",
"Total_Charges": "total_charges",
"Total_Charges_P": "total_charges_p",
"Amount_Owing": "amount_owing",
"Amount_Owing_P": "amount_owing_p",
"Transferable": "transferable",
"Memo": "memo"
},
"LEDGER.csv": {
"File_No": "file_no",
"Date": "date",
"Item_No": "item_no",
"Empl_Num": "empl_num",
"T_Code": "t_code",
"T_Type": "t_type",
"T_Type_L": "t_type_l",
"Quantity": "quantity",
"Rate": "rate",
"Amount": "amount",
"Billed": "billed",
"Note": "note"
},
"QDROS.csv": {
"File_No": "file_no",
"Version": "version",
"Plan_Id": "plan_id",
"^1": "field1",
"^2": "field2",
"^Part": "part",
"^AltP": "altp",
"^Pet": "pet",
"^Res": "res",
"Case_Type": "case_type",
"Case_Code": "case_code",
"Section": "section",
"Case_Number": "case_number",
"Judgment_Date": "judgment_date",
"Valuation_Date": "valuation_date",
"Married_On": "married_on",
"Percent_Awarded": "percent_awarded",
"Ven_City": "ven_city",
"Ven_Cnty": "ven_cnty",
"Ven_St": "ven_st",
"Draft_Out": "draft_out",
"Draft_Apr": "draft_apr",
"Final_Out": "final_out",
"Judge": "judge",
"Form_Name": "form_name"
},
"PENSIONS.csv": {
"File_No": "file_no",
"Version": "version",
"Plan_Id": "plan_id",
"Plan_Name": "plan_name",
"Title": "title",
"First": "first",
"Last": "last",
"Birth": "birth",
"Race": "race",
"Sex": "sex",
"Info": "info",
"Valu": "valu",
"Accrued": "accrued",
"Vested_Per": "vested_per",
"Start_Age": "start_age",
"COLA": "cola",
"Max_COLA": "max_cola",
"Withdrawal": "withdrawal",
"Pre_DR": "pre_dr",
"Post_DR": "post_dr",
"Tax_Rate": "tax_rate"
},
"EMPLOYEE.csv": {
"Empl_Num": "empl_num",
"Rate_Per_Hour": "rate_per_hour"
# "Empl_Id": not a field in Employee model, using empl_num as identifier
# Model has additional fields (first_name, last_name, title, etc.) not in CSV
},
"STATES.csv": {
"Abrev": "abbreviation",
"St": "name"
},
"GRUPLKUP.csv": {
"Code": "group_code",
"Description": "description"
# "Title": field not present in model, skipping
},
"TRNSLKUP.csv": {
"T_Code": "t_code",
"T_Type": "t_type",
# "T_Type_L": not a field in TransactionCode model
"Amount": "default_rate",
"Description": "description"
},
"TRNSTYPE.csv": {
"T_Type": "t_type",
"T_Type_L": "description"
# "Header": maps to debit_credit but needs data transformation
# "Footer": doesn't align with active boolean field
# These fields may need custom handling or model updates
},
"FILETYPE.csv": {
"File_Type": "type_code",
"Description": "description",
"Default_Rate": "default_rate"
},
"FILESTAT.csv": {
"Status_Code": "status_code",
"Description": "description",
"Sort_Order": "sort_order"
},
"FOOTERS.csv": {
"Footer_Code": "footer_code",
"Content": "content",
"Description": "description"
},
"PLANINFO.csv": {
"Plan_Id": "plan_id",
"Plan_Name": "plan_name",
"Plan_Type": "plan_type",
"Sponsor": "sponsor",
"Administrator": "administrator",
"Address1": "address1",
"Address2": "address2",
"City": "city",
"State": "state",
"Zip_Code": "zip_code",
"Phone": "phone",
"Notes": "notes"
},
"FORM_INX.csv": {
"Form_Id": "form_id",
"Form_Name": "form_name",
"Category": "category"
},
"FORM_LST.csv": {
"Form_Id": "form_id",
"Line_Number": "line_number",
"Content": "content"
},
"PRINTERS.csv": {
"Printer_Name": "printer_name",
"Description": "description",
"Driver": "driver",
"Port": "port",
"Default_Printer": "default_printer"
},
"SETUP.csv": {
"Setting_Key": "setting_key",
"Setting_Value": "setting_value",
"Description": "description",
"Setting_Type": "setting_type"
},
"SCHEDULE.csv": {
"File_No": "file_no",
"Version": "version",
"Vests_On": "vests_on",
"Vests_At": "vests_at"
},
"MARRIAGE.csv": {
"File_No": "file_no",
"Version": "version",
"Marriage_Date": "marriage_date",
"Separation_Date": "separation_date",
"Divorce_Date": "divorce_date"
},
"DEATH.csv": {
"File_No": "file_no",
"Version": "version",
"Benefit_Type": "benefit_type",
"Benefit_Amount": "benefit_amount",
"Beneficiary": "beneficiary"
},
"SEPARATE.csv": {
"File_No": "file_no",
"Version": "version",
"Agreement_Date": "agreement_date",
"Terms": "terms"
},
"LIFETABL.csv": {
"Age": "age",
"Male_Mortality": "male_mortality",
"Female_Mortality": "female_mortality"
},
"NUMBERAL.csv": {
"Table_Name": "table_name",
"Age": "age",
"Value": "value"
},
# Additional CSV file mappings
"DEPOSITS.csv": {
"Deposit_Date": "deposit_date",
"Total": "total"
},
"FILENOTS.csv": {
"File_No": "file_no",
"Memo_Date": "memo_date",
"Memo_Note": "memo_note"
},
"FVARLKUP.csv": {
"Identifier": "identifier",
"Query": "query",
"Response": "response"
},
"RVARLKUP.csv": {
"Identifier": "identifier",
"Query": "query"
},
"PAYMENTS.csv": {
"Deposit_Date": "deposit_date",
"File_No": "file_no",
"Id": "client_id",
"Regarding": "regarding",
"Amount": "amount",
"Note": "note"
},
"TRNSACTN.csv": {
# Maps to Ledger model - same structure as LEDGER.csv
"File_No": "file_no",
"Date": "date",
"Item_No": "item_no",
"Empl_Num": "empl_num",
"T_Code": "t_code",
"T_Type": "t_type",
"T_Type_L": "t_type_l",
"Quantity": "quantity",
"Rate": "rate",
"Amount": "amount",
"Billed": "billed",
"Note": "note"
}
}
def parse_date(date_str: str) -> Optional[datetime]:
"""Parse date string in various formats"""
if not date_str or date_str.strip() == "":
return None
date_formats = [
"%Y-%m-%d",
"%m/%d/%Y",
"%d/%m/%Y",
"%m-%d-%Y",
"%d-%m-%Y",
"%Y/%m/%d"
]
for fmt in date_formats:
try:
return datetime.strptime(date_str.strip(), fmt).date()
except ValueError:
continue
return None
def convert_value(value: str, field_name: str) -> Any:
"""Convert string value to appropriate type based on field name"""
if not value or value.strip() == "" or value.strip().lower() in ["null", "none", "n/a"]:
return None
value = value.strip()
# Date fields
if any(word in field_name.lower() for word in ["date", "dob", "birth", "opened", "closed", "judgment", "valuation", "married", "vests_on"]):
parsed_date = parse_date(value)
return parsed_date
# Boolean fields
if any(word in field_name.lower() for word in ["active", "default_printer", "billed", "transferable"]):
if value.lower() in ["true", "1", "yes", "y", "on", "active"]:
return True
elif value.lower() in ["false", "0", "no", "n", "off", "inactive"]:
return False
else:
return None
# Numeric fields (float)
if any(word in field_name.lower() for word in ["rate", "hour", "bal", "fee", "amount", "owing", "transfer", "valu", "accrued", "vested", "cola", "tax", "percent", "benefit_amount", "mortality", "value"]):
try:
# Remove currency symbols and commas
cleaned_value = value.replace("$", "").replace(",", "").replace("%", "")
return float(cleaned_value)
except ValueError:
return 0.0
# Integer fields
if any(word in field_name.lower() for word in ["item_no", "age", "start_age", "version", "line_number", "sort_order"]):
try:
return int(float(value)) # Handle cases like "1.0"
except ValueError:
return 0
# String fields - limit length to prevent database errors
if len(value) > 500: # Reasonable limit for most string fields
return value[:500]
return value
def validate_foreign_keys(model_data: dict, model_class, db: Session) -> list[str]:
"""Validate foreign key relationships before inserting data"""
errors = []
# Check Phone -> Rolodex relationship
if model_class == Phone and "rolodex_id" in model_data:
rolodex_id = model_data["rolodex_id"]
if rolodex_id and not db.query(Rolodex).filter(Rolodex.id == rolodex_id).first():
errors.append(f"Rolodex ID '{rolodex_id}' not found")
# Check File -> Rolodex relationship
if model_class == File and "id" in model_data:
rolodex_id = model_data["id"]
if rolodex_id and not db.query(Rolodex).filter(Rolodex.id == rolodex_id).first():
errors.append(f"Owner Rolodex ID '{rolodex_id}' not found")
# Add more foreign key validations as needed
return errors
@router.get("/available-files")
async def get_available_csv_files(current_user: User = Depends(get_current_user)):
"""Get list of available CSV files for import"""
return {
"available_files": list(CSV_MODEL_MAPPING.keys()),
"descriptions": {
"ROLODEX.csv": "Customer/contact information",
"PHONE.csv": "Phone numbers linked to customers",
"FILES.csv": "Client files and cases",
"LEDGER.csv": "Financial transactions per file",
"QDROS.csv": "Legal documents and court orders",
"PENSIONS.csv": "Pension calculation data",
"EMPLOYEE.csv": "Staff and employee information",
"STATES.csv": "US States lookup table",
"FILETYPE.csv": "File type categories",
"FILESTAT.csv": "File status codes",
"DEPOSITS.csv": "Daily bank deposit summaries",
"FILENOTS.csv": "File notes and case memos",
"FVARLKUP.csv": "Form template variables",
"RVARLKUP.csv": "Report template variables",
"PAYMENTS.csv": "Individual payments within deposits",
"TRNSACTN.csv": "Transaction details (maps to Ledger)"
}
}
@router.post("/upload/{file_type}")
async def import_csv_data(
file_type: str,
file: UploadFile = UploadFileForm(...),
replace_existing: bool = Form(False),
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""Import data from CSV file"""
# Validate file type
if file_type not in CSV_MODEL_MAPPING:
raise HTTPException(
status_code=400,
detail=f"Unsupported file type: {file_type}. Available types: {list(CSV_MODEL_MAPPING.keys())}"
)
# Validate file extension
if not file.filename.endswith('.csv'):
raise HTTPException(status_code=400, detail="File must be a CSV file")
model_class = CSV_MODEL_MAPPING[file_type]
field_mapping = FIELD_MAPPINGS.get(file_type, {})
try:
# Read CSV content
content = await file.read()
# Try multiple encodings for legacy CSV files
encodings = ['utf-8', 'windows-1252', 'iso-8859-1', 'cp1252']
csv_content = None
for encoding in encodings:
try:
csv_content = content.decode(encoding)
break
except UnicodeDecodeError:
continue
if csv_content is None:
raise HTTPException(status_code=400, detail="Could not decode CSV file. Please ensure it's saved in UTF-8, Windows-1252, or ISO-8859-1 encoding.")
# Preprocess CSV content to fix common legacy issues
def preprocess_csv(content):
lines = content.split('\n')
cleaned_lines = []
i = 0
while i < len(lines):
line = lines[i]
# If line doesn't have the expected number of commas, it might be a broken multi-line field
if i == 0: # Header line
cleaned_lines.append(line)
expected_comma_count = line.count(',')
i += 1
continue
# Check if this line has the expected number of commas
if line.count(',') < expected_comma_count:
# This might be a continuation of the previous line
# Try to merge with previous line
if cleaned_lines:
cleaned_lines[-1] += " " + line.replace('\n', ' ').replace('\r', ' ')
else:
cleaned_lines.append(line)
else:
cleaned_lines.append(line)
i += 1
return '\n'.join(cleaned_lines)
# Custom robust parser for problematic legacy CSV files
class MockCSVReader:
def __init__(self, data, fieldnames):
self.data = data
self.fieldnames = fieldnames
self.index = 0
def __iter__(self):
return self
def __next__(self):
if self.index >= len(self.data):
raise StopIteration
row = self.data[self.index]
self.index += 1
return row
try:
lines = csv_content.strip().split('\n')
if not lines:
raise ValueError("Empty CSV file")
# Parse header using proper CSV parsing
header_reader = csv.reader(io.StringIO(lines[0]))
headers = next(header_reader)
headers = [h.strip() for h in headers]
print(f"DEBUG: Found {len(headers)} headers: {headers}")
# Parse data rows with proper CSV parsing
rows_data = []
skipped_rows = 0
for line_num, line in enumerate(lines[1:], start=2):
# Skip empty lines
if not line.strip():
continue
try:
# Use proper CSV parsing to handle commas within quoted fields
line_reader = csv.reader(io.StringIO(line))
fields = next(line_reader)
fields = [f.strip() for f in fields]
# Skip rows that are clearly malformed (too few fields)
if len(fields) < len(headers) // 2: # Less than half the expected fields
skipped_rows += 1
continue
# Pad or truncate to match header length
while len(fields) < len(headers):
fields.append('')
fields = fields[:len(headers)]
row_dict = dict(zip(headers, fields))
rows_data.append(row_dict)
except Exception as row_error:
print(f"Skipping malformed row {line_num}: {row_error}")
skipped_rows += 1
continue
csv_reader = MockCSVReader(rows_data, headers)
print(f"SUCCESS: Parsed {len(rows_data)} rows (skipped {skipped_rows} malformed rows)")
except Exception as e:
print(f"Custom parsing failed: {e}")
raise HTTPException(status_code=400, detail=f"Could not parse CSV file. The file appears to have serious formatting issues. Error: {str(e)}")
imported_count = 0
errors = []
# If replace_existing is True, delete all existing records
if replace_existing:
db.query(model_class).delete()
db.commit()
for row_num, row in enumerate(csv_reader, start=2): # Start at 2 for header row
try:
# Convert CSV row to model data
model_data = {}
for csv_field, db_field in field_mapping.items():
if csv_field in row and db_field is not None: # Skip fields mapped to None
converted_value = convert_value(row[csv_field], csv_field)
if converted_value is not None:
model_data[db_field] = converted_value
# Skip empty rows
if not any(model_data.values()):
continue
# Special validation for models with required fields
if model_class == Phone:
if 'phone' not in model_data or not model_data['phone']:
continue # Skip phone records without a phone number
if model_class == Rolodex:
if 'last' not in model_data or not model_data['last']:
continue # Skip rolodex records without a last name/company name
# Create model instance
instance = model_class(**model_data)
db.add(instance)
imported_count += 1
# Commit every 100 records to avoid memory issues
if imported_count % 100 == 0:
db.commit()
except Exception as e:
errors.append({
"row": row_num,
"error": str(e),
"data": row
})
continue
# Final commit
db.commit()
result = {
"file_type": file_type,
"imported_count": imported_count,
"errors": errors[:10], # Limit errors to first 10
"total_errors": len(errors)
}
if errors:
result["warning"] = f"Import completed with {len(errors)} errors"
return result
except Exception as e:
print(f"IMPORT ERROR DEBUG: {type(e).__name__}: {str(e)}")
import traceback
print(f"TRACEBACK: {traceback.format_exc()}")
db.rollback()
raise HTTPException(status_code=500, detail=f"Import failed: {str(e)}")
@router.get("/status")
async def get_import_status(db: Session = Depends(get_db), current_user: User = Depends(get_current_user)):
"""Get current import status and record counts"""
status = {}
for file_type, model_class in CSV_MODEL_MAPPING.items():
try:
count = db.query(model_class).count()
status[file_type] = {
"table_name": model_class.__tablename__,
"record_count": count
}
except Exception as e:
status[file_type] = {
"table_name": model_class.__tablename__,
"record_count": 0,
"error": str(e)
}
return status
@router.delete("/clear/{file_type}")
async def clear_table_data(
file_type: str,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""Clear all data from a specific table"""
if file_type not in CSV_MODEL_MAPPING:
raise HTTPException(status_code=400, detail=f"Unknown file type: {file_type}")
model_class = CSV_MODEL_MAPPING[file_type]
try:
deleted_count = db.query(model_class).count()
db.query(model_class).delete()
db.commit()
return {
"file_type": file_type,
"table_name": model_class.__tablename__,
"deleted_count": deleted_count
}
except Exception as e:
db.rollback()
raise HTTPException(status_code=500, detail=f"Clear operation failed: {str(e)}")
@router.post("/validate/{file_type}")
async def validate_csv_file(
file_type: str,
file: UploadFile = UploadFileForm(...),
current_user: User = Depends(get_current_user)
):
"""Validate CSV file structure without importing"""
if file_type not in CSV_MODEL_MAPPING:
raise HTTPException(status_code=400, detail=f"Unsupported file type: {file_type}")
if not file.filename.endswith('.csv'):
raise HTTPException(status_code=400, detail="File must be a CSV file")
field_mapping = FIELD_MAPPINGS.get(file_type, {})
try:
content = await file.read()
# Try multiple encodings for legacy CSV files
encodings = ['utf-8', 'windows-1252', 'iso-8859-1', 'cp1252']
csv_content = None
for encoding in encodings:
try:
csv_content = content.decode(encoding)
break
except UnicodeDecodeError:
continue
if csv_content is None:
raise HTTPException(status_code=400, detail="Could not decode CSV file. Please ensure it's saved in UTF-8, Windows-1252, or ISO-8859-1 encoding.")
# Handle CSV parsing issues with legacy files
csv_reader = csv.DictReader(io.StringIO(csv_content), delimiter=',', quotechar='"', quoting=csv.QUOTE_MINIMAL)
# Check headers
csv_headers = csv_reader.fieldnames
expected_headers = list(field_mapping.keys())
missing_headers = [h for h in expected_headers if h not in csv_headers]
extra_headers = [h for h in csv_headers if h not in expected_headers]
# Sample data validation
sample_rows = []
errors = []
for row_num, row in enumerate(csv_reader, start=2):
if row_num > 12: # Only check first 10 data rows
break
sample_rows.append(row)
# Check for data type issues
for csv_field, db_field in field_mapping.items():
if csv_field in row and row[csv_field]:
try:
convert_value(row[csv_field], csv_field)
except Exception as e:
errors.append({
"row": row_num,
"field": csv_field,
"value": row[csv_field],
"error": str(e)
})
return {
"file_type": file_type,
"valid": len(missing_headers) == 0 and len(errors) == 0,
"headers": {
"found": csv_headers,
"expected": expected_headers,
"missing": missing_headers,
"extra": extra_headers
},
"sample_data": sample_rows,
"validation_errors": errors[:5], # First 5 errors only
"total_errors": len(errors)
}
except Exception as e:
print(f"VALIDATION ERROR DEBUG: {type(e).__name__}: {str(e)}")
import traceback
print(f"VALIDATION TRACEBACK: {traceback.format_exc()}")
raise HTTPException(status_code=500, detail=f"Validation failed: {str(e)}")
@router.get("/progress/{import_id}")
async def get_import_progress(
import_id: str,
current_user: User = Depends(get_current_user)
):
"""Get import progress status (placeholder for future implementation)"""
# This would be used for long-running imports with background tasks
return {
"import_id": import_id,
"status": "not_implemented",
"message": "Real-time progress tracking not yet implemented"
}
@router.post("/batch-upload")
async def batch_import_csv_files(
files: List[UploadFile] = UploadFileForm(...),
replace_existing: bool = Form(False),
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""Import multiple CSV files in optimal order"""
if len(files) > 20:
raise HTTPException(status_code=400, detail="Maximum 20 files allowed per batch")
# Define optimal import order based on dependencies
import_order = [
"STATES.csv", "GRUPLKUP.csv", "EMPLOYEE.csv", "FILETYPE.csv", "FILESTAT.csv",
"TRNSTYPE.csv", "TRNSLKUP.csv", "FOOTERS.csv", "SETUP.csv", "PRINTERS.csv",
"ROLODEX.csv", "PHONE.csv", "FILES.csv", "LEDGER.csv", "TRNSACTN.csv",
"QDROS.csv", "PENSIONS.csv", "PLANINFO.csv", "PAYMENTS.csv", "DEPOSITS.csv",
"FILENOTS.csv", "FORM_INX.csv", "FORM_LST.csv", "FVARLKUP.csv", "RVARLKUP.csv"
]
# Sort uploaded files by optimal import order
file_map = {f.filename: f for f in files}
ordered_files = []
for file_type in import_order:
if file_type in file_map:
ordered_files.append((file_type, file_map[file_type]))
del file_map[file_type]
# Add any remaining files not in the predefined order
for filename, file in file_map.items():
ordered_files.append((filename, file))
results = []
total_imported = 0
total_errors = 0
for file_type, file in ordered_files:
if file_type not in CSV_MODEL_MAPPING:
results.append({
"file_type": file_type,
"status": "skipped",
"message": f"Unsupported file type: {file_type}"
})
continue
try:
# Reset file pointer
await file.seek(0)
# Import this file using simplified logic
model_class = CSV_MODEL_MAPPING[file_type]
field_mapping = FIELD_MAPPINGS.get(file_type, {})
content = await file.read()
# Try multiple encodings for legacy CSV files
encodings = ['utf-8-sig', 'utf-8', 'windows-1252', 'iso-8859-1', 'cp1252']
csv_content = None
for encoding in encodings:
try:
csv_content = content.decode(encoding)
break
except UnicodeDecodeError:
continue
if csv_content is None:
results.append({
"file_type": file_type,
"status": "failed",
"message": "Could not decode CSV file encoding"
})
continue
# Handle CSV parsing issues with legacy files
csv_reader = csv.DictReader(io.StringIO(csv_content), delimiter=',', quotechar='"', quoting=csv.QUOTE_MINIMAL)
imported_count = 0
errors = []
# If replace_existing is True and this is the first file of this type
if replace_existing:
db.query(model_class).delete()
db.commit()
for row_num, row in enumerate(csv_reader, start=2):
try:
model_data = {}
for csv_field, db_field in field_mapping.items():
if csv_field in row and db_field is not None: # Skip fields mapped to None
converted_value = convert_value(row[csv_field], csv_field)
if converted_value is not None:
model_data[db_field] = converted_value
if not any(model_data.values()):
continue
instance = model_class(**model_data)
db.add(instance)
imported_count += 1
if imported_count % 100 == 0:
db.commit()
except Exception as e:
errors.append({
"row": row_num,
"error": str(e)
})
continue
db.commit()
total_imported += imported_count
total_errors += len(errors)
results.append({
"file_type": file_type,
"status": "success" if len(errors) == 0 else "completed_with_errors",
"imported_count": imported_count,
"errors": len(errors),
"message": f"Imported {imported_count} records" + (f" with {len(errors)} errors" if errors else "")
})
except Exception as e:
db.rollback()
results.append({
"file_type": file_type,
"status": "failed",
"message": f"Import failed: {str(e)}"
})
return {
"batch_results": results,
"summary": {
"total_files": len(files),
"successful_files": len([r for r in results if r["status"] in ["success", "completed_with_errors"]]),
"failed_files": len([r for r in results if r["status"] == "failed"]),
"total_imported": total_imported,
"total_errors": total_errors
}
}