Files
delphi-database-v2/app/main.py
HotSwapp 9b2ce0d28f Final enhancement: Increase encoding detection read size to 50KB
- Increased read size from 20KB to 50KB in both main.py and import_legacy.py
- This ensures problematic bytes at position 3738+ are caught during encoding detection
- Provides maximum robustness for legacy CSV files with deeply embedded encoding issues
- Maintains all previous improvements including fallback mechanisms
2025-10-13 21:44:17 -05:00

3969 lines
134 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
FastAPI application entry point for Delphi Database.
This module initializes the FastAPI application, sets up database connections,
and provides the main application instance.
"""
import os
import time
import csv
import json
import uuid
from contextlib import asynccontextmanager
from datetime import datetime
from typing import Optional, List, Dict, Any
from io import StringIO
from fastapi import FastAPI, Depends, Request, Query, HTTPException, UploadFile, File, Form
from fastapi.responses import RedirectResponse, Response, JSONResponse
from starlette.middleware.sessions import SessionMiddleware
from fastapi.middleware.cors import CORSMiddleware
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
from sqlalchemy.orm import Session, joinedload
from sqlalchemy import or_, and_, func as sa_func, select
from dotenv import load_dotenv
from starlette.middleware.base import BaseHTTPMiddleware
import structlog
from structlog import contextvars as structlog_contextvars
from .database import create_tables, get_db, get_database_url
from .models import User, Case, Client, Phone, Transaction, Document, Payment, ImportLog, Qdros, LegacyFile
from .auth import authenticate_user, get_current_user_from_session
from .reporting import (
build_phone_book_pdf,
build_payments_detailed_pdf,
build_envelope_pdf,
build_phone_book_address_pdf,
build_rolodex_info_pdf,
)
from .logging_config import setup_logging
from .schemas import (
ClientOut,
PhoneOut,
CaseOut,
TransactionOut,
Pagination,
RolodexListResponse,
FilesListResponse,
LedgerListResponse,
)
from . import import_legacy
from . import sync_legacy_to_modern
# Load environment variables
load_dotenv()
# Get SECRET_KEY from environment variables
SECRET_KEY = os.getenv("SECRET_KEY")
if not SECRET_KEY:
raise ValueError("SECRET_KEY environment variable must be set")
# Configure structured logging
setup_logging()
logger = structlog.get_logger(__name__)
def open_text_with_fallbacks(file_path: str):
"""
Open a text file trying multiple encodings commonly seen in legacy CSVs.
Attempts in order: utf-8, utf-8-sig, iso-8859-1, latin-1, cp1252, windows-1252, cp1250.
Prioritizes latin-1/iso-8859-1 as they handle legacy data better than cp1252.
Returns a tuple of (file_object, encoding_used). Caller is responsible to close file.
"""
encodings = ["utf-8", "utf-8-sig", "iso-8859-1", "latin-1", "cp1252", "windows-1252", "cp1250"]
last_error = None
for enc in encodings:
try:
f = open(file_path, 'r', encoding=enc, errors='strict', newline='')
# Read more than 1KB to catch encoding issues deeper in the file
# Many legacy CSVs have issues beyond the first few rows
_ = f.read(51200) # Read 50KB to test (increased from 20KB)
f.seek(0)
logger.info("csv_open_encoding_selected", file=file_path, encoding=enc)
return f, enc
except Exception as e:
last_error = e
logger.warning("encoding_fallback_failed", file=file_path, encoding=enc, error=str(e))
try:
f.close()
except:
pass
continue
# If strict mode fails, try with error replacement for robustness
logger.warning("strict_encoding_failed", file=file_path, trying_with_replace=True)
try:
# Try UTF-8 with error replacement first (most common case)
f = open(file_path, 'r', encoding='utf-8', errors='replace', newline='')
_ = f.read(51200) # Read 50KB to catch encoding issues deeper in the file (increased from 20KB)
f.seek(0)
logger.info("csv_open_encoding_with_replace", file=file_path, encoding="utf-8-replace")
return f, "utf-8-replace"
except Exception as e:
logger.warning("utf8_replace_failed", file=file_path, error=str(e))
# Final fallback: use latin-1 with replace (handles any byte sequence)
try:
f = open(file_path, 'r', encoding='latin-1', errors='replace', newline='')
_ = f.read(51200) # Read 50KB to catch encoding issues deeper in the file (increased from 20KB)
f.seek(0)
logger.info("csv_open_encoding_fallback", file=file_path, encoding="latin-1-replace")
return f, "latin-1-replace"
except Exception as e:
last_error = e
error_msg = f"Unable to open file '{file_path}' with any supported encodings"
if last_error:
error_msg += f". Last error: {str(last_error)}"
raise RuntimeError(error_msg)
# Configure Jinja2 templates
templates = Jinja2Templates(directory="app/templates")
class AuthMiddleware(BaseHTTPMiddleware):
"""
Simple session-based authentication middleware.
Redirects unauthenticated users to /login for protected routes.
"""
def __init__(self, app, exempt_paths: list[str] | None = None):
super().__init__(app)
self.exempt_paths = exempt_paths or []
async def dispatch(self, request, call_next):
path = request.url.path
# Allow exempt paths and static assets
if (
path in self.exempt_paths
or path.startswith("/static")
or path.startswith("/favicon")
):
return await call_next(request)
# Enforce authentication for other paths
if not request.session.get("user_id"):
# Return JSON 401 for API routes, redirect for HTML routes
if path.startswith("/api/"):
return JSONResponse(status_code=401, content={"detail": "Unauthorized"})
return RedirectResponse(url="/login", status_code=302)
return await call_next(request)
class RequestIdMiddleware(BaseHTTPMiddleware):
"""
Middleware that assigns a request_id and binds request context for logging.
Adds: request_id, http.method, http.path, user.id to the structlog context.
Emits a JSON access log with status_code and duration_ms after response.
"""
async def dispatch(self, request: Request, call_next):
start_time = time.perf_counter()
request_id = request.headers.get("X-Request-ID") or str(uuid.uuid4())
method = request.method
path = request.url.path
# user id from session if available (SessionMiddleware runs first)
user_id = request.session.get("user_id") if hasattr(request, "session") else None
structlog_contextvars.bind_contextvars(
request_id=request_id,
**{"http.method": method, "http.path": path, "user.id": user_id},
)
try:
response = await call_next(request)
status_code = response.status_code
except Exception as exc: # noqa: BLE001 - we re-raise after logging
status_code = 500
duration_ms = int((time.perf_counter() - start_time) * 1000)
logger.error(
"request",
status_code=status_code,
duration_ms=duration_ms,
exc_info=True,
)
structlog_contextvars.unbind_contextvars("request_id", "http.method", "http.path", "user.id")
raise
# Ensure response header has request id
try:
response.headers["X-Request-ID"] = request_id
except Exception:
pass
duration_ms = int((time.perf_counter() - start_time) * 1000)
logger.info(
"request",
status_code=status_code,
duration_ms=duration_ms,
)
structlog_contextvars.unbind_contextvars("request_id", "http.method", "http.path", "user.id")
return response
@asynccontextmanager
async def lifespan(app: FastAPI):
"""
Lifespan context manager for FastAPI application.
Handles startup and shutdown events:
- Creates database tables on startup
- Logs database connection info
"""
# Startup
logger.info("app_start")
# Create database tables
create_tables()
logger.info("db_tables_verified")
# Log database connection info
db_url = get_database_url()
logger.info("db_connected", database_url=db_url)
yield
# Shutdown
logger.info("app_shutdown")
# Create FastAPI application with lifespan management
app = FastAPI(
title="Delphi Database",
description="Legal case management database application",
version="1.0.0",
lifespan=lifespan
)
# Add CORS middleware for cross-origin requests
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # In production, specify allowed origins
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Register request logging and authentication middleware with exempt paths
EXEMPT_PATHS = ["/", "/health", "/login", "/logout"]
app.add_middleware(RequestIdMiddleware)
app.add_middleware(AuthMiddleware, exempt_paths=EXEMPT_PATHS)
# Add SessionMiddleware for session management (must be added LAST so it runs FIRST)
app.add_middleware(SessionMiddleware, secret_key=SECRET_KEY)
# Mount static files directory
app.mount("/static", StaticFiles(directory="static"), name="static")
# Canonical list of valid legacy import types used across the admin features
VALID_IMPORT_TYPES: List[str] = [
# Reference tables
'trnstype', 'trnslkup', 'footers', 'filestat', 'employee',
'gruplkup', 'filetype', 'fvarlkup', 'rvarlkup', 'states', 'printers', 'setup',
# Core data tables
'rolodex', 'phone', 'rolex_v', 'files', 'files_r', 'files_v',
'filenots', 'ledger', 'deposits', 'payments',
# Specialized tables
'planinfo', 'qdros', 'pensions', 'pension_marriage',
'pension_death', 'pension_schedule', 'pension_separate', 'pension_results',
]
# Centralized import order for auto-import after upload
# Reference tables first, then core tables, then specialized tables
IMPORT_ORDER: List[str] = [
# Reference tables - import these first
'trnstype', 'trnslkup', 'footers', 'filestat', 'employee', 'gruplkup', 'filetype', 'fvarlkup', 'rvarlkup', 'states', 'printers', 'setup',
# Core tables - import after reference tables
'rolodex', 'phone', 'rolex_v', 'files', 'files_r', 'files_v', 'filenots', 'ledger', 'deposits', 'payments',
# Specialized tables - import last
'planinfo', 'qdros', 'pensions', 'pension_marriage', 'pension_death', 'pension_schedule', 'pension_separate', 'pension_results',
]
ORDER_INDEX: Dict[str, int] = {t: i for i, t in enumerate(IMPORT_ORDER)}
def get_import_type_from_filename(filename: str) -> str:
"""
Determine import type based on filename pattern for legacy CSV files.
Supports both legacy CSV naming (e.g., FILES.csv, LEDGER.csv) and
model class naming (e.g., LegacyFile.csv, Ledger.csv).
Args:
filename: Name of the uploaded CSV file
Returns:
Import type string matching the import function keys
"""
filename_upper = filename.upper()
# Strip extension and normalize
base = filename_upper.rsplit('.', 1)[0]
# Reference tables
if 'TRNSTYPE' in base:
return 'trnstype'
if 'TRNSLKUP' in base:
return 'trnslkup'
if 'FOOTER' in base:
return 'footers'
if 'FILESTAT' in base:
return 'filestat'
if 'EMPLOYEE' in base:
return 'employee'
if 'GRUPLKUP' in base or 'GROUPLKUP' in base or base == 'GROUPLKUP':
return 'gruplkup'
if 'FILETYPE' in base:
return 'filetype'
if 'FVARLKUP' in base:
return 'fvarlkup'
if 'RVARLKUP' in base:
return 'rvarlkup'
if 'STATES' in base or base == 'STATES':
return 'states'
if 'PRINTERS' in base or base == 'PRINTERS':
return 'printers'
if 'SETUP' in base or base == 'SETUP':
return 'setup'
# Core data tables - check most specific patterns first
# Check for ROLEX_V and ROLEXV before ROLEX
if 'ROLEX_V' in base or 'ROLEXV' in base:
return 'rolex_v'
# Check for FILES_R, FILES_V, FILENOTS before generic FILES
if 'FILES_R' in base or 'FILESR' in base:
return 'files_r'
if 'FILES_V' in base or 'FILESV' in base:
return 'files_v'
if 'FILENOTS' in base or 'FILE_NOTS' in base or base == 'FILENOTS':
return 'filenots'
# Check for model class name "LEGACYFILE" before generic "FILE"
if base == 'LEGACYFILE':
return 'files'
if 'FILES' in base:
return 'files'
# Only match generic "FILE" if it's the exact base name or starts with it
if base == 'FILE' or base.startswith('FILE_'):
return 'files'
# ROLODEX variations
if 'ROLODEX' in base or base == 'ROLEX':
return 'rolodex'
# PHONE variations (including model class name LEGACYPHONE)
if 'PHONE' in base or base == 'LEGACYPHONE':
return 'phone'
# LEDGER
if 'LEDGER' in base:
return 'ledger'
# DEPOSITS
if 'DEPOSITS' in base or 'DEPOSIT' in base:
return 'deposits'
# PAYMENTS (including model class name LEGACYPAYMENT)
if 'PAYMENTS' in base or 'PAYMENT' in base or base == 'LEGACYPAYMENT':
return 'payments'
# Specialized tables
if 'PLANINFO' in base or 'PLAN_INFO' in base:
return 'planinfo'
if 'QDROS' in base or 'QDRO' in base:
return 'qdros'
# Pension sub-tables - check most specific first
if 'MARRIAGE' in base or base == 'PENSIONMARRIAGE':
return 'pension_marriage'
if 'DEATH' in base or base == 'PENSIONDEATH':
return 'pension_death'
if 'SCHEDULE' in base or base == 'PENSIONSCHEDULE':
return 'pension_schedule'
if 'SEPARATE' in base or base == 'PENSIONSEPARATE':
return 'pension_separate'
if 'RESULTS' in base or base == 'PENSIONRESULTS':
return 'pension_results'
# Generic PENSIONS - check last after specific pension tables
if 'PENSIONS' in base or base == 'PENSIONS':
return 'pensions'
if base == 'PENSION':
return 'pensions'
raise ValueError(f"Unknown file type for filename: {filename}")
@app.post("/admin/map-files")
async def admin_map_unknown_files(
request: Request,
db: Session = Depends(get_db)
):
"""
Map selected unknown files to a chosen import type by renaming with the
appropriate prefix used through the system (e.g., files are stored as
"{import_type}_{uuid}.csv").
Expects JSON body: { "target_type": str, "filenames": [str, ...] }
"""
# Auth check
user = get_current_user_from_session(request.session)
if not user:
raise HTTPException(status_code=401, detail="Unauthorized")
try:
payload = await request.json()
except Exception:
raise HTTPException(status_code=400, detail="Invalid JSON body")
target_type = (payload.get("target_type") or "").strip()
filenames = payload.get("filenames") or []
if target_type not in VALID_IMPORT_TYPES:
raise HTTPException(status_code=400, detail="Invalid target import type")
if not isinstance(filenames, list) or not filenames:
raise HTTPException(status_code=400, detail="No filenames provided")
import_dir = "data-import"
if not os.path.isdir(import_dir):
raise HTTPException(status_code=400, detail="Import directory not found")
mapped: List[Dict[str, str]] = []
errors: List[Dict[str, str]] = []
for name in filenames:
# Basic traversal protection
if not isinstance(name, str) or any(x in name for x in ["..", "/", "\\"]):
errors.append({"filename": name, "error": "Invalid filename"})
continue
old_path = os.path.join(import_dir, name)
if not os.path.isfile(old_path):
errors.append({"filename": name, "error": "File not found"})
continue
# Determine new name: keep UUID suffix/extension from existing stored filename
try:
_, ext = os.path.splitext(name)
# If name already follows pattern <type>_<uuid>.<ext>, keep suffix
suffix = name
if "_" in name:
parts = name.split("_", 1)
if len(parts) == 2:
suffix = parts[1]
new_name = f"{target_type}_{suffix}"
new_path = os.path.join(import_dir, new_name)
# Avoid overwriting: if exists, add short random suffix
if os.path.exists(new_path):
rand = uuid.uuid4().hex[:6]
base_no_ext, _ = os.path.splitext(new_name)
new_name = f"{base_no_ext}-{rand}{ext}"
new_path = os.path.join(import_dir, new_name)
os.replace(old_path, new_path)
logger.info(
"admin_map_file",
old_filename=name,
new_filename=new_name,
target_type=target_type,
username=user.username,
)
mapped.append({"old": name, "new": new_name})
except Exception as e:
logger.error(
"admin_map_file_error",
filename=name,
target_type=target_type,
error=str(e),
)
errors.append({"filename": name, "error": str(e)})
return {"mapped": mapped, "errors": errors}
def validate_csv_headers(headers: List[str], expected_fields: Dict[str, str]) -> Dict[str, Any]:
"""
Validate CSV headers against expected model fields.
Args:
headers: List of CSV column headers
expected_fields: Dict mapping field names to descriptions
Returns:
Dict with validation results and field mapping
"""
result = {
'valid': True,
'missing_fields': [],
'field_mapping': {},
'errors': []
}
# Create mapping from CSV headers to model fields (case-insensitive)
for csv_header in headers:
csv_header_clean = csv_header.strip().lower()
matched = False
for model_field, description in expected_fields.items():
if csv_header_clean == model_field.lower():
result['field_mapping'][model_field] = csv_header
matched = True
break
if not matched:
# Try partial matches for common variations
for model_field, description in expected_fields.items():
if model_field.lower() in csv_header_clean or csv_header_clean in model_field.lower():
result['field_mapping'][model_field] = csv_header
matched = True
break
if not matched:
result['errors'].append(f"Unknown header: '{csv_header}'")
# Check for required fields (case-insensitive)
required_fields = ['id'] # Most imports need some form of ID
for required in required_fields:
found = False
for mapped_field in result['field_mapping']:
if mapped_field.lower() == required.lower():
found = True
break
if not found:
result['missing_fields'].append(required)
if result['missing_fields'] or result['errors']:
result['valid'] = False
return result
def parse_date(date_str: str) -> Optional[datetime]:
"""Parse date string into datetime object."""
if not date_str or date_str.strip() in ('', 'NULL', 'N/A'):
return None
# Try common date formats
formats = ['%Y-%m-%d', '%m/%d/%Y', '%Y/%m/%d', '%d-%m-%Y']
for fmt in formats:
try:
return datetime.strptime(date_str.strip(), fmt)
except ValueError:
continue
logger.warning("parse_date_failed", value=date_str)
return None
def parse_float(value: str) -> Optional[float]:
"""Parse string value into float."""
if not value or value.strip() in ('', 'NULL', 'N/A'):
return None
try:
return float(value.strip())
except ValueError:
logger.warning("parse_float_failed", value=value)
return None
def parse_int(value: str) -> Optional[int]:
"""Parse string value into int."""
if not value or value.strip() in ('', 'NULL', 'N/A'):
return None
try:
return int(value.strip())
except ValueError:
logger.warning("parse_int_failed", value=value)
return None
def import_rolodex_data(db: Session, file_path: str) -> Dict[str, Any]:
"""
Import ROLODEX CSV data into Client model.
Expected CSV format: Id,Prefix,First,Middle,Last,Suffix,Title,A1,A2,A3,City,Abrev,St,Zip,Email,DOB,SS#,Legal_Status,Group,Memo
"""
result = {
'success': 0,
'errors': [],
'total_rows': 0,
'memo_imported': 0,
'memo_missing': 0,
'email_imported': 0,
'email_missing': 0,
'skipped_duplicates': 0,
'encoding_used': None,
}
expected_fields = {
'Id': 'Client ID',
'Prefix': 'Name Prefix',
'First': 'First Name',
'Middle': 'Middle Initial',
'Last': 'Last Name',
'Suffix': 'Name Suffix',
'Title': 'Company/Organization',
'A1': 'Address Line 1',
'A2': 'Address Line 2',
'A3': 'Address Line 3',
'City': 'City',
'Abrev': 'State Abbreviation',
'St': 'State',
'Zip': 'ZIP Code',
'Email': 'Email Address',
'DOB': 'Date of Birth',
'SS#': 'Social Security Number',
'Legal_Status': 'Legal Status',
'Group': 'Group',
'Memo': 'Memo/Notes'
}
try:
f, used_encoding = open_text_with_fallbacks(file_path)
result['encoding_used'] = used_encoding
with f as file:
reader = csv.DictReader(file)
# Validate headers
headers = reader.fieldnames or []
validation = validate_csv_headers(headers, expected_fields)
if not validation['valid']:
result['errors'].append(f"Header validation failed: {validation['errors']}")
return result
for row_num, row in enumerate(reader, start=2): # Start at 2 (header is row 1)
result['total_rows'] += 1
try:
# Extract and clean data
rolodex_id = row.get('Id', '').strip()
if not rolodex_id:
result['errors'].append(f"Row {row_num}: Missing client ID")
continue
# Check for existing client
existing = db.query(Client).filter(Client.rolodex_id == rolodex_id).first()
if existing:
result['skipped_duplicates'] += 1
logger.warning(
"rolodex_import_duplicate",
row=row_num,
rolodex_id=rolodex_id,
file=file_path,
)
continue
# Parse DOB (YYYY-MM-DD or MM/DD/YY variants)
dob_raw = row.get('DOB', '').strip()
dob_val = None
for fmt in ("%Y-%m-%d", "%m/%d/%Y", "%m/%d/%y"):
if not dob_raw:
break
try:
dob_val = datetime.strptime(dob_raw, fmt).date()
break
except ValueError:
continue
email_val = row.get('Email', '').strip() or None
memo_val = row.get('Memo', '')
memo_clean = memo_val.strip() if memo_val is not None else ''
memo_val_clean = memo_clean or None
if email_val:
result['email_imported'] += 1
else:
result['email_missing'] += 1
if memo_val_clean:
result['memo_imported'] += 1
else:
result['memo_missing'] += 1
client = Client(
rolodex_id=rolodex_id,
prefix=row.get('Prefix', '').strip() or None,
first_name=row.get('First', '').strip() or None,
middle_name=row.get('Middle', '').strip() or None,
last_name=row.get('Last', '').strip() or None,
suffix=row.get('Suffix', '').strip() or None,
title=row.get('Title', '').strip() or None,
company=row.get('Title', '').strip() or None,
address=row.get('A1', '').strip() or None,
city=row.get('City', '').strip() or None,
state=(row.get('Abrev', '').strip() or row.get('St', '').strip() or None),
zip_code=row.get('Zip', '').strip() or None,
group=row.get('Group', '').strip() or None,
email=email_val,
dob=dob_val,
ssn=row.get('SS#', '').strip() or None,
legal_status=row.get('Legal_Status', '').strip() or None,
memo=memo_val_clean,
)
db.add(client)
result['success'] += 1
logger.info(
"rolodex_import_row",
row=row_num,
rolodex_id=rolodex_id,
email_present=bool(email_val),
memo_present=bool(memo_val_clean),
)
except Exception as e:
result['errors'].append(f"Row {row_num}: {str(e)}")
db.commit()
logger.info(
"rolodex_import_complete",
file=file_path,
encoding=used_encoding,
total_rows=result['total_rows'],
success=result['success'],
memo_imported=result['memo_imported'],
memo_missing=result['memo_missing'],
email_imported=result['email_imported'],
email_missing=result['email_missing'],
skipped_duplicates=result['skipped_duplicates'],
errors=len(result['errors']),
)
except Exception as e:
logger.error("rolodex_import_failed", file=file_path, error=str(e))
result['errors'].append(f"Import failed: {str(e)}")
db.rollback()
return result
def import_phone_data(db: Session, file_path: str) -> Dict[str, Any]:
"""
Import PHONE CSV data into Phone model.
Expected CSV format: Id,Phone,Location
"""
result = {
'success': 0,
'errors': [],
'total_rows': 0
}
f = None
try:
f, used_encoding = open_text_with_fallbacks(file_path)
reader = csv.DictReader(f)
headers = reader.fieldnames or []
if len(headers) < 2:
result['errors'].append("Invalid CSV format: expected at least 2 columns")
return result
for row_num, row in enumerate(reader, start=2):
result['total_rows'] += 1
try:
client_id = row.get('Id', '').strip()
if not client_id:
result['errors'].append(f"Row {row_num}: Missing client ID")
continue
# Find the client
client = db.query(Client).filter(Client.rolodex_id == client_id).first()
if not client:
result['errors'].append(f"Row {row_num}: Client with ID '{client_id}' not found")
continue
phone_number = row.get('Phone', '').strip()
if not phone_number:
result['errors'].append(f"Row {row_num}: Missing phone number")
continue
phone = Phone(
client_id=client.id,
phone_type=row.get('Location', '').strip() or 'primary',
phone_number=phone_number
)
db.add(phone)
result['success'] += 1
except Exception as e:
result['errors'].append(f"Row {row_num}: {str(e)}")
db.commit()
except Exception as e:
result['errors'].append(f"Import failed: {str(e)}")
db.rollback()
finally:
if f:
f.close()
return result
def import_files_data(db: Session, file_path: str) -> Dict[str, Any]:
"""
Import FILES CSV data into Case model.
Expected CSV format: File_No,Id,File_Type,Regarding,Opened,Closed,Empl_Num,Rate_Per_Hour,Status,Footer_Code,Opposing,Hours,Hours_P,Trust_Bal,Trust_Bal_P,Hourly_Fees,Hourly_Fees_P,Flat_Fees,Flat_Fees_P,Disbursements,Disbursements_P,Credit_Bal,Credit_Bal_P,Total_Charges,Total_Charges_P,Amount_Owing,Amount_Owing_P,Transferable,Memo
"""
result = {
'success': 0,
'errors': [],
'total_rows': 0,
'client_linked': 0,
'client_missing': 0,
'encoding_used': None,
'skipped_duplicates': 0,
}
expected_fields = {
'File_No': 'File Number',
'Status': 'Status',
'File_Type': 'File Type',
'Regarding': 'Regarding',
'Opened': 'Opened Date',
'Closed': 'Closed Date',
'Id': 'Client ID',
'Empl_Num': 'Employee Number',
'Rate_Per_Hour': 'Rate Per Hour',
'Footer_Code': 'Footer Code',
'Opposing': 'Opposing Party',
'Hours': 'Hours',
'Hours_P': 'Hours (Previous)',
'Trust_Bal': 'Trust Balance',
'Trust_Bal_P': 'Trust Balance (Previous)',
'Hourly_Fees': 'Hourly Fees',
'Hourly_Fees_P': 'Hourly Fees (Previous)',
'Flat_Fees': 'Flat Fees',
'Flat_Fees_P': 'Flat Fees (Previous)',
'Disbursements': 'Disbursements',
'Disbursements_P': 'Disbursements (Previous)',
'Credit_Bal': 'Credit Balance',
'Credit_Bal_P': 'Credit Balance (Previous)',
'Total_Charges': 'Total Charges',
'Total_Charges_P': 'Total Charges (Previous)',
'Amount_Owing': 'Amount Owing',
'Amount_Owing_P': 'Amount Owing (Previous)',
'Transferable': 'Transferable',
'Memo': 'Memo'
}
f = None
try:
f, used_encoding = open_text_with_fallbacks(file_path)
result['encoding_used'] = used_encoding
reader = csv.DictReader(f)
headers = reader.fieldnames or []
validation = validate_csv_headers(headers, expected_fields)
if not validation['valid']:
result['errors'].append(f"Header validation failed: {validation['errors']}")
return result
for row_num, row in enumerate(reader, start=2):
result['total_rows'] += 1
try:
file_no = row.get('File_No', '').strip()
if not file_no:
result['errors'].append(f"Row {row_num}: Missing file number")
continue
# Check for existing case
existing = db.query(Case).filter(Case.file_no == file_no).first()
if existing:
result['skipped_duplicates'] += 1
logger.warning(
"files_import_duplicate",
row=row_num,
file_no=file_no,
file=file_path,
)
continue
# Find client by ID
client_id = row.get('Id', '').strip()
client = None
if client_id:
client = db.query(Client).filter(Client.rolodex_id == client_id).first()
if not client:
result['client_missing'] += 1
logger.warning(
"files_import_missing_client",
row=row_num,
file_no=file_no,
legacy_client_id=client_id,
)
result['errors'].append(f"Row {row_num}: Client with ID '{client_id}' not found")
continue
result['client_linked'] += 1
case = Case(
file_no=file_no,
client_id=client.id if client else None,
status=row.get('Status', '').strip() or 'active',
case_type=row.get('File_Type', '').strip() or None,
description=row.get('Regarding', '').strip() or None,
open_date=parse_date(row.get('Opened', '')),
close_date=parse_date(row.get('Closed', '')),
)
db.add(case)
result['success'] += 1
logger.info(
"files_import_row",
row=row_num,
file_no=file_no,
client_id=client.id if client else None,
status=case.status,
)
except Exception as e:
result['errors'].append(f"Row {row_num}: {str(e)}")
db.commit()
logger.info(
"files_import_complete",
file=file_path,
encoding=used_encoding,
total_rows=result['total_rows'],
success=result['success'],
client_linked=result['client_linked'],
client_missing=result['client_missing'],
skipped_duplicates=result['skipped_duplicates'],
errors=len(result['errors']),
)
except Exception as e:
result['errors'].append(f"Import failed: {str(e)}")
db.rollback()
finally:
if f:
f.close()
return result
def import_ledger_data(db: Session, file_path: str) -> Dict[str, Any]:
"""
Import LEDGER CSV data into Transaction model.
Expected CSV format: File_No,Date,Item_No,Empl_Num,T_Code,T_Type,T_Type_L,Quantity,Rate,Amount,Billed,Note
"""
result = {
'success': 0,
'errors': [],
'total_rows': 0
}
f = None
try:
f, used_encoding = open_text_with_fallbacks(file_path)
reader = csv.DictReader(f)
headers = reader.fieldnames or []
if len(headers) < 3:
result['errors'].append("Invalid CSV format: expected at least 3 columns")
return result
for row_num, row in enumerate(reader, start=2):
result['total_rows'] += 1
try:
file_no = row.get('File_No', '').strip()
if not file_no:
result['errors'].append(f"Row {row_num}: Missing file number")
continue
# Find the case
case = db.query(Case).filter(Case.file_no == file_no).first()
if not case:
result['errors'].append(f"Row {row_num}: Case with file number '{file_no}' not found")
continue
amount = parse_float(row.get('Amount', '0'))
if amount is None:
result['errors'].append(f"Row {row_num}: Invalid amount")
continue
tx_date = parse_date(row.get('Date', ''))
item_no = parse_int(row.get('Item_No', '') or '')
# ensure unique item_no per date by increment
# temp session-less check via while loop
desired_item_no = item_no if item_no is not None else 1
while True:
exists = (
db.query(Transaction)
.filter(
Transaction.case_id == case.id,
Transaction.transaction_date == tx_date,
Transaction.item_no == desired_item_no,
)
.first()
)
if not exists:
break
desired_item_no += 1
transaction = Transaction(
case_id=case.id,
transaction_date=tx_date,
transaction_type=(row.get('T_Type', '').strip() or None),
t_type_l=(row.get('T_Type_L', '').strip().upper() or None),
amount=amount,
description=(row.get('Note', '').strip() or None),
reference=(row.get('Item_No', '').strip() or None),
item_no=desired_item_no,
employee_number=(row.get('Empl_Num', '').strip() or None),
t_code=(row.get('T_Code', '').strip().upper() or None),
quantity=parse_float(row.get('Quantity', '')),
rate=parse_float(row.get('Rate', '')),
billed=((row.get('Billed', '') or '').strip().upper() or None),
)
db.add(transaction)
result['success'] += 1
except Exception as e:
result['errors'].append(f"Row {row_num}: {str(e)}")
db.commit()
except Exception as e:
result['errors'].append(f"Import failed: {str(e)}")
db.rollback()
finally:
if f:
f.close()
return result
def import_qdros_data(db: Session, file_path: str) -> Dict[str, Any]:
"""
Import QDROS CSV data into Document model.
Expected CSV format: File_No,Document_Type,Description,File_Name,Date
"""
result = {
'success': 0,
'errors': [],
'total_rows': 0
}
f = None
try:
f, used_encoding = open_text_with_fallbacks(file_path)
reader = csv.DictReader(f)
headers = reader.fieldnames or []
if len(headers) < 2:
result['errors'].append("Invalid CSV format: expected at least 2 columns")
return result
for row_num, row in enumerate(reader, start=2):
result['total_rows'] += 1
try:
file_no = row.get('File_No', '').strip()
if not file_no:
result['errors'].append(f"Row {row_num}: Missing file number")
continue
# Find the case
case = db.query(Case).filter(Case.file_no == file_no).first()
if not case:
result['errors'].append(f"Row {row_num}: Case with file number '{file_no}' not found")
continue
document = Document(
case_id=case.id,
document_type=row.get('Document_Type', '').strip() or 'QDRO',
file_name=row.get('File_Name', '').strip() or None,
description=row.get('Description', '').strip() or None,
uploaded_date=parse_date(row.get('Date', ''))
)
db.add(document)
result['success'] += 1
except Exception as e:
result['errors'].append(f"Row {row_num}: {str(e)}")
db.commit()
except Exception as e:
result['errors'].append(f"Import failed: {str(e)}")
db.rollback()
finally:
if f:
f.close()
return result
def import_payments_data(db: Session, file_path: str) -> Dict[str, Any]:
"""
Import PAYMENTS CSV data into Payment model.
Expected CSV format: File_No,Date,Amount,Type,Description,Check_Number
"""
result = {
'success': 0,
'errors': [],
'total_rows': 0
}
f = None
try:
f, used_encoding = open_text_with_fallbacks(file_path)
reader = csv.DictReader(f)
headers = reader.fieldnames or []
if len(headers) < 2:
result['errors'].append("Invalid CSV format: expected at least 2 columns")
return result
for row_num, row in enumerate(reader, start=2):
result['total_rows'] += 1
try:
file_no = row.get('File_No', '').strip()
if not file_no:
result['errors'].append(f"Row {row_num}: Missing file number")
continue
# Find the case
case = db.query(Case).filter(Case.file_no == file_no).first()
if not case:
result['errors'].append(f"Row {row_num}: Case with file number '{file_no}' not found")
continue
amount = parse_float(row.get('Amount', '0'))
if amount is None:
result['errors'].append(f"Row {row_num}: Invalid amount")
continue
payment = Payment(
case_id=case.id,
payment_date=parse_date(row.get('Date', '')),
payment_type=row.get('Type', '').strip() or None,
amount=amount,
description=row.get('Description', '').strip() or None,
check_number=row.get('Check_Number', '').strip() or None
)
db.add(payment)
result['success'] += 1
except Exception as e:
result['errors'].append(f"Row {row_num}: {str(e)}")
db.commit()
except Exception as e:
result['errors'].append(f"Import failed: {str(e)}")
db.rollback()
finally:
if f:
f.close()
return result
def process_csv_import(db: Session, import_type: str, file_path: str) -> Dict[str, Any]:
"""
Process CSV import based on type using legacy import functions.
Args:
db: Database session
import_type: Type of import
file_path: Path to CSV file
Returns:
Dict with import results
"""
import_functions = {
# Reference tables (import first)
'trnstype': import_legacy.import_trnstype,
'trnslkup': import_legacy.import_trnslkup,
'footers': import_legacy.import_footers,
'filestat': import_legacy.import_filestat,
'employee': import_legacy.import_employee,
'gruplkup': import_legacy.import_gruplkup,
'filetype': import_legacy.import_filetype,
'fvarlkup': import_legacy.import_fvarlkup,
'rvarlkup': import_legacy.import_rvarlkup,
'states': import_legacy.import_states,
'printers': import_legacy.import_printers,
'setup': import_legacy.import_setup,
# Core data tables
'rolodex': import_legacy.import_rolodex,
'phone': import_legacy.import_phone,
'rolex_v': import_legacy.import_rolex_v,
'files': import_legacy.import_files,
'files_r': import_legacy.import_files_r,
'files_v': import_legacy.import_files_v,
'filenots': import_legacy.import_filenots,
'ledger': import_legacy.import_ledger,
'deposits': import_legacy.import_deposits,
'payments': import_legacy.import_payments,
# Specialized tables
'planinfo': import_legacy.import_planinfo,
'qdros': import_legacy.import_qdros,
'pensions': import_legacy.import_pensions,
'pension_marriage': import_legacy.import_pension_marriage,
'pension_death': import_legacy.import_pension_death,
'pension_schedule': import_legacy.import_pension_schedule,
'pension_separate': import_legacy.import_pension_separate,
'pension_results': import_legacy.import_pension_results,
}
import_func = import_functions.get(import_type)
if not import_func:
return {
'success': 0,
'errors': [f"Unknown import type: {import_type}"],
'total_rows': 0
}
return import_func(db, file_path)
# ---------------------------------
# Auto-import helper after upload
# ---------------------------------
def run_auto_import_for_upload(db: Session, uploaded_items: List[Dict[str, Any]]) -> Dict[str, Any]:
"""
Run auto-import for the files just uploaded, following IMPORT_ORDER.
Stops after the first file that reports any row errors. Unknown types are
skipped. Logs each file via ImportLog.
"""
# Filter out unknowns; keep metadata
known_items: List[Dict[str, Any]] = [
item for item in uploaded_items if item.get("import_type") in ORDER_INDEX
]
# Sort by import order, then by filename for stability
known_items.sort(key=lambda x: (ORDER_INDEX.get(x.get("import_type"), 1_000_000), x.get("filename", "")))
files_summary: List[Dict[str, Any]] = []
stopped = False
stopped_on: Optional[str] = None
for item in known_items:
import_type = item["import_type"]
file_path = item["file_path"]
stored_filename = item["stored_filename"]
# Create import log
import_log = ImportLog(
import_type=import_type,
file_name=stored_filename,
file_path=file_path,
status="running",
)
db.add(import_log)
db.commit()
try:
result = process_csv_import(db, import_type, file_path)
import_log.status = "completed" if not result.get("errors") else "failed"
import_log.total_rows = result.get("total_rows", 0)
import_log.success_count = result.get("success", 0)
import_log.error_count = len(result.get("errors", []))
import_log.error_details = json.dumps(result.get("errors", []))
import_log.completed_at = datetime.now()
db.commit()
summary_item = {
"filename": item.get("filename"),
"stored_filename": stored_filename,
"import_type": import_type,
"status": "success" if result.get("success", 0) > 0 and not result.get("errors") else "error",
"total_rows": result.get("total_rows", 0),
"success_count": result.get("success", 0),
"error_count": len(result.get("errors", [])),
"errors": (result.get("errors", [])[:10] if result.get("errors") else []),
}
# Add skip details if present (for phone imports and others that track this)
if result.get("skipped_no_phone", 0) > 0 or result.get("skipped_no_id", 0) > 0:
skip_details = []
if result.get("skipped_no_phone", 0) > 0:
skip_details.append(f"{result['skipped_no_phone']} rows without phone number")
if result.get("skipped_no_id", 0) > 0:
skip_details.append(f"{result['skipped_no_id']} rows without ID")
summary_item["skip_info"] = ", ".join(skip_details)
files_summary.append(summary_item)
if result.get("errors"):
stopped = True
stopped_on = stored_filename
break
except Exception as e:
import_log.status = "failed"
import_log.error_details = json.dumps([str(e)])
import_log.completed_at = datetime.now()
db.commit()
files_summary.append({
"filename": item.get("filename"),
"stored_filename": stored_filename,
"import_type": import_type,
"status": "error",
"total_rows": 0,
"success_count": 0,
"error_count": 1,
"errors": [str(e)][:10],
})
stopped = True
stopped_on = stored_filename
break
# Build skipped notes for unknowns
skipped_unknowns = [
{"filename": item.get("filename"), "stored_filename": item.get("stored_filename")}
for item in uploaded_items
if item.get("import_type") not in ORDER_INDEX
]
return {
"files": files_summary,
"stopped": stopped,
"stopped_on": stopped_on,
"skipped_unknowns": skipped_unknowns,
}
# ------------------------------
# Ledger CRUD and helpers
# ------------------------------
def validate_ledger_fields(
*,
transaction_date: Optional[str],
t_code: Optional[str],
employee_number: Optional[str],
quantity: Optional[str],
rate: Optional[str],
amount: Optional[str],
billed: Optional[str],
) -> tuple[list[str], dict[str, Any]]:
"""Validate incoming ledger form fields and return (errors, parsed_values)."""
errors: list[str] = []
parsed: dict[str, Any] = {}
# Date
tx_dt = parse_date(transaction_date or "") if transaction_date is not None else None
if tx_dt is None:
errors.append("Date is required and must be valid")
else:
parsed["transaction_date"] = tx_dt
# T_Code
if t_code is None or not t_code.strip():
errors.append("T_Code is required")
else:
parsed["t_code"] = t_code.strip().upper()
# Employee number
if employee_number is None or not employee_number.strip():
errors.append("Empl_Num is required")
else:
parsed["employee_number"] = employee_number.strip()
# Quantity, Rate, Amount
qty = parse_float(quantity or "") if quantity is not None else None
rt = parse_float(rate or "") if rate is not None else None
amt = parse_float(amount or "") if amount is not None else None
if qty is not None:
parsed["quantity"] = qty
if rt is not None:
parsed["rate"] = rt
# Auto-compute amount if missing but quantity and rate present
if amt is None and qty is not None and rt is not None:
amt = round(qty * rt, 2)
if amt is None:
errors.append("Amount is required or derivable from Quantity × Rate")
else:
parsed["amount"] = amt
# Billed flag
billed_flag = (billed or "").strip().upper() if billed is not None else ""
if billed_flag not in ("Y", "N"):
errors.append("Billed must be 'Y' or 'N'")
else:
parsed["billed"] = billed_flag
return errors, parsed
def next_unique_item_no(db: Session, case_id: int, tx_date: datetime, desired_item_no: Optional[int]) -> int:
"""Ensure (transaction_date, item_no) uniqueness per case by incrementing if needed."""
# Start at provided item_no or at 1 if missing
item_no = int(desired_item_no) if desired_item_no is not None else 1
while True:
exists = (
db.query(Transaction)
.filter(
Transaction.case_id == case_id,
Transaction.transaction_date == tx_date,
Transaction.item_no == item_no,
)
.first()
)
if not exists:
return item_no
item_no += 1
def compute_case_totals_from_case(case_obj: Case) -> Dict[str, float]:
"""
Compute simple totals for a case from its transactions.
Returns billed, unbilled, and total sums. Amounts are treated as positive;
future enhancement could apply sign based on t_type_l.
"""
billed_total = 0.0
unbilled_total = 0.0
overall_total = 0.0
for t in (case_obj.transactions or []):
amt = float(t.amount) if t.amount is not None else 0.0
overall_total += amt
if (t.billed or '').upper() == 'Y':
billed_total += amt
else:
unbilled_total += amt
return {
'billed_total': round(billed_total, 2),
'unbilled_total': round(unbilled_total, 2),
'overall_total': round(overall_total, 2),
}
def compute_case_totals_for_case_id(db: Session, case_id: int) -> Dict[str, float]:
"""
Compute billed, unbilled, and overall totals for a case by ID.
This uses a simple in-Python aggregation over the case's transactions to
avoid SQL portability issues and to keep the logic consistent with
compute_case_totals_from_case.
"""
billed_total = 0.0
unbilled_total = 0.0
overall_total = 0.0
transactions: List[Transaction] = (
db.query(Transaction).filter(Transaction.case_id == case_id).all()
)
for t in transactions:
amt = float(t.amount) if t.amount is not None else 0.0
overall_total += amt
if ((t.billed or '').upper()) == 'Y':
billed_total += amt
else:
unbilled_total += amt
return {
'billed_total': round(billed_total, 2),
'unbilled_total': round(unbilled_total, 2),
'overall_total': round(overall_total, 2),
}
def _ledger_keys_from_tx(tx: Optional["Transaction"]) -> Dict[str, Any]:
"""
Extract identifying keys for a ledger transaction for audit logs.
"""
if tx is None:
return {}
return {
'transaction_id': getattr(tx, 'id', None),
'case_id': getattr(tx, 'case_id', None),
'item_no': getattr(tx, 'item_no', None),
'transaction_date': getattr(tx, 'transaction_date', None),
't_code': getattr(tx, 't_code', None),
't_type_l': getattr(tx, 't_type_l', None),
'employee_number': getattr(tx, 'employee_number', None),
'billed': getattr(tx, 'billed', None),
'amount': getattr(tx, 'amount', None),
}
def _log_ledger_audit(
*,
action: str,
user: "User",
case_id: int,
keys: Dict[str, Any],
pre: Dict[str, float],
post: Dict[str, float],
) -> None:
"""
Emit a structured audit log line for ledger mutations including user, action,
identifiers, and pre/post balances with deltas.
"""
delta = {
'billed_total': round((post.get('billed_total', 0.0) - pre.get('billed_total', 0.0)), 2),
'unbilled_total': round((post.get('unbilled_total', 0.0) - pre.get('unbilled_total', 0.0)), 2),
'overall_total': round((post.get('overall_total', 0.0) - pre.get('overall_total', 0.0)), 2),
}
logger.info(
"ledger_audit",
action=action,
user_id=getattr(user, 'id', None),
user_username=getattr(user, 'username', None),
case_id=case_id,
keys=keys,
pre_balances=pre,
post_balances=post,
delta_balances=delta,
)
@app.post("/case/{case_id}/ledger")
async def ledger_create(
request: Request,
case_id: int,
db: Session = Depends(get_db),
):
user = get_current_user_from_session(request.session)
if not user:
return RedirectResponse(url="/login", status_code=302)
form = await request.form()
# Pre-mutation totals for audit
pre_totals = compute_case_totals_for_case_id(db, case_id)
# Validate
errors, parsed = validate_ledger_fields(
transaction_date=form.get("transaction_date"),
t_code=form.get("t_code"),
employee_number=form.get("employee_number"),
quantity=form.get("quantity"),
rate=form.get("rate"),
amount=form.get("amount"),
billed=form.get("billed"),
)
if errors:
request.session["case_update_errors"] = errors
return RedirectResponse(url=f"/case/{case_id}", status_code=302)
# Ensure case exists
case_obj = db.query(Case).filter(Case.id == case_id).first()
if not case_obj:
request.session["case_update_errors"] = ["Case not found"]
return RedirectResponse(url=f"/case/{case_id}", status_code=302)
# Assign optional fields
t_type = (form.get("transaction_type") or "").strip() or None
t_type_l = (form.get("t_type_l") or "").strip().upper() or None
reference = (form.get("reference") or "").strip() or None
desc = (form.get("description") or "").strip() or None
desired_item_no = parse_int(form.get("item_no") or "")
item_no = next_unique_item_no(db, case_id, parsed["transaction_date"], desired_item_no)
try:
tx = Transaction(
case_id=case_id,
transaction_date=parsed["transaction_date"],
transaction_type=t_type,
t_type_l=t_type_l,
amount=parsed["amount"],
description=desc,
reference=reference,
item_no=item_no,
employee_number=parsed["employee_number"],
t_code=parsed["t_code"],
quantity=parsed.get("quantity"),
rate=parsed.get("rate"),
billed=parsed["billed"],
)
db.add(tx)
db.commit()
# Post-mutation totals and audit log
post_totals = compute_case_totals_for_case_id(db, case_id)
_log_ledger_audit(
action="create",
user=user,
case_id=case_id,
keys=_ledger_keys_from_tx(tx),
pre=pre_totals,
post=post_totals,
)
logger.info("ledger_create", case_id=case_id, transaction_id=tx.id)
return RedirectResponse(url=f"/case/{case_id}?saved=1", status_code=302)
except Exception as e:
db.rollback()
logger.error("ledger_create_failed", case_id=case_id, error=str(e))
request.session["case_update_errors"] = ["Failed to create ledger entry"]
return RedirectResponse(url=f"/case/{case_id}", status_code=302)
@app.post("/case/{case_id}/ledger/{tx_id}")
async def ledger_update(
request: Request,
case_id: int,
tx_id: int,
db: Session = Depends(get_db),
):
user = get_current_user_from_session(request.session)
if not user:
return RedirectResponse(url="/login", status_code=302)
form = await request.form()
tx = db.query(Transaction).filter(Transaction.id == tx_id, Transaction.case_id == case_id).first()
if not tx:
request.session["case_update_errors"] = ["Ledger entry not found"]
return RedirectResponse(url=f"/case/{case_id}", status_code=302)
# Pre-mutation totals for audit
pre_totals = compute_case_totals_for_case_id(db, case_id)
errors, parsed = validate_ledger_fields(
transaction_date=form.get("transaction_date"),
t_code=form.get("t_code"),
employee_number=form.get("employee_number"),
quantity=form.get("quantity"),
rate=form.get("rate"),
amount=form.get("amount"),
billed=form.get("billed"),
)
if errors:
request.session["case_update_errors"] = errors
return RedirectResponse(url=f"/case/{case_id}", status_code=302)
try:
tx.transaction_date = parsed["transaction_date"]
# Ensure uniqueness of (date, item_no)
desired_item_no = parse_int(form.get("item_no") or "") or tx.item_no
tx.item_no = next_unique_item_no(db, case_id, parsed["transaction_date"], desired_item_no)
tx.t_code = parsed["t_code"]
tx.employee_number = parsed["employee_number"]
tx.quantity = parsed.get("quantity")
tx.rate = parsed.get("rate")
tx.amount = parsed["amount"]
tx.billed = parsed["billed"]
tx.transaction_type = (form.get("transaction_type") or "").strip() or None
tx.t_type_l = (form.get("t_type_l") or "").strip().upper() or None
tx.reference = (form.get("reference") or "").strip() or None
tx.description = (form.get("description") or "").strip() or None
db.commit()
# Post-mutation totals and audit log
post_totals = compute_case_totals_for_case_id(db, case_id)
_log_ledger_audit(
action="update",
user=user,
case_id=case_id,
keys=_ledger_keys_from_tx(tx),
pre=pre_totals,
post=post_totals,
)
logger.info("ledger_update", case_id=case_id, transaction_id=tx.id)
return RedirectResponse(url=f"/case/{case_id}?saved=1", status_code=302)
except Exception as e:
db.rollback()
logger.error("ledger_update_failed", case_id=case_id, tx_id=tx_id, error=str(e))
request.session["case_update_errors"] = ["Failed to update ledger entry"]
return RedirectResponse(url=f"/case/{case_id}", status_code=302)
@app.post("/case/{case_id}/ledger/{tx_id}/delete")
async def ledger_delete(
request: Request,
case_id: int,
tx_id: int,
db: Session = Depends(get_db),
):
user = get_current_user_from_session(request.session)
if not user:
return RedirectResponse(url="/login", status_code=302)
tx = db.query(Transaction).filter(Transaction.id == tx_id, Transaction.case_id == case_id).first()
if not tx:
request.session["case_update_errors"] = ["Ledger entry not found"]
return RedirectResponse(url=f"/case/{case_id}", status_code=302)
try:
# Capture pre-mutation totals and keys for audit before deletion
pre_totals = compute_case_totals_for_case_id(db, case_id)
tx_keys = _ledger_keys_from_tx(tx)
db.delete(tx)
db.commit()
# Post-mutation totals and audit log
post_totals = compute_case_totals_for_case_id(db, case_id)
_log_ledger_audit(
action="delete",
user=user,
case_id=case_id,
keys=tx_keys,
pre=pre_totals,
post=post_totals,
)
logger.info("ledger_delete", case_id=case_id, transaction_id=tx_id)
return RedirectResponse(url=f"/case/{case_id}?saved=1", status_code=302)
except Exception as e:
db.rollback()
logger.error("ledger_delete_failed", case_id=case_id, tx_id=tx_id, error=str(e))
request.session["case_update_errors"] = ["Failed to delete ledger entry"]
return RedirectResponse(url=f"/case/{case_id}", status_code=302)
@app.get("/")
async def root():
"""
Root endpoint - serves login form for web interface.
"""
return RedirectResponse(url="/login", status_code=302)
@app.get("/health")
async def health_check(db: Session = Depends(get_db)):
"""
Health check endpoint that verifies database connectivity.
"""
try:
# Test database connection by querying user count
user_count = db.query(User).count()
return {
"status": "healthy",
"database": "connected",
"users": user_count
}
except Exception as e:
logger.error("health_check_failed", error=str(e))
return {
"status": "unhealthy",
"database": "error",
"error": str(e)
}
@app.get("/login")
async def login_form(request: Request):
"""
Display login form.
If user is already logged in, redirect to dashboard.
"""
# Check if user is already logged in
user = get_current_user_from_session(request.session)
if user:
return RedirectResponse(url="/dashboard", status_code=302)
return templates.TemplateResponse("login.html", {"request": request})
@app.post("/login")
async def login_submit(request: Request, db: Session = Depends(get_db)):
"""
Handle login form submission.
Authenticates user credentials and sets up session.
"""
form = await request.form()
username = form.get("username")
password = form.get("password")
if not username or not password:
error_message = "Username and password are required"
logger.warning("login_failed", username=username, reason="missing_credentials")
return templates.TemplateResponse("login.html", {
"request": request,
"error": error_message
})
# Authenticate user
user = authenticate_user(username, password)
if not user:
error_message = "Invalid username or password"
logger.warning("login_failed", username=username, reason="invalid_credentials")
return templates.TemplateResponse("login.html", {
"request": request,
"error": error_message
})
# Set up user session
request.session["user_id"] = user.id
request.session["user"] = {"id": user.id, "username": user.username}
# Update bound context with authenticated user id
structlog_contextvars.bind_contextvars(**{"user.id": user.id})
logger.info("login_success", username=username, **{"user.id": user.id})
# Redirect to dashboard after successful login
return RedirectResponse(url="/dashboard", status_code=302)
@app.get("/logout")
async def logout(request: Request):
"""
Handle user logout.
Clears user session and redirects to home page.
"""
username = request.session.get("user", {}).get("username", "unknown")
request.session.clear()
logger.info("logout", username=username)
return RedirectResponse(url="/", status_code=302)
@app.get("/dashboard")
async def dashboard(
request: Request,
q: str | None = Query(None, description="Search by file number or client name"),
page: int = Query(1, ge=1, description="Page number (1-indexed)"),
page_size: int = Query(20, ge=1, le=100, description="Results per page"),
db: Session = Depends(get_db),
):
"""
Dashboard page - lists recent cases with search and pagination.
- Optional query param `q` filters by case file number or client name/company
- `page` and `page_size` control pagination
"""
# Check authentication
user = get_current_user_from_session(request.session)
if not user:
return RedirectResponse(url="/login", status_code=302)
# Base query: join clients for name/company access
query = db.query(Case).join(Client).order_by(
Case.open_date.desc(),
Case.created_at.desc(),
)
# Apply search filter if provided
if q:
like_term = f"%{q}%"
query = query.filter(
or_(
Case.file_no.ilike(like_term),
Client.first_name.ilike(like_term),
Client.last_name.ilike(like_term),
Client.company.ilike(like_term),
)
)
# Total count for pagination
total: int = query.count()
# Clamp page to valid range when total is known
total_pages: int = (total + page_size - 1) // page_size if total > 0 else 1
if page > total_pages:
page = total_pages
# Pagination window
offset = (page - 1) * page_size
cases = query.offset(offset).limit(page_size).all()
# Page number window for UI (current +/- 2)
start_page = max(1, page - 2)
end_page = min(total_pages, page + 2)
page_numbers = list(range(start_page, end_page + 1))
logger.info(
"dashboard_render",
query=q,
page=page,
page_size=page_size,
total=total,
)
return templates.TemplateResponse(
"dashboard.html",
{
"request": request,
"user": user,
"cases": cases,
"q": q,
"page": page,
"page_size": page_size,
"total": total,
"total_pages": total_pages,
"page_numbers": page_numbers,
"start_index": (offset + 1) if total > 0 else 0,
"end_index": min(offset + len(cases), total),
},
)
@app.post("/admin/upload")
async def admin_upload_files(
request: Request,
files: List[UploadFile] = File(...),
auto_import: bool = Form(True),
db: Session = Depends(get_db)
):
"""
Handle CSV file uploads for admin panel.
Validates uploaded files are CSV format and stores them in data-import directory.
"""
# Check authentication
user = get_current_user_from_session(request.session)
if not user:
return RedirectResponse(url="/login", status_code=302)
results = []
errors = []
# Ensure data-import directory exists
import_dir = "data-import"
os.makedirs(import_dir, exist_ok=True)
for file in files:
try:
# Validate file type
if not file.filename.lower().endswith('.csv'):
errors.append(f"File '{file.filename}' is not a CSV file")
continue
# Generate unique filename to avoid conflicts
file_id = str(uuid.uuid4())
file_ext = os.path.splitext(file.filename)[1]
# Determine import type from original filename for better categorization later
try:
detected_type = get_import_type_from_filename(file.filename)
except ValueError:
detected_type = 'unknown'
# Prefix stored filename with detected type to preserve context
stored_filename = f"{detected_type}_{file_id}{file_ext}"
file_path = os.path.join(import_dir, stored_filename)
# Save file
contents = await file.read()
with open(file_path, "wb") as f:
f.write(contents)
# Use detected type (already derived from original name)
import_type = detected_type
results.append({
'filename': file.filename,
'stored_filename': stored_filename,
'import_type': import_type,
'file_path': file_path,
'size': len(contents)
})
except Exception as e:
errors.append(f"Error processing '{file.filename}': {str(e)}")
continue
# Log the upload operation
logger.info(
"admin_upload",
uploaded_count=len(results),
error_count=len(errors),
username=user.username,
auto_import=auto_import,
)
auto_import_results: Dict[str, Any] | None = None
if auto_import and results:
try:
auto_import_results = run_auto_import_for_upload(db, results)
logger.info(
"admin_upload_auto_import",
processed_files=len(auto_import_results.get("files", [])),
stopped=auto_import_results.get("stopped", False),
stopped_on=auto_import_results.get("stopped_on"),
username=user.username,
)
except Exception as e:
logger.error("admin_upload_auto_import_failed", error=str(e), username=user.username)
return templates.TemplateResponse("admin.html", {
"request": request,
"user": user,
"upload_results": results,
"upload_errors": errors,
"auto_import_results": auto_import_results,
"show_upload_results": True
})
@app.post("/admin/import/{data_type}")
async def admin_import_data(
request: Request,
data_type: str,
db: Session = Depends(get_db)
):
"""
Process CSV import for specified data type.
Creates import log entry and processes the import in the background.
"""
# Check authentication
user = get_current_user_from_session(request.session)
if not user:
return RedirectResponse(url="/login", status_code=302)
# Validate data type
if data_type not in VALID_IMPORT_TYPES:
return templates.TemplateResponse("admin.html", {
"request": request,
"user": user,
"error": f"Invalid data type: {data_type}"
})
# Get form data for file selection
form = await request.form()
selected_files = form.getlist("selected_files")
if not selected_files:
return templates.TemplateResponse("admin.html", {
"request": request,
"user": user,
"error": "No files selected for import"
})
import_results = []
total_success = 0
total_errors = 0
for stored_filename in selected_files:
file_path = os.path.join("data-import", stored_filename)
if not os.path.exists(file_path):
import_results.append({
'filename': stored_filename,
'status': 'error',
'message': 'File not found'
})
total_errors += 1
continue
# Create import log entry
import_log = ImportLog(
import_type=data_type,
file_name=stored_filename,
file_path=file_path,
status="running"
)
db.add(import_log)
db.commit()
try:
# Process the import
result = process_csv_import(db, data_type, file_path)
# Update import log
import_log.status = "completed" if not result['errors'] else "failed"
import_log.total_rows = result['total_rows']
import_log.success_count = result['success']
import_log.error_count = len(result['errors'])
import_log.error_details = json.dumps(result['errors'])
import_log.completed_at = datetime.now()
db.commit()
import_results.append({
'filename': stored_filename,
'status': 'success' if result['success'] > 0 else 'error',
'total_rows': result['total_rows'],
'success_count': result['success'],
'error_count': len(result['errors']),
'errors': result['errors'][:10] # Show first 10 errors
})
total_success += result['success']
total_errors += len(result['errors'])
except Exception as e:
# Update import log on error
import_log.status = "failed"
import_log.error_details = json.dumps([str(e)])
import_log.completed_at = datetime.now()
db.commit()
import_results.append({
'filename': stored_filename,
'status': 'error',
'message': str(e)
})
total_errors += 1
# Log the import operation
logger.info(
"admin_import",
import_type=data_type,
success_count=total_success,
error_count=total_errors,
username=user.username,
)
return templates.TemplateResponse("admin.html", {
"request": request,
"user": user,
"import_results": import_results,
"total_success": total_success,
"total_errors": total_errors,
"show_import_results": True
})
@app.post("/admin/sync")
async def admin_sync_data(
request: Request,
db: Session = Depends(get_db)
):
"""
Sync legacy database models to modern application models.
This route triggers the sync process to populate the simplified
modern models (Client, Phone, Case, Transaction, Payment, Document)
from the comprehensive legacy models.
"""
# Check authentication
user = get_current_user_from_session(request.session)
if not user:
return RedirectResponse(url="/login", status_code=302)
# Get form data for confirmation
form = await request.form()
clear_existing = form.get("clear_existing") == "true"
try:
logger.info(
"admin_sync_starting",
clear_existing=clear_existing,
username=user.username
)
# Run all sync functions
results = sync_legacy_to_modern.sync_all(db, clear_existing=clear_existing)
# Calculate totals
total_synced = sum(r['success'] for r in results.values() if r)
total_skipped = sum(r['skipped'] for r in results.values() if r)
total_errors = sum(len(r['errors']) for r in results.values() if r)
logger.info(
"admin_sync_complete",
total_synced=total_synced,
total_skipped=total_skipped,
total_errors=total_errors,
username=user.username
)
return templates.TemplateResponse("admin.html", {
"request": request,
"user": user,
"sync_results": results,
"total_synced": total_synced,
"total_skipped": total_skipped,
"total_sync_errors": total_errors,
"show_sync_results": True
})
except Exception as e:
logger.error("admin_sync_failed", error=str(e), username=user.username)
return templates.TemplateResponse("admin.html", {
"request": request,
"user": user,
"error": f"Sync failed: {str(e)}"
})
@app.delete("/admin/delete-file/{filename}")
async def admin_delete_file(
request: Request,
filename: str,
db: Session = Depends(get_db)
):
"""
Delete an uploaded CSV file from the data-import directory.
Requires authentication and validates that the file exists.
"""
# Check authentication
user = get_current_user_from_session(request.session)
if not user:
raise HTTPException(status_code=401, detail="Unauthorized")
# Validate filename to prevent directory traversal
if ".." in filename or "/" in filename or "\\" in filename:
raise HTTPException(status_code=400, detail="Invalid filename")
# Construct file path
import_dir = "data-import"
file_path = os.path.join(import_dir, filename)
# Check if file exists
if not os.path.exists(file_path):
raise HTTPException(status_code=404, detail="File not found")
# Check if it's actually a file (not a directory)
if not os.path.isfile(file_path):
raise HTTPException(status_code=400, detail="Not a file")
try:
# Delete the file
os.remove(file_path)
# Log the deletion
logger.info(
"admin_delete_file",
filename=filename,
username=user.username,
)
return {"success": True, "message": f"File '{filename}' deleted successfully"}
except Exception as e:
logger.error(
"admin_delete_file_error",
filename=filename,
username=user.username,
error=str(e)
)
raise HTTPException(status_code=500, detail=f"Error deleting file: {str(e)}")
@app.get("/admin")
async def admin_panel(request: Request, db: Session = Depends(get_db)):
"""
Admin panel - requires authentication.
Provides administrative functions like data import and system management.
"""
# Check authentication
user = get_current_user_from_session(request.session)
if not user:
return RedirectResponse(url="/login", status_code=302)
# Get recent import history
recent_imports = db.query(ImportLog).order_by(ImportLog.created_at.desc()).limit(10).all()
# Get available files for import
import_dir = "data-import"
available_files = []
if os.path.exists(import_dir):
for filename in os.listdir(import_dir):
if filename.endswith('.csv'):
file_path = os.path.join(import_dir, filename)
file_size = os.path.getsize(file_path)
try:
import_type = get_import_type_from_filename(filename)
except ValueError:
import_type = 'unknown'
available_files.append({
'filename': filename,
'import_type': import_type,
'size': file_size,
'modified': datetime.fromtimestamp(os.path.getmtime(file_path))
})
# Group files by import type
files_by_type = {}
for file_info in available_files:
import_type = file_info['import_type']
if import_type not in files_by_type:
files_by_type[import_type] = []
files_by_type[import_type].append(file_info)
# Get record counts for all legacy and modern tables
from .models import (
# Legacy tables
Rolodex, LegacyPhone, LegacyFile, FilesR, FilesV, FileNots,
Ledger, Deposits, LegacyPayment, TrnsType, TrnsLkup,
Footers, FileStat, Employee, GroupLkup, FileType,
Qdros, PlanInfo, Pensions, PensionMarriage, PensionDeath,
PensionSchedule, PensionSeparate, PensionResults,
RolexV, FVarLkup, RVarLkup, States, Printers, Setup,
# Modern tables
Client, Phone, Case, Transaction, Payment, Document
)
table_counts = {
'reference': {
'TrnsType': db.query(TrnsType).count(),
'TrnsLkup': db.query(TrnsLkup).count(),
'Footers': db.query(Footers).count(),
'FileStat': db.query(FileStat).count(),
'Employee': db.query(Employee).count(),
'GroupLkup': db.query(GroupLkup).count(),
'FileType': db.query(FileType).count(),
'FVarLkup': db.query(FVarLkup).count(),
'RVarLkup': db.query(RVarLkup).count(),
'States': db.query(States).count(),
'Printers': db.query(Printers).count(),
'Setup': db.query(Setup).count(),
},
'core': {
'Rolodex': db.query(Rolodex).count(),
'LegacyPhone': db.query(LegacyPhone).count(),
'RolexV': db.query(RolexV).count(),
'LegacyFile': db.query(LegacyFile).count(),
'FilesR': db.query(FilesR).count(),
'FilesV': db.query(FilesV).count(),
'FileNots': db.query(FileNots).count(),
'Ledger': db.query(Ledger).count(),
'Deposits': db.query(Deposits).count(),
'LegacyPayment': db.query(LegacyPayment).count(),
},
'specialized': {
'PlanInfo': db.query(PlanInfo).count(),
'Qdros': db.query(Qdros).count(),
'Pensions': db.query(Pensions).count(),
'PensionMarriage': db.query(PensionMarriage).count(),
'PensionDeath': db.query(PensionDeath).count(),
'PensionSchedule': db.query(PensionSchedule).count(),
'PensionSeparate': db.query(PensionSeparate).count(),
'PensionResults': db.query(PensionResults).count(),
},
'modern': {
'Client': db.query(Client).count(),
'Phone': db.query(Phone).count(),
'Case': db.query(Case).count(),
'Transaction': db.query(Transaction).count(),
'Payment': db.query(Payment).count(),
'Document': db.query(Document).count(),
}
}
return templates.TemplateResponse("admin.html", {
"request": request,
"user": user,
"recent_imports": recent_imports,
"available_files": available_files,
"table_counts": table_counts,
"files_by_type": files_by_type,
"valid_import_types": VALID_IMPORT_TYPES
})
@app.get("/case/{case_id}")
async def case_detail(
request: Request,
case_id: int,
saved: bool = Query(False, description="Whether to show success message"),
db: Session = Depends(get_db),
):
"""
Case detail view.
Displays detailed information for a single case and its related client and
associated records (transactions, documents, payments).
"""
# Check authentication
user = get_current_user_from_session(request.session)
if not user:
return RedirectResponse(url="/login", status_code=302)
# Fetch case with related entities eagerly loaded to avoid lazy-load issues
case_obj = (
db.query(Case)
.options(
joinedload(Case.client),
joinedload(Case.transactions),
joinedload(Case.documents),
joinedload(Case.payments),
)
.filter(Case.id == case_id)
.first()
)
if not case_obj:
logger.warning("case_not_found", case_id=case_id)
# Get any errors from session and clear them
errors = request.session.pop("case_update_errors", None)
return templates.TemplateResponse(
"case.html",
{
"request": request,
"user": user,
"case": None,
"error": "Case not found",
"saved": False,
"errors": errors or [],
},
status_code=404,
)
logger.info("case_detail", case_id=case_obj.id, file_no=case_obj.file_no)
# Determine if QDRO entries exist for this case's file number
has_qdro = db.query(Qdros).filter(Qdros.file_no == case_obj.file_no).count() > 0
# Get any errors from session and clear them
errors = request.session.pop("case_update_errors", None)
# Sort transactions by date then item_no for stable display
sorted_transactions = sorted(
case_obj.transactions or [],
key=lambda t: (
t.transaction_date or datetime.min,
t.item_no or 0,
)
)
case_obj.transactions = sorted_transactions
totals = compute_case_totals_from_case(case_obj)
return templates.TemplateResponse(
"case.html",
{
"request": request,
"user": user,
"case": case_obj,
"saved": saved,
"errors": errors or [],
"totals": totals,
"has_qdro": has_qdro,
},
)
@app.post("/case/{case_id}/update")
async def case_update(
request: Request,
case_id: int,
db: Session = Depends(get_db),
) -> RedirectResponse:
"""
Update case details.
Updates the specified fields on a case and redirects back to the case detail view.
"""
# Check authentication
user = get_current_user_from_session(request.session)
if not user:
return RedirectResponse(url="/login", status_code=302)
# Get form data
form = await request.form()
# Fetch the case
case_obj = db.query(Case).filter(Case.id == case_id).first()
if not case_obj:
logger.warning("case_not_found_update", case_id=case_id)
return RedirectResponse(url=f"/case/{case_id}", status_code=302)
# Validate and process fields
errors = []
update_data = {}
# Status validation
status = form.get("status")
if status is not None:
if status not in ["active", "closed"]:
errors.append("Status must be 'active' or 'closed'")
else:
update_data["status"] = status
# Case type and description (optional)
case_type = form.get("case_type")
if case_type is not None:
update_data["case_type"] = case_type.strip() if case_type.strip() else None
description = form.get("description")
if description is not None:
update_data["description"] = description.strip() if description.strip() else None
# Date validation and parsing
open_date = form.get("open_date")
if open_date is not None:
if open_date.strip():
try:
update_data["open_date"] = datetime.strptime(open_date.strip(), "%Y-%m-%d")
except ValueError:
errors.append("Open date must be in YYYY-MM-DD format")
else:
update_data["open_date"] = None
close_date = form.get("close_date")
if close_date is not None:
if close_date.strip():
try:
update_data["close_date"] = datetime.strptime(close_date.strip(), "%Y-%m-%d")
except ValueError:
errors.append("Close date must be in YYYY-MM-DD format")
else:
update_data["close_date"] = None
# If there are validation errors, redirect back with errors
if errors:
# Store errors in session for display on the case page
request.session["case_update_errors"] = errors
return RedirectResponse(url=f"/case/{case_id}", status_code=302)
# Apply updates
try:
changed_fields = {}
for field, value in update_data.items():
old_value = getattr(case_obj, field)
if old_value != value:
changed_fields[field] = {"old": old_value, "new": value}
setattr(case_obj, field, value)
db.commit()
logger.info(
"case_update",
case_id=case_id,
changed_fields=list(update_data.keys()),
changed_details=changed_fields,
)
# Clear any previous errors from session
request.session.pop("case_update_errors", None)
return RedirectResponse(url=f"/case/{case_id}?saved=1", status_code=302)
except Exception as e:
db.rollback()
logger.error("case_update_failed", case_id=case_id, error=str(e))
# Store error in session for display
request.session["case_update_errors"] = ["Failed to save changes. Please try again."]
return RedirectResponse(url=f"/case/{case_id}", status_code=302)
@app.post("/case/{case_id}/close")
async def case_close(
request: Request,
case_id: int,
db: Session = Depends(get_db),
) -> RedirectResponse:
"""
Close a case.
Sets the case status to 'closed' and sets close_date to current date if not already set.
"""
# Check authentication
user = get_current_user_from_session(request.session)
if not user:
return RedirectResponse(url="/login", status_code=302)
# Fetch the case
case_obj = db.query(Case).filter(Case.id == case_id).first()
if not case_obj:
logger.warning("case_not_found_close", case_id=case_id)
return RedirectResponse(url=f"/case/{case_id}", status_code=302)
# Update case
try:
case_obj.status = "closed"
# Only set close_date if it's not already set
if not case_obj.close_date:
case_obj.close_date = datetime.now()
db.commit()
logger.info("case_closed", case_id=case_id, close_date=case_obj.close_date.isoformat() if case_obj.close_date else None)
return RedirectResponse(url=f"/case/{case_id}?saved=1", status_code=302)
except Exception as e:
db.rollback()
logger.error("case_close_failed", case_id=case_id, error=str(e))
# Store error in session for display
request.session["case_update_errors"] = ["Failed to close case. Please try again."]
return RedirectResponse(url=f"/case/{case_id}", status_code=302)
@app.post("/case/{case_id}/reopen")
async def case_reopen(
request: Request,
case_id: int,
db: Session = Depends(get_db),
) -> RedirectResponse:
"""
Reopen a case.
Sets the case status to 'active' and clears the close_date.
"""
# Check authentication
user = get_current_user_from_session(request.session)
if not user:
return RedirectResponse(url="/login", status_code=302)
# Fetch the case
case_obj = db.query(Case).filter(Case.id == case_id).first()
if not case_obj:
logger.warning("case_not_found_reopen", case_id=case_id)
return RedirectResponse(url=f"/case/{case_id}", status_code=302)
# Update case
try:
case_obj.status = "active"
case_obj.close_date = None
db.commit()
logger.info("case_reopened", case_id=case_id)
return RedirectResponse(url=f"/case/{case_id}?saved=1", status_code=302)
except Exception as e:
db.rollback()
logger.error("case_reopen_failed", case_id=case_id, error=str(e))
# Store error in session for display
request.session["case_update_errors"] = ["Failed to reopen case. Please try again."]
return RedirectResponse(url=f"/case/{case_id}", status_code=302)
@app.get("/rolodex")
async def rolodex_list(
request: Request,
q: str | None = Query(None, description="Search by name or company"),
phone: str | None = Query(None, description="Search by phone contains"),
page: int = Query(1, ge=1, description="Page number (1-indexed)"),
page_size: int = Query(20, ge=1, le=100, description="Results per page"),
sort_key: str | None = Query(None, description="Sort column key"),
sort_dir: str | None = Query(None, description="Sort direction (asc/desc)"),
db: Session = Depends(get_db),
):
"""
Rolodex list with simple search and pagination.
Filters clients by name/company and optional phone substring.
"""
user = get_current_user_from_session(request.session)
if not user:
return RedirectResponse(url="/login", status_code=302)
allowed_sort_keys = {
"name": "Name",
"company": "Company",
"address": "Address",
"city": "City",
"state": "State",
"zip": "ZIP",
"phones": "Phones",
"updated": "Updated",
}
sort_defaults = {
"name": "asc",
"company": "asc",
"address": "asc",
"city": "asc",
"state": "asc",
"zip": "asc",
"phones": "asc",
"updated": "desc",
}
session_sort = request.session.get("rolodex_sort") or {}
if not isinstance(session_sort, dict):
session_sort = {}
raw_sort_key = sort_key.lower().strip() if isinstance(sort_key, str) else None
chosen_sort_key = raw_sort_key or session_sort.get("key") or "name"
if chosen_sort_key not in allowed_sort_keys:
chosen_sort_key = "name"
default_direction = sort_defaults[chosen_sort_key]
raw_sort_dir = sort_dir.lower().strip() if isinstance(sort_dir, str) else None
chosen_sort_dir = raw_sort_dir or session_sort.get("direction") or default_direction
if chosen_sort_dir not in {"asc", "desc"}:
chosen_sort_dir = default_direction
request.session["rolodex_sort"] = {"key": chosen_sort_key, "direction": chosen_sort_dir}
# Eager-load phones to avoid N+1 in template
query = db.query(Client).options(joinedload(Client.phones))
if q:
like = f"%{q}%"
query = query.filter(
or_(
Client.first_name.ilike(like),
Client.last_name.ilike(like),
Client.company.ilike(like),
)
)
if phone:
like_phone = f"%{phone}%"
# Use EXISTS over join to avoid duplicate rows
query = query.filter(Client.phones.any(Phone.phone_number.ilike(like_phone)))
phone_sort_expr = (
select(sa_func.min(Phone.phone_number))
.where(Phone.client_id == Client.id)
.correlate(Client)
.scalar_subquery()
)
updated_sort_expr = sa_func.coalesce(Client.updated_at, Client.created_at)
order_map: dict[str, dict[str, list[Any]]] = {
"name": {
"asc": [
Client.last_name.is_(None),
Client.last_name.asc(),
Client.first_name.is_(None),
Client.first_name.asc(),
],
"desc": [
Client.last_name.is_(None),
Client.last_name.desc(),
Client.first_name.is_(None),
Client.first_name.desc(),
],
},
"company": {
"asc": [Client.company.is_(None), Client.company.asc()],
"desc": [Client.company.is_(None), Client.company.desc()],
},
"address": {
"asc": [Client.address.is_(None), Client.address.asc()],
"desc": [Client.address.is_(None), Client.address.desc()],
},
"city": {
"asc": [Client.city.is_(None), Client.city.asc()],
"desc": [Client.city.is_(None), Client.city.desc()],
},
"state": {
"asc": [Client.state.is_(None), Client.state.asc()],
"desc": [Client.state.is_(None), Client.state.desc()],
},
"zip": {
"asc": [Client.zip_code.is_(None), Client.zip_code.asc()],
"desc": [Client.zip_code.is_(None), Client.zip_code.desc()],
},
"phones": {
"asc": [phone_sort_expr.is_(None), phone_sort_expr.asc()],
"desc": [phone_sort_expr.is_(None), phone_sort_expr.desc()],
},
"updated": {
"asc": [updated_sort_expr.is_(None), updated_sort_expr.asc()],
"desc": [updated_sort_expr.is_(None), updated_sort_expr.desc()],
},
}
query = query.order_by(*order_map[chosen_sort_key][chosen_sort_dir])
total: int = query.count()
total_pages: int = (total + page_size - 1) // page_size if total > 0 else 1
if page > total_pages:
page = total_pages
offset = (page - 1) * page_size
clients = query.offset(offset).limit(page_size).all()
start_page = max(1, page - 2)
end_page = min(total_pages, page + 2)
page_numbers = list(range(start_page, end_page + 1))
logger.info(
"rolodex_render",
query=q,
phone=phone,
page=page,
page_size=page_size,
total=total,
sort_key=chosen_sort_key,
sort_dir=chosen_sort_dir,
)
return templates.TemplateResponse(
"rolodex.html",
{
"request": request,
"user": user,
"clients": clients,
"q": q,
"phone": phone,
"page": page,
"page_size": page_size,
"total": total,
"total_pages": total_pages,
"page_numbers": page_numbers,
"start_index": (offset + 1) if total > 0 else 0,
"end_index": min(offset + len(clients), total),
"enable_bulk": True,
"sort_key": chosen_sort_key,
"sort_dir": chosen_sort_dir,
"sort_labels": allowed_sort_keys,
},
)
@app.get("/rolodex/new")
async def rolodex_new(
request: Request,
prefill: str | None = Query(None, alias="_prefill"),
db: Session = Depends(get_db),
):
user = get_current_user_from_session(request.session)
if not user:
return RedirectResponse(url="/login", status_code=302)
client = None
if prefill:
try:
client_id = int(prefill)
client = db.query(Client).filter(Client.id == client_id).first()
except ValueError:
client = None
return templates.TemplateResponse("rolodex_edit.html", {"request": request, "user": user, "client": client})
@app.get("/rolodex/{client_id}/edit")
async def rolodex_edit(client_id: int, request: Request, db: Session = Depends(get_db)):
user = get_current_user_from_session(request.session)
if not user:
return RedirectResponse(url="/login", status_code=302)
client = db.query(Client).filter(Client.id == client_id).first()
if not client:
raise HTTPException(status_code=404, detail="Client not found")
return templates.TemplateResponse("rolodex_edit.html", {"request": request, "user": user, "client": client})
@app.get("/rolodex/{client_id}")
async def rolodex_view(client_id: int, request: Request, db: Session = Depends(get_db)):
user = get_current_user_from_session(request.session)
if not user:
return RedirectResponse(url="/login", status_code=302)
client = (
db.query(Client)
.options(joinedload(Client.phones), joinedload(Client.cases))
.filter(Client.id == client_id)
.first()
)
if not client:
raise HTTPException(status_code=404, detail="Client not found")
return templates.TemplateResponse("rolodex_view.html", {"request": request, "user": user, "client": client})
@app.post("/rolodex/create")
async def rolodex_create(
request: Request,
prefix: str = Form(None),
first_name: str = Form(None),
middle_name: str = Form(None),
last_name: str = Form(None),
suffix: str = Form(None),
title: str = Form(None),
company: str = Form(None),
address: str = Form(None),
city: str = Form(None),
state: str = Form(None),
zip_code: str = Form(None),
group: str = Form(None),
email: str = Form(None),
dob: str = Form(None),
ssn: str = Form(None),
legal_status: str = Form(None),
memo: str = Form(None),
rolodex_id: str = Form(None),
db: Session = Depends(get_db),
):
user = get_current_user_from_session(request.session)
if not user:
return RedirectResponse(url="/login", status_code=302)
# Parse date
dob_dt = None
if dob and dob.strip():
try:
dob_dt = datetime.strptime(dob.strip(), "%Y-%m-%d").date()
except ValueError:
dob_dt = None
client = Client(
prefix=(prefix or "").strip() or None,
first_name=(first_name or "").strip() or None,
middle_name=(middle_name or "").strip() or None,
last_name=(last_name or "").strip() or None,
suffix=(suffix or "").strip() or None,
title=(title or "").strip() or None,
company=(company or "").strip() or None,
address=(address or "").strip() or None,
city=(city or "").strip() or None,
state=(state or "").strip() or None,
zip_code=(zip_code or "").strip() or None,
group=(group or "").strip() or None,
email=(email or "").strip() or None,
dob=dob_dt,
ssn=(ssn or "").strip() or None,
legal_status=(legal_status or "").strip() or None,
memo=(memo or "").strip() or None,
rolodex_id=(rolodex_id or "").strip() or None,
)
db.add(client)
db.commit()
db.refresh(client)
logger.info("rolodex_create", client_id=client.id, rolodex_id=client.rolodex_id)
return RedirectResponse(url=f"/rolodex/{client.id}", status_code=302)
@app.post("/rolodex/{client_id}/update")
async def rolodex_update(
client_id: int,
request: Request,
prefix: str = Form(None),
first_name: str = Form(None),
middle_name: str = Form(None),
last_name: str = Form(None),
suffix: str = Form(None),
title: str = Form(None),
company: str = Form(None),
address: str = Form(None),
city: str = Form(None),
state: str = Form(None),
zip_code: str = Form(None),
group: str = Form(None),
email: str = Form(None),
dob: str = Form(None),
ssn: str = Form(None),
legal_status: str = Form(None),
memo: str = Form(None),
rolodex_id: str = Form(None),
db: Session = Depends(get_db),
):
user = get_current_user_from_session(request.session)
if not user:
return RedirectResponse(url="/login", status_code=302)
client = db.query(Client).filter(Client.id == client_id).first()
if not client:
raise HTTPException(status_code=404, detail="Client not found")
client.prefix = (prefix or "").strip() or None
client.first_name = (first_name or "").strip() or None
client.middle_name = (middle_name or "").strip() or None
client.last_name = (last_name or "").strip() or None
client.suffix = (suffix or "").strip() or None
client.title = (title or "").strip() or None
client.company = (company or "").strip() or None
client.address = (address or "").strip() or None
client.city = (city or "").strip() or None
client.state = (state or "").strip() or None
client.zip_code = (zip_code or "").strip() or None
client.group = (group or "").strip() or None
client.email = (email or "").strip() or None
if dob and dob.strip():
try:
client.dob = datetime.strptime(dob.strip(), "%Y-%m-%d").date()
except ValueError:
pass
client.ssn = (ssn or "").strip() or None
client.legal_status = (legal_status or "").strip() or None
client.memo = (memo or "").strip() or None
client.rolodex_id = (rolodex_id or "").strip() or None
db.commit()
logger.info(
"rolodex_update",
client_id=client.id,
fields={
"first_name": client.first_name,
"last_name": client.last_name,
"company": client.company,
"rolodex_id": client.rolodex_id,
},
)
return RedirectResponse(url=f"/rolodex/{client.id}", status_code=302)
@app.post("/rolodex/{client_id}/delete")
async def rolodex_delete(client_id: int, request: Request, db: Session = Depends(get_db)):
user = get_current_user_from_session(request.session)
if not user:
return RedirectResponse(url="/login", status_code=302)
client = db.query(Client).filter(Client.id == client_id).first()
if not client:
raise HTTPException(status_code=404, detail="Client not found")
db.delete(client)
db.commit()
logger.info("rolodex_delete", client_id=client_id)
return RedirectResponse(url="/rolodex", status_code=302)
@app.post("/rolodex/{client_id}/phone/add")
async def rolodex_add_phone(
client_id: int,
request: Request,
phone_number: str = Form(...),
phone_type: str = Form(None),
db: Session = Depends(get_db),
):
user = get_current_user_from_session(request.session)
if not user:
return RedirectResponse(url="/login", status_code=302)
client = db.query(Client).filter(Client.id == client_id).first()
if not client:
raise HTTPException(status_code=404, detail="Client not found")
phone = Phone(
client_id=client.id,
phone_number=(phone_number or "").strip(),
phone_type=(phone_type or "").strip() or None,
)
db.add(phone)
db.commit()
logger.info("rolodex_phone_add", client_id=client.id, phone_id=phone.id, number=phone.phone_number)
return RedirectResponse(url=f"/rolodex/{client.id}", status_code=302)
@app.post("/rolodex/{client_id}/phone/{phone_id}/delete")
async def rolodex_delete_phone(client_id: int, phone_id: int, request: Request, db: Session = Depends(get_db)):
user = get_current_user_from_session(request.session)
if not user:
return RedirectResponse(url="/login", status_code=302)
phone = db.query(Phone).filter(Phone.id == phone_id, Phone.client_id == client_id).first()
if not phone:
raise HTTPException(status_code=404, detail="Phone not found")
db.delete(phone)
db.commit()
logger.info("rolodex_phone_delete", client_id=client_id, phone_id=phone_id)
return RedirectResponse(url=f"/rolodex/{client_id}", status_code=302)
@app.get("/payments")
async def payments_search(
request: Request,
from_date: str | None = Query(None, description="YYYY-MM-DD"),
to_date: str | None = Query(None, description="YYYY-MM-DD"),
file_no: str | None = Query(None, description="Case file number"),
rolodex_id: str | None = Query(None, description="Legacy client Id"),
q: str | None = Query(None, description="Description contains"),
page: int = Query(1, ge=1),
page_size: int = Query(50, ge=1, le=200),
db: Session = Depends(get_db),
):
user = get_current_user_from_session(request.session)
if not user:
return RedirectResponse(url="/login", status_code=302)
query = (
db.query(Payment)
.join(Case, Payment.case_id == Case.id)
.join(Client, Case.client_id == Client.id)
.order_by(Payment.payment_date.desc().nulls_last(), Payment.id.desc())
)
filters = []
if from_date:
try:
dt = datetime.strptime(from_date, "%Y-%m-%d")
filters.append(Payment.payment_date >= dt)
except ValueError:
pass
if to_date:
try:
dt = datetime.strptime(to_date, "%Y-%m-%d")
filters.append(Payment.payment_date <= dt)
except ValueError:
pass
if file_no:
filters.append(Case.file_no.ilike(f"%{file_no}%"))
if rolodex_id:
filters.append(Client.rolodex_id.ilike(f"%{rolodex_id}%"))
if q:
filters.append(Payment.description.ilike(f"%{q}%"))
if filters:
query = query.filter(and_(*filters))
total = query.count()
total_pages = (total + page_size - 1) // page_size if total > 0 else 1
if page > total_pages:
page = total_pages
offset = (page - 1) * page_size
payments = query.offset(offset).limit(page_size).all()
# Totals for current result page
page_total_amount = sum(p.amount or 0 for p in payments)
logger.info(
"payments_render",
from_date=from_date,
to_date=to_date,
file_no=file_no,
rolodex_id=rolodex_id,
q=q,
total=total,
)
return templates.TemplateResponse(
"payments.html",
{
"request": request,
"user": user,
"payments": payments,
"from_date": from_date,
"to_date": to_date,
"file_no": file_no,
"rolodex_id": rolodex_id,
"q": q,
"page": page,
"page_size": page_size,
"total": total,
"total_pages": total_pages,
"start_index": (offset + 1) if total > 0 else 0,
"end_index": min(offset + len(payments), total),
"page_total_amount": page_total_amount,
},
)
@app.post("/reports/phone-book")
async def phone_book_report_post(request: Request):
"""Accepts selected client IDs from forms and redirects to GET for rendering."""
user = get_current_user_from_session(request.session)
if not user:
return RedirectResponse(url="/login", status_code=302)
form = await request.form()
client_ids = form.getlist("client_ids")
if not client_ids:
return RedirectResponse(url="/rolodex", status_code=302)
ids_param = "&".join([f"client_ids={cid}" for cid in client_ids])
return RedirectResponse(url=f"/reports/phone-book?{ids_param}", status_code=302)
@app.get("/reports/phone-book")
async def phone_book_report(
request: Request,
client_ids: List[int] | None = Query(None),
q: str | None = Query(None, description="Filter by name/company"),
format: str | None = Query(None, description="csv or pdf for export"),
db: Session = Depends(get_db),
):
user = get_current_user_from_session(request.session)
if not user:
return RedirectResponse(url="/login", status_code=302)
query = db.query(Client).options(joinedload(Client.phones))
if client_ids:
query = query.filter(Client.id.in_(client_ids))
elif q:
like = f"%{q}%"
query = query.filter(
or_(Client.first_name.ilike(like), Client.last_name.ilike(like), Client.company.ilike(like))
)
clients = query.order_by(Client.last_name.asc().nulls_last(), Client.first_name.asc().nulls_last()).all()
if format == "csv":
# Build CSV output
output = StringIO()
writer = csv.writer(output)
writer.writerow(["Last", "First", "Company", "Phone Type", "Phone Number"])
for c in clients:
if c.phones:
for p in c.phones:
writer.writerow([
c.last_name or "",
c.first_name or "",
c.company or "",
p.phone_type or "",
p.phone_number or "",
])
else:
writer.writerow([c.last_name or "", c.first_name or "", c.company or "", "", ""])
csv_bytes = output.getvalue().encode("utf-8")
return Response(
content=csv_bytes,
media_type="text/csv",
headers={"Content-Disposition": "attachment; filename=phone_book.csv"},
)
if format == "pdf":
pdf_bytes = build_phone_book_pdf(clients)
return Response(
content=pdf_bytes,
media_type="application/pdf",
headers={"Content-Disposition": "attachment; filename=phone_book.pdf"},
)
logger.info("phone_book_render", count=len(clients))
return templates.TemplateResponse(
"report_phone_book.html",
{"request": request, "user": user, "clients": clients, "q": q, "client_ids": client_ids or []},
)
# ------------------------------
# Reports: Payments - Detailed
# ------------------------------
@app.get("/reports/payments-detailed")
async def payments_detailed_report(
request: Request,
from_date: str | None = Query(None, description="YYYY-MM-DD"),
to_date: str | None = Query(None, description="YYYY-MM-DD"),
file_no: str | None = Query(None, description="Case file number"),
format: str | None = Query(None, description="pdf for PDF output"),
db: Session = Depends(get_db),
):
user = get_current_user_from_session(request.session)
if not user:
return RedirectResponse(url="/login", status_code=302)
query = (
db.query(Payment)
.join(Case, Payment.case_id == Case.id)
.join(Client, Case.client_id == Client.id)
)
filters = []
if from_date:
try:
dt = datetime.strptime(from_date, "%Y-%m-%d")
filters.append(Payment.payment_date >= dt)
except ValueError:
pass
if to_date:
try:
dt = datetime.strptime(to_date, "%Y-%m-%d")
filters.append(Payment.payment_date <= dt)
except ValueError:
pass
if file_no:
filters.append(Case.file_no.ilike(f"%{file_no}%"))
if filters:
query = query.filter(and_(*filters))
# For grouping by deposit date, order by date then id
payments = (
query.options(joinedload(Payment.case).joinedload(Case.client))
.order_by(Payment.payment_date.asc().nulls_last(), Payment.id.asc())
.all()
)
if format == "pdf":
pdf_bytes = build_payments_detailed_pdf(payments)
return Response(
content=pdf_bytes,
media_type="application/pdf",
headers={"Content-Disposition": "attachment; filename=payments_detailed.pdf"},
)
# Build preview groups for template: [{date, total, items}]
groups: list[dict[str, Any]] = []
from collections import defaultdict
grouped: dict[str, list[Payment]] = defaultdict(list)
for p in payments:
key = p.payment_date.date().isoformat() if p.payment_date else "(No Date)"
grouped[key].append(p)
overall_total = sum((p.amount or 0.0) for p in payments)
for key in sorted(grouped.keys()):
items = grouped[key]
total_amt = sum((p.amount or 0.0) for p in items)
groups.append({"date": key, "total": total_amt, "items": items})
logger.info(
"payments_detailed_render",
from_date=from_date,
to_date=to_date,
file_no=file_no,
count=len(payments),
)
return templates.TemplateResponse(
"payments_detailed.html",
{
"request": request,
"user": user,
"groups": groups,
"overall_total": overall_total,
"from_date": from_date,
"to_date": to_date,
"file_no": file_no,
},
)
# ------------------------------
# Reports: Phone Book (Address + Phone)
# ------------------------------
@app.post("/reports/phone-book-address")
async def phone_book_address_post(request: Request):
"""Accept selected client IDs from forms and redirect to GET for rendering."""
user = get_current_user_from_session(request.session)
if not user:
return RedirectResponse(url="/login", status_code=302)
form = await request.form()
client_ids = form.getlist("client_ids")
if not client_ids:
return RedirectResponse(url="/rolodex", status_code=302)
ids_param = "&".join([f"client_ids={cid}" for cid in client_ids])
return RedirectResponse(url=f"/reports/phone-book-address?{ids_param}", status_code=302)
@app.get("/reports/phone-book-address")
async def phone_book_address_report(
request: Request,
client_ids: List[int] | None = Query(None),
q: str | None = Query(None, description="Filter by name/company"),
phone: str | None = Query(None, description="Phone contains"),
format: str | None = Query(None, description="csv or pdf for export"),
db: Session = Depends(get_db),
):
user = get_current_user_from_session(request.session)
if not user:
return RedirectResponse(url="/login", status_code=302)
query = db.query(Client).options(joinedload(Client.phones))
if client_ids:
query = query.filter(Client.id.in_(client_ids))
else:
if q:
like = f"%{q}%"
query = query.filter(
or_(Client.first_name.ilike(like), Client.last_name.ilike(like), Client.company.ilike(like))
)
if phone:
query = query.filter(Client.phones.any(Phone.phone_number.ilike(f"%{phone}%")))
clients = query.order_by(Client.last_name.asc().nulls_last(), Client.first_name.asc().nulls_last()).all()
if format == "csv":
# Build CSV output
output = StringIO()
writer = csv.writer(output)
writer.writerow(["Last", "First", "Company", "Address", "City", "State", "ZIP", "Phone Type", "Phone Number"])
for c in clients:
if c.phones:
for p in c.phones:
writer.writerow([
c.last_name or "",
c.first_name or "",
c.company or "",
c.address or "",
c.city or "",
c.state or "",
c.zip_code or "",
p.phone_type or "",
p.phone_number or "",
])
else:
writer.writerow([
c.last_name or "",
c.first_name or "",
c.company or "",
c.address or "",
c.city or "",
c.state or "",
c.zip_code or "",
"",
"",
])
csv_bytes = output.getvalue().encode("utf-8")
return Response(
content=csv_bytes,
media_type="text/csv",
headers={"Content-Disposition": "attachment; filename=phone_book_address.csv"},
)
if format == "pdf":
pdf_bytes = build_phone_book_address_pdf(clients)
return Response(
content=pdf_bytes,
media_type="application/pdf",
headers={"Content-Disposition": "attachment; filename=phone_book_address.pdf"},
)
logger.info("phone_book_address_render", count=len(clients))
return templates.TemplateResponse(
"report_phone_book_address.html",
{
"request": request,
"user": user,
"clients": clients,
"q": q,
"phone": phone,
"client_ids": client_ids or [],
},
)
# ------------------------------
# Reports: Envelope (PDF)
# ------------------------------
@app.post("/reports/envelope")
async def envelope_report_post(request: Request):
"""Accept selected client IDs and redirect to GET for PDF download."""
user = get_current_user_from_session(request.session)
if not user:
return RedirectResponse(url="/login", status_code=302)
form = await request.form()
client_ids = form.getlist("client_ids")
if not client_ids:
return RedirectResponse(url="/rolodex", status_code=302)
ids_param = "&".join([f"client_ids={cid}" for cid in client_ids])
return RedirectResponse(url=f"/reports/envelope?{ids_param}&format=pdf", status_code=302)
@app.get("/reports/envelope")
async def envelope_report(
request: Request,
client_ids: List[int] | None = Query(None),
q: str | None = Query(None, description="Filter by name/company"),
phone: str | None = Query(None, description="Phone contains (optional)"),
format: str | None = Query("pdf", description="pdf output only"),
db: Session = Depends(get_db),
):
user = get_current_user_from_session(request.session)
if not user:
return RedirectResponse(url="/login", status_code=302)
query = db.query(Client)
if client_ids:
query = query.filter(Client.id.in_(client_ids))
else:
if q:
like = f"%{q}%"
query = query.filter(
or_(Client.first_name.ilike(like), Client.last_name.ilike(like), Client.company.ilike(like))
)
if phone:
# include clients that have a matching phone
query = query.join(Phone, isouter=True).filter(or_(Phone.phone_number.ilike(f"%{phone}%"), Phone.id == None)).distinct() # noqa: E711
clients = query.order_by(Client.last_name.asc().nulls_last(), Client.first_name.asc().nulls_last()).all()
# Always produce PDF
pdf_bytes = build_envelope_pdf(clients)
logger.info("envelope_pdf", count=len(clients))
return Response(
content=pdf_bytes,
media_type="application/pdf",
headers={"Content-Disposition": "attachment; filename=envelopes.pdf"},
)
# ------------------------------
# Reports: Rolodex Info (PDF)
# ------------------------------
@app.post("/reports/rolodex-info")
async def rolodex_info_post(request: Request):
user = get_current_user_from_session(request.session)
if not user:
return RedirectResponse(url="/login", status_code=302)
form = await request.form()
client_ids = form.getlist("client_ids")
if not client_ids:
return RedirectResponse(url="/rolodex", status_code=302)
ids_param = "&".join([f"client_ids={cid}" for cid in client_ids])
return RedirectResponse(url=f"/reports/rolodex-info?{ids_param}&format=pdf", status_code=302)
@app.get("/reports/rolodex-info")
async def rolodex_info_report(
request: Request,
client_ids: List[int] | None = Query(None),
q: str | None = Query(None, description="Filter by name/company"),
format: str | None = Query("pdf", description="pdf output only"),
db: Session = Depends(get_db),
):
user = get_current_user_from_session(request.session)
if not user:
return RedirectResponse(url="/login", status_code=302)
query = db.query(Client).options(joinedload(Client.phones))
if client_ids:
query = query.filter(Client.id.in_(client_ids))
elif q:
like = f"%{q}%"
query = query.filter(
or_(Client.first_name.ilike(like), Client.last_name.ilike(like), Client.company.ilike(like))
)
clients = query.order_by(Client.last_name.asc().nulls_last(), Client.first_name.asc().nulls_last()).all()
pdf_bytes = build_rolodex_info_pdf(clients)
logger.info("rolodex_info_pdf", count=len(clients))
return Response(
content=pdf_bytes,
media_type="application/pdf",
headers={"Content-Disposition": "attachment; filename=rolodex_info.pdf"},
)
# ------------------------------
# JSON API: list/filter endpoints
# ------------------------------
def _apply_sorting(query, sort_by: str | None, sort_dir: str, allowed_map: dict[str, Any], default_order: list[Any]):
"""Apply validated sorting to a SQLAlchemy query.
Args:
query: Base SQLAlchemy query object
sort_by: Optional requested sort field
sort_dir: 'asc' or 'desc'
allowed_map: Map of allowed sort_by -> SQLAlchemy column or list of columns
default_order: Fallback order_by list when sort_by is not provided
Returns:
(query, applied_sort_by, applied_sort_dir)
"""
if not sort_by:
for col in default_order:
query = query.order_by(col)
return query, None, sort_dir
column_expr = allowed_map.get(sort_by)
if column_expr is None:
raise HTTPException(status_code=400, detail=f"Invalid sort_by: '{sort_by}'. Allowed: {sorted(list(allowed_map.keys()))}")
def _order(expr):
return expr.asc().nulls_last() if sort_dir == "asc" else expr.desc().nulls_last()
if isinstance(column_expr, (list, tuple)):
for expr in column_expr:
query = query.order_by(_order(expr))
else:
query = query.order_by(_order(column_expr))
return query, sort_by, sort_dir
@app.get("/api/rolodex", response_model=RolodexListResponse)
async def api_list_rolodex(
request: Request,
q: str | None = Query(None, description="Search by first/last/company contains"),
phone: str | None = Query(None, description="Phone number contains"),
rolodex_id: str | None = Query(None, description="Legacy Rolodex ID contains"),
page: int = Query(1, ge=1, description="Page number (1-indexed)"),
page_size: int = Query(20, ge=1, le=100, description="Results per page"),
sort_by: str | None = Query(None, description="Sort field: id, rolodex_id, last_name, first_name, company, created_at"),
sort_dir: str = Query("asc", description="Sort direction: asc or desc"),
db: Session = Depends(get_db),
) -> RolodexListResponse:
"""Return paginated clients with simple filters as JSON."""
user = get_current_user_from_session(request.session)
if not user:
# Middleware ensures JSON 401 for /api/*, keep explicit for clarity
raise HTTPException(status_code=401, detail="Unauthorized")
query = db.query(Client).options(joinedload(Client.phones))
if q:
like = f"%{q}%"
query = query.filter(
or_(
Client.first_name.ilike(like),
Client.last_name.ilike(like),
Client.company.ilike(like),
)
)
if phone:
query = query.filter(Client.phones.any(Phone.phone_number.ilike(f"%{phone}%")))
if rolodex_id:
query = query.filter(Client.rolodex_id.ilike(f"%{rolodex_id}%"))
# Sorting
sort_dir_norm = (sort_dir or "").lower()
if sort_dir_norm not in ("asc", "desc"):
raise HTTPException(status_code=400, detail="Invalid sort_dir. Allowed: 'asc' or 'desc'")
allowed_sort = {
"id": Client.id,
"rolodex_id": Client.rolodex_id,
"last_name": Client.last_name,
"first_name": Client.first_name,
"company": Client.company,
"created_at": Client.created_at,
}
default_order = [Client.last_name.asc().nulls_last(), Client.first_name.asc().nulls_last(), Client.id.asc()]
query, applied_sort_by, applied_sort_dir = _apply_sorting(query, sort_by, sort_dir_norm, allowed_sort, default_order)
total: int = query.count()
total_pages: int = (total + page_size - 1) // page_size if total > 0 else 1
if page > total_pages:
page = total_pages
offset = (page - 1) * page_size
clients = query.offset(offset).limit(page_size).all()
logger.info(
"api_rolodex_list",
query=q,
phone=phone,
rolodex_id=rolodex_id,
page=page,
page_size=page_size,
total=total,
sort_by=applied_sort_by,
sort_dir=applied_sort_dir,
)
items = [ClientOut.model_validate(c) for c in clients]
return RolodexListResponse(
items=items,
pagination=Pagination(page=page, page_size=page_size, total=total, total_pages=total_pages),
)
@app.get("/api/files", response_model=FilesListResponse)
async def api_list_files(
request: Request,
q: str | None = Query(None, description="Search file no/description/client name/company"),
status: str | None = Query(None, description="Case status: active or closed"),
case_type: str | None = Query(None, description="Case type contains"),
file_no: str | None = Query(None, description="File number contains"),
client_rolodex_id: str | None = Query(None, description="Legacy client Id contains"),
from_open_date: str | None = Query(None, description="Opened on/after YYYY-MM-DD"),
to_open_date: str | None = Query(None, description="Opened on/before YYYY-MM-DD"),
page: int = Query(1, ge=1, description="Page number (1-indexed)"),
page_size: int = Query(20, ge=1, le=100, description="Results per page"),
sort_by: str | None = Query(None, description="Sort field: file_no, status, case_type, description, open_date, close_date, created_at, client_last_name, client_first_name, client_company"),
sort_dir: str = Query("desc", description="Sort direction: asc or desc"),
db: Session = Depends(get_db),
) -> FilesListResponse:
"""Return paginated cases with simple filters as JSON."""
user = get_current_user_from_session(request.session)
if not user:
raise HTTPException(status_code=401, detail="Unauthorized")
query = (
db.query(Case)
.join(Client, Case.client_id == Client.id)
.options(joinedload(Case.client))
)
filters = []
if q:
like = f"%{q}%"
filters.append(
or_(
Case.file_no.ilike(like),
Case.description.ilike(like),
Client.first_name.ilike(like),
Client.last_name.ilike(like),
Client.company.ilike(like),
)
)
if status:
filters.append(Case.status.ilike(f"%{status}%"))
if case_type:
filters.append(Case.case_type.ilike(f"%{case_type}%"))
if file_no:
filters.append(Case.file_no.ilike(f"%{file_no}%"))
if client_rolodex_id:
filters.append(Client.rolodex_id.ilike(f"%{client_rolodex_id}%"))
if from_open_date:
try:
dt = datetime.strptime(from_open_date, "%Y-%m-%d")
filters.append(Case.open_date >= dt)
except ValueError:
pass
if to_open_date:
try:
dt = datetime.strptime(to_open_date, "%Y-%m-%d")
filters.append(Case.open_date <= dt)
except ValueError:
pass
if filters:
query = query.filter(and_(*filters))
# Sorting
sort_dir_norm = (sort_dir or "").lower()
if sort_dir_norm not in ("asc", "desc"):
raise HTTPException(status_code=400, detail="Invalid sort_dir. Allowed: 'asc' or 'desc'")
allowed_sort = {
"file_no": Case.file_no,
"status": Case.status,
"case_type": Case.case_type,
"description": Case.description,
"open_date": Case.open_date,
"close_date": Case.close_date,
"created_at": Case.created_at,
"client_last_name": Client.last_name,
"client_first_name": Client.first_name,
"client_company": Client.company,
"id": Case.id,
}
default_order = [Case.open_date.desc().nulls_last(), Case.created_at.desc()]
query, applied_sort_by, applied_sort_dir = _apply_sorting(query, sort_by, sort_dir_norm, allowed_sort, default_order)
total: int = query.count()
total_pages: int = (total + page_size - 1) // page_size if total > 0 else 1
if page > total_pages:
page = total_pages
offset = (page - 1) * page_size
cases = query.offset(offset).limit(page_size).all()
logger.info(
"api_files_list",
query=q,
status=status,
case_type=case_type,
file_no=file_no,
client_rolodex_id=client_rolodex_id,
page=page,
page_size=page_size,
total=total,
sort_by=applied_sort_by,
sort_dir=applied_sort_dir,
)
items = [CaseOut.model_validate(c) for c in cases]
return FilesListResponse(
items=items,
pagination=Pagination(page=page, page_size=page_size, total=total, total_pages=total_pages),
)
@app.get("/api/ledger", response_model=LedgerListResponse)
async def api_list_ledger(
request: Request,
case_id: int | None = Query(None, description="Filter by case ID"),
file_no: str | None = Query(None, description="Filter by case file number contains"),
from_date: str | None = Query(None, description="On/after YYYY-MM-DD"),
to_date: str | None = Query(None, description="On/before YYYY-MM-DD"),
billed: str | None = Query(None, description="'Y' or 'N'"),
t_code: str | None = Query(None, description="Transaction code contains"),
t_type_l: str | None = Query(None, description="Legacy type flag (e.g., C/D)"),
employee_number: str | None = Query(None, description="Employee number contains"),
q: str | None = Query(None, description="Description contains"),
page: int = Query(1, ge=1),
page_size: int = Query(50, ge=1, le=200),
sort_by: str | None = Query(None, description="Sort field: transaction_date, item_no, id, amount, billed, t_code, t_type_l, employee_number, case_file_no, case_id"),
sort_dir: str = Query("desc", description="Sort direction: asc or desc"),
db: Session = Depends(get_db),
) -> LedgerListResponse:
"""Return paginated ledger (transactions) with simple filters as JSON."""
user = get_current_user_from_session(request.session)
if not user:
raise HTTPException(status_code=401, detail="Unauthorized")
query = (
db.query(Transaction)
.join(Case, Transaction.case_id == Case.id)
.options(joinedload(Transaction.case))
)
filters = []
if case_id is not None:
filters.append(Transaction.case_id == case_id)
if file_no:
filters.append(Case.file_no.ilike(f"%{file_no}%"))
if from_date:
try:
dt = datetime.strptime(from_date, "%Y-%m-%d")
filters.append(Transaction.transaction_date >= dt)
except ValueError:
pass
if to_date:
try:
dt = datetime.strptime(to_date, "%Y-%m-%d")
filters.append(Transaction.transaction_date <= dt)
except ValueError:
pass
if billed in ("Y", "N"):
filters.append(Transaction.billed == billed)
if t_code:
filters.append(Transaction.t_code.ilike(f"%{t_code}%"))
if t_type_l:
filters.append(Transaction.t_type_l.ilike(f"%{t_type_l}%"))
if employee_number:
filters.append(Transaction.employee_number.ilike(f"%{employee_number}%"))
if q:
filters.append(Transaction.description.ilike(f"%{q}%"))
if filters:
query = query.filter(and_(*filters))
# Sorting
sort_dir_norm = (sort_dir or "").lower()
if sort_dir_norm not in ("asc", "desc"):
raise HTTPException(status_code=400, detail="Invalid sort_dir. Allowed: 'asc' or 'desc'")
allowed_sort = {
"transaction_date": Transaction.transaction_date,
"item_no": Transaction.item_no,
"id": Transaction.id,
"amount": Transaction.amount,
"billed": Transaction.billed,
"t_code": Transaction.t_code,
"t_type_l": Transaction.t_type_l,
"employee_number": Transaction.employee_number,
"case_file_no": Case.file_no,
"case_id": Transaction.case_id,
}
default_order = [
Transaction.transaction_date.desc().nulls_last(),
Transaction.item_no.asc().nulls_last(),
Transaction.id.desc(),
]
query, applied_sort_by, applied_sort_dir = _apply_sorting(query, sort_by, sort_dir_norm, allowed_sort, default_order)
total: int = query.count()
total_pages: int = (total + page_size - 1) // page_size if total > 0 else 1
if page > total_pages:
page = total_pages
offset = (page - 1) * page_size
txns = query.offset(offset).limit(page_size).all()
logger.info(
"api_ledger_list",
case_id=case_id,
file_no=file_no,
from_date=from_date,
to_date=to_date,
billed=billed,
t_code=t_code,
t_type_l=t_type_l,
employee_number=employee_number,
q=q,
page=page,
page_size=page_size,
total=total,
sort_by=applied_sort_by,
sort_dir=applied_sort_dir,
)
items = [
TransactionOut(
id=t.id,
case_id=t.case_id,
case_file_no=t.case.file_no if t.case else None,
transaction_date=t.transaction_date,
item_no=t.item_no,
amount=t.amount,
billed=t.billed,
t_code=t.t_code,
t_type_l=t.t_type_l,
quantity=t.quantity,
rate=t.rate,
description=t.description,
employee_number=t.employee_number,
)
for t in txns
]
return LedgerListResponse(
items=items,
pagination=Pagination(page=page, page_size=page_size, total=total, total_pages=total_pages),
)
# ------------------------------
# QDRO Views
# ------------------------------
@app.get("/qdro/{file_no}")
async def qdro_versions(
request: Request,
file_no: str,
db: Session = Depends(get_db),
):
user = get_current_user_from_session(request.session)
if not user:
return RedirectResponse(url="/login", status_code=302)
versions = (
db.query(Qdros)
.filter(Qdros.file_no == file_no)
.order_by(Qdros.version.asc())
.all()
)
return templates.TemplateResponse(
"qdro.html",
{
"request": request,
"user": user,
"file_no": file_no,
"versions": versions,
"qdro": None,
},
)
@app.get("/qdro/{file_no}/{version}")
async def qdro_detail(
request: Request,
file_no: str,
version: str,
db: Session = Depends(get_db),
):
user = get_current_user_from_session(request.session)
if not user:
return RedirectResponse(url="/login", status_code=302)
q = db.query(Qdros).filter(Qdros.file_no == file_no, Qdros.version == version).first()
if not q:
return RedirectResponse(url=f"/qdro/{file_no}", status_code=302)
versions = (
db.query(Qdros)
.filter(Qdros.file_no == file_no)
.order_by(Qdros.version.asc())
.all()
)
return templates.TemplateResponse(
"qdro.html",
{
"request": request,
"user": user,
"file_no": file_no,
"versions": versions,
"qdro": q,
},
)