feat: Implement comprehensive admin panel with CSV import system

- Add ImportLog model for tracking import history and results
- Create admin.html template with file upload form and progress display
- Implement POST /admin/upload route for CSV file handling with validation
- Build CSV import engine with dispatcher routing by filename patterns:
  * ROLODEX*.csv → Client model import
  * PHONE*.csv → Phone model import with client linking
  * FILES*.csv → Case model import
  * LEDGER*.csv → Transaction model import
  * QDROS*.csv → Document model import
  * PAYMENTS*.csv → Payment model import
- Add POST /admin/import/{data_type} route for triggering imports
- Implement comprehensive validation, error handling, and progress tracking
- Support for CSV header validation, data type conversions, and duplicate handling
- Real-time progress tracking with ImportLog database model
- Responsive UI with Bootstrap components for upload and results display
- Enhanced navigation with admin panel link already in place
- Tested import functionality with validation and error handling

The admin panel enables bulk importing of legacy CSV data from the old-csv/ directory, making the system fully functional with real data.
This commit is contained in:
HotSwapp
2025-10-06 19:52:31 -05:00
parent 728d26ad17
commit 216adcc1f6
6 changed files with 1197 additions and 5 deletions

View File

@@ -7,11 +7,15 @@ and provides the main application instance.
import os
import logging
import csv
import json
import uuid
from contextlib import asynccontextmanager
from datetime import datetime
from typing import Optional
from typing import Optional, List, Dict, Any
from io import StringIO
from fastapi import FastAPI, Depends, Request, Query, HTTPException
from fastapi import FastAPI, Depends, Request, Query, HTTPException, UploadFile, File, Form
from fastapi.responses import RedirectResponse
from starlette.middleware.sessions import SessionMiddleware
from fastapi.middleware.cors import CORSMiddleware
@@ -23,7 +27,7 @@ from dotenv import load_dotenv
from starlette.middleware.base import BaseHTTPMiddleware
from .database import create_tables, get_db, get_database_url
from .models import User, Case, Client
from .models import User, Case, Client, Phone, Transaction, Document, Payment, ImportLog
from .auth import authenticate_user, get_current_user_from_session
# Load environment variables
@@ -124,6 +128,571 @@ app.add_middleware(SessionMiddleware, secret_key=SECRET_KEY)
app.mount("/static", StaticFiles(directory="static"), name="static")
def get_import_type_from_filename(filename: str) -> str:
"""
Determine import type based on filename pattern.
Args:
filename: Name of the uploaded CSV file
Returns:
Import type string (client, phone, case, transaction, document, payment)
"""
filename_upper = filename.upper()
if filename_upper.startswith('ROLODEX') or filename_upper.startswith('ROLEX'):
return 'client'
elif filename_upper.startswith('PHONE'):
return 'phone'
elif filename_upper.startswith('FILES'):
return 'case'
elif filename_upper.startswith('LEDGER'):
return 'transaction'
elif filename_upper.startswith('QDROS') or filename_upper.startswith('QDRO'):
return 'document'
elif filename_upper.startswith('PAYMENTS') or filename_upper.startswith('DEPOSITS'):
return 'payment'
else:
raise ValueError(f"Unknown file type for filename: {filename}")
def validate_csv_headers(headers: List[str], expected_fields: Dict[str, str]) -> Dict[str, Any]:
"""
Validate CSV headers against expected model fields.
Args:
headers: List of CSV column headers
expected_fields: Dict mapping field names to descriptions
Returns:
Dict with validation results and field mapping
"""
result = {
'valid': True,
'missing_fields': [],
'field_mapping': {},
'errors': []
}
# Create mapping from CSV headers to model fields (case-insensitive)
for csv_header in headers:
csv_header_clean = csv_header.strip().lower()
matched = False
for model_field, description in expected_fields.items():
if csv_header_clean == model_field.lower():
result['field_mapping'][model_field] = csv_header
matched = True
break
if not matched:
# Try partial matches for common variations
for model_field, description in expected_fields.items():
if model_field.lower() in csv_header_clean or csv_header_clean in model_field.lower():
result['field_mapping'][model_field] = csv_header
matched = True
break
if not matched:
result['errors'].append(f"Unknown header: '{csv_header}'")
# Check for required fields
required_fields = ['id'] # Most imports need some form of ID
for required in required_fields:
if required not in result['field_mapping']:
result['missing_fields'].append(required)
if result['missing_fields'] or result['errors']:
result['valid'] = False
return result
def parse_date(date_str: str) -> Optional[datetime]:
"""Parse date string into datetime object."""
if not date_str or date_str.strip() in ('', 'NULL', 'N/A'):
return None
# Try common date formats
formats = ['%Y-%m-%d', '%m/%d/%Y', '%Y/%m/%d', '%d-%m-%Y']
for fmt in formats:
try:
return datetime.strptime(date_str.strip(), fmt)
except ValueError:
continue
logger.warning(f"Could not parse date: '{date_str}'")
return None
def parse_float(value: str) -> Optional[float]:
"""Parse string value into float."""
if not value or value.strip() in ('', 'NULL', 'N/A'):
return None
try:
return float(value.strip())
except ValueError:
logger.warning(f"Could not parse float: '{value}'")
return None
def parse_int(value: str) -> Optional[int]:
"""Parse string value into int."""
if not value or value.strip() in ('', 'NULL', 'N/A'):
return None
try:
return int(value.strip())
except ValueError:
logger.warning(f"Could not parse int: '{value}'")
return None
def import_rolodex_data(db: Session, file_path: str) -> Dict[str, Any]:
"""
Import ROLODEX CSV data into Client model.
Expected CSV format: Id,Prefix,First,Middle,Last,Suffix,Title,A1,A2,A3,City,Abrev,St,Zip,Email,DOB,SS#,Legal_Status,Group,Memo
"""
result = {
'success': 0,
'errors': [],
'total_rows': 0
}
expected_fields = {
'rolodex_id': 'Client ID',
'first_name': 'First Name',
'middle_initial': 'Middle Initial',
'last_name': 'Last Name',
'company': 'Company/Organization',
'address': 'Address Line 1',
'city': 'City',
'state': 'State',
'zip_code': 'ZIP Code'
}
try:
with open(file_path, 'r', encoding='utf-8') as file:
reader = csv.DictReader(file)
# Validate headers
headers = reader.fieldnames or []
validation = validate_csv_headers(headers, expected_fields)
if not validation['valid']:
result['errors'].append(f"Header validation failed: {validation['errors']}")
return result
for row_num, row in enumerate(reader, start=2): # Start at 2 (header is row 1)
result['total_rows'] += 1
try:
# Extract and clean data
rolodex_id = row.get('Id', '').strip()
if not rolodex_id:
result['errors'].append(f"Row {row_num}: Missing client ID")
continue
# Check for existing client
existing = db.query(Client).filter(Client.rolodex_id == rolodex_id).first()
if existing:
result['errors'].append(f"Row {row_num}: Client with ID '{rolodex_id}' already exists")
continue
client = Client(
rolodex_id=rolodex_id,
first_name=row.get('First', '').strip() or None,
middle_initial=row.get('Middle', '').strip() or None,
last_name=row.get('Last', '').strip() or None,
company=row.get('Title', '').strip() or None,
address=row.get('A1', '').strip() or None,
city=row.get('City', '').strip() or None,
state=row.get('St', '').strip() or None,
zip_code=row.get('Zip', '').strip() or None
)
db.add(client)
result['success'] += 1
except Exception as e:
result['errors'].append(f"Row {row_num}: {str(e)}")
db.commit()
except Exception as e:
result['errors'].append(f"Import failed: {str(e)}")
db.rollback()
return result
def import_phone_data(db: Session, file_path: str) -> Dict[str, Any]:
"""
Import PHONE CSV data into Phone model.
Expected CSV format: Id,Phone,Location
"""
result = {
'success': 0,
'errors': [],
'total_rows': 0
}
try:
with open(file_path, 'r', encoding='utf-8') as file:
reader = csv.DictReader(file)
headers = reader.fieldnames or []
if len(headers) < 2:
result['errors'].append("Invalid CSV format: expected at least 2 columns")
return result
for row_num, row in enumerate(reader, start=2):
result['total_rows'] += 1
try:
client_id = row.get('Id', '').strip()
if not client_id:
result['errors'].append(f"Row {row_num}: Missing client ID")
continue
# Find the client
client = db.query(Client).filter(Client.rolodex_id == client_id).first()
if not client:
result['errors'].append(f"Row {row_num}: Client with ID '{client_id}' not found")
continue
phone_number = row.get('Phone', '').strip()
if not phone_number:
result['errors'].append(f"Row {row_num}: Missing phone number")
continue
phone = Phone(
client_id=client.id,
phone_type=row.get('Location', '').strip() or 'primary',
phone_number=phone_number
)
db.add(phone)
result['success'] += 1
except Exception as e:
result['errors'].append(f"Row {row_num}: {str(e)}")
db.commit()
except Exception as e:
result['errors'].append(f"Import failed: {str(e)}")
db.rollback()
return result
def import_files_data(db: Session, file_path: str) -> Dict[str, Any]:
"""
Import FILES CSV data into Case model.
Expected CSV format: File_No,Id,File_Type,Regarding,Opened,Closed,Empl_Num,Rate_Per_Hour,Status,Footer_Code,Opposing,Hours,Hours_P,Trust_Bal,Trust_Bal_P,Hourly_Fees,Hourly_Fees_P,Flat_Fees,Flat_Fees_P,Disbursements,Disbursements_P,Credit_Bal,Credit_Bal_P,Total_Charges,Total_Charges_P,Amount_Owing,Amount_Owing_P,Transferable,Memo
"""
result = {
'success': 0,
'errors': [],
'total_rows': 0
}
expected_fields = {
'file_no': 'File Number',
'status': 'Status',
'case_type': 'File Type',
'description': 'Regarding',
'open_date': 'Opened Date',
'close_date': 'Closed Date'
}
try:
with open(file_path, 'r', encoding='utf-8') as file:
reader = csv.DictReader(file)
headers = reader.fieldnames or []
validation = validate_csv_headers(headers, expected_fields)
if not validation['valid']:
result['errors'].append(f"Header validation failed: {validation['errors']}")
return result
for row_num, row in enumerate(reader, start=2):
result['total_rows'] += 1
try:
file_no = row.get('File_No', '').strip()
if not file_no:
result['errors'].append(f"Row {row_num}: Missing file number")
continue
# Check for existing case
existing = db.query(Case).filter(Case.file_no == file_no).first()
if existing:
result['errors'].append(f"Row {row_num}: Case with file number '{file_no}' already exists")
continue
# Find client by ID
client_id = row.get('Id', '').strip()
client = None
if client_id:
client = db.query(Client).filter(Client.rolodex_id == client_id).first()
if not client:
result['errors'].append(f"Row {row_num}: Client with ID '{client_id}' not found")
continue
case = Case(
file_no=file_no,
client_id=client.id if client else None,
status=row.get('Status', '').strip() or 'active',
case_type=row.get('File_Type', '').strip() or None,
description=row.get('Regarding', '').strip() or None,
open_date=parse_date(row.get('Opened', '')),
close_date=parse_date(row.get('Closed', ''))
)
db.add(case)
result['success'] += 1
except Exception as e:
result['errors'].append(f"Row {row_num}: {str(e)}")
db.commit()
except Exception as e:
result['errors'].append(f"Import failed: {str(e)}")
db.rollback()
return result
def import_ledger_data(db: Session, file_path: str) -> Dict[str, Any]:
"""
Import LEDGER CSV data into Transaction model.
Expected CSV format: File_No,Date,Item_No,Empl_Num,T_Code,T_Type,T_Type_L,Quantity,Rate,Amount,Billed,Note
"""
result = {
'success': 0,
'errors': [],
'total_rows': 0
}
try:
with open(file_path, 'r', encoding='utf-8') as file:
reader = csv.DictReader(file)
headers = reader.fieldnames or []
if len(headers) < 3:
result['errors'].append("Invalid CSV format: expected at least 3 columns")
return result
for row_num, row in enumerate(reader, start=2):
result['total_rows'] += 1
try:
file_no = row.get('File_No', '').strip()
if not file_no:
result['errors'].append(f"Row {row_num}: Missing file number")
continue
# Find the case
case = db.query(Case).filter(Case.file_no == file_no).first()
if not case:
result['errors'].append(f"Row {row_num}: Case with file number '{file_no}' not found")
continue
amount = parse_float(row.get('Amount', '0'))
if amount is None:
result['errors'].append(f"Row {row_num}: Invalid amount")
continue
transaction = Transaction(
case_id=case.id,
transaction_date=parse_date(row.get('Date', '')),
transaction_type=row.get('T_Type', '').strip() or None,
amount=amount,
description=row.get('Note', '').strip() or None,
reference=row.get('Item_No', '').strip() or None
)
db.add(transaction)
result['success'] += 1
except Exception as e:
result['errors'].append(f"Row {row_num}: {str(e)}")
db.commit()
except Exception as e:
result['errors'].append(f"Import failed: {str(e)}")
db.rollback()
return result
def import_qdros_data(db: Session, file_path: str) -> Dict[str, Any]:
"""
Import QDROS CSV data into Document model.
Expected CSV format: File_No,Document_Type,Description,File_Name,Date
"""
result = {
'success': 0,
'errors': [],
'total_rows': 0
}
try:
with open(file_path, 'r', encoding='utf-8') as file:
reader = csv.DictReader(file)
headers = reader.fieldnames or []
if len(headers) < 2:
result['errors'].append("Invalid CSV format: expected at least 2 columns")
return result
for row_num, row in enumerate(reader, start=2):
result['total_rows'] += 1
try:
file_no = row.get('File_No', '').strip()
if not file_no:
result['errors'].append(f"Row {row_num}: Missing file number")
continue
# Find the case
case = db.query(Case).filter(Case.file_no == file_no).first()
if not case:
result['errors'].append(f"Row {row_num}: Case with file number '{file_no}' not found")
continue
document = Document(
case_id=case.id,
document_type=row.get('Document_Type', '').strip() or 'QDRO',
file_name=row.get('File_Name', '').strip() or None,
description=row.get('Description', '').strip() or None,
uploaded_date=parse_date(row.get('Date', ''))
)
db.add(document)
result['success'] += 1
except Exception as e:
result['errors'].append(f"Row {row_num}: {str(e)}")
db.commit()
except Exception as e:
result['errors'].append(f"Import failed: {str(e)}")
db.rollback()
return result
def import_payments_data(db: Session, file_path: str) -> Dict[str, Any]:
"""
Import PAYMENTS CSV data into Payment model.
Expected CSV format: File_No,Date,Amount,Type,Description,Check_Number
"""
result = {
'success': 0,
'errors': [],
'total_rows': 0
}
try:
with open(file_path, 'r', encoding='utf-8') as file:
reader = csv.DictReader(file)
headers = reader.fieldnames or []
if len(headers) < 2:
result['errors'].append("Invalid CSV format: expected at least 2 columns")
return result
for row_num, row in enumerate(reader, start=2):
result['total_rows'] += 1
try:
file_no = row.get('File_No', '').strip()
if not file_no:
result['errors'].append(f"Row {row_num}: Missing file number")
continue
# Find the case
case = db.query(Case).filter(Case.file_no == file_no).first()
if not case:
result['errors'].append(f"Row {row_num}: Case with file number '{file_no}' not found")
continue
amount = parse_float(row.get('Amount', '0'))
if amount is None:
result['errors'].append(f"Row {row_num}: Invalid amount")
continue
payment = Payment(
case_id=case.id,
payment_date=parse_date(row.get('Date', '')),
payment_type=row.get('Type', '').strip() or None,
amount=amount,
description=row.get('Description', '').strip() or None,
check_number=row.get('Check_Number', '').strip() or None
)
db.add(payment)
result['success'] += 1
except Exception as e:
result['errors'].append(f"Row {row_num}: {str(e)}")
db.commit()
except Exception as e:
result['errors'].append(f"Import failed: {str(e)}")
db.rollback()
return result
def process_csv_import(db: Session, import_type: str, file_path: str) -> Dict[str, Any]:
"""
Process CSV import based on type.
Args:
db: Database session
import_type: Type of import (client, phone, case, transaction, document, payment)
file_path: Path to CSV file
Returns:
Dict with import results
"""
import_functions = {
'client': import_rolodex_data,
'phone': import_phone_data,
'case': import_files_data,
'transaction': import_ledger_data,
'document': import_qdros_data,
'payment': import_payments_data
}
import_func = import_functions.get(import_type)
if not import_func:
return {
'success': 0,
'errors': [f"Unknown import type: {import_type}"],
'total_rows': 0
}
return import_func(db, file_path)
@app.get("/")
async def root():
"""
@@ -300,6 +869,195 @@ async def dashboard(
)
@app.post("/admin/upload")
async def admin_upload_files(
request: Request,
files: List[UploadFile] = File(...),
db: Session = Depends(get_db)
):
"""
Handle CSV file uploads for admin panel.
Validates uploaded files are CSV format and stores them in data-import directory.
"""
# Check authentication
user = get_current_user_from_session(request.session)
if not user:
return RedirectResponse(url="/login", status_code=302)
results = []
errors = []
# Ensure data-import directory exists
import_dir = "data-import"
os.makedirs(import_dir, exist_ok=True)
for file in files:
try:
# Validate file type
if not file.filename.lower().endswith('.csv'):
errors.append(f"File '{file.filename}' is not a CSV file")
continue
# Generate unique filename to avoid conflicts
file_id = str(uuid.uuid4())
file_ext = os.path.splitext(file.filename)[1]
stored_filename = f"{file_id}{file_ext}"
file_path = os.path.join(import_dir, stored_filename)
# Save file
contents = await file.read()
with open(file_path, "wb") as f:
f.write(contents)
# Determine import type from filename
try:
import_type = get_import_type_from_filename(file.filename)
except ValueError as e:
errors.append(f"File '{file.filename}': {str(e)}")
# Clean up uploaded file
os.remove(file_path)
continue
results.append({
'filename': file.filename,
'stored_filename': stored_filename,
'import_type': import_type,
'file_path': file_path,
'size': len(contents)
})
except Exception as e:
errors.append(f"Error processing '{file.filename}': {str(e)}")
continue
# Log the upload operation
logger.info(f"Admin upload: {len(results)} files uploaded, {len(errors)} errors by user '{user.username}'")
return templates.TemplateResponse("admin.html", {
"request": request,
"user": user,
"upload_results": results,
"upload_errors": errors,
"show_upload_results": True
})
@app.post("/admin/import/{data_type}")
async def admin_import_data(
request: Request,
data_type: str,
db: Session = Depends(get_db)
):
"""
Process CSV import for specified data type.
Creates import log entry and processes the import in the background.
"""
# Check authentication
user = get_current_user_from_session(request.session)
if not user:
return RedirectResponse(url="/login", status_code=302)
# Validate data type
valid_types = ['client', 'phone', 'case', 'transaction', 'document', 'payment']
if data_type not in valid_types:
return templates.TemplateResponse("admin.html", {
"request": request,
"user": user,
"error": f"Invalid data type: {data_type}"
})
# Get form data for file selection
form = await request.form()
selected_files = form.getlist("selected_files")
if not selected_files:
return templates.TemplateResponse("admin.html", {
"request": request,
"user": user,
"error": "No files selected for import"
})
import_results = []
total_success = 0
total_errors = 0
for stored_filename in selected_files:
file_path = os.path.join("data-import", stored_filename)
if not os.path.exists(file_path):
import_results.append({
'filename': stored_filename,
'status': 'error',
'message': 'File not found'
})
total_errors += 1
continue
# Create import log entry
import_log = ImportLog(
import_type=data_type,
file_name=stored_filename,
file_path=file_path,
status="running"
)
db.add(import_log)
db.commit()
try:
# Process the import
result = process_csv_import(db, data_type, file_path)
# Update import log
import_log.status = "completed" if result['errors'] else "failed"
import_log.total_rows = result['total_rows']
import_log.success_count = result['success']
import_log.error_count = len(result['errors'])
import_log.error_details = json.dumps(result['errors'])
import_log.completed_at = datetime.now()
db.commit()
import_results.append({
'filename': stored_filename,
'status': 'success' if result['success'] > 0 else 'error',
'total_rows': result['total_rows'],
'success_count': result['success'],
'error_count': len(result['errors']),
'errors': result['errors'][:10] # Show first 10 errors
})
total_success += result['success']
total_errors += len(result['errors'])
except Exception as e:
# Update import log on error
import_log.status = "failed"
import_log.error_details = json.dumps([str(e)])
import_log.completed_at = datetime.now()
db.commit()
import_results.append({
'filename': stored_filename,
'status': 'error',
'message': str(e)
})
total_errors += 1
# Log the import operation
logger.info(f"Admin import: {data_type}, {total_success} success, {total_errors} errors by user '{user.username}'")
return templates.TemplateResponse("admin.html", {
"request": request,
"user": user,
"import_results": import_results,
"total_success": total_success,
"total_errors": total_errors,
"show_import_results": True
})
@app.get("/admin")
async def admin_panel(request: Request, db: Session = Depends(get_db)):
"""
@@ -312,9 +1070,43 @@ async def admin_panel(request: Request, db: Session = Depends(get_db)):
if not user:
return RedirectResponse(url="/login", status_code=302)
# Get recent import history
recent_imports = db.query(ImportLog).order_by(ImportLog.created_at.desc()).limit(10).all()
# Get available files for import
import_dir = "data-import"
available_files = []
if os.path.exists(import_dir):
for filename in os.listdir(import_dir):
if filename.endswith('.csv'):
file_path = os.path.join(import_dir, filename)
file_size = os.path.getsize(file_path)
try:
import_type = get_import_type_from_filename(filename)
except ValueError:
import_type = 'unknown'
available_files.append({
'filename': filename,
'import_type': import_type,
'size': file_size,
'modified': datetime.fromtimestamp(os.path.getmtime(file_path))
})
# Group files by import type
files_by_type = {}
for file_info in available_files:
import_type = file_info['import_type']
if import_type not in files_by_type:
files_by_type[import_type] = []
files_by_type[import_type].append(file_info)
return templates.TemplateResponse("admin.html", {
"request": request,
"user": user
"user": user,
"recent_imports": recent_imports,
"available_files": available_files,
"files_by_type": files_by_type
})