From 5111079149f4e6850cd12b64a7844a83b179c142 Mon Sep 17 00:00:00 2001 From: HotSwapp <47397945+HotSwapp@users.noreply.github.com> Date: Wed, 13 Aug 2025 18:53:35 -0500 Subject: [PATCH] coming together --- app/api/customers.py | 258 +- app/api/documents.py | 47 +- app/api/financial.py | 88 +- app/api/flexible.py | 281 + app/api/import_data.py | 1406 ++++- app/api/search.py | 62 +- app/api/search_highlight.py | 141 + app/api/support.py | 55 +- app/main.py | 17 +- app/models/__init__.py | 7 +- app/models/audit.py | 55 +- app/models/flexible.py | 37 + package-lock.json | 6539 ++++++++++++++++++++++++ package.json | 6 +- static/js/__tests__/alerts.ui.test.js | 133 + static/js/__tests__/highlight.test.js | 79 + static/js/__tests__/upload.ui.test.js | 123 + static/js/customers-modern.js | 5 +- static/js/customers-tailwind.js | 213 +- static/js/flexible.js | 314 ++ static/js/highlight.js | 95 + static/js/keyboard-shortcuts.js | 4 + static/js/main.js | 29 +- static/js/sanitizer.js | 18 +- static/js/upload-helper.js | 46 + templates/admin.html | 64 +- templates/base.html | 37 +- templates/customers.html | 568 +- templates/dashboard.html | 92 +- templates/documents.html | 583 ++- templates/files.html | 169 +- templates/financial.html | 418 +- templates/flexible.html | 113 + templates/import.html | 682 ++- templates/search.html | 35 +- test_customers.py | 261 +- tests/conftest.py | 11 + tests/helpers.py | 25 + tests/test_admin_api.py | 101 + tests/test_customers_edge_cases.py | 168 + tests/test_document_upload_envelope.py | 130 + tests/test_documents_api.py | 155 + tests/test_files_api.py | 119 + tests/test_financial_api.py | 263 + tests/test_flexible_batch_import.py | 79 + tests/test_flexible_import.py | 103 + tests/test_import_api.py | 132 + tests/test_search_api.py | 286 ++ tests/test_search_highlight_utils.py | 186 + tests/test_settings_api.py | 85 + tests/test_support_api.py | 122 + 51 files changed, 14457 insertions(+), 588 deletions(-) create mode 100644 app/api/flexible.py create mode 100644 app/api/search_highlight.py create mode 100644 app/models/flexible.py create mode 100644 static/js/__tests__/alerts.ui.test.js create mode 100644 static/js/__tests__/highlight.test.js create mode 100644 static/js/__tests__/upload.ui.test.js create mode 100644 static/js/flexible.js create mode 100644 static/js/highlight.js create mode 100644 static/js/upload-helper.js create mode 100644 templates/flexible.html create mode 100644 tests/conftest.py create mode 100644 tests/helpers.py create mode 100644 tests/test_admin_api.py create mode 100644 tests/test_customers_edge_cases.py create mode 100644 tests/test_document_upload_envelope.py create mode 100644 tests/test_documents_api.py create mode 100644 tests/test_files_api.py create mode 100644 tests/test_financial_api.py create mode 100644 tests/test_flexible_batch_import.py create mode 100644 tests/test_flexible_import.py create mode 100644 tests/test_import_api.py create mode 100644 tests/test_search_api.py create mode 100644 tests/test_search_highlight_utils.py create mode 100644 tests/test_settings_api.py create mode 100644 tests/test_support_api.py diff --git a/app/api/customers.py b/app/api/customers.py index 4a2cc24..fd3439f 100644 --- a/app/api/customers.py +++ b/app/api/customers.py @@ -1,10 +1,13 @@ """ Customer (Rolodex) API endpoints """ -from typing import List, Optional +from typing import List, Optional, Union from fastapi import APIRouter, Depends, HTTPException, status, Query from sqlalchemy.orm import Session, joinedload -from sqlalchemy import or_, func +from sqlalchemy import or_, and_, func, asc, desc +from fastapi.responses import StreamingResponse +import csv +import io from app.database.base import get_db from app.models.rolodex import Rolodex, Phone @@ -169,36 +172,263 @@ async def get_customer_stats( } -@router.get("/", response_model=List[CustomerResponse]) +class PaginatedCustomersResponse(BaseModel): + items: List[CustomerResponse] + total: int + + +@router.get("/", response_model=Union[List[CustomerResponse], PaginatedCustomersResponse]) async def list_customers( skip: int = Query(0, ge=0), limit: int = Query(50, ge=1, le=200), search: Optional[str] = Query(None), + group: Optional[str] = Query(None, description="Filter by customer group (exact match)"), + state: Optional[str] = Query(None, description="Filter by state abbreviation (exact match)"), + groups: Optional[List[str]] = Query(None, description="Filter by multiple groups (repeat param)"), + states: Optional[List[str]] = Query(None, description="Filter by multiple states (repeat param)"), + sort_by: Optional[str] = Query("id", description="Sort field: id, name, city, email"), + sort_dir: Optional[str] = Query("asc", description="Sort direction: asc or desc"), + include_total: bool = Query(False, description="When true, returns {items, total} instead of a plain list"), db: Session = Depends(get_db), current_user: User = Depends(get_current_user) ): """List customers with pagination and search""" try: - query = db.query(Rolodex).options(joinedload(Rolodex.phone_numbers)) + base_query = db.query(Rolodex) if search: - query = query.filter( - or_( - Rolodex.id.contains(search), - Rolodex.last.contains(search), - Rolodex.first.contains(search), - Rolodex.city.contains(search), - Rolodex.email.contains(search) - ) + s = (search or "").strip() + s_lower = s.lower() + tokens = [t for t in s_lower.split() if t] + # Basic contains search on several fields (case-insensitive) + contains_any = or_( + func.lower(Rolodex.id).contains(s_lower), + func.lower(Rolodex.last).contains(s_lower), + func.lower(Rolodex.first).contains(s_lower), + func.lower(Rolodex.middle).contains(s_lower), + func.lower(Rolodex.city).contains(s_lower), + func.lower(Rolodex.email).contains(s_lower), ) - - customers = query.offset(skip).limit(limit).all() + # Multi-token name support: every token must match either first, middle, or last + name_tokens = [ + or_( + func.lower(Rolodex.first).contains(tok), + func.lower(Rolodex.middle).contains(tok), + func.lower(Rolodex.last).contains(tok), + ) + for tok in tokens + ] + combined = contains_any if not name_tokens else or_(contains_any, and_(*name_tokens)) + # Comma pattern: "Last, First" + last_first_filter = None + if "," in s_lower: + last_part, first_part = [p.strip() for p in s_lower.split(",", 1)] + if last_part and first_part: + last_first_filter = and_( + func.lower(Rolodex.last).contains(last_part), + func.lower(Rolodex.first).contains(first_part), + ) + elif last_part: + last_first_filter = func.lower(Rolodex.last).contains(last_part) + final_filter = or_(combined, last_first_filter) if last_first_filter is not None else combined + base_query = base_query.filter(final_filter) + + # Apply group/state filters (support single and multi-select) + effective_groups = [g for g in (groups or []) if g] or ([group] if group else []) + if effective_groups: + base_query = base_query.filter(Rolodex.group.in_(effective_groups)) + effective_states = [s for s in (states or []) if s] or ([state] if state else []) + if effective_states: + base_query = base_query.filter(Rolodex.abrev.in_(effective_states)) + + # Apply sorting (whitelisted fields only) + normalized_sort_by = (sort_by or "id").lower() + normalized_sort_dir = (sort_dir or "asc").lower() + is_desc = normalized_sort_dir == "desc" + + order_columns = [] + if normalized_sort_by == "id": + order_columns = [Rolodex.id] + elif normalized_sort_by == "name": + # Sort by last, then first + order_columns = [Rolodex.last, Rolodex.first] + elif normalized_sort_by == "city": + # Sort by city, then state abbreviation + order_columns = [Rolodex.city, Rolodex.abrev] + elif normalized_sort_by == "email": + order_columns = [Rolodex.email] + else: + # Fallback to id to avoid arbitrary column injection + order_columns = [Rolodex.id] + + # Case-insensitive ordering where applicable, preserving None ordering default + ordered = [] + for col in order_columns: + # Use lower() for string-like cols; SQLAlchemy will handle non-string safely enough for SQLite/Postgres + expr = func.lower(col) if col.type.python_type in (str,) else col # type: ignore[attr-defined] + ordered.append(desc(expr) if is_desc else asc(expr)) + + if ordered: + base_query = base_query.order_by(*ordered) + + customers = base_query.options(joinedload(Rolodex.phone_numbers)).offset(skip).limit(limit).all() + if include_total: + total = base_query.count() + return {"items": customers, "total": total} return customers except Exception as e: raise HTTPException(status_code=500, detail=f"Error loading customers: {str(e)}") +@router.get("/export") +async def export_customers( + # Optional pagination for exporting only current page; omit to export all + skip: Optional[int] = Query(None, ge=0), + limit: Optional[int] = Query(None, ge=1, le=1000000), + search: Optional[str] = Query(None), + group: Optional[str] = Query(None, description="Filter by customer group (exact match)"), + state: Optional[str] = Query(None, description="Filter by state abbreviation (exact match)"), + groups: Optional[List[str]] = Query(None, description="Filter by multiple groups (repeat param)"), + states: Optional[List[str]] = Query(None, description="Filter by multiple states (repeat param)"), + sort_by: Optional[str] = Query("id", description="Sort field: id, name, city, email"), + sort_dir: Optional[str] = Query("asc", description="Sort direction: asc or desc"), + fields: Optional[List[str]] = Query(None, description="CSV fields to include: id,name,group,city,state,phone,email"), + export_all: bool = Query(False, description="When true, ignore skip/limit and export all matches"), + db: Session = Depends(get_db), + current_user: User = Depends(get_current_user) +): + """Export customers as CSV respecting search, filters, and sorting. + If skip/limit provided, exports that slice; otherwise exports all matches. + """ + try: + base_query = db.query(Rolodex) + + if search: + s = (search or "").strip() + s_lower = s.lower() + tokens = [t for t in s_lower.split() if t] + contains_any = or_( + func.lower(Rolodex.id).contains(s_lower), + func.lower(Rolodex.last).contains(s_lower), + func.lower(Rolodex.first).contains(s_lower), + func.lower(Rolodex.middle).contains(s_lower), + func.lower(Rolodex.city).contains(s_lower), + func.lower(Rolodex.email).contains(s_lower), + ) + name_tokens = [ + or_( + func.lower(Rolodex.first).contains(tok), + func.lower(Rolodex.middle).contains(tok), + func.lower(Rolodex.last).contains(tok), + ) + for tok in tokens + ] + combined = contains_any if not name_tokens else or_(contains_any, and_(*name_tokens)) + last_first_filter = None + if "," in s_lower: + last_part, first_part = [p.strip() for p in s_lower.split(",", 1)] + if last_part and first_part: + last_first_filter = and_( + func.lower(Rolodex.last).contains(last_part), + func.lower(Rolodex.first).contains(first_part), + ) + elif last_part: + last_first_filter = func.lower(Rolodex.last).contains(last_part) + final_filter = or_(combined, last_first_filter) if last_first_filter is not None else combined + base_query = base_query.filter(final_filter) + + effective_groups = [g for g in (groups or []) if g] or ([group] if group else []) + if effective_groups: + base_query = base_query.filter(Rolodex.group.in_(effective_groups)) + effective_states = [s for s in (states or []) if s] or ([state] if state else []) + if effective_states: + base_query = base_query.filter(Rolodex.abrev.in_(effective_states)) + + normalized_sort_by = (sort_by or "id").lower() + normalized_sort_dir = (sort_dir or "asc").lower() + is_desc = normalized_sort_dir == "desc" + + order_columns = [] + if normalized_sort_by == "id": + order_columns = [Rolodex.id] + elif normalized_sort_by == "name": + order_columns = [Rolodex.last, Rolodex.first] + elif normalized_sort_by == "city": + order_columns = [Rolodex.city, Rolodex.abrev] + elif normalized_sort_by == "email": + order_columns = [Rolodex.email] + else: + order_columns = [Rolodex.id] + + ordered = [] + for col in order_columns: + try: + expr = func.lower(col) if col.type.python_type in (str,) else col # type: ignore[attr-defined] + except Exception: + expr = col + ordered.append(desc(expr) if is_desc else asc(expr)) + if ordered: + base_query = base_query.order_by(*ordered) + + if not export_all: + if skip is not None: + base_query = base_query.offset(skip) + if limit is not None: + base_query = base_query.limit(limit) + + customers = base_query.options(joinedload(Rolodex.phone_numbers)).all() + + # Prepare CSV + output = io.StringIO() + writer = csv.writer(output) + allowed_fields_in_order = ["id", "name", "group", "city", "state", "phone", "email"] + header_names = { + "id": "Customer ID", + "name": "Name", + "group": "Group", + "city": "City", + "state": "State", + "phone": "Primary Phone", + "email": "Email", + } + requested = [f.lower() for f in (fields or []) if isinstance(f, str)] + selected_fields = [f for f in allowed_fields_in_order if f in requested] if requested else allowed_fields_in_order + if not selected_fields: + selected_fields = allowed_fields_in_order + writer.writerow([header_names[f] for f in selected_fields]) + for c in customers: + full_name = f"{(c.first or '').strip()} {(c.last or '').strip()}".strip() + primary_phone = "" + try: + if c.phone_numbers: + primary_phone = c.phone_numbers[0].phone or "" + except Exception: + primary_phone = "" + row_map = { + "id": c.id, + "name": full_name, + "group": c.group or "", + "city": c.city or "", + "state": c.abrev or "", + "phone": primary_phone, + "email": c.email or "", + } + writer.writerow([row_map[f] for f in selected_fields]) + + output.seek(0) + filename = "customers_export.csv" + return StreamingResponse( + output, + media_type="text/csv", + headers={ + "Content-Disposition": f"attachment; filename={filename}" + }, + ) + except Exception as e: + raise HTTPException(status_code=500, detail=f"Error exporting customers: {str(e)}") + + @router.get("/{customer_id}", response_model=CustomerResponse) async def get_customer( customer_id: str, diff --git a/app/api/documents.py b/app/api/documents.py index 481ac9f..69e7e51 100644 --- a/app/api/documents.py +++ b/app/api/documents.py @@ -118,10 +118,18 @@ async def create_qdro( current_user: User = Depends(get_current_user) ): """Create new QDRO""" - qdro = QDRO(**qdro_data.model_dump()) + # Only accept fields that exist on the model and exclude None values + allowed_fields = {c.name for c in QDRO.__table__.columns} + payload = { + k: v + for k, v in qdro_data.model_dump(exclude_unset=True).items() + if v is not None and k in allowed_fields + } + qdro = QDRO(**payload) - if not qdro.created_date: - qdro.created_date = date.today() + # Backfill created_date if model supports it; otherwise rely on created_at + if hasattr(qdro, "created_date") and not getattr(qdro, "created_date"): + setattr(qdro, "created_date", date.today()) db.add(qdro) db.commit() @@ -172,9 +180,11 @@ async def update_qdro( detail="QDRO not found" ) - # Update fields + # Update fields present on the model only + allowed_fields = {c.name for c in QDRO.__table__.columns} for field, value in qdro_data.model_dump(exclude_unset=True).items(): - setattr(qdro, field, value) + if field in allowed_fields: + setattr(qdro, field, value) db.commit() db.refresh(qdro) @@ -525,23 +535,33 @@ async def generate_document( document_id = str(uuid.uuid4()) file_name = f"{template.form_name}_{file_obj.file_no}_{date.today().isoformat()}" + exports_dir = "/app/exports" + try: + os.makedirs(exports_dir, exist_ok=True) + except Exception: + try: + os.makedirs("exports", exist_ok=True) + exports_dir = "exports" + except Exception: + exports_dir = "." + if request.output_format.upper() == "PDF": - file_path = f"/app/exports/{document_id}.pdf" + file_path = f"{exports_dir}/{document_id}.pdf" file_name += ".pdf" # Here you would implement PDF generation # For now, create a simple text file - with open(f"/app/exports/{document_id}.txt", "w") as f: + with open(f"{exports_dir}/{document_id}.txt", "w") as f: f.write(merged_content) - file_path = f"/app/exports/{document_id}.txt" + file_path = f"{exports_dir}/{document_id}.txt" elif request.output_format.upper() == "DOCX": - file_path = f"/app/exports/{document_id}.docx" + file_path = f"{exports_dir}/{document_id}.docx" file_name += ".docx" # Implement DOCX generation - with open(f"/app/exports/{document_id}.txt", "w") as f: + with open(f"{exports_dir}/{document_id}.txt", "w") as f: f.write(merged_content) - file_path = f"/app/exports/{document_id}.txt" + file_path = f"{exports_dir}/{document_id}.txt" else: # HTML - file_path = f"/app/exports/{document_id}.html" + file_path = f"{exports_dir}/{document_id}.html" file_name += ".html" html_content = f"
{merged_content}"
with open(file_path, "w") as f:
@@ -768,6 +788,9 @@ async def upload_document(
max_size = 10 * 1024 * 1024 # 10MB
content = await file.read()
+ # Treat zero-byte payloads as no file uploaded to provide a clearer client error
+ if len(content) == 0:
+ raise HTTPException(status_code=400, detail="No file uploaded")
if len(content) > max_size:
raise HTTPException(status_code=400, detail="File too large")
diff --git a/app/api/financial.py b/app/api/financial.py
index 1860e03..54299ac 100644
--- a/app/api/financial.py
+++ b/app/api/financial.py
@@ -294,33 +294,82 @@ async def _update_file_balances(file_obj: File, db: Session):
async def get_recent_time_entries(
days: int = Query(7, ge=1, le=30),
employee: Optional[str] = Query(None),
- skip: int = Query(0, ge=0),
+ status: Optional[str] = Query(None, description="billed|unbilled"),
+ q: Optional[str] = Query(None, description="text search across description, file, employee, matter, client name"),
+ page: int = Query(1, ge=1),
limit: int = Query(50, ge=1, le=200),
+ sort_by: str = Query("date"),
+ sort_dir: str = Query("desc"),
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
):
- """Get recent time entries across all files"""
+ """Get recent time entries across all files with server-side sorting and pagination"""
cutoff_date = date.today() - timedelta(days=days)
-
- query = db.query(Ledger)\
- .options(joinedload(Ledger.file).joinedload(File.owner))\
+
+ # Base query with joins for sorting/searching by client/matter
+ base_query = db.query(Ledger) \
+ .join(File, Ledger.file_no == File.file_no) \
+ .outerjoin(Rolodex, File.id == Rolodex.id) \
+ .options(joinedload(Ledger.file).joinedload(File.owner)) \
.filter(and_(
Ledger.date >= cutoff_date,
- Ledger.t_type == "2" # Time entries
- ))\
- .order_by(desc(Ledger.date))
-
+ Ledger.t_type == "2"
+ ))
+
if employee:
- query = query.filter(Ledger.empl_num == employee)
-
- entries = query.offset(skip).limit(limit).all()
-
+ base_query = base_query.filter(Ledger.empl_num == employee)
+
+ # Status/billed filtering
+ if status:
+ status_l = str(status).strip().lower()
+ if status_l in ("billed", "unbilled"):
+ billed_value = "Y" if status_l == "billed" else "N"
+ base_query = base_query.filter(Ledger.billed == billed_value)
+
+ # Text search across multiple fields
+ if q:
+ query_text = f"%{q.strip()}%"
+ base_query = base_query.filter(
+ or_(
+ Ledger.note.ilike(query_text),
+ Ledger.file_no.ilike(query_text),
+ Ledger.empl_num.ilike(query_text),
+ File.regarding.ilike(query_text),
+ Rolodex.first.ilike(query_text),
+ Rolodex.last.ilike(query_text)
+ )
+ )
+
+ # Sorting mapping (supported columns)
+ sort_map = {
+ "date": Ledger.date,
+ "file_no": Ledger.file_no,
+ "client_name": Rolodex.last, # best-effort: sort by client last name
+ "empl_num": Ledger.empl_num,
+ "quantity": Ledger.quantity,
+ "hours": Ledger.quantity, # alias
+ "rate": Ledger.rate,
+ "amount": Ledger.amount,
+ "billed": Ledger.billed,
+ "description": Ledger.note,
+ }
+ sort_column = sort_map.get(sort_by.lower(), Ledger.date)
+ direction = desc if str(sort_dir).lower() == "desc" else asc
+
+ # Total count for pagination (distinct on Ledger.id to avoid join-induced dupes)
+ total_count = base_query.with_entities(func.count(func.distinct(Ledger.id))).scalar()
+
+ # Apply sorting and pagination
+ offset = (page - 1) * limit
+ page_query = base_query.order_by(direction(sort_column)).offset(offset).limit(limit)
+ entries = page_query.all()
+
# Format results with file and client information
results = []
for entry in entries:
file_obj = entry.file
client = file_obj.owner if file_obj else None
-
+
results.append({
"id": entry.id,
"date": entry.date.isoformat(),
@@ -334,8 +383,15 @@ async def get_recent_time_entries(
"description": entry.note,
"billed": entry.billed == "Y"
})
-
- return {"entries": results, "total_entries": len(results)}
+
+ return {
+ "entries": results,
+ "total_count": total_count,
+ "page": page,
+ "limit": limit,
+ "sort_by": sort_by,
+ "sort_dir": sort_dir,
+ }
@router.post("/time-entry/quick")
diff --git a/app/api/flexible.py b/app/api/flexible.py
new file mode 100644
index 0000000..d13e327
--- /dev/null
+++ b/app/api/flexible.py
@@ -0,0 +1,281 @@
+"""
+Flexible Imports admin API: list, filter, and export unmapped rows captured during CSV imports.
+"""
+from typing import Optional, Dict, Any, List
+from datetime import datetime
+import csv
+import io
+
+from fastapi import APIRouter, Depends, Query, HTTPException
+from fastapi.responses import StreamingResponse
+from sqlalchemy.orm import Session
+from sqlalchemy import func, or_, cast, String
+
+from app.database.base import get_db
+from app.auth.security import get_admin_user
+from app.models.flexible import FlexibleImport
+
+
+router = APIRouter(prefix="/flexible", tags=["flexible"])
+
+
+@router.get("/imports")
+async def list_flexible_imports(
+ file_type: Optional[str] = Query(None, description="Filter by CSV file type (e.g., FILES.csv)"),
+ target_table: Optional[str] = Query(None, description="Filter by target model table name"),
+ q: Optional[str] = Query(None, description="Quick text search across file type, target table, and unmapped data"),
+ has_keys: Optional[List[str]] = Query(
+ None,
+ description="Filter rows where extra_data (or its 'unmapped' payload) contains these keys. Repeat param for multiple keys.",
+ ),
+ skip: int = Query(0, ge=0),
+ limit: int = Query(50, ge=1, le=500),
+ db: Session = Depends(get_db),
+ current_user=Depends(get_admin_user),
+):
+ """List flexible import rows with optional filtering, quick search, and pagination."""
+ query = db.query(FlexibleImport)
+ if file_type:
+ query = query.filter(FlexibleImport.file_type == file_type)
+ if target_table:
+ query = query.filter(FlexibleImport.target_table == target_table)
+ if q:
+ pattern = f"%{q.strip()}%"
+ # Search across file_type, target_table, and serialized JSON extra_data
+ query = query.filter(
+ or_(
+ FlexibleImport.file_type.ilike(pattern),
+ FlexibleImport.target_table.ilike(pattern),
+ cast(FlexibleImport.extra_data, String).ilike(pattern),
+ )
+ )
+
+ # Filter by key presence inside JSON payload by string matching of the serialized JSON
+ # This is DB-agnostic and works across SQLite/Postgres, though not as precise as JSON operators.
+ if has_keys:
+ for k in [k for k in has_keys if k is not None and str(k).strip() != ""]:
+ key = str(k).strip()
+ # Look for the JSON key token followed by a colon, e.g. "key":
+ query = query.filter(cast(FlexibleImport.extra_data, String).ilike(f'%"{key}":%'))
+
+ total = query.count()
+ items = (
+ query.order_by(FlexibleImport.id.desc())
+ .offset(skip)
+ .limit(limit)
+ .all()
+ )
+
+ def serialize(item: FlexibleImport) -> Dict[str, Any]:
+ return {
+ "id": item.id,
+ "file_type": item.file_type,
+ "target_table": item.target_table,
+ "primary_key_field": item.primary_key_field,
+ "primary_key_value": item.primary_key_value,
+ "extra_data": item.extra_data,
+ }
+
+ return {
+ "total": total,
+ "skip": skip,
+ "limit": limit,
+ "items": [serialize(i) for i in items],
+ }
+
+
+@router.get("/options")
+async def flexible_options(
+ db: Session = Depends(get_db),
+ current_user=Depends(get_admin_user),
+):
+ """Return distinct file types and target tables for filter dropdowns."""
+ file_types: List[str] = [
+ ft for (ft,) in db.query(func.distinct(FlexibleImport.file_type)).order_by(FlexibleImport.file_type.asc()).all()
+ if ft is not None
+ ]
+ target_tables: List[str] = [
+ tt for (tt,) in db.query(func.distinct(FlexibleImport.target_table)).order_by(FlexibleImport.target_table.asc()).all()
+ if tt is not None and tt != ""
+ ]
+ return {"file_types": file_types, "target_tables": target_tables}
+
+
+@router.get("/export")
+async def export_unmapped_csv(
+ file_type: Optional[str] = Query(None, description="Filter by CSV file type (e.g., FILES.csv)"),
+ target_table: Optional[str] = Query(None, description="Filter by target model table name"),
+ has_keys: Optional[List[str]] = Query(
+ None,
+ description="Filter rows where extra_data (or its 'unmapped' payload) contains these keys. Repeat param for multiple keys.",
+ ),
+ db: Session = Depends(get_db),
+ current_user=Depends(get_admin_user),
+):
+ """Export unmapped rows as CSV for review. Includes basic metadata columns and unmapped fields.
+
+ If FlexibleImport.extra_data contains a nested 'unmapped' dict, those keys are exported.
+ Otherwise, all keys of extra_data are exported.
+ """
+ query = db.query(FlexibleImport)
+ if file_type:
+ query = query.filter(FlexibleImport.file_type == file_type)
+ if target_table:
+ query = query.filter(FlexibleImport.target_table == target_table)
+ if has_keys:
+ for k in [k for k in has_keys if k is not None and str(k).strip() != ""]:
+ key = str(k).strip()
+ query = query.filter(cast(FlexibleImport.extra_data, String).ilike(f'%"{key}":%'))
+
+ rows: List[FlexibleImport] = query.order_by(FlexibleImport.id.asc()).all()
+ if not rows:
+ raise HTTPException(status_code=404, detail="No matching flexible imports to export")
+
+ # Determine union of unmapped keys across all rows
+ unmapped_keys: List[str] = []
+ key_set = set()
+ for r in rows:
+ data = r.extra_data or {}
+ payload = data.get("unmapped") if isinstance(data, dict) and isinstance(data.get("unmapped"), dict) else data
+ if isinstance(payload, dict):
+ for k in payload.keys():
+ if k not in key_set:
+ key_set.add(k)
+ unmapped_keys.append(k)
+
+ # Prepare CSV
+ meta_headers = [
+ "id",
+ "file_type",
+ "target_table",
+ "primary_key_field",
+ "primary_key_value",
+ ]
+ fieldnames = meta_headers + unmapped_keys
+
+ output = io.StringIO()
+ writer = csv.DictWriter(output, fieldnames=fieldnames)
+ writer.writeheader()
+
+ for r in rows:
+ row_out: Dict[str, Any] = {
+ "id": r.id,
+ "file_type": r.file_type,
+ "target_table": r.target_table or "",
+ "primary_key_field": r.primary_key_field or "",
+ "primary_key_value": r.primary_key_value or "",
+ }
+ data = r.extra_data or {}
+ payload = data.get("unmapped") if isinstance(data, dict) and isinstance(data.get("unmapped"), dict) else data
+ if isinstance(payload, dict):
+ for k in unmapped_keys:
+ v = payload.get(k)
+ # Normalize lists/dicts to JSON strings for CSV safety
+ if isinstance(v, (dict, list)):
+ try:
+ import json as _json
+ row_out[k] = _json.dumps(v, ensure_ascii=False)
+ except Exception:
+ row_out[k] = str(v)
+ else:
+ row_out[k] = v if v is not None else ""
+ writer.writerow(row_out)
+
+ output.seek(0)
+ timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
+ filename_parts = ["flexible_unmapped"]
+ if file_type:
+ filename_parts.append(file_type.replace("/", "-").replace(" ", "_"))
+ if target_table:
+ filename_parts.append(target_table.replace("/", "-").replace(" ", "_"))
+ filename = "_".join(filename_parts) + f"_{timestamp}.csv"
+
+ return StreamingResponse(
+ iter([output.getvalue()]),
+ media_type="text/csv",
+ headers={
+ "Content-Disposition": f"attachment; filename=\"{filename}\"",
+ },
+ )
+
+
+@router.get("/export/{row_id}")
+async def export_single_row_csv(
+ row_id: int,
+ db: Session = Depends(get_db),
+ current_user=Depends(get_admin_user),
+):
+ """Export a single flexible import row as CSV.
+
+ Includes metadata columns plus keys from the row's unmapped payload.
+ If FlexibleImport.extra_data contains a nested 'unmapped' dict, those keys are exported;
+ otherwise, all keys of extra_data are exported.
+ """
+ row: Optional[FlexibleImport] = (
+ db.query(FlexibleImport).filter(FlexibleImport.id == row_id).first()
+ )
+ if not row:
+ raise HTTPException(status_code=404, detail="Flexible import row not found")
+
+ data = row.extra_data or {}
+ payload = (
+ data.get("unmapped")
+ if isinstance(data, dict) and isinstance(data.get("unmapped"), dict)
+ else data
+ )
+
+ unmapped_keys: List[str] = []
+ if isinstance(payload, dict):
+ for k in payload.keys():
+ unmapped_keys.append(k)
+
+ meta_headers = [
+ "id",
+ "file_type",
+ "target_table",
+ "primary_key_field",
+ "primary_key_value",
+ ]
+ fieldnames = meta_headers + unmapped_keys
+
+ output = io.StringIO()
+ writer = csv.DictWriter(output, fieldnames=fieldnames)
+ writer.writeheader()
+
+ row_out: Dict[str, Any] = {
+ "id": row.id,
+ "file_type": row.file_type,
+ "target_table": row.target_table or "",
+ "primary_key_field": row.primary_key_field or "",
+ "primary_key_value": row.primary_key_value or "",
+ }
+ if isinstance(payload, dict):
+ for k in unmapped_keys:
+ v = payload.get(k)
+ if isinstance(v, (dict, list)):
+ try:
+ import json as _json
+ row_out[k] = _json.dumps(v, ensure_ascii=False)
+ except Exception:
+ row_out[k] = str(v)
+ else:
+ row_out[k] = v if v is not None else ""
+
+ writer.writerow(row_out)
+ output.seek(0)
+
+ timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
+ filename = (
+ f"flexible_row_{row.id}_{row.file_type.replace('/', '-').replace(' ', '_')}_{timestamp}.csv"
+ if row.file_type
+ else f"flexible_row_{row.id}_{timestamp}.csv"
+ )
+
+ return StreamingResponse(
+ iter([output.getvalue()]),
+ media_type="text/csv",
+ headers={
+ "Content-Disposition": f"attachment; filename=\"{filename}\"",
+ },
+ )
+
diff --git a/app/api/import_data.py b/app/api/import_data.py
index 600c096..11f0127 100644
--- a/app/api/import_data.py
+++ b/app/api/import_data.py
@@ -1,11 +1,16 @@
"""
-Data import API endpoints for CSV file uploads
+Data import API endpoints for CSV file uploads with auto-discovery mapping.
"""
import csv
import io
-from datetime import datetime
-from typing import List, Dict, Any, Optional
-from fastapi import APIRouter, Depends, HTTPException, UploadFile, File as UploadFileForm, Form
+import re
+import os
+from pathlib import Path
+from difflib import SequenceMatcher
+from datetime import datetime, date
+from decimal import Decimal
+from typing import List, Dict, Any, Optional, Tuple
+from fastapi import APIRouter, Depends, HTTPException, UploadFile, File as UploadFileForm, Form, Query
from sqlalchemy.orm import Session
from app.database.base import get_db
from app.auth.security import get_current_user
@@ -17,6 +22,9 @@ from app.models.qdro import QDRO
from app.models.pensions import Pension, PensionSchedule, MarriageHistory, DeathBenefit, SeparationAgreement, LifeTable, NumberTable
from app.models.lookups import Employee, FileType, FileStatus, TransactionType, TransactionCode, State, GroupLookup, Footer, PlanInfo, FormIndex, FormList, PrinterSetup, SystemSetup
from app.models.additional import Payment, Deposit, FileNote, FormVariable, ReportVariable
+from app.models.flexible import FlexibleImport
+from app.models.audit import ImportAudit, ImportAuditFile
+from app.config import settings
router = APIRouter(tags=["import"])
@@ -24,8 +32,11 @@ router = APIRouter(tags=["import"])
# CSV to Model mapping
CSV_MODEL_MAPPING = {
"ROLODEX.csv": Rolodex,
+ "ROLEX_V.csv": Rolodex, # Legacy/view alias
"PHONE.csv": Phone,
"FILES.csv": File,
+ "FILES_R.csv": File, # Legacy/report alias
+ "FILES_V.csv": File, # Legacy/view alias
"LEDGER.csv": Ledger,
"QDROS.csv": QDRO,
"PENSIONS.csv": Pension,
@@ -44,6 +55,8 @@ CSV_MODEL_MAPPING = {
"GRUPLKUP.csv": GroupLookup,
"FOOTERS.csv": Footer,
"PLANINFO.csv": PlanInfo,
+ # Legacy alternate names from export directories
+ "SCHEDULE.csv": PensionSchedule,
"FORM_INX.csv": FormIndex,
"FORM_LST.csv": FormList,
"PRINTERS.csv": PrinterSetup,
@@ -58,6 +71,7 @@ CSV_MODEL_MAPPING = {
}
# Field mappings for CSV columns to database fields
+# Legacy header synonyms used as hints only (not required). Auto-discovery will work without exact matches.
FIELD_MAPPINGS = {
"ROLODEX.csv": {
"Id": "id",
@@ -221,9 +235,9 @@ FIELD_MAPPINGS = {
"Sort_Order": "sort_order"
},
"FOOTERS.csv": {
- "Footer_Code": "footer_code",
- "Content": "content",
- "Description": "description"
+ "F_Code": "footer_code",
+ "F_Footer": "content"
+ # Description is optional - not required for footers
},
"PLANINFO.csv": {
"Plan_Id": "plan_id",
@@ -367,6 +381,224 @@ def parse_date(date_str: str) -> Optional[datetime]:
+def make_json_safe(value: Any) -> Any:
+ """Recursively convert values to JSON-serializable types.
+
+ - date/datetime -> ISO string
+ - Decimal -> float
+ - dict/list -> recurse
+ """
+ if isinstance(value, (datetime, date)):
+ return value.isoformat()
+ if isinstance(value, Decimal):
+ try:
+ return float(value)
+ except Exception:
+ return str(value)
+ if isinstance(value, dict):
+ return {k: make_json_safe(v) for k, v in value.items()}
+ if isinstance(value, list):
+ return [make_json_safe(v) for v in value]
+ return value
+
+
+def parse_csv_robust(csv_content: str) -> Tuple[List[Dict[str, str]], List[str]]:
+ """Parse CSV text robustly by handling broken newlines in unquoted fields.
+
+ Returns tuple of (rows_as_dicts, headers)
+ """
+ lines = (csv_content or "").strip().split('\n')
+ if not lines or (len(lines) == 1 and not lines[0].strip()):
+ return [], []
+
+ # Parse headers using the csv module to respect quoting
+ header_reader = csv.reader(io.StringIO(lines[0]))
+ headers = next(header_reader)
+ headers = [h.strip() for h in headers]
+
+ rows_data: List[Dict[str, str]] = []
+ for line_num, line in enumerate(lines[1:], start=2):
+ # Skip empty lines
+ if not line.strip():
+ continue
+ try:
+ # Parse each line independently; avoids multiline parse explosions
+ line_reader = csv.reader(io.StringIO(line))
+ fields = next(line_reader)
+ fields = [f.strip() for f in fields]
+
+ # If clearly malformed (too few fields), skip
+ if len(fields) < max(1, len(headers) // 2):
+ continue
+
+ # Pad or truncate to header length
+ while len(fields) < len(headers):
+ fields.append("")
+ fields = fields[:len(headers)]
+
+ row_dict = dict(zip(headers, fields))
+ rows_data.append(row_dict)
+ except Exception:
+ # Skip malformed row
+ continue
+
+ return rows_data, headers
+
+
+def parse_csv_with_fallback(text: str) -> Tuple[List[Dict[str, str]], List[str]]:
+ """Try csv.DictReader first; on failure, fall back to robust parser."""
+ try:
+ reader = csv.DictReader(io.StringIO(text), delimiter=',', quotechar='"', quoting=csv.QUOTE_MINIMAL)
+ headers_local = reader.fieldnames or []
+ rows_local: List[Dict[str, str]] = []
+ for r in reader:
+ rows_local.append(r)
+ return rows_local, headers_local
+ except Exception:
+ return parse_csv_robust(text)
+
+
+def _normalize_label(label: str) -> str:
+ """Normalize a header/field label for fuzzy comparison."""
+ if not label:
+ return ""
+ # Lowercase, replace separators with space, remove non-alphanumerics, expand common short forms
+ lowered = label.strip().lower()
+ # Replace separators
+ lowered = re.sub(r"[\s\-]+", "_", lowered)
+ # Remove non-word characters except underscore
+ lowered = re.sub(r"[^a-z0-9_]", "", lowered)
+ # Expand a few common abbreviations
+ replacements = {
+ "num": "number",
+ "no": "number",
+ "amt": "amount",
+ "addr": "address",
+ "st": "state",
+ "dob": "dateofbirth",
+ "ss": "ssnumber",
+ }
+ tokens = [replacements.get(t, t) for t in lowered.split("_") if t]
+ return "".join(tokens)
+
+
+def _get_model_columns(model_class) -> Tuple[Dict[str, Any], List[str]]:
+ """Return model columns mapping name->Column and list of primary key column names."""
+ columns = {}
+ pk_names = []
+ for col in model_class.__table__.columns:
+ if col.name in {"created_at", "updated_at"}:
+ continue
+ columns[col.name] = col
+ if col.primary_key:
+ pk_names.append(col.name)
+ return columns, pk_names
+
+
+def _build_dynamic_mapping(headers: List[str], model_class, file_type: str) -> Dict[str, Any]:
+ """Create a mapping from CSV headers to model fields using synonyms and fuzzy similarity.
+
+ Returns a dict with keys: mapping (csv_header->db_field), suggestions, unmapped_headers, mapped_headers
+ """
+ model_columns, _ = _get_model_columns(model_class)
+ model_field_names = list(model_columns.keys())
+
+ # Start with legacy mapping hints when available
+ legacy_map = FIELD_MAPPINGS.get(file_type, {}) or {}
+
+ mapping: Dict[str, Optional[str]] = {}
+ suggestions: Dict[str, List[Tuple[str, float]]] = {}
+ used_db_fields: set[str] = set()
+
+ # 1) Exact legacy header key usage
+ for header in headers:
+ if header in legacy_map and legacy_map[header] is not None:
+ candidate = legacy_map[header]
+ if candidate in model_field_names and candidate not in used_db_fields:
+ mapping[header] = candidate
+ used_db_fields.add(candidate)
+
+ # 2) Direct exact match against model fields (case-insensitive and normalized)
+ normalized_model = {name: _normalize_label(name) for name in model_field_names}
+ normalized_to_model = {v: k for k, v in normalized_model.items()}
+
+ for header in headers:
+ if header in mapping:
+ continue
+ normalized_header = _normalize_label(header)
+ if normalized_header in normalized_to_model:
+ candidate = normalized_to_model[normalized_header]
+ if candidate not in used_db_fields:
+ mapping[header] = candidate
+ used_db_fields.add(candidate)
+
+ # 3) Fuzzy best-match based on normalized strings
+ for header in headers:
+ if header in mapping:
+ continue
+ normalized_header = _normalize_label(header)
+ best_candidate = None
+ best_score = 0.0
+ candidate_list: List[Tuple[str, float]] = []
+ for model_field in model_field_names:
+ if model_field in used_db_fields:
+ continue
+ nm = normalized_model[model_field]
+ if not nm or not normalized_header:
+ score = 0.0
+ else:
+ # Combine ratio and partial containment heuristic
+ ratio = SequenceMatcher(None, normalized_header, nm).ratio()
+ containment = 1.0 if (normalized_header in nm or nm in normalized_header) else 0.0
+ score = max(ratio, 0.85 if containment else 0.0)
+ candidate_list.append((model_field, score))
+ if score > best_score:
+ best_score = score
+ best_candidate = model_field
+ # Keep top 3 suggestions for UI
+ suggestions[header] = sorted(candidate_list, key=lambda x: x[1], reverse=True)[:3]
+ # Apply only if score above threshold
+ if best_candidate and best_score >= 0.82:
+ mapping[header] = best_candidate
+ used_db_fields.add(best_candidate)
+
+ # 4) Any header explicitly mapped to None in legacy map is considered intentionally skipped
+ for header in headers:
+ if header not in mapping and header in legacy_map and legacy_map[header] is None:
+ mapping[header] = None
+
+ mapped_headers = {h: f for h, f in mapping.items() if f is not None}
+ unmapped_headers = [h for h in headers if h not in mapping or mapping[h] is None]
+
+ return {
+ "mapping": mapping,
+ "mapped_headers": mapped_headers,
+ "unmapped_headers": unmapped_headers,
+ "suggestions": suggestions,
+ }
+
+
+def _get_required_fields(model_class) -> List[str]:
+ """Infer required (non-nullable) fields for a model to avoid DB errors.
+
+ Excludes primary keys (which might be autoincrement or provided) and timestamp mixins.
+ """
+ required = []
+ for col in model_class.__table__.columns:
+ if col.name in {"created_at", "updated_at"}:
+ continue
+ if col.primary_key:
+ # If PK is a string or composite, we cannot assume optional; handle separately
+ continue
+ try:
+ is_required = not getattr(col, "nullable", True)
+ except Exception:
+ is_required = False
+ if is_required:
+ required.append(col.name)
+ return required
+
+
def convert_value(value: str, field_name: str) -> Any:
"""Convert string value to appropriate type based on field name"""
if not value or value.strip() == "" or value.strip().lower() in ["null", "none", "n/a"]:
@@ -398,10 +630,13 @@ def convert_value(value: str, field_name: str) -> Any:
return 0.0
# Integer fields
- if any(word in field_name.lower() for word in ["item_no", "age", "start_age", "version", "line_number", "sort_order"]):
+ if any(word in field_name.lower() for word in ["item_no", "age", "start_age", "version", "line_number", "sort_order", "empl_num"]):
try:
return int(float(value)) # Handle cases like "1.0"
except ValueError:
+ # For employee numbers, return None to skip the record rather than 0
+ if "empl_num" in field_name.lower():
+ return None
return 0
# String fields - limit length to prevent database errors
@@ -447,13 +682,15 @@ async def get_available_csv_files(current_user: User = Depends(get_current_user)
"STATES.csv": "US States lookup table",
"FILETYPE.csv": "File type categories",
"FILESTAT.csv": "File status codes",
+ "FOOTERS.csv": "Document footers and signatures",
"DEPOSITS.csv": "Daily bank deposit summaries",
"FILENOTS.csv": "File notes and case memos",
"FVARLKUP.csv": "Form template variables",
"RVARLKUP.csv": "Report template variables",
"PAYMENTS.csv": "Individual payments within deposits",
"TRNSACTN.csv": "Transaction details (maps to Ledger)"
- }
+ },
+ "auto_discovery": True
}
@@ -479,7 +716,8 @@ async def import_csv_data(
raise HTTPException(status_code=400, detail="File must be a CSV file")
model_class = CSV_MODEL_MAPPING[file_type]
- field_mapping = FIELD_MAPPINGS.get(file_type, {})
+ # Legacy mapping hints used internally by auto-discovery; not used strictly
+ legacy_hint_map = FIELD_MAPPINGS.get(file_type, {})
try:
# Read CSV content
@@ -554,6 +792,8 @@ async def import_csv_data(
headers = next(header_reader)
headers = [h.strip() for h in headers]
print(f"DEBUG: Found {len(headers)} headers: {headers}")
+ # Build dynamic header mapping for this file/model
+ mapping_info = _build_dynamic_mapping(headers, model_class, file_type)
# Parse data rows with proper CSV parsing
rows_data = []
@@ -597,26 +837,54 @@ async def import_csv_data(
imported_count = 0
errors = []
+ flexible_saved = 0
+ mapped_headers = mapping_info.get("mapped_headers", {})
+ unmapped_headers = mapping_info.get("unmapped_headers", [])
- # If replace_existing is True, delete all existing records
+ # If replace_existing is True, delete all existing records and related flexible extras
if replace_existing:
db.query(model_class).delete()
+ db.query(FlexibleImport).filter(
+ FlexibleImport.file_type == file_type,
+ FlexibleImport.target_table == model_class.__tablename__,
+ ).delete()
db.commit()
for row_num, row in enumerate(csv_reader, start=2): # Start at 2 for header row
try:
# Convert CSV row to model data
- model_data = {}
-
- for csv_field, db_field in field_mapping.items():
- if csv_field in row and db_field is not None: # Skip fields mapped to None
- converted_value = convert_value(row[csv_field], csv_field)
+ model_data: Dict[str, Any] = {}
+ # Apply discovered mapping
+ for csv_field, db_field in mapped_headers.items():
+ if csv_field in row and db_field is not None:
+ converted_value = convert_value(row[csv_field], db_field)
if converted_value is not None:
model_data[db_field] = converted_value
# Skip empty rows
if not any(model_data.values()):
continue
+
+ # Fallback: if required non-nullable fields are missing, store row as flexible only
+ required_fields = _get_required_fields(model_class)
+ missing_required = [f for f in required_fields if model_data.get(f) in (None, "")]
+ if missing_required:
+ db.add(
+ FlexibleImport(
+ file_type=file_type,
+ target_table=model_class.__tablename__,
+ primary_key_field=None,
+ primary_key_value=None,
+ extra_data={
+ "mapped": model_data,
+ "unmapped": {h: row.get(h) for h in unmapped_headers if row.get(h) not in (None, "")},
+ "missing_required": missing_required,
+ },
+ )
+ )
+ flexible_saved += 1
+ # Do not attempt to insert into strict model; continue to next row
+ continue
# Special validation for models with required fields
if model_class == Phone:
@@ -627,9 +895,44 @@ async def import_csv_data(
if 'last' not in model_data or not model_data['last']:
continue # Skip rolodex records without a last name/company name
+ if model_class == Ledger:
+ # Skip ledger records without required fields
+ if 'empl_num' not in model_data or not model_data['empl_num']:
+ continue # Skip ledger records without employee number
+ if 'file_no' not in model_data or not model_data['file_no']:
+ continue # Skip ledger records without file number
+
# Create model instance
instance = model_class(**model_data)
db.add(instance)
+ db.flush() # Ensure PK is available
+
+ # Capture PK details for flexible storage linkage (single-column PKs only)
+ _, pk_names = _get_model_columns(model_class)
+ pk_field_name = pk_names[0] if len(pk_names) == 1 else None
+ pk_value = None
+ if pk_field_name:
+ try:
+ pk_value = getattr(instance, pk_field_name)
+ except Exception:
+ pk_value = None
+
+ # Save unmapped fields into flexible storage (privacy-first, per-row JSON)
+ extra_data = {}
+ for csv_field in unmapped_headers:
+ if csv_field in row and row[csv_field] not in (None, ""):
+ extra_data[csv_field] = row[csv_field]
+ if extra_data:
+ db.add(
+ FlexibleImport(
+ file_type=file_type,
+ target_table=model_class.__tablename__,
+ primary_key_field=pk_field_name,
+ primary_key_value=str(pk_value) if pk_value is not None else None,
+ extra_data=extra_data,
+ )
+ )
+ flexible_saved += 1
imported_count += 1
# Commit every 100 records to avoid memory issues
@@ -637,11 +940,30 @@ async def import_csv_data(
db.commit()
except Exception as e:
- errors.append({
- "row": row_num,
- "error": str(e),
- "data": row
- })
+ # Rollback the transaction for this record
+ db.rollback()
+ # As a robustness measure, persist row in flexible storage instead of counting as error
+ try:
+ db.add(
+ FlexibleImport(
+ file_type=file_type,
+ target_table=model_class.__tablename__,
+ primary_key_field=None,
+ primary_key_value=None,
+ extra_data={
+ "mapped": model_data,
+ "unmapped": {h: row.get(h) for h in unmapped_headers if row.get(h) not in (None, "")},
+ "error": str(e),
+ },
+ )
+ )
+ flexible_saved += 1
+ except Exception as flex_e:
+ errors.append({
+ "row": row_num,
+ "error": f"{str(e)} | Flexible save failed: {str(flex_e)}",
+ "data": row,
+ })
continue
# Final commit
@@ -651,7 +973,12 @@ async def import_csv_data(
"file_type": file_type,
"imported_count": imported_count,
"errors": errors[:10], # Limit errors to first 10
- "total_errors": len(errors)
+ "total_errors": len(errors),
+ "auto_mapping": {
+ "mapped_headers": mapped_headers,
+ "unmapped_headers": unmapped_headers,
+ "flexible_saved_rows": flexible_saved,
+ },
}
if errors:
@@ -706,6 +1033,11 @@ async def clear_table_data(
try:
deleted_count = db.query(model_class).count()
db.query(model_class).delete()
+ # Also clear any flexible rows linked to this target table and file type
+ db.query(FlexibleImport).filter(
+ FlexibleImport.file_type == file_type,
+ FlexibleImport.target_table == model_class.__tablename__,
+ ).delete()
db.commit()
return {
@@ -733,7 +1065,7 @@ async def validate_csv_file(
if not file.filename.endswith('.csv'):
raise HTTPException(status_code=400, detail="File must be a CSV file")
- field_mapping = FIELD_MAPPINGS.get(file_type, {})
+ # Use auto-discovery mapping for validation
try:
content = await file.read()
@@ -751,31 +1083,39 @@ async def validate_csv_file(
if csv_content is None:
raise HTTPException(status_code=400, detail="Could not decode CSV file. Please ensure it's saved in UTF-8, Windows-1252, or ISO-8859-1 encoding.")
- # Handle CSV parsing issues with legacy files
- csv_reader = csv.DictReader(io.StringIO(csv_content), delimiter=',', quotechar='"', quoting=csv.QUOTE_MINIMAL)
-
- # Check headers
- csv_headers = csv_reader.fieldnames
- expected_headers = list(field_mapping.keys())
-
- missing_headers = [h for h in expected_headers if h not in csv_headers]
- extra_headers = [h for h in csv_headers if h not in expected_headers]
+ # Parse CSV with fallback to robust line-by-line parsing
+ def parse_csv_with_fallback(text: str) -> Tuple[List[Dict[str, str]], List[str]]:
+ try:
+ reader = csv.DictReader(io.StringIO(text), delimiter=',', quotechar='"', quoting=csv.QUOTE_MINIMAL)
+ headers_local = reader.fieldnames or []
+ rows_local = []
+ for r in reader:
+ rows_local.append(r)
+ return rows_local, headers_local
+ except Exception:
+ return parse_csv_robust(text)
+
+ rows_list, csv_headers = parse_csv_with_fallback(csv_content)
+ model_class = CSV_MODEL_MAPPING[file_type]
+ mapping_info = _build_dynamic_mapping(csv_headers, model_class, file_type)
+ mapped_headers = mapping_info["mapped_headers"]
+ unmapped_headers = mapping_info["unmapped_headers"]
# Sample data validation
sample_rows = []
errors = []
- for row_num, row in enumerate(csv_reader, start=2):
+ for row_num, row in enumerate(rows_list, start=2):
if row_num > 12: # Only check first 10 data rows
break
sample_rows.append(row)
- # Check for data type issues
- for csv_field, db_field in field_mapping.items():
+ # Check for data type issues on mapped fields
+ for csv_field, db_field in mapped_headers.items():
if csv_field in row and row[csv_field]:
try:
- convert_value(row[csv_field], csv_field)
+ convert_value(row[csv_field], db_field)
except Exception as e:
errors.append({
"row": row_num,
@@ -786,16 +1126,19 @@ async def validate_csv_file(
return {
"file_type": file_type,
- "valid": len(missing_headers) == 0 and len(errors) == 0,
+ # Consider valid if we can map at least one column; we don't require exact header match
+ "valid": len(mapped_headers) > 0 and len(errors) == 0,
"headers": {
"found": csv_headers,
- "expected": expected_headers,
- "missing": missing_headers,
- "extra": extra_headers
+ "mapped": mapped_headers,
+ "unmapped": unmapped_headers,
},
"sample_data": sample_rows,
"validation_errors": errors[:5], # First 5 errors only
- "total_errors": len(errors)
+ "total_errors": len(errors),
+ "auto_mapping": {
+ "suggestions": mapping_info["suggestions"],
+ },
}
except Exception as e:
@@ -819,6 +1162,133 @@ async def get_import_progress(
}
+@router.post("/batch-validate")
+async def batch_validate_csv_files(
+ files: List[UploadFile] = UploadFileForm(...),
+ current_user: User = Depends(get_current_user)
+):
+ """Validate multiple CSV files without importing"""
+
+ if len(files) > 25:
+ raise HTTPException(status_code=400, detail="Maximum 25 files allowed per batch")
+
+ validation_results = []
+
+ for file in files:
+ file_type = file.filename
+
+ if file_type not in CSV_MODEL_MAPPING:
+ validation_results.append({
+ "file_type": file_type,
+ "valid": False,
+ "error": f"Unsupported file type: {file_type}"
+ })
+ continue
+
+ if not file.filename.endswith('.csv'):
+ validation_results.append({
+ "file_type": file_type,
+ "valid": False,
+ "error": "File must be a CSV file"
+ })
+ continue
+
+ model_class = CSV_MODEL_MAPPING.get(file_type)
+
+ try:
+ content = await file.read()
+
+ # Try multiple encodings for legacy CSV files (include BOM-friendly utf-8-sig)
+ encodings = ['utf-8-sig', 'utf-8', 'windows-1252', 'iso-8859-1', 'cp1252']
+ csv_content = None
+ for encoding in encodings:
+ try:
+ csv_content = content.decode(encoding)
+ break
+ except UnicodeDecodeError:
+ continue
+
+ if csv_content is None:
+ validation_results.append({
+ "file_type": file_type,
+ "valid": False,
+ "error": "Could not decode CSV file encoding"
+ })
+ continue
+
+ # Handle CSV parsing issues with legacy files
+ rows_list, csv_headers = parse_csv_with_fallback(csv_content)
+
+ # Check headers and build dynamic mapping
+ mapping_info = _build_dynamic_mapping(csv_headers, model_class, file_type)
+ mapped_headers = mapping_info["mapped_headers"]
+ unmapped_headers = mapping_info["unmapped_headers"]
+
+ # Sample data validation
+ sample_rows = []
+ errors = []
+
+ for row_num, row in enumerate(rows_list, start=2):
+ if row_num > 12: # Only check first 10 data rows
+ break
+
+ sample_rows.append(row)
+
+ # Check for data type issues
+ for csv_field, db_field in mapped_headers.items():
+ if csv_field in row and row[csv_field]:
+ try:
+ convert_value(row[csv_field], db_field)
+ except Exception as e:
+ errors.append({
+ "row": row_num,
+ "field": csv_field,
+ "value": row[csv_field],
+ "error": str(e)
+ })
+
+ validation_results.append({
+ "file_type": file_type,
+ "valid": len(mapped_headers) > 0 and len(errors) == 0,
+ "headers": {
+ "found": csv_headers,
+ "mapped": mapped_headers,
+ "unmapped": unmapped_headers
+ },
+ "sample_data": sample_rows[:5], # Limit sample data for batch operation
+ "validation_errors": errors[:5], # First 5 errors only
+ "total_errors": len(errors),
+ "auto_mapping": {
+ "suggestions": mapping_info["suggestions"],
+ },
+ })
+
+ # Reset file pointer for potential future use
+ await file.seek(0)
+
+ except Exception as e:
+ validation_results.append({
+ "file_type": file_type,
+ "valid": False,
+ "error": f"Validation failed: {str(e)}"
+ })
+
+ # Summary statistics
+ total_files = len(validation_results)
+ valid_files = len([r for r in validation_results if r["valid"]])
+ invalid_files = total_files - valid_files
+
+ return {
+ "batch_validation_results": validation_results,
+ "summary": {
+ "total_files": total_files,
+ "valid_files": valid_files,
+ "invalid_files": invalid_files,
+ "all_valid": invalid_files == 0
+ }
+ }
+
+
@router.post("/batch-upload")
async def batch_import_csv_files(
files: List[UploadFile] = UploadFileForm(...),
@@ -828,8 +1298,8 @@ async def batch_import_csv_files(
):
"""Import multiple CSV files in optimal order"""
- if len(files) > 20:
- raise HTTPException(status_code=400, detail="Maximum 20 files allowed per batch")
+ if len(files) > 25:
+ raise HTTPException(status_code=400, detail="Maximum 25 files allowed per batch")
# Define optimal import order based on dependencies
import_order = [
@@ -856,25 +1326,144 @@ async def batch_import_csv_files(
results = []
total_imported = 0
total_errors = 0
+
+ # Create import audit row (running)
+ audit_row = ImportAudit(
+ status="running",
+ total_files=len(files),
+ successful_files=0,
+ failed_files=0,
+ total_imported=0,
+ total_errors=0,
+ initiated_by_user_id=getattr(current_user, "id", None),
+ initiated_by_username=getattr(current_user, "username", None),
+ message="Batch import started",
+ )
+ db.add(audit_row)
+ db.commit()
+ db.refresh(audit_row)
+ # Directory to persist uploaded files for this audit (for reruns)
+ audit_dir = Path(settings.upload_dir).joinpath("import_audits", str(audit_row.id))
+ try:
+ audit_dir.mkdir(parents=True, exist_ok=True)
+ except Exception:
+ pass
+
for file_type, file in ordered_files:
if file_type not in CSV_MODEL_MAPPING:
- results.append({
- "file_type": file_type,
- "status": "skipped",
- "message": f"Unsupported file type: {file_type}"
- })
- continue
+ # Fallback flexible-only import for unknown file structures
+ try:
+ await file.seek(0)
+ content = await file.read()
+ # Save original upload to disk for potential reruns
+ saved_path = None
+ try:
+ file_path = audit_dir.joinpath(file_type)
+ with open(file_path, "wb") as fh:
+ fh.write(content)
+ saved_path = str(file_path)
+ except Exception:
+ saved_path = None
+ encodings = ['utf-8-sig', 'utf-8', 'windows-1252', 'iso-8859-1', 'cp1252']
+ csv_content = None
+ for encoding in encodings:
+ try:
+ csv_content = content.decode(encoding)
+ break
+ except UnicodeDecodeError:
+ continue
+ if csv_content is None:
+ results.append({
+ "file_type": file_type,
+ "status": "failed",
+ "message": "Could not decode CSV file encoding"
+ })
+ continue
+ rows_list, headers = parse_csv_with_fallback(csv_content)
+ flexible_count = 0
+ for row in rows_list:
+ # Save entire row as flexible JSON
+ db.add(
+ FlexibleImport(
+ file_type=file_type,
+ target_table=None,
+ primary_key_field=None,
+ primary_key_value=None,
+ extra_data=make_json_safe({k: v for k, v in (row or {}).items() if v not in (None, "")}),
+ )
+ )
+ flexible_count += 1
+ if flexible_count % 200 == 0:
+ db.commit()
+ db.commit()
+ total_imported += flexible_count
+ # Persist per-file result row
+ results.append({
+ "file_type": file_type,
+ "status": "success",
+ "imported_count": flexible_count,
+ "errors": 0,
+ "message": f"Stored {flexible_count} rows as flexible data (no known model)",
+ "auto_mapping": {
+ "mapped_headers": {},
+ "unmapped_headers": list(headers),
+ "flexible_saved_rows": flexible_count,
+ },
+ })
+ try:
+ db.add(ImportAuditFile(
+ audit_id=audit_row.id,
+ file_type=file_type,
+ status="success",
+ imported_count=flexible_count,
+ errors=0,
+ message=f"Stored {flexible_count} rows as flexible data",
+ details={"saved_path": saved_path} if saved_path else {}
+ ))
+ db.commit()
+ except Exception:
+ db.rollback()
+ continue
+ except Exception as e:
+ db.rollback()
+ results.append({
+ "file_type": file_type,
+ "status": "failed",
+ "message": f"Flexible import failed: {str(e)}"
+ })
+ try:
+ db.add(ImportAuditFile(
+ audit_id=audit_row.id,
+ file_type=file_type,
+ status="failed",
+ imported_count=0,
+ errors=1,
+ message=f"Flexible import failed: {str(e)}",
+ details={}
+ ))
+ db.commit()
+ except Exception:
+ db.rollback()
+ continue
try:
# Reset file pointer
await file.seek(0)
- # Import this file using simplified logic
+ # Import this file using auto-discovery mapping
model_class = CSV_MODEL_MAPPING[file_type]
- field_mapping = FIELD_MAPPINGS.get(file_type, {})
content = await file.read()
+ # Save original upload to disk for potential reruns
+ saved_path = None
+ try:
+ file_path = audit_dir.joinpath(file_type)
+ with open(file_path, "wb") as fh:
+ fh.write(content)
+ saved_path = str(file_path)
+ except Exception:
+ saved_path = None
# Try multiple encodings for legacy CSV files
encodings = ['utf-8-sig', 'utf-8', 'windows-1252', 'iso-8859-1', 'cp1252']
@@ -892,43 +1481,145 @@ async def batch_import_csv_files(
"status": "failed",
"message": "Could not decode CSV file encoding"
})
+ try:
+ db.add(ImportAuditFile(
+ audit_id=audit_row.id,
+ file_type=file_type,
+ status="failed",
+ imported_count=0,
+ errors=1,
+ message="Could not decode CSV file encoding",
+ details={"saved_path": saved_path} if saved_path else {}
+ ))
+ db.commit()
+ except Exception:
+ db.rollback()
continue
# Handle CSV parsing issues with legacy files
- csv_reader = csv.DictReader(io.StringIO(csv_content), delimiter=',', quotechar='"', quoting=csv.QUOTE_MINIMAL)
+ rows_list, csv_headers = parse_csv_with_fallback(csv_content)
+ mapping_info = _build_dynamic_mapping(csv_headers, model_class, file_type)
+ mapped_headers = mapping_info["mapped_headers"]
+ unmapped_headers = mapping_info["unmapped_headers"]
imported_count = 0
errors = []
+ flexible_saved = 0
# If replace_existing is True and this is the first file of this type
if replace_existing:
db.query(model_class).delete()
+ db.query(FlexibleImport).filter(
+ FlexibleImport.file_type == file_type,
+ FlexibleImport.target_table == model_class.__tablename__,
+ ).delete()
db.commit()
- for row_num, row in enumerate(csv_reader, start=2):
+ for row_num, row in enumerate(rows_list, start=2):
try:
model_data = {}
- for csv_field, db_field in field_mapping.items():
- if csv_field in row and db_field is not None: # Skip fields mapped to None
- converted_value = convert_value(row[csv_field], csv_field)
+ for csv_field, db_field in mapped_headers.items():
+ if csv_field in row and db_field is not None:
+ converted_value = convert_value(row[csv_field], db_field)
if converted_value is not None:
model_data[db_field] = converted_value
if not any(model_data.values()):
continue
+ # Fallback: if required non-nullable fields are missing, store row as flexible only
+ required_fields = _get_required_fields(model_class)
+ missing_required = [f for f in required_fields if model_data.get(f) in (None, "")]
+ if missing_required:
+ db.add(
+ FlexibleImport(
+ file_type=file_type,
+ target_table=model_class.__tablename__,
+ primary_key_field=None,
+ primary_key_value=None,
+ extra_data=make_json_safe({
+ "mapped": model_data,
+ "unmapped": {h: row.get(h) for h in unmapped_headers if row.get(h) not in (None, "")},
+ "missing_required": missing_required,
+ }),
+ )
+ )
+ flexible_saved += 1
+ continue
+
+ # Special validation for models with required fields
+ if model_class == Phone:
+ if 'phone' not in model_data or not model_data['phone']:
+ continue # Skip phone records without a phone number
+
+ if model_class == Rolodex:
+ if 'last' not in model_data or not model_data['last']:
+ continue # Skip rolodex records without a last name/company name
+
+ if model_class == Ledger:
+ # Skip ledger records without required fields
+ if 'empl_num' not in model_data or not model_data['empl_num']:
+ continue # Skip ledger records without employee number
+ if 'file_no' not in model_data or not model_data['file_no']:
+ continue # Skip ledger records without file number
+
instance = model_class(**model_data)
db.add(instance)
+ db.flush()
+
+ # Link flexible extras
+ _, pk_names = _get_model_columns(model_class)
+ pk_field_name = pk_names[0] if len(pk_names) == 1 else None
+ pk_value = None
+ if pk_field_name:
+ try:
+ pk_value = getattr(instance, pk_field_name)
+ except Exception:
+ pk_value = None
+ extra_data = {}
+ for csv_field in unmapped_headers:
+ if csv_field in row and row[csv_field] not in (None, ""):
+ extra_data[csv_field] = row[csv_field]
+ if extra_data:
+ db.add(
+ FlexibleImport(
+ file_type=file_type,
+ target_table=model_class.__tablename__,
+ primary_key_field=pk_field_name,
+ primary_key_value=str(pk_value) if pk_value is not None else None,
+ extra_data=make_json_safe(extra_data),
+ )
+ )
+ flexible_saved += 1
imported_count += 1
if imported_count % 100 == 0:
db.commit()
except Exception as e:
- errors.append({
- "row": row_num,
- "error": str(e)
- })
+ # Rollback the transaction for this record
+ db.rollback()
+ # Persist row in flexible storage instead of counting as error only
+ try:
+ db.add(
+ FlexibleImport(
+ file_type=file_type,
+ target_table=model_class.__tablename__,
+ primary_key_field=None,
+ primary_key_value=None,
+ extra_data=make_json_safe({
+ "mapped": model_data,
+ "unmapped": {h: row.get(h) for h in unmapped_headers if row.get(h) not in (None, "")},
+ "error": str(e),
+ }),
+ )
+ )
+ flexible_saved += 1
+ except Exception as flex_e:
+ errors.append({
+ "row": row_num,
+ "error": f"{str(e)} | Flexible save failed: {str(flex_e)}",
+ })
continue
db.commit()
@@ -941,8 +1632,31 @@ async def batch_import_csv_files(
"status": "success" if len(errors) == 0 else "completed_with_errors",
"imported_count": imported_count,
"errors": len(errors),
- "message": f"Imported {imported_count} records" + (f" with {len(errors)} errors" if errors else "")
+ "message": f"Imported {imported_count} records" + (f" with {len(errors)} errors" if errors else ""),
+ "auto_mapping": {
+ "mapped_headers": mapped_headers,
+ "unmapped_headers": unmapped_headers,
+ "flexible_saved_rows": flexible_saved,
+ },
})
+ try:
+ db.add(ImportAuditFile(
+ audit_id=audit_row.id,
+ file_type=file_type,
+ status="success" if len(errors) == 0 else "completed_with_errors",
+ imported_count=imported_count,
+ errors=len(errors),
+ message=f"Imported {imported_count} records" + (f" with {len(errors)} errors" if errors else ""),
+ details={
+ "mapped_headers": list(mapped_headers.keys()),
+ "unmapped_count": len(unmapped_headers),
+ "flexible_saved_rows": flexible_saved,
+ **({"saved_path": saved_path} if saved_path else {}),
+ }
+ ))
+ db.commit()
+ except Exception:
+ db.rollback()
except Exception as e:
db.rollback()
@@ -951,14 +1665,570 @@ async def batch_import_csv_files(
"status": "failed",
"message": f"Import failed: {str(e)}"
})
+ try:
+ db.add(ImportAuditFile(
+ audit_id=audit_row.id,
+ file_type=file_type,
+ status="failed",
+ imported_count=0,
+ errors=1,
+ message=f"Import failed: {str(e)}",
+ details={"saved_path": saved_path} if saved_path else {}
+ ))
+ db.commit()
+ except Exception:
+ db.rollback()
+ summary = {
+ "total_files": len(files),
+ "successful_files": len([r for r in results if r["status"] in ["success", "completed_with_errors"]]),
+ "failed_files": len([r for r in results if r["status"] == "failed"]),
+ "total_imported": total_imported,
+ "total_errors": total_errors
+ }
+
+ # Finalize audit row
+ try:
+ audit_row.successful_files = summary["successful_files"]
+ audit_row.failed_files = summary["failed_files"]
+ audit_row.total_imported = summary["total_imported"]
+ audit_row.total_errors = summary["total_errors"]
+ audit_row.status = "success" if summary["failed_files"] == 0 and summary["total_errors"] == 0 else (
+ "completed_with_errors" if summary["successful_files"] > 0 else "failed"
+ )
+ audit_row.message = f"Batch import completed: {audit_row.successful_files}/{audit_row.total_files} files"
+ audit_row.finished_at = datetime.utcnow()
+ audit_row.details = {
+ "files": [
+ {"file_type": r.get("file_type"), "status": r.get("status"), "imported_count": r.get("imported_count", 0), "errors": r.get("errors", 0)}
+ for r in results
+ ]
+ }
+ db.add(audit_row)
+ db.commit()
+ except Exception:
+ db.rollback()
+
return {
"batch_results": results,
- "summary": {
- "total_files": len(files),
- "successful_files": len([r for r in results if r["status"] in ["success", "completed_with_errors"]]),
- "failed_files": len([r for r in results if r["status"] == "failed"]),
- "total_imported": total_imported,
- "total_errors": total_errors
+ "summary": summary
+ }
+
+
+@router.get("/recent-batches")
+async def recent_batch_imports(
+ limit: int = Query(5, ge=1, le=50),
+ offset: int = Query(0, ge=0),
+ status: Optional[str] = Query(None, description="Filter by status: running|success|completed_with_errors|failed"),
+ start: Optional[str] = Query(None, description="ISO datetime start for started_at filter"),
+ end: Optional[str] = Query(None, description="ISO datetime end for started_at filter"),
+ db: Session = Depends(get_db),
+ current_user: User = Depends(get_current_user)
+):
+ """Return recent batch import audit rows (most recent first) with optional filters and pagination."""
+ q = db.query(ImportAudit)
+ if status and status.lower() != "all":
+ q = q.filter(ImportAudit.status == status)
+ # Date range filters on started_at
+ try:
+ if start:
+ start_dt = datetime.fromisoformat(start)
+ q = q.filter(ImportAudit.started_at >= start_dt)
+ except Exception:
+ pass
+ try:
+ if end:
+ end_dt = datetime.fromisoformat(end)
+ q = q.filter(ImportAudit.started_at <= end_dt)
+ except Exception:
+ pass
+ total = q.count()
+ rows = (
+ q.order_by(ImportAudit.started_at.desc())
+ .offset(offset)
+ .limit(limit)
+ .all()
+ )
+ def _row(r: ImportAudit):
+ return {
+ "id": r.id,
+ "started_at": r.started_at.isoformat() if r.started_at else None,
+ "finished_at": r.finished_at.isoformat() if r.finished_at else None,
+ "status": r.status,
+ "total_files": r.total_files,
+ "successful_files": r.successful_files,
+ "failed_files": r.failed_files,
+ "total_imported": r.total_imported,
+ "total_errors": r.total_errors,
+ "initiated_by": r.initiated_by_username,
+ "message": r.message,
}
- }
\ No newline at end of file
+ return {"recent": [_row(r) for r in rows], "total": total, "limit": limit, "offset": offset}
+
+
+@router.get("/recent-batches/{audit_id}")
+async def get_batch_details(
+ audit_id: int,
+ db: Session = Depends(get_db),
+ current_user: User = Depends(get_current_user)
+):
+ """Return a specific audit entry with per-file details."""
+ audit = db.query(ImportAudit).filter(ImportAudit.id == audit_id).first()
+ if not audit:
+ raise HTTPException(status_code=404, detail="Audit entry not found")
+ files = (
+ db.query(ImportAuditFile)
+ .filter(ImportAuditFile.audit_id == audit.id)
+ .order_by(ImportAuditFile.id.asc())
+ .all()
+ )
+ def _row(r: ImportAudit):
+ return {
+ "id": r.id,
+ "started_at": r.started_at.isoformat() if r.started_at else None,
+ "finished_at": r.finished_at.isoformat() if r.finished_at else None,
+ "status": r.status,
+ "total_files": r.total_files,
+ "successful_files": r.successful_files,
+ "failed_files": r.failed_files,
+ "total_imported": r.total_imported,
+ "total_errors": r.total_errors,
+ "initiated_by": r.initiated_by_username,
+ "message": r.message,
+ "details": r.details or {},
+ }
+ def _file(f: ImportAuditFile):
+ return {
+ "id": f.id,
+ "file_type": f.file_type,
+ "status": f.status,
+ "imported_count": f.imported_count,
+ "errors": f.errors,
+ "message": f.message,
+ "details": f.details or {},
+ "created_at": f.created_at.isoformat() if f.created_at else None,
+ }
+ return {"audit": _row(audit), "files": [_file(f) for f in files]}
+
+
+@router.post("/recent-batches/{audit_id}/rerun-failed")
+async def rerun_failed_files(
+ audit_id: int,
+ replace_existing: bool = Form(False),
+ db: Session = Depends(get_db),
+ current_user: User = Depends(get_current_user)
+):
+ """Re-run only failed files for a given audit. Creates a new audit entry for the rerun."""
+ prior = db.query(ImportAudit).filter(ImportAudit.id == audit_id).first()
+ if not prior:
+ raise HTTPException(status_code=404, detail="Audit entry not found")
+ failed_files: List[ImportAuditFile] = (
+ db.query(ImportAuditFile)
+ .filter(ImportAuditFile.audit_id == audit_id, ImportAuditFile.status == "failed")
+ .all()
+ )
+ if not failed_files:
+ raise HTTPException(status_code=400, detail="No failed files to rerun for this audit")
+
+ # Build list of (file_type, path) that exist
+ items: List[Tuple[str, str]] = []
+ for f in failed_files:
+ saved_path = None
+ try:
+ saved_path = (f.details or {}).get("saved_path")
+ except Exception:
+ saved_path = None
+ if saved_path and os.path.exists(saved_path):
+ items.append((f.file_type, saved_path))
+ if not items:
+ raise HTTPException(status_code=400, detail="No saved files available to rerun. Upload again.")
+
+ # Import order for sorting
+ import_order = [
+ "STATES.csv", "GRUPLKUP.csv", "EMPLOYEE.csv", "FILETYPE.csv", "FILESTAT.csv",
+ "TRNSTYPE.csv", "TRNSLKUP.csv", "FOOTERS.csv", "SETUP.csv", "PRINTERS.csv",
+ "ROLODEX.csv", "PHONE.csv", "FILES.csv", "LEDGER.csv", "TRNSACTN.csv",
+ "QDROS.csv", "PENSIONS.csv", "PLANINFO.csv", "PAYMENTS.csv", "DEPOSITS.csv",
+ "FILENOTS.csv", "FORM_INX.csv", "FORM_LST.csv", "FVARLKUP.csv", "RVARLKUP.csv"
+ ]
+ order_index = {name: i for i, name in enumerate(import_order)}
+ items.sort(key=lambda x: order_index.get(x[0], len(import_order) + 1))
+
+ # Create new audit row for rerun
+ rerun_audit = ImportAudit(
+ status="running",
+ total_files=len(items),
+ successful_files=0,
+ failed_files=0,
+ total_imported=0,
+ total_errors=0,
+ initiated_by_user_id=getattr(current_user, "id", None),
+ initiated_by_username=getattr(current_user, "username", None),
+ message=f"Rerun failed files for audit #{audit_id}",
+ details={"rerun_of": audit_id},
+ )
+ db.add(rerun_audit)
+ db.commit()
+ db.refresh(rerun_audit)
+
+ # Directory to persist rerun files
+ rerun_dir = Path(settings.upload_dir).joinpath("import_audits", str(rerun_audit.id))
+ try:
+ rerun_dir.mkdir(parents=True, exist_ok=True)
+ except Exception:
+ pass
+
+ results: List[Dict[str, Any]] = []
+ total_imported = 0
+ total_errors = 0
+
+ for file_type, path in items:
+ try:
+ with open(path, "rb") as fh:
+ content = fh.read()
+ # Save a copy under the rerun audit
+ saved_path = None
+ try:
+ file_path = rerun_dir.joinpath(file_type)
+ with open(file_path, "wb") as out:
+ out.write(content)
+ saved_path = str(file_path)
+ except Exception:
+ saved_path = None
+
+ if file_type not in CSV_MODEL_MAPPING:
+ # Flexible-only path
+ encodings = ['utf-8-sig', 'utf-8', 'windows-1252', 'iso-8859-1', 'cp1252']
+ csv_content = None
+ for enc in encodings:
+ try:
+ csv_content = content.decode(enc)
+ break
+ except UnicodeDecodeError:
+ continue
+ if csv_content is None:
+ results.append({"file_type": file_type, "status": "failed", "message": "Could not decode CSV file encoding"})
+ try:
+ db.add(ImportAuditFile(
+ audit_id=rerun_audit.id,
+ file_type=file_type,
+ status="failed",
+ imported_count=0,
+ errors=1,
+ message="Could not decode CSV file encoding",
+ details={"saved_path": saved_path} if saved_path else {}
+ ))
+ db.commit()
+ except Exception:
+ db.rollback()
+ continue
+
+ rows_list, _headers = parse_csv_with_fallback(csv_content)
+ flexible_count = 0
+ for row in rows_list:
+ db.add(
+ FlexibleImport(
+ file_type=file_type,
+ target_table=None,
+ primary_key_field=None,
+ primary_key_value=None,
+ extra_data=make_json_safe({k: v for k, v in (row or {}).items() if v not in (None, "")}),
+ )
+ )
+ flexible_count += 1
+ if flexible_count % 200 == 0:
+ db.commit()
+ db.commit()
+ total_imported += flexible_count
+ results.append({
+ "file_type": file_type,
+ "status": "success",
+ "imported_count": flexible_count,
+ "errors": 0,
+ "message": f"Stored {flexible_count} rows as flexible data (no known model)",
+ })
+ try:
+ db.add(ImportAuditFile(
+ audit_id=rerun_audit.id,
+ file_type=file_type,
+ status="success",
+ imported_count=flexible_count,
+ errors=0,
+ message=f"Stored {flexible_count} rows as flexible data",
+ details={"saved_path": saved_path} if saved_path else {}
+ ))
+ db.commit()
+ except Exception:
+ db.rollback()
+ continue
+
+ # Known model path
+ model_class = CSV_MODEL_MAPPING[file_type]
+ encodings = ['utf-8-sig', 'utf-8', 'windows-1252', 'iso-8859-1', 'cp1252']
+ csv_content = None
+ for enc in encodings:
+ try:
+ csv_content = content.decode(enc)
+ break
+ except UnicodeDecodeError:
+ continue
+ if csv_content is None:
+ results.append({"file_type": file_type, "status": "failed", "message": "Could not decode CSV file encoding"})
+ try:
+ db.add(ImportAuditFile(
+ audit_id=rerun_audit.id,
+ file_type=file_type,
+ status="failed",
+ imported_count=0,
+ errors=1,
+ message="Could not decode CSV file encoding",
+ details={"saved_path": saved_path} if saved_path else {}
+ ))
+ db.commit()
+ except Exception:
+ db.rollback()
+ continue
+
+ csv_reader = csv.DictReader(io.StringIO(csv_content), delimiter=',', quotechar='"', quoting=csv.QUOTE_MINIMAL)
+ csv_headers = csv_reader.fieldnames or []
+ mapping_info = _build_dynamic_mapping(csv_headers, model_class, file_type)
+ mapped_headers = mapping_info["mapped_headers"]
+ unmapped_headers = mapping_info["unmapped_headers"]
+ imported_count = 0
+ errors: List[Dict[str, Any]] = []
+
+ if replace_existing:
+ db.query(model_class).delete()
+ db.query(FlexibleImport).filter(
+ FlexibleImport.file_type == file_type,
+ FlexibleImport.target_table == model_class.__tablename__,
+ ).delete()
+ db.commit()
+
+ for row_num, row in enumerate(csv_reader, start=2):
+ try:
+ model_data: Dict[str, Any] = {}
+ for csv_field, db_field in mapped_headers.items():
+ if csv_field in row and db_field is not None:
+ converted_value = convert_value(row[csv_field], db_field)
+ if converted_value is not None:
+ model_data[db_field] = converted_value
+ if not any(model_data.values()):
+ continue
+ required_fields = _get_required_fields(model_class)
+ missing_required = [f for f in required_fields if model_data.get(f) in (None, "")]
+ if missing_required:
+ db.add(
+ FlexibleImport(
+ file_type=file_type,
+ target_table=model_class.__tablename__,
+ primary_key_field=None,
+ primary_key_value=None,
+ extra_data={
+ "mapped": model_data,
+ "unmapped": {h: row.get(h) for h in unmapped_headers if row.get(h) not in (None, "")},
+ "missing_required": missing_required,
+ },
+ )
+ )
+ continue
+
+ if model_class == Phone and (not model_data.get('phone')):
+ continue
+ if model_class == Rolodex and (not model_data.get('last')):
+ continue
+ if model_class == Ledger and (not model_data.get('empl_num') or not model_data.get('file_no')):
+ continue
+
+ instance = model_class(**model_data)
+ db.add(instance)
+ db.flush()
+
+ _, pk_names = _get_model_columns(model_class)
+ pk_field_name = pk_names[0] if len(pk_names) == 1 else None
+ pk_value = None
+ if pk_field_name:
+ try:
+ pk_value = getattr(instance, pk_field_name)
+ except Exception:
+ pk_value = None
+ extra_data = {}
+ for csv_field in unmapped_headers:
+ if csv_field in row and row[csv_field] not in (None, ""):
+ extra_data[csv_field] = row[csv_field]
+ if extra_data:
+ db.add(
+ FlexibleImport(
+ file_type=file_type,
+ target_table=model_class.__tablename__,
+ primary_key_field=pk_field_name,
+ primary_key_value=str(pk_value) if pk_value is not None else None,
+ extra_data=extra_data,
+ )
+ )
+ imported_count += 1
+ if imported_count % 100 == 0:
+ db.commit()
+ except Exception as e:
+ db.rollback()
+ try:
+ db.add(
+ FlexibleImport(
+ file_type=file_type,
+ target_table=model_class.__tablename__,
+ primary_key_field=None,
+ primary_key_value=None,
+ extra_data={
+ "mapped": model_data,
+ "unmapped": {h: row.get(h) for h in unmapped_headers if row.get(h) not in (None, "")},
+ "error": str(e),
+ },
+ )
+ )
+ except Exception:
+ errors.append({"row": row_num, "error": str(e)})
+ continue
+
+ db.commit()
+ total_imported += imported_count
+ total_errors += len(errors)
+ results.append({
+ "file_type": file_type,
+ "status": "success" if len(errors) == 0 else "completed_with_errors",
+ "imported_count": imported_count,
+ "errors": len(errors),
+ "message": f"Imported {imported_count} records" + (f" with {len(errors)} errors" if errors else ""),
+ })
+ try:
+ db.add(ImportAuditFile(
+ audit_id=rerun_audit.id,
+ file_type=file_type,
+ status="success" if len(errors) == 0 else "completed_with_errors",
+ imported_count=imported_count,
+ errors=len(errors),
+ message=f"Imported {imported_count} records" + (f" with {len(errors)} errors" if errors else ""),
+ details={"saved_path": saved_path} if saved_path else {}
+ ))
+ db.commit()
+ except Exception:
+ db.rollback()
+
+ except Exception as e:
+ db.rollback()
+ results.append({"file_type": file_type, "status": "failed", "message": f"Import failed: {str(e)}"})
+ try:
+ db.add(ImportAuditFile(
+ audit_id=rerun_audit.id,
+ file_type=file_type,
+ status="failed",
+ imported_count=0,
+ errors=1,
+ message=f"Import failed: {str(e)}",
+ details={}
+ ))
+ db.commit()
+ except Exception:
+ db.rollback()
+
+ # Finalize rerun audit
+ summary = {
+ "total_files": len(items),
+ "successful_files": len([r for r in results if r["status"] in ["success", "completed_with_errors"]]),
+ "failed_files": len([r for r in results if r["status"] == "failed"]),
+ "total_imported": total_imported,
+ "total_errors": total_errors,
+ }
+ try:
+ rerun_audit.successful_files = summary["successful_files"]
+ rerun_audit.failed_files = summary["failed_files"]
+ rerun_audit.total_imported = summary["total_imported"]
+ rerun_audit.total_errors = summary["total_errors"]
+ rerun_audit.status = "success" if summary["failed_files"] == 0 and summary["total_errors"] == 0 else (
+ "completed_with_errors" if summary["successful_files"] > 0 else "failed"
+ )
+ rerun_audit.message = f"Rerun completed: {rerun_audit.successful_files}/{rerun_audit.total_files} files"
+ rerun_audit.finished_at = datetime.utcnow()
+ rerun_audit.details = {"rerun_of": audit_id}
+ db.add(rerun_audit)
+ db.commit()
+ except Exception:
+ db.rollback()
+
+ return {"batch_results": results, "summary": summary, "rerun_audit_id": rerun_audit.id}
+
+@router.post("/upload-flexible")
+async def upload_flexible_only(
+ file: UploadFile = UploadFileForm(...),
+ replace_existing: bool = Form(False),
+ db: Session = Depends(get_db),
+ current_user: User = Depends(get_current_user),
+):
+ """Flexible-only single-file upload.
+
+ Accepts any CSV and stores each row as a `FlexibleImport` record with `target_table=None`.
+ """
+ # Ensure CSV
+ if not file.filename or not file.filename.lower().endswith(".csv"):
+ raise HTTPException(status_code=400, detail="File must be a CSV file")
+
+ file_type = file.filename
+
+ try:
+ # Optionally clear prior flexible rows for this file_type
+ if replace_existing:
+ db.query(FlexibleImport).filter(
+ FlexibleImport.file_type == file_type,
+ FlexibleImport.target_table == None, # noqa: E711
+ ).delete()
+ db.commit()
+
+ content = await file.read()
+ encodings = ["utf-8-sig", "utf-8", "windows-1252", "iso-8859-1", "cp1252"]
+ csv_content = None
+ for encoding in encodings:
+ try:
+ csv_content = content.decode(encoding)
+ break
+ except UnicodeDecodeError:
+ continue
+ if csv_content is None:
+ raise HTTPException(status_code=400, detail="Could not decode CSV file encoding")
+
+ reader = csv.DictReader(io.StringIO(csv_content), delimiter=",", quotechar='"', quoting=csv.QUOTE_MINIMAL)
+ headers = reader.fieldnames or []
+
+ imported_count = 0
+ for row in reader:
+ payload = {k: v for k, v in (row or {}).items() if v not in (None, "")}
+ db.add(
+ FlexibleImport(
+ file_type=file_type,
+ target_table=None,
+ primary_key_field=None,
+ primary_key_value=None,
+ extra_data=payload,
+ )
+ )
+ imported_count += 1
+ if imported_count % 200 == 0:
+ db.commit()
+
+ db.commit()
+
+ return {
+ "file_type": file_type,
+ "imported_count": imported_count,
+ "errors": [],
+ "total_errors": 0,
+ "auto_mapping": {
+ "mapped_headers": {},
+ "unmapped_headers": list(headers),
+ "flexible_saved_rows": imported_count,
+ },
+ "message": f"Stored {imported_count} rows as flexible data (no known model)",
+ }
+ except HTTPException:
+ raise
+ except Exception as e:
+ db.rollback()
+ raise HTTPException(status_code=500, detail=f"Flexible upload failed: {str(e)}")
\ No newline at end of file
diff --git a/app/api/search.py b/app/api/search.py
index 2ba372b..9a67b60 100644
--- a/app/api/search.py
+++ b/app/api/search.py
@@ -1,7 +1,7 @@
"""
Advanced Search API endpoints - Comprehensive search across all data types
"""
-from typing import List, Optional, Union, Dict, Any
+from typing import List, Optional, Union, Dict, Any, Tuple
from fastapi import APIRouter, Depends, HTTPException, status, Query, Body
from sqlalchemy.orm import Session, joinedload
from sqlalchemy import or_, and_, func, desc, asc, text, case, cast, String, DateTime, Date, Numeric
@@ -11,6 +11,14 @@ import json
import re
from app.database.base import get_db
+from app.api.search_highlight import (
+ build_query_tokens,
+ highlight_text,
+ create_customer_highlight,
+ create_file_highlight,
+ create_ledger_highlight,
+ create_qdro_highlight,
+)
from app.models.rolodex import Rolodex, Phone
from app.models.files import File
from app.models.ledger import Ledger
@@ -1059,60 +1067,16 @@ def _calculate_document_relevance(doc: FormIndex, query: str) -> float:
# Highlight functions
def _create_customer_highlight(customer: Rolodex, query: str) -> str:
- """Create highlight snippet for customer"""
- if not query:
- return ""
-
- full_name = f"{customer.first or ''} {customer.last}".strip()
- if query.lower() in full_name.lower():
- return f"Name: {full_name}"
-
- if customer.email and query.lower() in customer.email.lower():
- return f"Email: {customer.email}"
-
- if customer.city and query.lower() in customer.city.lower():
- return f"City: {customer.city}"
-
- return ""
+ return create_customer_highlight(customer, query)
def _create_file_highlight(file_obj: File, query: str) -> str:
- """Create highlight snippet for file"""
- if not query:
- return ""
-
- if file_obj.regarding and query.lower() in file_obj.regarding.lower():
- return f"Matter: {file_obj.regarding}"
-
- if file_obj.file_type and query.lower() in file_obj.file_type.lower():
- return f"Type: {file_obj.file_type}"
-
- return ""
+ return create_file_highlight(file_obj, query)
def _create_ledger_highlight(ledger: Ledger, query: str) -> str:
- """Create highlight snippet for ledger"""
- if not query:
- return ""
-
- if ledger.note and query.lower() in ledger.note.lower():
- return f"Note: {ledger.note[:100]}..."
-
- return ""
+ return create_ledger_highlight(ledger, query)
def _create_qdro_highlight(qdro: QDRO, query: str) -> str:
- """Create highlight snippet for QDRO"""
- if not query:
- return ""
-
- if qdro.form_name and query.lower() in qdro.form_name.lower():
- return f"Form: {qdro.form_name}"
-
- if qdro.pet and query.lower() in qdro.pet.lower():
- return f"Petitioner: {qdro.pet}"
-
- if qdro.case_number and query.lower() in qdro.case_number.lower():
- return f"Case: {qdro.case_number}"
-
- return ""
\ No newline at end of file
+ return create_qdro_highlight(qdro, query)
\ No newline at end of file
diff --git a/app/api/search_highlight.py b/app/api/search_highlight.py
new file mode 100644
index 0000000..3263e08
--- /dev/null
+++ b/app/api/search_highlight.py
@@ -0,0 +1,141 @@
+"""
+Server-side highlight utilities for search results.
+
+These functions generate HTML snippets with around matched tokens,
+preserving the original casing of the source text. The output is intended to be
+sanitized on the client before insertion into the DOM.
+"""
+from typing import List, Tuple, Any
+import re
+
+
+def build_query_tokens(query: str) -> List[str]:
+ """Split query into alphanumeric tokens, trimming punctuation and deduping while preserving order."""
+ if not query:
+ return []
+ raw_parts = re.sub(r"[,_;:]+", " ", str(query or "").strip()).split()
+ cleaned: List[str] = []
+ seen = set()
+ for part in raw_parts:
+ token = re.sub(r"^[^A-Za-z0-9]+|[^A-Za-z0-9]+$", "", part)
+ lowered = token.lower()
+ if token and lowered not in seen:
+ cleaned.append(token)
+ seen.add(lowered)
+ return cleaned
+
+
+def _merge_ranges(ranges: List[Tuple[int, int]]) -> List[Tuple[int, int]]:
+ if not ranges:
+ return []
+ ranges.sort(key=lambda x: (x[0], x[1]))
+ merged: List[Tuple[int, int]] = []
+ cur_s, cur_e = ranges[0]
+ for s, e in ranges[1:]:
+ if s <= cur_e:
+ cur_e = max(cur_e, e)
+ else:
+ merged.append((cur_s, cur_e))
+ cur_s, cur_e = s, e
+ merged.append((cur_s, cur_e))
+ return merged
+
+
+def highlight_text(value: str, tokens: List[str]) -> str:
+ """Return `value` with case-insensitive matches of `tokens` wrapped in , preserving original casing."""
+ if value is None:
+ return ""
+ source = str(value)
+ if not source or not tokens:
+ return source
+ haystack = source.lower()
+ ranges: List[Tuple[int, int]] = []
+ for t in tokens:
+ needle = str(t or "").lower()
+ if not needle:
+ continue
+ start = 0
+ last_possible = max(0, len(haystack) - len(needle))
+ while start <= last_possible and len(needle) > 0:
+ idx = haystack.find(needle, start)
+ if idx == -1:
+ break
+ ranges.append((idx, idx + len(needle)))
+ start = idx + 1
+ if not ranges:
+ return source
+ parts: List[str] = []
+ merged = _merge_ranges(ranges)
+ pos = 0
+ for s, e in merged:
+ if pos < s:
+ parts.append(source[pos:s])
+ parts.append("" + source[s:e] + "")
+ pos = e
+ if pos < len(source):
+ parts.append(source[pos:])
+ return "".join(parts)
+
+
+def create_customer_highlight(customer: Any, query: str) -> str:
+ if not query:
+ return ""
+ tokens = build_query_tokens(query)
+ full_name = f"{getattr(customer, 'first', '') or ''} {getattr(customer, 'last', '')}".strip()
+ email = getattr(customer, 'email', None)
+ city = getattr(customer, 'city', None)
+ ql = query.lower()
+
+ if full_name and ql in full_name.lower():
+ return f"Name: {highlight_text(full_name, tokens)}"
+ if email and ql in str(email).lower():
+ return f"Email: {highlight_text(str(email), tokens)}"
+ if city and ql in str(city).lower():
+ return f"City: {highlight_text(str(city), tokens)}"
+ return ""
+
+
+def create_file_highlight(file_obj: Any, query: str) -> str:
+ if not query:
+ return ""
+ tokens = build_query_tokens(query)
+ regarding = getattr(file_obj, 'regarding', None)
+ file_type = getattr(file_obj, 'file_type', None)
+ ql = query.lower()
+ if regarding and ql in str(regarding).lower():
+ return f"Matter: {highlight_text(str(regarding), tokens)}"
+ if file_type and ql in str(file_type).lower():
+ return f"Type: {highlight_text(str(file_type), tokens)}"
+ return ""
+
+
+def create_ledger_highlight(ledger: Any, query: str) -> str:
+ if not query:
+ return ""
+ tokens = build_query_tokens(query)
+ note = getattr(ledger, 'note', None)
+ if note and query.lower() in str(note).lower():
+ text = str(note) or ""
+ preview = text[:160]
+ suffix = "..." if len(text) > 160 else ""
+ return f"Note: {highlight_text(preview, tokens)}{suffix}"
+ return ""
+
+
+def create_qdro_highlight(qdro: Any, query: str) -> str:
+ if not query:
+ return ""
+ tokens = build_query_tokens(query)
+ form_name = getattr(qdro, 'form_name', None)
+ pet = getattr(qdro, 'pet', None)
+ case_number = getattr(qdro, 'case_number', None)
+ ql = query.lower()
+ if form_name and ql in str(form_name).lower():
+ return f"Form: {highlight_text(str(form_name), tokens)}"
+ if pet and ql in str(pet).lower():
+ return f"Petitioner: {highlight_text(str(pet), tokens)}"
+ if case_number and ql in str(case_number).lower():
+ return f"Case: {highlight_text(str(case_number), tokens)}"
+ return ""
+
+
diff --git a/app/api/support.py b/app/api/support.py
index 0175bfe..ae23d12 100644
--- a/app/api/support.py
+++ b/app/api/support.py
@@ -9,8 +9,9 @@ from datetime import datetime
import secrets
from app.database.base import get_db
-from app.models import User, SupportTicket, TicketResponse, TicketStatus, TicketPriority, TicketCategory
+from app.models import User, SupportTicket, TicketResponse as TicketResponseModel, TicketStatus, TicketPriority, TicketCategory
from app.auth.security import get_current_user, get_admin_user
+from app.services.audit import audit_service
router = APIRouter()
@@ -46,7 +47,7 @@ class ResponseCreate(BaseModel):
is_internal: bool = False
-class TicketResponse(BaseModel):
+class TicketResponseOut(BaseModel):
"""Ticket response model"""
id: int
ticket_id: int
@@ -80,7 +81,7 @@ class TicketDetail(BaseModel):
assigned_to: Optional[int]
assigned_admin_name: Optional[str]
submitter_name: Optional[str]
- responses: List[TicketResponse] = []
+ responses: List[TicketResponseOut] = []
class Config:
from_attributes = True
@@ -135,6 +136,20 @@ async def create_support_ticket(
db.commit()
db.refresh(new_ticket)
+ # Audit logging (non-blocking)
+ try:
+ audit_service.log_action(
+ db=db,
+ action="CREATE",
+ resource_type="SUPPORT_TICKET",
+ user=current_user,
+ resource_id=str(new_ticket.id),
+ details={"ticket_number": new_ticket.ticket_number},
+ request=request,
+ )
+ except Exception:
+ pass
+
return {
"message": "Support ticket created successfully",
"ticket_number": new_ticket.ticket_number,
@@ -225,7 +240,7 @@ async def get_ticket(
ticket = db.query(SupportTicket).options(
joinedload(SupportTicket.submitter),
joinedload(SupportTicket.assigned_admin),
- joinedload(SupportTicket.responses).joinedload(TicketResponse.author)
+ joinedload(SupportTicket.responses).joinedload(TicketResponseModel.author)
).filter(SupportTicket.id == ticket_id).first()
if not ticket:
@@ -303,8 +318,19 @@ async def update_ticket(
ticket.updated_at = datetime.utcnow()
db.commit()
- # Log the update (audit logging can be added later)
- # TODO: Add audit logging for ticket updates
+ # Audit logging (non-blocking)
+ try:
+ audit_service.log_action(
+ db=db,
+ action="UPDATE",
+ resource_type="SUPPORT_TICKET",
+ user=current_user,
+ resource_id=str(ticket_id),
+ details={"changes": changes} if changes else None,
+ request=request,
+ )
+ except Exception:
+ pass
return {"message": "Ticket updated successfully"}
@@ -327,7 +353,7 @@ async def add_response(
)
# Create response
- response = TicketResponse(
+ response = TicketResponseModel(
ticket_id=ticket_id,
message=response_data.message,
is_internal=response_data.is_internal,
@@ -343,8 +369,19 @@ async def add_response(
db.commit()
db.refresh(response)
- # Log the response (audit logging can be added later)
- # TODO: Add audit logging for ticket responses
+ # Audit logging (non-blocking)
+ try:
+ audit_service.log_action(
+ db=db,
+ action="ADD_RESPONSE",
+ resource_type="SUPPORT_TICKET",
+ user=current_user,
+ resource_id=str(ticket_id),
+ details={"response_id": response.id, "is_internal": response_data.is_internal},
+ request=request,
+ )
+ except Exception:
+ pass
return {"message": "Response added successfully", "response_id": response.id}
diff --git a/app/main.py b/app/main.py
index 6054f5a..8f11c95 100644
--- a/app/main.py
+++ b/app/main.py
@@ -68,6 +68,7 @@ from app.api.documents import router as documents_router
from app.api.search import router as search_router
from app.api.admin import router as admin_router
from app.api.import_data import router as import_router
+from app.api.flexible import router as flexible_router
from app.api.support import router as support_router
from app.api.settings import router as settings_router
@@ -82,14 +83,15 @@ app.include_router(admin_router, prefix="/api/admin", tags=["admin"])
app.include_router(import_router, prefix="/api/import", tags=["import"])
app.include_router(support_router, prefix="/api/support", tags=["support"])
app.include_router(settings_router, prefix="/api/settings", tags=["settings"])
+app.include_router(flexible_router, prefix="/api")
@app.get("/", response_class=HTMLResponse)
async def root(request: Request):
- """Main application - redirect to login"""
+ """Dashboard as the main landing page. Client-side JS handles auth redirect."""
return templates.TemplateResponse(
- "login.html",
- {"request": request, "title": "Login - " + settings.app_name}
+ "dashboard.html",
+ {"request": request, "title": "Dashboard - " + settings.app_name}
)
@@ -167,6 +169,15 @@ async def import_page(request: Request):
)
+@app.get("/flexible", response_class=HTMLResponse)
+async def flexible_page(request: Request):
+ """Flexible imports admin page (admin only)."""
+ return templates.TemplateResponse(
+ "flexible.html",
+ {"request": request, "title": "Flexible Imports - " + settings.app_name}
+ )
+
+
@app.get("/health")
async def health_check():
"""Health check endpoint"""
diff --git a/app/models/__init__.py b/app/models/__init__.py
index a02cfe0..33f5b20 100644
--- a/app/models/__init__.py
+++ b/app/models/__init__.py
@@ -7,9 +7,10 @@ from .rolodex import Rolodex, Phone
from .files import File
from .ledger import Ledger
from .qdro import QDRO
-from .audit import AuditLog, LoginAttempt
+from .audit import AuditLog, LoginAttempt, ImportAudit, ImportAuditFile
from .auth import RefreshToken
from .additional import Deposit, Payment, FileNote, FormVariable, ReportVariable, Document
+from .flexible import FlexibleImport
from .support import SupportTicket, TicketResponse, TicketStatus, TicketPriority, TicketCategory
from .pensions import (
Pension, PensionSchedule, MarriageHistory, DeathBenefit,
@@ -23,8 +24,8 @@ from .lookups import (
__all__ = [
"BaseModel", "User", "Rolodex", "Phone", "File", "Ledger", "QDRO",
- "AuditLog", "LoginAttempt", "RefreshToken",
- "Deposit", "Payment", "FileNote", "FormVariable", "ReportVariable", "Document",
+ "AuditLog", "LoginAttempt", "ImportAudit", "ImportAuditFile", "RefreshToken",
+ "Deposit", "Payment", "FileNote", "FormVariable", "ReportVariable", "Document", "FlexibleImport",
"SupportTicket", "TicketResponse", "TicketStatus", "TicketPriority", "TicketCategory",
"Pension", "PensionSchedule", "MarriageHistory", "DeathBenefit",
"SeparationAgreement", "LifeTable", "NumberTable",
diff --git a/app/models/audit.py b/app/models/audit.py
index baa066e..b824db9 100644
--- a/app/models/audit.py
+++ b/app/models/audit.py
@@ -46,4 +46,57 @@ class LoginAttempt(BaseModel):
failure_reason = Column(String(200), nullable=True) # Reason for failure
def __repr__(self):
- return f"hi
', 'info', { html: true, duration: 0 }); + const textEl = wrapper.querySelector('.text-sm.mt-1.font-semibold'); + expect(textEl).toBeTruthy(); + const html = textEl.innerHTML; + expect(html).toContain('Select one or more groups
Select one or more states
+| Customer | -Name | -Group | -Location | -Phone | -Actions | +Customer | +Name | +Group | +Location | +Phone | +Actions |
- ${result.customer.city}, ${result.customer.state}
+ ${highlightText(`${result.customer.city}, ${result.customer.state}`)}
|
-
${result.location}: ${result.phone}
+
+ ${result.location}:
+ ${escapeHtml(result.phone)}
+
|
@@ -517,6 +865,15 @@ async function loadGroups() {
option.textContent = g.group;
select.appendChild(option);
});
+ // Apply saved selections
+ try {
+ const savedStr = localStorage.getItem('customers.filterGroups');
+ const savedLegacy = localStorage.getItem('customers.filterGroup') || '';
+ const saved = savedStr ? JSON.parse(savedStr) : (savedLegacy ? [savedLegacy] : []);
+ if (Array.isArray(saved)) {
+ Array.from(select.options).forEach(o => { o.selected = saved.includes(o.value); });
+ }
+ } catch (_) {}
}
} catch (error) {
console.error('Error loading groups:', error);
@@ -536,12 +893,69 @@ async function loadStates() {
option.textContent = s.state;
select.appendChild(option);
});
+ // Apply saved selections
+ try {
+ const savedStr = localStorage.getItem('customers.filterStates');
+ const savedLegacy = localStorage.getItem('customers.filterState') || '';
+ const saved = savedStr ? JSON.parse(savedStr) : (savedLegacy ? [savedLegacy] : []);
+ if (Array.isArray(saved)) {
+ Array.from(select.options).forEach(o => { o.selected = saved.includes(o.value); });
+ }
+ } catch (_) {}
}
} catch (error) {
console.error('Error loading states:', error);
}
}
+function renderActiveFilterChips() {
+ const container = document.getElementById('activeFilterChips');
+ const clearBtn = document.getElementById('clearAllFiltersBtn');
+ if (!container) return;
+ const groups = Array.isArray(window.currentGroupFilters) ? window.currentGroupFilters : [];
+ const states = Array.isArray(window.currentStateFilters) ? window.currentStateFilters : [];
+ const chips = [];
+ groups.forEach(g => chips.push({ type: 'group', label: g }));
+ states.forEach(s => chips.push({ type: 'state', label: s }));
+ if (chips.length === 0) {
+ container.innerHTML = '';
+ if (clearBtn) clearBtn.classList.add('hidden');
+ return;
+ }
+ container.innerHTML = chips.map((c, idx) => `
+
+ ${c.type === 'group' ? 'Group' : 'State'}: ${c.label}
+
+
+ `).join('');
+ if (clearBtn) clearBtn.classList.remove('hidden');
+ // Wire remove events
+ Array.from(container.querySelectorAll('.chip-remove')).forEach(btn => {
+ btn.addEventListener('click', (e) => {
+ const chip = e.currentTarget.closest('span');
+ if (!chip) return;
+ const type = chip.getAttribute('data-type');
+ const value = chip.getAttribute('data-value');
+ if (type === 'group') {
+ window.currentGroupFilters = (window.currentGroupFilters || []).filter(v => v !== value);
+ try { localStorage.setItem('customers.filterGroups', JSON.stringify(window.currentGroupFilters)); } catch (_) {}
+ const sel = document.getElementById('groupFilter');
+ if (sel) Array.from(sel.options).forEach(o => { if (o.value === value) o.selected = false; });
+ } else if (type === 'state') {
+ window.currentStateFilters = (window.currentStateFilters || []).filter(v => v !== value);
+ try { localStorage.setItem('customers.filterStates', JSON.stringify(window.currentStateFilters)); } catch (_) {}
+ const sel = document.getElementById('stateFilter');
+ if (sel) Array.from(sel.options).forEach(o => { if (o.value === value) o.selected = false; });
+ }
+ currentPage = 0;
+ renderActiveFilterChips();
+ loadCustomers(currentPage, currentSearch);
+ });
+ });
+}
+
async function showStats() {
try {
const response = await window.http.wrappedFetch('/api/customers/stats');
@@ -597,4 +1011,112 @@ function displayStats(stats) {
// Functions are now implemented in the external customers-tailwind.js file
+
+
{% endblock %}
\ No newline at end of file
diff --git a/templates/dashboard.html b/templates/dashboard.html
index 925af1c..88b5183 100644
--- a/templates/dashboard.html
+++ b/templates/dashboard.html
@@ -121,6 +121,11 @@
Global Search
Ctrl+F
+
@@ -130,14 +135,22 @@
- Recent Activity + Recent Activity & Imports
+
@@ -227,6 +240,81 @@ function globalSearch() {
// Load data on page load
document.addEventListener('DOMContentLoaded', function() {
loadDashboardData(); // Uncomment when authentication is implemented
+ loadRecentImports();
+ loadRecentActivity();
});
+async function loadRecentActivity() {
+ // Placeholder: existing system would populate; if an endpoint exists, hook it here.
+}
+
+async function loadRecentImports() {
+ try {
+ const [statusResp, recentResp] = await Promise.all([
+ window.http.wrappedFetch('/api/import/status'),
+ window.http.wrappedFetch('/api/import/recent-batches?limit=5')
+ ]);
+ if (!statusResp.ok) return;
+ const status = await statusResp.json();
+ const recent = recentResp && recentResp.ok ? (await recentResp.json()).recent || [] : [];
+ const entries = Object.entries(status || {});
+ const total = entries.reduce((sum, [, v]) => sum + (v && v.record_count ? v.record_count : 0), 0);
+ const top = entries
+ .filter(([, v]) => (v && v.record_count) > 0)
+ .slice(0, 6)
+ .map(([k, v]) => ({ name: k, count: v.record_count, table: v.table_name }));
+
+ const container = document.getElementById('recent-imports');
+ if (!container) return;
+ if (entries.length === 0 && recent.length === 0) {
+ container.innerHTML = '
+
+
+
+
+
+ Loading recent imports... +Loading recent activity... No import status available. '; + return; + } + const items = top.map(({ name, count }) => ` +
+ ${name}
+ ${Number(count).toLocaleString()}
+
+ `).join('');
+ const recentRows = (recent || []).map(r => `
+ |
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| ${r.status} | +${r.started_at ? new Date(r.started_at).toLocaleString() : ''} | +${r.finished_at ? new Date(r.finished_at).toLocaleString() : ''} | +${r.successful_files}/${r.total_files} | +${Number(r.total_imported || 0).toLocaleString()} | +|||||||||
| Status | +Started | +Finished | +Files | +Imported | +
|---|---|---|---|---|
| No recent batch uploads | ||||
| File # | -Client | -Matter | -Type | -Status | -Attorney | -Opened | -Balance | -Actions | -
|---|
| File # | +Client | +Matter | +Type | +Status | +Attorney | +Opened | +Balance | +Actions | +
|---|