""" Data import API endpoints for CSV file uploads """ import csv import io from datetime import datetime from typing import List, Dict, Any, Optional from fastapi import APIRouter, Depends, HTTPException, UploadFile, File as UploadFileForm, Form from sqlalchemy.orm import Session from app.database.base import get_db from app.auth.security import get_current_user from app.models.user import User from app.models import * router = APIRouter(prefix="/api/import", tags=["import"]) # CSV to Model mapping CSV_MODEL_MAPPING = { "ROLODEX.csv": Rolodex, "PHONE.csv": Phone, "FILES.csv": File, "LEDGER.csv": Ledger, "QDROS.csv": QDRO, "PENSIONS.csv": Pension, "SCHEDULE.csv": PensionSchedule, "MARRIAGE.csv": MarriageHistory, "DEATH.csv": DeathBenefit, "SEPARATE.csv": SeparationAgreement, "LIFETABL.csv": LifeTable, "NUMBERAL.csv": NumberTable, "EMPLOYEE.csv": Employee, "FILETYPE.csv": FileType, "FILESTAT.csv": FileStatus, "TRNSTYPE.csv": TransactionType, "TRNSLKUP.csv": TransactionCode, "STATES.csv": State, "GRUPLKUP.csv": GroupLookup, "FOOTERS.csv": Footer, "PLANINFO.csv": PlanInfo, "FORM_INX.csv": FormIndex, "FORM_LST.csv": FormList, "PRINTERS.csv": PrinterSetup, "SETUP.csv": SystemSetup, # Additional models for complete legacy coverage "DEPOSITS.csv": Deposit, "FILENOTS.csv": FileNote, "FVARLKUP.csv": FormVariable, "RVARLKUP.csv": ReportVariable, "PAYMENTS.csv": Payment, "TRNSACTN.csv": Ledger # Maps to existing Ledger model (same structure) } # Field mappings for CSV columns to database fields FIELD_MAPPINGS = { "ROLODEX.csv": { "Id": "id", "Prefix": "prefix", "First": "first", "Middle": "middle", "Last": "last", "Suffix": "suffix", "Title": "title", "A1": "a1", "A2": "a2", "A3": "a3", "City": "city", "Abrev": "abrev", # "St": "st", # Full state name - not mapped (model only has abrev) "Zip": "zip", "Email": "email", "DOB": "dob", "SS#": "ss_number", "Legal_Status": "legal_status", "Group": "group", "Memo": "memo" }, "PHONE.csv": { "Id": "rolodex_id", "Phone": "phone", "Location": "location" }, "FILES.csv": { "File_No": "file_no", "Id": "id", "File_Type": "file_type", "Regarding": "regarding", "Opened": "opened", "Closed": "closed", "Empl_Num": "empl_num", "Rate_Per_Hour": "rate_per_hour", "Status": "status", "Footer_Code": "footer_code", "Opposing": "opposing", "Hours": "hours", "Hours_P": "hours_p", "Trust_Bal": "trust_bal", "Trust_Bal_P": "trust_bal_p", "Hourly_Fees": "hourly_fees", "Hourly_Fees_P": "hourly_fees_p", "Flat_Fees": "flat_fees", "Flat_Fees_P": "flat_fees_p", "Disbursements": "disbursements", "Disbursements_P": "disbursements_p", "Credit_Bal": "credit_bal", "Credit_Bal_P": "credit_bal_p", "Total_Charges": "total_charges", "Total_Charges_P": "total_charges_p", "Amount_Owing": "amount_owing", "Amount_Owing_P": "amount_owing_p", "Transferable": "transferable", "Memo": "memo" }, "LEDGER.csv": { "File_No": "file_no", "Date": "date", "Item_No": "item_no", "Empl_Num": "empl_num", "T_Code": "t_code", "T_Type": "t_type", "T_Type_L": "t_type_l", "Quantity": "quantity", "Rate": "rate", "Amount": "amount", "Billed": "billed", "Note": "note" }, "QDROS.csv": { "File_No": "file_no", "Version": "version", "Plan_Id": "plan_id", "^1": "field1", "^2": "field2", "^Part": "part", "^AltP": "altp", "^Pet": "pet", "^Res": "res", "Case_Type": "case_type", "Case_Code": "case_code", "Section": "section", "Case_Number": "case_number", "Judgment_Date": "judgment_date", "Valuation_Date": "valuation_date", "Married_On": "married_on", "Percent_Awarded": "percent_awarded", "Ven_City": "ven_city", "Ven_Cnty": "ven_cnty", "Ven_St": "ven_st", "Draft_Out": "draft_out", "Draft_Apr": "draft_apr", "Final_Out": "final_out", "Judge": "judge", "Form_Name": "form_name" }, "PENSIONS.csv": { "File_No": "file_no", "Version": "version", "Plan_Id": "plan_id", "Plan_Name": "plan_name", "Title": "title", "First": "first", "Last": "last", "Birth": "birth", "Race": "race", "Sex": "sex", "Info": "info", "Valu": "valu", "Accrued": "accrued", "Vested_Per": "vested_per", "Start_Age": "start_age", "COLA": "cola", "Max_COLA": "max_cola", "Withdrawal": "withdrawal", "Pre_DR": "pre_dr", "Post_DR": "post_dr", "Tax_Rate": "tax_rate" }, "EMPLOYEE.csv": { "Empl_Num": "empl_num", "Rate_Per_Hour": "rate_per_hour" # "Empl_Id": not a field in Employee model, using empl_num as identifier # Model has additional fields (first_name, last_name, title, etc.) not in CSV }, "STATES.csv": { "Abrev": "abbreviation", "St": "name" }, "GRUPLKUP.csv": { "Code": "group_code", "Description": "description" # "Title": field not present in model, skipping }, "TRNSLKUP.csv": { "T_Code": "t_code", "T_Type": "t_type", # "T_Type_L": not a field in TransactionCode model "Amount": "default_rate", "Description": "description" }, "TRNSTYPE.csv": { "T_Type": "t_type", "T_Type_L": "description" # "Header": maps to debit_credit but needs data transformation # "Footer": doesn't align with active boolean field # These fields may need custom handling or model updates }, "FILETYPE.csv": { "File_Type": "type_code", "Description": "description", "Default_Rate": "default_rate" }, "FILESTAT.csv": { "Status_Code": "status_code", "Description": "description", "Sort_Order": "sort_order" }, "FOOTERS.csv": { "Footer_Code": "footer_code", "Content": "content", "Description": "description" }, "PLANINFO.csv": { "Plan_Id": "plan_id", "Plan_Name": "plan_name", "Plan_Type": "plan_type", "Sponsor": "sponsor", "Administrator": "administrator", "Address1": "address1", "Address2": "address2", "City": "city", "State": "state", "Zip_Code": "zip_code", "Phone": "phone", "Notes": "notes" }, "FORM_INX.csv": { "Form_Id": "form_id", "Form_Name": "form_name", "Category": "category" }, "FORM_LST.csv": { "Form_Id": "form_id", "Line_Number": "line_number", "Content": "content" }, "PRINTERS.csv": { "Printer_Name": "printer_name", "Description": "description", "Driver": "driver", "Port": "port", "Default_Printer": "default_printer" }, "SETUP.csv": { "Setting_Key": "setting_key", "Setting_Value": "setting_value", "Description": "description", "Setting_Type": "setting_type" }, "SCHEDULE.csv": { "File_No": "file_no", "Version": "version", "Vests_On": "vests_on", "Vests_At": "vests_at" }, "MARRIAGE.csv": { "File_No": "file_no", "Version": "version", "Marriage_Date": "marriage_date", "Separation_Date": "separation_date", "Divorce_Date": "divorce_date" }, "DEATH.csv": { "File_No": "file_no", "Version": "version", "Benefit_Type": "benefit_type", "Benefit_Amount": "benefit_amount", "Beneficiary": "beneficiary" }, "SEPARATE.csv": { "File_No": "file_no", "Version": "version", "Agreement_Date": "agreement_date", "Terms": "terms" }, "LIFETABL.csv": { "Age": "age", "Male_Mortality": "male_mortality", "Female_Mortality": "female_mortality" }, "NUMBERAL.csv": { "Table_Name": "table_name", "Age": "age", "Value": "value" }, # Additional CSV file mappings "DEPOSITS.csv": { "Deposit_Date": "deposit_date", "Total": "total" }, "FILENOTS.csv": { "File_No": "file_no", "Memo_Date": "memo_date", "Memo_Note": "memo_note" }, "FVARLKUP.csv": { "Identifier": "identifier", "Query": "query", "Response": "response" }, "RVARLKUP.csv": { "Identifier": "identifier", "Query": "query" }, "PAYMENTS.csv": { "Deposit_Date": "deposit_date", "File_No": "file_no", "Id": "client_id", "Regarding": "regarding", "Amount": "amount", "Note": "note" }, "TRNSACTN.csv": { # Maps to Ledger model - same structure as LEDGER.csv "File_No": "file_no", "Date": "date", "Item_No": "item_no", "Empl_Num": "empl_num", "T_Code": "t_code", "T_Type": "t_type", "T_Type_L": "t_type_l", "Quantity": "quantity", "Rate": "rate", "Amount": "amount", "Billed": "billed", "Note": "note" } } def parse_date(date_str: str) -> Optional[datetime]: """Parse date string in various formats""" if not date_str or date_str.strip() == "": return None date_formats = [ "%Y-%m-%d", "%m/%d/%Y", "%d/%m/%Y", "%m-%d-%Y", "%d-%m-%Y", "%Y/%m/%d" ] for fmt in date_formats: try: return datetime.strptime(date_str.strip(), fmt).date() except ValueError: continue return None def convert_value(value: str, field_name: str) -> Any: """Convert string value to appropriate type based on field name""" if not value or value.strip() == "" or value.strip().lower() in ["null", "none", "n/a"]: return None value = value.strip() # Date fields if any(word in field_name.lower() for word in ["date", "dob", "birth", "opened", "closed", "judgment", "valuation", "married", "vests_on"]): parsed_date = parse_date(value) return parsed_date # Boolean fields if any(word in field_name.lower() for word in ["active", "default_printer", "billed", "transferable"]): if value.lower() in ["true", "1", "yes", "y", "on", "active"]: return True elif value.lower() in ["false", "0", "no", "n", "off", "inactive"]: return False else: return None # Numeric fields (float) if any(word in field_name.lower() for word in ["rate", "hour", "bal", "fee", "amount", "owing", "transfer", "valu", "accrued", "vested", "cola", "tax", "percent", "benefit_amount", "mortality", "value"]): try: # Remove currency symbols and commas cleaned_value = value.replace("$", "").replace(",", "").replace("%", "") return float(cleaned_value) except ValueError: return 0.0 # Integer fields if any(word in field_name.lower() for word in ["item_no", "age", "start_age", "version", "line_number", "sort_order"]): try: return int(float(value)) # Handle cases like "1.0" except ValueError: return 0 # String fields - limit length to prevent database errors if len(value) > 500: # Reasonable limit for most string fields return value[:500] return value def validate_foreign_keys(model_data: dict, model_class, db: Session) -> list[str]: """Validate foreign key relationships before inserting data""" errors = [] # Check Phone -> Rolodex relationship if model_class == Phone and "rolodex_id" in model_data: rolodex_id = model_data["rolodex_id"] if rolodex_id and not db.query(Rolodex).filter(Rolodex.id == rolodex_id).first(): errors.append(f"Rolodex ID '{rolodex_id}' not found") # Check File -> Rolodex relationship if model_class == File and "id" in model_data: rolodex_id = model_data["id"] if rolodex_id and not db.query(Rolodex).filter(Rolodex.id == rolodex_id).first(): errors.append(f"Owner Rolodex ID '{rolodex_id}' not found") # Add more foreign key validations as needed return errors @router.get("/available-files") async def get_available_csv_files(current_user: User = Depends(get_current_user)): """Get list of available CSV files for import""" return { "available_files": list(CSV_MODEL_MAPPING.keys()), "descriptions": { "ROLODEX.csv": "Customer/contact information", "PHONE.csv": "Phone numbers linked to customers", "FILES.csv": "Client files and cases", "LEDGER.csv": "Financial transactions per file", "QDROS.csv": "Legal documents and court orders", "PENSIONS.csv": "Pension calculation data", "EMPLOYEE.csv": "Staff and employee information", "STATES.csv": "US States lookup table", "FILETYPE.csv": "File type categories", "FILESTAT.csv": "File status codes", "DEPOSITS.csv": "Daily bank deposit summaries", "FILENOTS.csv": "File notes and case memos", "FVARLKUP.csv": "Form template variables", "RVARLKUP.csv": "Report template variables", "PAYMENTS.csv": "Individual payments within deposits", "TRNSACTN.csv": "Transaction details (maps to Ledger)" } } @router.post("/upload/{file_type}") async def import_csv_data( file_type: str, file: UploadFile = UploadFileForm(...), replace_existing: bool = Form(False), db: Session = Depends(get_db), current_user: User = Depends(get_current_user) ): """Import data from CSV file""" # Validate file type if file_type not in CSV_MODEL_MAPPING: raise HTTPException( status_code=400, detail=f"Unsupported file type: {file_type}. Available types: {list(CSV_MODEL_MAPPING.keys())}" ) # Validate file extension if not file.filename.endswith('.csv'): raise HTTPException(status_code=400, detail="File must be a CSV file") model_class = CSV_MODEL_MAPPING[file_type] field_mapping = FIELD_MAPPINGS.get(file_type, {}) try: # Read CSV content content = await file.read() csv_content = content.decode('utf-8') csv_reader = csv.DictReader(io.StringIO(csv_content)) imported_count = 0 errors = [] # If replace_existing is True, delete all existing records if replace_existing: db.query(model_class).delete() db.commit() for row_num, row in enumerate(csv_reader, start=2): # Start at 2 for header row try: # Convert CSV row to model data model_data = {} for csv_field, db_field in field_mapping.items(): if csv_field in row: converted_value = convert_value(row[csv_field], csv_field) if converted_value is not None: model_data[db_field] = converted_value # Skip empty rows if not any(model_data.values()): continue # Create model instance instance = model_class(**model_data) db.add(instance) imported_count += 1 # Commit every 100 records to avoid memory issues if imported_count % 100 == 0: db.commit() except Exception as e: errors.append({ "row": row_num, "error": str(e), "data": row }) continue # Final commit db.commit() result = { "file_type": file_type, "imported_count": imported_count, "errors": errors[:10], # Limit errors to first 10 "total_errors": len(errors) } if errors: result["warning"] = f"Import completed with {len(errors)} errors" return result except Exception as e: db.rollback() raise HTTPException(status_code=500, detail=f"Import failed: {str(e)}") @router.get("/status") async def get_import_status(db: Session = Depends(get_db), current_user: User = Depends(get_current_user)): """Get current import status and record counts""" status = {} for file_type, model_class in CSV_MODEL_MAPPING.items(): try: count = db.query(model_class).count() status[file_type] = { "table_name": model_class.__tablename__, "record_count": count } except Exception as e: status[file_type] = { "table_name": model_class.__tablename__, "record_count": 0, "error": str(e) } return status @router.delete("/clear/{file_type}") async def clear_table_data( file_type: str, db: Session = Depends(get_db), current_user: User = Depends(get_current_user) ): """Clear all data from a specific table""" if file_type not in CSV_MODEL_MAPPING: raise HTTPException(status_code=400, detail=f"Unknown file type: {file_type}") model_class = CSV_MODEL_MAPPING[file_type] try: deleted_count = db.query(model_class).count() db.query(model_class).delete() db.commit() return { "file_type": file_type, "table_name": model_class.__tablename__, "deleted_count": deleted_count } except Exception as e: db.rollback() raise HTTPException(status_code=500, detail=f"Clear operation failed: {str(e)}") @router.post("/validate/{file_type}") async def validate_csv_file( file_type: str, file: UploadFile = UploadFileForm(...), current_user: User = Depends(get_current_user) ): """Validate CSV file structure without importing""" if file_type not in CSV_MODEL_MAPPING: raise HTTPException(status_code=400, detail=f"Unsupported file type: {file_type}") if not file.filename.endswith('.csv'): raise HTTPException(status_code=400, detail="File must be a CSV file") field_mapping = FIELD_MAPPINGS.get(file_type, {}) try: content = await file.read() csv_content = content.decode('utf-8') csv_reader = csv.DictReader(io.StringIO(csv_content)) # Check headers csv_headers = csv_reader.fieldnames expected_headers = list(field_mapping.keys()) missing_headers = [h for h in expected_headers if h not in csv_headers] extra_headers = [h for h in csv_headers if h not in expected_headers] # Sample data validation sample_rows = [] errors = [] for row_num, row in enumerate(csv_reader, start=2): if row_num > 12: # Only check first 10 data rows break sample_rows.append(row) # Check for data type issues for csv_field, db_field in field_mapping.items(): if csv_field in row and row[csv_field]: try: convert_value(row[csv_field], csv_field) except Exception as e: errors.append({ "row": row_num, "field": csv_field, "value": row[csv_field], "error": str(e) }) return { "file_type": file_type, "valid": len(missing_headers) == 0 and len(errors) == 0, "headers": { "found": csv_headers, "expected": expected_headers, "missing": missing_headers, "extra": extra_headers }, "sample_data": sample_rows, "validation_errors": errors[:5], # First 5 errors only "total_errors": len(errors) } except Exception as e: raise HTTPException(status_code=500, detail=f"Validation failed: {str(e)}")