- Added windows-1252, cp1250, iso-8859-1 to encoding fallback list - Enhanced error logging in open_text_with_fallbacks function - Improved error messages to show all attempted encodings - Added warning logs for each encoding attempt that fails This should resolve 'charmap' codec errors and other encoding issues with legacy CSV files that use different Windows codepages or ISO encodings.
3072 lines
102 KiB
Python
3072 lines
102 KiB
Python
"""
|
||
FastAPI application entry point for Delphi Database.
|
||
|
||
This module initializes the FastAPI application, sets up database connections,
|
||
and provides the main application instance.
|
||
"""
|
||
|
||
import os
|
||
import time
|
||
import csv
|
||
import json
|
||
import uuid
|
||
from contextlib import asynccontextmanager
|
||
from datetime import datetime
|
||
from typing import Optional, List, Dict, Any
|
||
from io import StringIO
|
||
|
||
from fastapi import FastAPI, Depends, Request, Query, HTTPException, UploadFile, File, Form
|
||
from fastapi.responses import RedirectResponse, Response, JSONResponse
|
||
from starlette.middleware.sessions import SessionMiddleware
|
||
from fastapi.middleware.cors import CORSMiddleware
|
||
from fastapi.staticfiles import StaticFiles
|
||
from fastapi.templating import Jinja2Templates
|
||
from sqlalchemy.orm import Session, joinedload
|
||
from sqlalchemy import or_, and_, func as sa_func
|
||
from dotenv import load_dotenv
|
||
from starlette.middleware.base import BaseHTTPMiddleware
|
||
import structlog
|
||
from structlog import contextvars as structlog_contextvars
|
||
|
||
from .database import create_tables, get_db, get_database_url
|
||
from .models import User, Case, Client, Phone, Transaction, Document, Payment, ImportLog
|
||
from .auth import authenticate_user, get_current_user_from_session
|
||
from .reporting import (
|
||
build_phone_book_pdf,
|
||
build_payments_detailed_pdf,
|
||
build_envelope_pdf,
|
||
build_phone_book_address_pdf,
|
||
build_rolodex_info_pdf,
|
||
)
|
||
from .logging_config import setup_logging
|
||
from .schemas import (
|
||
ClientOut,
|
||
PhoneOut,
|
||
CaseOut,
|
||
TransactionOut,
|
||
Pagination,
|
||
RolodexListResponse,
|
||
FilesListResponse,
|
||
LedgerListResponse,
|
||
)
|
||
|
||
# Load environment variables
|
||
load_dotenv()
|
||
|
||
# Get SECRET_KEY from environment variables
|
||
SECRET_KEY = os.getenv("SECRET_KEY")
|
||
if not SECRET_KEY:
|
||
raise ValueError("SECRET_KEY environment variable must be set")
|
||
|
||
# Configure structured logging
|
||
setup_logging()
|
||
logger = structlog.get_logger(__name__)
|
||
def open_text_with_fallbacks(file_path: str):
|
||
"""
|
||
Open a text file trying multiple encodings commonly seen in legacy CSVs.
|
||
|
||
Attempts in order: utf-8, utf-8-sig, cp1252, windows-1252, cp1250, iso-8859-1, latin-1.
|
||
|
||
Returns a tuple of (file_object, encoding_used). Caller is responsible to close file.
|
||
"""
|
||
encodings = ["utf-8", "utf-8-sig", "cp1252", "windows-1252", "cp1250", "iso-8859-1", "latin-1"]
|
||
last_error = None
|
||
for enc in encodings:
|
||
try:
|
||
f = open(file_path, 'r', encoding=enc, errors='strict', newline='')
|
||
# Try reading a tiny chunk to force decoding errors early
|
||
_ = f.read(1024)
|
||
f.seek(0)
|
||
logger.info("csv_open_encoding_selected", file=file_path, encoding=enc)
|
||
return f, enc
|
||
except Exception as e:
|
||
last_error = e
|
||
logger.warning("encoding_fallback_failed", file=file_path, encoding=enc, error=str(e))
|
||
continue
|
||
|
||
error_msg = f"Unable to open file '{file_path}' with any of the supported encodings: {', '.join(encodings)}"
|
||
if last_error:
|
||
error_msg += f". Last error: {str(last_error)}"
|
||
raise RuntimeError(error_msg)
|
||
|
||
|
||
# Configure Jinja2 templates
|
||
templates = Jinja2Templates(directory="app/templates")
|
||
|
||
|
||
class AuthMiddleware(BaseHTTPMiddleware):
|
||
"""
|
||
Simple session-based authentication middleware.
|
||
|
||
Redirects unauthenticated users to /login for protected routes.
|
||
"""
|
||
|
||
def __init__(self, app, exempt_paths: list[str] | None = None):
|
||
super().__init__(app)
|
||
self.exempt_paths = exempt_paths or []
|
||
|
||
async def dispatch(self, request, call_next):
|
||
path = request.url.path
|
||
|
||
# Allow exempt paths and static assets
|
||
if (
|
||
path in self.exempt_paths
|
||
or path.startswith("/static")
|
||
or path.startswith("/favicon")
|
||
):
|
||
return await call_next(request)
|
||
|
||
# Enforce authentication for other paths
|
||
if not request.session.get("user_id"):
|
||
# Return JSON 401 for API routes, redirect for HTML routes
|
||
if path.startswith("/api/"):
|
||
return JSONResponse(status_code=401, content={"detail": "Unauthorized"})
|
||
return RedirectResponse(url="/login", status_code=302)
|
||
|
||
return await call_next(request)
|
||
|
||
|
||
class RequestIdMiddleware(BaseHTTPMiddleware):
|
||
"""
|
||
Middleware that assigns a request_id and binds request context for logging.
|
||
|
||
Adds: request_id, http.method, http.path, user.id to the structlog context.
|
||
Emits a JSON access log with status_code and duration_ms after response.
|
||
"""
|
||
|
||
async def dispatch(self, request: Request, call_next):
|
||
start_time = time.perf_counter()
|
||
|
||
request_id = request.headers.get("X-Request-ID") or str(uuid.uuid4())
|
||
method = request.method
|
||
path = request.url.path
|
||
|
||
# user id from session if available (SessionMiddleware runs first)
|
||
user_id = request.session.get("user_id") if hasattr(request, "session") else None
|
||
|
||
structlog_contextvars.bind_contextvars(
|
||
request_id=request_id,
|
||
**{"http.method": method, "http.path": path, "user.id": user_id},
|
||
)
|
||
|
||
try:
|
||
response = await call_next(request)
|
||
status_code = response.status_code
|
||
except Exception as exc: # noqa: BLE001 - we re-raise after logging
|
||
status_code = 500
|
||
duration_ms = int((time.perf_counter() - start_time) * 1000)
|
||
logger.error(
|
||
"request",
|
||
status_code=status_code,
|
||
duration_ms=duration_ms,
|
||
exc_info=True,
|
||
)
|
||
structlog_contextvars.unbind_contextvars("request_id", "http.method", "http.path", "user.id")
|
||
raise
|
||
|
||
# Ensure response header has request id
|
||
try:
|
||
response.headers["X-Request-ID"] = request_id
|
||
except Exception:
|
||
pass
|
||
|
||
duration_ms = int((time.perf_counter() - start_time) * 1000)
|
||
logger.info(
|
||
"request",
|
||
status_code=status_code,
|
||
duration_ms=duration_ms,
|
||
)
|
||
|
||
structlog_contextvars.unbind_contextvars("request_id", "http.method", "http.path", "user.id")
|
||
return response
|
||
|
||
@asynccontextmanager
|
||
async def lifespan(app: FastAPI):
|
||
"""
|
||
Lifespan context manager for FastAPI application.
|
||
|
||
Handles startup and shutdown events:
|
||
- Creates database tables on startup
|
||
- Logs database connection info
|
||
"""
|
||
# Startup
|
||
logger.info("app_start")
|
||
|
||
# Create database tables
|
||
create_tables()
|
||
logger.info("db_tables_verified")
|
||
|
||
# Log database connection info
|
||
db_url = get_database_url()
|
||
logger.info("db_connected", database_url=db_url)
|
||
|
||
yield
|
||
|
||
# Shutdown
|
||
logger.info("app_shutdown")
|
||
|
||
|
||
# Create FastAPI application with lifespan management
|
||
app = FastAPI(
|
||
title="Delphi Database",
|
||
description="Legal case management database application",
|
||
version="1.0.0",
|
||
lifespan=lifespan
|
||
)
|
||
|
||
# Add CORS middleware for cross-origin requests
|
||
app.add_middleware(
|
||
CORSMiddleware,
|
||
allow_origins=["*"], # In production, specify allowed origins
|
||
allow_credentials=True,
|
||
allow_methods=["*"],
|
||
allow_headers=["*"],
|
||
)
|
||
|
||
# Register request logging and authentication middleware with exempt paths
|
||
EXEMPT_PATHS = ["/", "/health", "/login", "/logout"]
|
||
app.add_middleware(RequestIdMiddleware)
|
||
app.add_middleware(AuthMiddleware, exempt_paths=EXEMPT_PATHS)
|
||
|
||
# Add SessionMiddleware for session management (must be added LAST so it runs FIRST)
|
||
app.add_middleware(SessionMiddleware, secret_key=SECRET_KEY)
|
||
|
||
# Mount static files directory
|
||
app.mount("/static", StaticFiles(directory="static"), name="static")
|
||
|
||
|
||
def get_import_type_from_filename(filename: str) -> str:
|
||
"""
|
||
Determine import type based on filename pattern.
|
||
|
||
Args:
|
||
filename: Name of the uploaded CSV file
|
||
|
||
Returns:
|
||
Import type string (client, phone, case, transaction, document, payment)
|
||
"""
|
||
filename_upper = filename.upper()
|
||
# Strip extension and normalize
|
||
base = filename_upper.rsplit('.', 1)[0]
|
||
|
||
# Support files saved with explicit type prefixes (e.g., CLIENT_<uuid>.csv)
|
||
if base.startswith('CLIENT_'):
|
||
return 'client'
|
||
if base.startswith('PHONE_'):
|
||
return 'phone'
|
||
if base.startswith('CASE_'):
|
||
return 'case'
|
||
if base.startswith('TRANSACTION_'):
|
||
return 'transaction'
|
||
if base.startswith('DOCUMENT_'):
|
||
return 'document'
|
||
if base.startswith('PAYMENT_'):
|
||
return 'payment'
|
||
|
||
# Legacy/real file name patterns
|
||
if base.startswith('ROLODEX') or base.startswith('ROLEX') or 'ROLODEX' in base or 'ROLEX' in base:
|
||
return 'client'
|
||
if base.startswith('PHONE') or 'PHONE' in base:
|
||
return 'phone'
|
||
if base.startswith('FILES') or base.startswith('FILE') or 'FILES' in base:
|
||
return 'case'
|
||
if base.startswith('LEDGER') or 'LEDGER' in base or base.startswith('TRNSACTN') or 'TRNSACTN' in base:
|
||
return 'transaction'
|
||
if base.startswith('QDROS') or base.startswith('QDRO') or 'QDRO' in base:
|
||
return 'document'
|
||
if base.startswith('PAYMENTS') or base.startswith('DEPOSITS') or 'PAYMENT' in base or 'DEPOSIT' in base:
|
||
return 'payment'
|
||
|
||
raise ValueError(f"Unknown file type for filename: {filename}")
|
||
|
||
|
||
def validate_csv_headers(headers: List[str], expected_fields: Dict[str, str]) -> Dict[str, Any]:
|
||
"""
|
||
Validate CSV headers against expected model fields.
|
||
|
||
Args:
|
||
headers: List of CSV column headers
|
||
expected_fields: Dict mapping field names to descriptions
|
||
|
||
Returns:
|
||
Dict with validation results and field mapping
|
||
"""
|
||
result = {
|
||
'valid': True,
|
||
'missing_fields': [],
|
||
'field_mapping': {},
|
||
'errors': []
|
||
}
|
||
|
||
# Create mapping from CSV headers to model fields (case-insensitive)
|
||
for csv_header in headers:
|
||
csv_header_clean = csv_header.strip().lower()
|
||
matched = False
|
||
|
||
for model_field, description in expected_fields.items():
|
||
if csv_header_clean == model_field.lower():
|
||
result['field_mapping'][model_field] = csv_header
|
||
matched = True
|
||
break
|
||
|
||
if not matched:
|
||
# Try partial matches for common variations
|
||
for model_field, description in expected_fields.items():
|
||
if model_field.lower() in csv_header_clean or csv_header_clean in model_field.lower():
|
||
result['field_mapping'][model_field] = csv_header
|
||
matched = True
|
||
break
|
||
|
||
if not matched:
|
||
result['errors'].append(f"Unknown header: '{csv_header}'")
|
||
|
||
# Check for required fields (case-insensitive)
|
||
required_fields = ['id'] # Most imports need some form of ID
|
||
for required in required_fields:
|
||
found = False
|
||
for mapped_field in result['field_mapping']:
|
||
if mapped_field.lower() == required.lower():
|
||
found = True
|
||
break
|
||
if not found:
|
||
result['missing_fields'].append(required)
|
||
|
||
if result['missing_fields'] or result['errors']:
|
||
result['valid'] = False
|
||
|
||
return result
|
||
|
||
|
||
def parse_date(date_str: str) -> Optional[datetime]:
|
||
"""Parse date string into datetime object."""
|
||
if not date_str or date_str.strip() in ('', 'NULL', 'N/A'):
|
||
return None
|
||
|
||
# Try common date formats
|
||
formats = ['%Y-%m-%d', '%m/%d/%Y', '%Y/%m/%d', '%d-%m-%Y']
|
||
|
||
for fmt in formats:
|
||
try:
|
||
return datetime.strptime(date_str.strip(), fmt)
|
||
except ValueError:
|
||
continue
|
||
|
||
logger.warning("parse_date_failed", value=date_str)
|
||
return None
|
||
|
||
|
||
def parse_float(value: str) -> Optional[float]:
|
||
"""Parse string value into float."""
|
||
if not value or value.strip() in ('', 'NULL', 'N/A'):
|
||
return None
|
||
|
||
try:
|
||
return float(value.strip())
|
||
except ValueError:
|
||
logger.warning("parse_float_failed", value=value)
|
||
return None
|
||
|
||
|
||
def parse_int(value: str) -> Optional[int]:
|
||
"""Parse string value into int."""
|
||
if not value or value.strip() in ('', 'NULL', 'N/A'):
|
||
return None
|
||
|
||
try:
|
||
return int(value.strip())
|
||
except ValueError:
|
||
logger.warning("parse_int_failed", value=value)
|
||
return None
|
||
|
||
|
||
def import_rolodex_data(db: Session, file_path: str) -> Dict[str, Any]:
|
||
"""
|
||
Import ROLODEX CSV data into Client model.
|
||
|
||
Expected CSV format: Id,Prefix,First,Middle,Last,Suffix,Title,A1,A2,A3,City,Abrev,St,Zip,Email,DOB,SS#,Legal_Status,Group,Memo
|
||
"""
|
||
result = {
|
||
'success': 0,
|
||
'errors': [],
|
||
'total_rows': 0
|
||
}
|
||
|
||
expected_fields = {
|
||
'Id': 'Client ID',
|
||
'Prefix': 'Name Prefix',
|
||
'First': 'First Name',
|
||
'Middle': 'Middle Initial',
|
||
'Last': 'Last Name',
|
||
'Suffix': 'Name Suffix',
|
||
'Title': 'Company/Organization',
|
||
'A1': 'Address Line 1',
|
||
'A2': 'Address Line 2',
|
||
'A3': 'Address Line 3',
|
||
'City': 'City',
|
||
'Abrev': 'State Abbreviation',
|
||
'St': 'State',
|
||
'Zip': 'ZIP Code',
|
||
'Email': 'Email Address',
|
||
'DOB': 'Date of Birth',
|
||
'SS#': 'Social Security Number',
|
||
'Legal_Status': 'Legal Status',
|
||
'Group': 'Group',
|
||
'Memo': 'Memo/Notes'
|
||
}
|
||
|
||
try:
|
||
f, used_encoding = open_text_with_fallbacks(file_path)
|
||
with f as file:
|
||
reader = csv.DictReader(file)
|
||
|
||
# Validate headers
|
||
headers = reader.fieldnames or []
|
||
validation = validate_csv_headers(headers, expected_fields)
|
||
|
||
if not validation['valid']:
|
||
result['errors'].append(f"Header validation failed: {validation['errors']}")
|
||
return result
|
||
|
||
for row_num, row in enumerate(reader, start=2): # Start at 2 (header is row 1)
|
||
result['total_rows'] += 1
|
||
|
||
try:
|
||
# Extract and clean data
|
||
rolodex_id = row.get('Id', '').strip()
|
||
if not rolodex_id:
|
||
result['errors'].append(f"Row {row_num}: Missing client ID")
|
||
continue
|
||
|
||
# Check for existing client
|
||
existing = db.query(Client).filter(Client.rolodex_id == rolodex_id).first()
|
||
if existing:
|
||
result['errors'].append(f"Row {row_num}: Client with ID '{rolodex_id}' already exists")
|
||
continue
|
||
|
||
client = Client(
|
||
rolodex_id=rolodex_id,
|
||
first_name=row.get('First', '').strip() or None,
|
||
middle_initial=row.get('Middle', '').strip() or None,
|
||
last_name=row.get('Last', '').strip() or None,
|
||
company=row.get('Title', '').strip() or None,
|
||
address=row.get('A1', '').strip() or None,
|
||
city=row.get('City', '').strip() or None,
|
||
state=row.get('St', '').strip() or None,
|
||
zip_code=row.get('Zip', '').strip() or None
|
||
)
|
||
|
||
db.add(client)
|
||
result['success'] += 1
|
||
|
||
except Exception as e:
|
||
result['errors'].append(f"Row {row_num}: {str(e)}")
|
||
|
||
db.commit()
|
||
|
||
except Exception as e:
|
||
logger.error("rolodex_import_failed", file=file_path, error=str(e))
|
||
result['errors'].append(f"Import failed: {str(e)}")
|
||
db.rollback()
|
||
|
||
return result
|
||
|
||
|
||
def import_phone_data(db: Session, file_path: str) -> Dict[str, Any]:
|
||
"""
|
||
Import PHONE CSV data into Phone model.
|
||
|
||
Expected CSV format: Id,Phone,Location
|
||
"""
|
||
result = {
|
||
'success': 0,
|
||
'errors': [],
|
||
'total_rows': 0
|
||
}
|
||
|
||
f = None
|
||
try:
|
||
f, used_encoding = open_text_with_fallbacks(file_path)
|
||
reader = csv.DictReader(f)
|
||
|
||
headers = reader.fieldnames or []
|
||
if len(headers) < 2:
|
||
result['errors'].append("Invalid CSV format: expected at least 2 columns")
|
||
return result
|
||
|
||
for row_num, row in enumerate(reader, start=2):
|
||
result['total_rows'] += 1
|
||
|
||
try:
|
||
client_id = row.get('Id', '').strip()
|
||
if not client_id:
|
||
result['errors'].append(f"Row {row_num}: Missing client ID")
|
||
continue
|
||
|
||
# Find the client
|
||
client = db.query(Client).filter(Client.rolodex_id == client_id).first()
|
||
if not client:
|
||
result['errors'].append(f"Row {row_num}: Client with ID '{client_id}' not found")
|
||
continue
|
||
|
||
phone_number = row.get('Phone', '').strip()
|
||
if not phone_number:
|
||
result['errors'].append(f"Row {row_num}: Missing phone number")
|
||
continue
|
||
|
||
phone = Phone(
|
||
client_id=client.id,
|
||
phone_type=row.get('Location', '').strip() or 'primary',
|
||
phone_number=phone_number
|
||
)
|
||
|
||
db.add(phone)
|
||
result['success'] += 1
|
||
|
||
except Exception as e:
|
||
result['errors'].append(f"Row {row_num}: {str(e)}")
|
||
|
||
db.commit()
|
||
|
||
except Exception as e:
|
||
result['errors'].append(f"Import failed: {str(e)}")
|
||
db.rollback()
|
||
finally:
|
||
if f:
|
||
f.close()
|
||
|
||
return result
|
||
|
||
|
||
def import_files_data(db: Session, file_path: str) -> Dict[str, Any]:
|
||
"""
|
||
Import FILES CSV data into Case model.
|
||
|
||
Expected CSV format: File_No,Id,File_Type,Regarding,Opened,Closed,Empl_Num,Rate_Per_Hour,Status,Footer_Code,Opposing,Hours,Hours_P,Trust_Bal,Trust_Bal_P,Hourly_Fees,Hourly_Fees_P,Flat_Fees,Flat_Fees_P,Disbursements,Disbursements_P,Credit_Bal,Credit_Bal_P,Total_Charges,Total_Charges_P,Amount_Owing,Amount_Owing_P,Transferable,Memo
|
||
"""
|
||
result = {
|
||
'success': 0,
|
||
'errors': [],
|
||
'total_rows': 0
|
||
}
|
||
|
||
expected_fields = {
|
||
'File_No': 'File Number',
|
||
'Status': 'Status',
|
||
'File_Type': 'File Type',
|
||
'Regarding': 'Regarding',
|
||
'Opened': 'Opened Date',
|
||
'Closed': 'Closed Date',
|
||
'Id': 'Client ID',
|
||
'Empl_Num': 'Employee Number',
|
||
'Rate_Per_Hour': 'Rate Per Hour',
|
||
'Footer_Code': 'Footer Code',
|
||
'Opposing': 'Opposing Party',
|
||
'Hours': 'Hours',
|
||
'Hours_P': 'Hours (Previous)',
|
||
'Trust_Bal': 'Trust Balance',
|
||
'Trust_Bal_P': 'Trust Balance (Previous)',
|
||
'Hourly_Fees': 'Hourly Fees',
|
||
'Hourly_Fees_P': 'Hourly Fees (Previous)',
|
||
'Flat_Fees': 'Flat Fees',
|
||
'Flat_Fees_P': 'Flat Fees (Previous)',
|
||
'Disbursements': 'Disbursements',
|
||
'Disbursements_P': 'Disbursements (Previous)',
|
||
'Credit_Bal': 'Credit Balance',
|
||
'Credit_Bal_P': 'Credit Balance (Previous)',
|
||
'Total_Charges': 'Total Charges',
|
||
'Total_Charges_P': 'Total Charges (Previous)',
|
||
'Amount_Owing': 'Amount Owing',
|
||
'Amount_Owing_P': 'Amount Owing (Previous)',
|
||
'Transferable': 'Transferable',
|
||
'Memo': 'Memo'
|
||
}
|
||
|
||
f = None
|
||
try:
|
||
f, used_encoding = open_text_with_fallbacks(file_path)
|
||
reader = csv.DictReader(f)
|
||
|
||
headers = reader.fieldnames or []
|
||
validation = validate_csv_headers(headers, expected_fields)
|
||
|
||
if not validation['valid']:
|
||
result['errors'].append(f"Header validation failed: {validation['errors']}")
|
||
return result
|
||
|
||
for row_num, row in enumerate(reader, start=2):
|
||
result['total_rows'] += 1
|
||
|
||
try:
|
||
file_no = row.get('File_No', '').strip()
|
||
if not file_no:
|
||
result['errors'].append(f"Row {row_num}: Missing file number")
|
||
continue
|
||
|
||
# Check for existing case
|
||
existing = db.query(Case).filter(Case.file_no == file_no).first()
|
||
if existing:
|
||
result['errors'].append(f"Row {row_num}: Case with file number '{file_no}' already exists")
|
||
continue
|
||
|
||
# Find client by ID
|
||
client_id = row.get('Id', '').strip()
|
||
client = None
|
||
if client_id:
|
||
client = db.query(Client).filter(Client.rolodex_id == client_id).first()
|
||
if not client:
|
||
result['errors'].append(f"Row {row_num}: Client with ID '{client_id}' not found")
|
||
continue
|
||
|
||
case = Case(
|
||
file_no=file_no,
|
||
client_id=client.id if client else None,
|
||
status=row.get('Status', '').strip() or 'active',
|
||
case_type=row.get('File_Type', '').strip() or None,
|
||
description=row.get('Regarding', '').strip() or None,
|
||
open_date=parse_date(row.get('Opened', '')),
|
||
close_date=parse_date(row.get('Closed', ''))
|
||
)
|
||
|
||
db.add(case)
|
||
result['success'] += 1
|
||
|
||
except Exception as e:
|
||
result['errors'].append(f"Row {row_num}: {str(e)}")
|
||
|
||
db.commit()
|
||
|
||
except Exception as e:
|
||
result['errors'].append(f"Import failed: {str(e)}")
|
||
db.rollback()
|
||
finally:
|
||
if f:
|
||
f.close()
|
||
|
||
return result
|
||
|
||
|
||
def import_ledger_data(db: Session, file_path: str) -> Dict[str, Any]:
|
||
"""
|
||
Import LEDGER CSV data into Transaction model.
|
||
|
||
Expected CSV format: File_No,Date,Item_No,Empl_Num,T_Code,T_Type,T_Type_L,Quantity,Rate,Amount,Billed,Note
|
||
"""
|
||
result = {
|
||
'success': 0,
|
||
'errors': [],
|
||
'total_rows': 0
|
||
}
|
||
|
||
f = None
|
||
try:
|
||
f, used_encoding = open_text_with_fallbacks(file_path)
|
||
reader = csv.DictReader(f)
|
||
|
||
headers = reader.fieldnames or []
|
||
if len(headers) < 3:
|
||
result['errors'].append("Invalid CSV format: expected at least 3 columns")
|
||
return result
|
||
|
||
for row_num, row in enumerate(reader, start=2):
|
||
result['total_rows'] += 1
|
||
|
||
try:
|
||
file_no = row.get('File_No', '').strip()
|
||
if not file_no:
|
||
result['errors'].append(f"Row {row_num}: Missing file number")
|
||
continue
|
||
|
||
# Find the case
|
||
case = db.query(Case).filter(Case.file_no == file_no).first()
|
||
if not case:
|
||
result['errors'].append(f"Row {row_num}: Case with file number '{file_no}' not found")
|
||
continue
|
||
|
||
amount = parse_float(row.get('Amount', '0'))
|
||
if amount is None:
|
||
result['errors'].append(f"Row {row_num}: Invalid amount")
|
||
continue
|
||
|
||
tx_date = parse_date(row.get('Date', ''))
|
||
item_no = parse_int(row.get('Item_No', '') or '')
|
||
# ensure unique item_no per date by increment
|
||
# temp session-less check via while loop
|
||
desired_item_no = item_no if item_no is not None else 1
|
||
while True:
|
||
exists = (
|
||
db.query(Transaction)
|
||
.filter(
|
||
Transaction.case_id == case.id,
|
||
Transaction.transaction_date == tx_date,
|
||
Transaction.item_no == desired_item_no,
|
||
)
|
||
.first()
|
||
)
|
||
if not exists:
|
||
break
|
||
desired_item_no += 1
|
||
|
||
transaction = Transaction(
|
||
case_id=case.id,
|
||
transaction_date=tx_date,
|
||
transaction_type=(row.get('T_Type', '').strip() or None),
|
||
t_type_l=(row.get('T_Type_L', '').strip().upper() or None),
|
||
amount=amount,
|
||
description=(row.get('Note', '').strip() or None),
|
||
reference=(row.get('Item_No', '').strip() or None),
|
||
item_no=desired_item_no,
|
||
employee_number=(row.get('Empl_Num', '').strip() or None),
|
||
t_code=(row.get('T_Code', '').strip().upper() or None),
|
||
quantity=parse_float(row.get('Quantity', '')),
|
||
rate=parse_float(row.get('Rate', '')),
|
||
billed=((row.get('Billed', '') or '').strip().upper() or None),
|
||
)
|
||
|
||
db.add(transaction)
|
||
result['success'] += 1
|
||
|
||
except Exception as e:
|
||
result['errors'].append(f"Row {row_num}: {str(e)}")
|
||
|
||
db.commit()
|
||
|
||
except Exception as e:
|
||
result['errors'].append(f"Import failed: {str(e)}")
|
||
db.rollback()
|
||
finally:
|
||
if f:
|
||
f.close()
|
||
|
||
return result
|
||
|
||
|
||
def import_qdros_data(db: Session, file_path: str) -> Dict[str, Any]:
|
||
"""
|
||
Import QDROS CSV data into Document model.
|
||
|
||
Expected CSV format: File_No,Document_Type,Description,File_Name,Date
|
||
"""
|
||
result = {
|
||
'success': 0,
|
||
'errors': [],
|
||
'total_rows': 0
|
||
}
|
||
|
||
f = None
|
||
try:
|
||
f, used_encoding = open_text_with_fallbacks(file_path)
|
||
reader = csv.DictReader(f)
|
||
|
||
headers = reader.fieldnames or []
|
||
if len(headers) < 2:
|
||
result['errors'].append("Invalid CSV format: expected at least 2 columns")
|
||
return result
|
||
|
||
for row_num, row in enumerate(reader, start=2):
|
||
result['total_rows'] += 1
|
||
|
||
try:
|
||
file_no = row.get('File_No', '').strip()
|
||
if not file_no:
|
||
result['errors'].append(f"Row {row_num}: Missing file number")
|
||
continue
|
||
|
||
# Find the case
|
||
case = db.query(Case).filter(Case.file_no == file_no).first()
|
||
if not case:
|
||
result['errors'].append(f"Row {row_num}: Case with file number '{file_no}' not found")
|
||
continue
|
||
|
||
document = Document(
|
||
case_id=case.id,
|
||
document_type=row.get('Document_Type', '').strip() or 'QDRO',
|
||
file_name=row.get('File_Name', '').strip() or None,
|
||
description=row.get('Description', '').strip() or None,
|
||
uploaded_date=parse_date(row.get('Date', ''))
|
||
)
|
||
|
||
db.add(document)
|
||
result['success'] += 1
|
||
|
||
except Exception as e:
|
||
result['errors'].append(f"Row {row_num}: {str(e)}")
|
||
|
||
db.commit()
|
||
|
||
except Exception as e:
|
||
result['errors'].append(f"Import failed: {str(e)}")
|
||
db.rollback()
|
||
finally:
|
||
if f:
|
||
f.close()
|
||
|
||
return result
|
||
|
||
|
||
def import_payments_data(db: Session, file_path: str) -> Dict[str, Any]:
|
||
"""
|
||
Import PAYMENTS CSV data into Payment model.
|
||
|
||
Expected CSV format: File_No,Date,Amount,Type,Description,Check_Number
|
||
"""
|
||
result = {
|
||
'success': 0,
|
||
'errors': [],
|
||
'total_rows': 0
|
||
}
|
||
|
||
f = None
|
||
try:
|
||
f, used_encoding = open_text_with_fallbacks(file_path)
|
||
reader = csv.DictReader(f)
|
||
|
||
headers = reader.fieldnames or []
|
||
if len(headers) < 2:
|
||
result['errors'].append("Invalid CSV format: expected at least 2 columns")
|
||
return result
|
||
|
||
for row_num, row in enumerate(reader, start=2):
|
||
result['total_rows'] += 1
|
||
|
||
try:
|
||
file_no = row.get('File_No', '').strip()
|
||
if not file_no:
|
||
result['errors'].append(f"Row {row_num}: Missing file number")
|
||
continue
|
||
|
||
# Find the case
|
||
case = db.query(Case).filter(Case.file_no == file_no).first()
|
||
if not case:
|
||
result['errors'].append(f"Row {row_num}: Case with file number '{file_no}' not found")
|
||
continue
|
||
|
||
amount = parse_float(row.get('Amount', '0'))
|
||
if amount is None:
|
||
result['errors'].append(f"Row {row_num}: Invalid amount")
|
||
continue
|
||
|
||
payment = Payment(
|
||
case_id=case.id,
|
||
payment_date=parse_date(row.get('Date', '')),
|
||
payment_type=row.get('Type', '').strip() or None,
|
||
amount=amount,
|
||
description=row.get('Description', '').strip() or None,
|
||
check_number=row.get('Check_Number', '').strip() or None
|
||
)
|
||
|
||
db.add(payment)
|
||
result['success'] += 1
|
||
|
||
except Exception as e:
|
||
result['errors'].append(f"Row {row_num}: {str(e)}")
|
||
|
||
db.commit()
|
||
|
||
except Exception as e:
|
||
result['errors'].append(f"Import failed: {str(e)}")
|
||
db.rollback()
|
||
finally:
|
||
if f:
|
||
f.close()
|
||
|
||
return result
|
||
|
||
|
||
def process_csv_import(db: Session, import_type: str, file_path: str) -> Dict[str, Any]:
|
||
"""
|
||
Process CSV import based on type.
|
||
|
||
Args:
|
||
db: Database session
|
||
import_type: Type of import (client, phone, case, transaction, document, payment)
|
||
file_path: Path to CSV file
|
||
|
||
Returns:
|
||
Dict with import results
|
||
"""
|
||
import_functions = {
|
||
'client': import_rolodex_data,
|
||
'phone': import_phone_data,
|
||
'case': import_files_data,
|
||
'transaction': import_ledger_data,
|
||
'document': import_qdros_data,
|
||
'payment': import_payments_data
|
||
}
|
||
|
||
import_func = import_functions.get(import_type)
|
||
if not import_func:
|
||
return {
|
||
'success': 0,
|
||
'errors': [f"Unknown import type: {import_type}"],
|
||
'total_rows': 0
|
||
}
|
||
|
||
return import_func(db, file_path)
|
||
|
||
|
||
# ------------------------------
|
||
# Ledger CRUD and helpers
|
||
# ------------------------------
|
||
|
||
def validate_ledger_fields(
|
||
*,
|
||
transaction_date: Optional[str],
|
||
t_code: Optional[str],
|
||
employee_number: Optional[str],
|
||
quantity: Optional[str],
|
||
rate: Optional[str],
|
||
amount: Optional[str],
|
||
billed: Optional[str],
|
||
) -> tuple[list[str], dict[str, Any]]:
|
||
"""Validate incoming ledger form fields and return (errors, parsed_values)."""
|
||
errors: list[str] = []
|
||
parsed: dict[str, Any] = {}
|
||
|
||
# Date
|
||
tx_dt = parse_date(transaction_date or "") if transaction_date is not None else None
|
||
if tx_dt is None:
|
||
errors.append("Date is required and must be valid")
|
||
else:
|
||
parsed["transaction_date"] = tx_dt
|
||
|
||
# T_Code
|
||
if t_code is None or not t_code.strip():
|
||
errors.append("T_Code is required")
|
||
else:
|
||
parsed["t_code"] = t_code.strip().upper()
|
||
|
||
# Employee number
|
||
if employee_number is None or not employee_number.strip():
|
||
errors.append("Empl_Num is required")
|
||
else:
|
||
parsed["employee_number"] = employee_number.strip()
|
||
|
||
# Quantity, Rate, Amount
|
||
qty = parse_float(quantity or "") if quantity is not None else None
|
||
rt = parse_float(rate or "") if rate is not None else None
|
||
amt = parse_float(amount or "") if amount is not None else None
|
||
|
||
if qty is not None:
|
||
parsed["quantity"] = qty
|
||
if rt is not None:
|
||
parsed["rate"] = rt
|
||
|
||
# Auto-compute amount if missing but quantity and rate present
|
||
if amt is None and qty is not None and rt is not None:
|
||
amt = round(qty * rt, 2)
|
||
if amt is None:
|
||
errors.append("Amount is required or derivable from Quantity × Rate")
|
||
else:
|
||
parsed["amount"] = amt
|
||
|
||
# Billed flag
|
||
billed_flag = (billed or "").strip().upper() if billed is not None else ""
|
||
if billed_flag not in ("Y", "N"):
|
||
errors.append("Billed must be 'Y' or 'N'")
|
||
else:
|
||
parsed["billed"] = billed_flag
|
||
|
||
return errors, parsed
|
||
|
||
|
||
def next_unique_item_no(db: Session, case_id: int, tx_date: datetime, desired_item_no: Optional[int]) -> int:
|
||
"""Ensure (transaction_date, item_no) uniqueness per case by incrementing if needed."""
|
||
# Start at provided item_no or at 1 if missing
|
||
item_no = int(desired_item_no) if desired_item_no is not None else 1
|
||
while True:
|
||
exists = (
|
||
db.query(Transaction)
|
||
.filter(
|
||
Transaction.case_id == case_id,
|
||
Transaction.transaction_date == tx_date,
|
||
Transaction.item_no == item_no,
|
||
)
|
||
.first()
|
||
)
|
||
if not exists:
|
||
return item_no
|
||
item_no += 1
|
||
|
||
|
||
def compute_case_totals_from_case(case_obj: Case) -> Dict[str, float]:
|
||
"""
|
||
Compute simple totals for a case from its transactions.
|
||
|
||
Returns billed, unbilled, and total sums. Amounts are treated as positive;
|
||
future enhancement could apply sign based on t_type_l.
|
||
"""
|
||
billed_total = 0.0
|
||
unbilled_total = 0.0
|
||
overall_total = 0.0
|
||
|
||
for t in (case_obj.transactions or []):
|
||
amt = float(t.amount) if t.amount is not None else 0.0
|
||
overall_total += amt
|
||
if (t.billed or '').upper() == 'Y':
|
||
billed_total += amt
|
||
else:
|
||
unbilled_total += amt
|
||
|
||
return {
|
||
'billed_total': round(billed_total, 2),
|
||
'unbilled_total': round(unbilled_total, 2),
|
||
'overall_total': round(overall_total, 2),
|
||
}
|
||
|
||
|
||
def compute_case_totals_for_case_id(db: Session, case_id: int) -> Dict[str, float]:
|
||
"""
|
||
Compute billed, unbilled, and overall totals for a case by ID.
|
||
|
||
This uses a simple in-Python aggregation over the case's transactions to
|
||
avoid SQL portability issues and to keep the logic consistent with
|
||
compute_case_totals_from_case.
|
||
"""
|
||
billed_total = 0.0
|
||
unbilled_total = 0.0
|
||
overall_total = 0.0
|
||
|
||
transactions: List[Transaction] = (
|
||
db.query(Transaction).filter(Transaction.case_id == case_id).all()
|
||
)
|
||
for t in transactions:
|
||
amt = float(t.amount) if t.amount is not None else 0.0
|
||
overall_total += amt
|
||
if ((t.billed or '').upper()) == 'Y':
|
||
billed_total += amt
|
||
else:
|
||
unbilled_total += amt
|
||
|
||
return {
|
||
'billed_total': round(billed_total, 2),
|
||
'unbilled_total': round(unbilled_total, 2),
|
||
'overall_total': round(overall_total, 2),
|
||
}
|
||
|
||
|
||
def _ledger_keys_from_tx(tx: Optional["Transaction"]) -> Dict[str, Any]:
|
||
"""
|
||
Extract identifying keys for a ledger transaction for audit logs.
|
||
"""
|
||
if tx is None:
|
||
return {}
|
||
return {
|
||
'transaction_id': getattr(tx, 'id', None),
|
||
'case_id': getattr(tx, 'case_id', None),
|
||
'item_no': getattr(tx, 'item_no', None),
|
||
'transaction_date': getattr(tx, 'transaction_date', None),
|
||
't_code': getattr(tx, 't_code', None),
|
||
't_type_l': getattr(tx, 't_type_l', None),
|
||
'employee_number': getattr(tx, 'employee_number', None),
|
||
'billed': getattr(tx, 'billed', None),
|
||
'amount': getattr(tx, 'amount', None),
|
||
}
|
||
|
||
|
||
def _log_ledger_audit(
|
||
*,
|
||
action: str,
|
||
user: "User",
|
||
case_id: int,
|
||
keys: Dict[str, Any],
|
||
pre: Dict[str, float],
|
||
post: Dict[str, float],
|
||
) -> None:
|
||
"""
|
||
Emit a structured audit log line for ledger mutations including user, action,
|
||
identifiers, and pre/post balances with deltas.
|
||
"""
|
||
delta = {
|
||
'billed_total': round((post.get('billed_total', 0.0) - pre.get('billed_total', 0.0)), 2),
|
||
'unbilled_total': round((post.get('unbilled_total', 0.0) - pre.get('unbilled_total', 0.0)), 2),
|
||
'overall_total': round((post.get('overall_total', 0.0) - pre.get('overall_total', 0.0)), 2),
|
||
}
|
||
|
||
logger.info(
|
||
"ledger_audit",
|
||
action=action,
|
||
user_id=getattr(user, 'id', None),
|
||
user_username=getattr(user, 'username', None),
|
||
case_id=case_id,
|
||
keys=keys,
|
||
pre_balances=pre,
|
||
post_balances=post,
|
||
delta_balances=delta,
|
||
)
|
||
|
||
|
||
@app.post("/case/{case_id}/ledger")
|
||
async def ledger_create(
|
||
request: Request,
|
||
case_id: int,
|
||
db: Session = Depends(get_db),
|
||
):
|
||
user = get_current_user_from_session(request.session)
|
||
if not user:
|
||
return RedirectResponse(url="/login", status_code=302)
|
||
|
||
form = await request.form()
|
||
|
||
# Pre-mutation totals for audit
|
||
pre_totals = compute_case_totals_for_case_id(db, case_id)
|
||
|
||
# Validate
|
||
errors, parsed = validate_ledger_fields(
|
||
transaction_date=form.get("transaction_date"),
|
||
t_code=form.get("t_code"),
|
||
employee_number=form.get("employee_number"),
|
||
quantity=form.get("quantity"),
|
||
rate=form.get("rate"),
|
||
amount=form.get("amount"),
|
||
billed=form.get("billed"),
|
||
)
|
||
|
||
if errors:
|
||
request.session["case_update_errors"] = errors
|
||
return RedirectResponse(url=f"/case/{case_id}", status_code=302)
|
||
|
||
# Ensure case exists
|
||
case_obj = db.query(Case).filter(Case.id == case_id).first()
|
||
if not case_obj:
|
||
request.session["case_update_errors"] = ["Case not found"]
|
||
return RedirectResponse(url=f"/case/{case_id}", status_code=302)
|
||
|
||
# Assign optional fields
|
||
t_type = (form.get("transaction_type") or "").strip() or None
|
||
t_type_l = (form.get("t_type_l") or "").strip().upper() or None
|
||
reference = (form.get("reference") or "").strip() or None
|
||
desc = (form.get("description") or "").strip() or None
|
||
|
||
desired_item_no = parse_int(form.get("item_no") or "")
|
||
item_no = next_unique_item_no(db, case_id, parsed["transaction_date"], desired_item_no)
|
||
|
||
try:
|
||
tx = Transaction(
|
||
case_id=case_id,
|
||
transaction_date=parsed["transaction_date"],
|
||
transaction_type=t_type,
|
||
t_type_l=t_type_l,
|
||
amount=parsed["amount"],
|
||
description=desc,
|
||
reference=reference,
|
||
item_no=item_no,
|
||
employee_number=parsed["employee_number"],
|
||
t_code=parsed["t_code"],
|
||
quantity=parsed.get("quantity"),
|
||
rate=parsed.get("rate"),
|
||
billed=parsed["billed"],
|
||
)
|
||
db.add(tx)
|
||
db.commit()
|
||
# Post-mutation totals and audit log
|
||
post_totals = compute_case_totals_for_case_id(db, case_id)
|
||
_log_ledger_audit(
|
||
action="create",
|
||
user=user,
|
||
case_id=case_id,
|
||
keys=_ledger_keys_from_tx(tx),
|
||
pre=pre_totals,
|
||
post=post_totals,
|
||
)
|
||
logger.info("ledger_create", case_id=case_id, transaction_id=tx.id)
|
||
return RedirectResponse(url=f"/case/{case_id}?saved=1", status_code=302)
|
||
except Exception as e:
|
||
db.rollback()
|
||
logger.error("ledger_create_failed", case_id=case_id, error=str(e))
|
||
request.session["case_update_errors"] = ["Failed to create ledger entry"]
|
||
return RedirectResponse(url=f"/case/{case_id}", status_code=302)
|
||
|
||
|
||
@app.post("/case/{case_id}/ledger/{tx_id}")
|
||
async def ledger_update(
|
||
request: Request,
|
||
case_id: int,
|
||
tx_id: int,
|
||
db: Session = Depends(get_db),
|
||
):
|
||
user = get_current_user_from_session(request.session)
|
||
if not user:
|
||
return RedirectResponse(url="/login", status_code=302)
|
||
|
||
form = await request.form()
|
||
|
||
tx = db.query(Transaction).filter(Transaction.id == tx_id, Transaction.case_id == case_id).first()
|
||
if not tx:
|
||
request.session["case_update_errors"] = ["Ledger entry not found"]
|
||
return RedirectResponse(url=f"/case/{case_id}", status_code=302)
|
||
|
||
# Pre-mutation totals for audit
|
||
pre_totals = compute_case_totals_for_case_id(db, case_id)
|
||
|
||
errors, parsed = validate_ledger_fields(
|
||
transaction_date=form.get("transaction_date"),
|
||
t_code=form.get("t_code"),
|
||
employee_number=form.get("employee_number"),
|
||
quantity=form.get("quantity"),
|
||
rate=form.get("rate"),
|
||
amount=form.get("amount"),
|
||
billed=form.get("billed"),
|
||
)
|
||
if errors:
|
||
request.session["case_update_errors"] = errors
|
||
return RedirectResponse(url=f"/case/{case_id}", status_code=302)
|
||
|
||
try:
|
||
tx.transaction_date = parsed["transaction_date"]
|
||
# Ensure uniqueness of (date, item_no)
|
||
desired_item_no = parse_int(form.get("item_no") or "") or tx.item_no
|
||
tx.item_no = next_unique_item_no(db, case_id, parsed["transaction_date"], desired_item_no)
|
||
|
||
tx.t_code = parsed["t_code"]
|
||
tx.employee_number = parsed["employee_number"]
|
||
tx.quantity = parsed.get("quantity")
|
||
tx.rate = parsed.get("rate")
|
||
tx.amount = parsed["amount"]
|
||
tx.billed = parsed["billed"]
|
||
tx.transaction_type = (form.get("transaction_type") or "").strip() or None
|
||
tx.t_type_l = (form.get("t_type_l") or "").strip().upper() or None
|
||
tx.reference = (form.get("reference") or "").strip() or None
|
||
tx.description = (form.get("description") or "").strip() or None
|
||
|
||
db.commit()
|
||
# Post-mutation totals and audit log
|
||
post_totals = compute_case_totals_for_case_id(db, case_id)
|
||
_log_ledger_audit(
|
||
action="update",
|
||
user=user,
|
||
case_id=case_id,
|
||
keys=_ledger_keys_from_tx(tx),
|
||
pre=pre_totals,
|
||
post=post_totals,
|
||
)
|
||
logger.info("ledger_update", case_id=case_id, transaction_id=tx.id)
|
||
return RedirectResponse(url=f"/case/{case_id}?saved=1", status_code=302)
|
||
except Exception as e:
|
||
db.rollback()
|
||
logger.error("ledger_update_failed", case_id=case_id, tx_id=tx_id, error=str(e))
|
||
request.session["case_update_errors"] = ["Failed to update ledger entry"]
|
||
return RedirectResponse(url=f"/case/{case_id}", status_code=302)
|
||
|
||
|
||
@app.post("/case/{case_id}/ledger/{tx_id}/delete")
|
||
async def ledger_delete(
|
||
request: Request,
|
||
case_id: int,
|
||
tx_id: int,
|
||
db: Session = Depends(get_db),
|
||
):
|
||
user = get_current_user_from_session(request.session)
|
||
if not user:
|
||
return RedirectResponse(url="/login", status_code=302)
|
||
|
||
tx = db.query(Transaction).filter(Transaction.id == tx_id, Transaction.case_id == case_id).first()
|
||
if not tx:
|
||
request.session["case_update_errors"] = ["Ledger entry not found"]
|
||
return RedirectResponse(url=f"/case/{case_id}", status_code=302)
|
||
|
||
try:
|
||
# Capture pre-mutation totals and keys for audit before deletion
|
||
pre_totals = compute_case_totals_for_case_id(db, case_id)
|
||
tx_keys = _ledger_keys_from_tx(tx)
|
||
db.delete(tx)
|
||
db.commit()
|
||
# Post-mutation totals and audit log
|
||
post_totals = compute_case_totals_for_case_id(db, case_id)
|
||
_log_ledger_audit(
|
||
action="delete",
|
||
user=user,
|
||
case_id=case_id,
|
||
keys=tx_keys,
|
||
pre=pre_totals,
|
||
post=post_totals,
|
||
)
|
||
logger.info("ledger_delete", case_id=case_id, transaction_id=tx_id)
|
||
return RedirectResponse(url=f"/case/{case_id}?saved=1", status_code=302)
|
||
except Exception as e:
|
||
db.rollback()
|
||
logger.error("ledger_delete_failed", case_id=case_id, tx_id=tx_id, error=str(e))
|
||
request.session["case_update_errors"] = ["Failed to delete ledger entry"]
|
||
return RedirectResponse(url=f"/case/{case_id}", status_code=302)
|
||
|
||
|
||
@app.get("/")
|
||
async def root():
|
||
"""
|
||
Root endpoint - serves login form for web interface.
|
||
"""
|
||
return RedirectResponse(url="/login", status_code=302)
|
||
|
||
|
||
@app.get("/health")
|
||
async def health_check(db: Session = Depends(get_db)):
|
||
"""
|
||
Health check endpoint that verifies database connectivity.
|
||
"""
|
||
try:
|
||
# Test database connection by querying user count
|
||
user_count = db.query(User).count()
|
||
return {
|
||
"status": "healthy",
|
||
"database": "connected",
|
||
"users": user_count
|
||
}
|
||
except Exception as e:
|
||
logger.error("health_check_failed", error=str(e))
|
||
return {
|
||
"status": "unhealthy",
|
||
"database": "error",
|
||
"error": str(e)
|
||
}
|
||
|
||
|
||
@app.get("/login")
|
||
async def login_form(request: Request):
|
||
"""
|
||
Display login form.
|
||
|
||
If user is already logged in, redirect to dashboard.
|
||
"""
|
||
# Check if user is already logged in
|
||
user = get_current_user_from_session(request.session)
|
||
if user:
|
||
return RedirectResponse(url="/dashboard", status_code=302)
|
||
|
||
return templates.TemplateResponse("login.html", {"request": request})
|
||
|
||
|
||
@app.post("/login")
|
||
async def login_submit(request: Request, db: Session = Depends(get_db)):
|
||
"""
|
||
Handle login form submission.
|
||
|
||
Authenticates user credentials and sets up session.
|
||
"""
|
||
form = await request.form()
|
||
username = form.get("username")
|
||
password = form.get("password")
|
||
|
||
if not username or not password:
|
||
error_message = "Username and password are required"
|
||
logger.warning("login_failed", username=username, reason="missing_credentials")
|
||
return templates.TemplateResponse("login.html", {
|
||
"request": request,
|
||
"error": error_message
|
||
})
|
||
|
||
# Authenticate user
|
||
user = authenticate_user(username, password)
|
||
if not user:
|
||
error_message = "Invalid username or password"
|
||
logger.warning("login_failed", username=username, reason="invalid_credentials")
|
||
return templates.TemplateResponse("login.html", {
|
||
"request": request,
|
||
"error": error_message
|
||
})
|
||
|
||
# Set up user session
|
||
request.session["user_id"] = user.id
|
||
request.session["user"] = {"id": user.id, "username": user.username}
|
||
|
||
# Update bound context with authenticated user id
|
||
structlog_contextvars.bind_contextvars(**{"user.id": user.id})
|
||
logger.info("login_success", username=username, **{"user.id": user.id})
|
||
|
||
# Redirect to dashboard after successful login
|
||
return RedirectResponse(url="/dashboard", status_code=302)
|
||
|
||
|
||
@app.get("/logout")
|
||
async def logout(request: Request):
|
||
"""
|
||
Handle user logout.
|
||
|
||
Clears user session and redirects to home page.
|
||
"""
|
||
username = request.session.get("user", {}).get("username", "unknown")
|
||
request.session.clear()
|
||
logger.info("logout", username=username)
|
||
|
||
return RedirectResponse(url="/", status_code=302)
|
||
|
||
|
||
@app.get("/dashboard")
|
||
async def dashboard(
|
||
request: Request,
|
||
q: str | None = Query(None, description="Search by file number or client name"),
|
||
page: int = Query(1, ge=1, description="Page number (1-indexed)"),
|
||
page_size: int = Query(20, ge=1, le=100, description="Results per page"),
|
||
db: Session = Depends(get_db),
|
||
):
|
||
"""
|
||
Dashboard page - lists recent cases with search and pagination.
|
||
|
||
- Optional query param `q` filters by case file number or client name/company
|
||
- `page` and `page_size` control pagination
|
||
"""
|
||
# Check authentication
|
||
user = get_current_user_from_session(request.session)
|
||
if not user:
|
||
return RedirectResponse(url="/login", status_code=302)
|
||
|
||
# Base query: join clients for name/company access
|
||
query = db.query(Case).join(Client).order_by(
|
||
Case.open_date.desc(),
|
||
Case.created_at.desc(),
|
||
)
|
||
|
||
# Apply search filter if provided
|
||
if q:
|
||
like_term = f"%{q}%"
|
||
query = query.filter(
|
||
or_(
|
||
Case.file_no.ilike(like_term),
|
||
Client.first_name.ilike(like_term),
|
||
Client.last_name.ilike(like_term),
|
||
Client.company.ilike(like_term),
|
||
)
|
||
)
|
||
|
||
# Total count for pagination
|
||
total: int = query.count()
|
||
|
||
# Clamp page to valid range when total is known
|
||
total_pages: int = (total + page_size - 1) // page_size if total > 0 else 1
|
||
if page > total_pages:
|
||
page = total_pages
|
||
|
||
# Pagination window
|
||
offset = (page - 1) * page_size
|
||
cases = query.offset(offset).limit(page_size).all()
|
||
|
||
# Page number window for UI (current +/- 2)
|
||
start_page = max(1, page - 2)
|
||
end_page = min(total_pages, page + 2)
|
||
page_numbers = list(range(start_page, end_page + 1))
|
||
|
||
logger.info(
|
||
"dashboard_render",
|
||
query=q,
|
||
page=page,
|
||
page_size=page_size,
|
||
total=total,
|
||
)
|
||
|
||
return templates.TemplateResponse(
|
||
"dashboard.html",
|
||
{
|
||
"request": request,
|
||
"user": user,
|
||
"cases": cases,
|
||
"q": q,
|
||
"page": page,
|
||
"page_size": page_size,
|
||
"total": total,
|
||
"total_pages": total_pages,
|
||
"page_numbers": page_numbers,
|
||
"start_index": (offset + 1) if total > 0 else 0,
|
||
"end_index": min(offset + len(cases), total),
|
||
},
|
||
)
|
||
|
||
|
||
@app.post("/admin/upload")
|
||
async def admin_upload_files(
|
||
request: Request,
|
||
files: List[UploadFile] = File(...),
|
||
db: Session = Depends(get_db)
|
||
):
|
||
"""
|
||
Handle CSV file uploads for admin panel.
|
||
|
||
Validates uploaded files are CSV format and stores them in data-import directory.
|
||
"""
|
||
# Check authentication
|
||
user = get_current_user_from_session(request.session)
|
||
if not user:
|
||
return RedirectResponse(url="/login", status_code=302)
|
||
|
||
results = []
|
||
errors = []
|
||
|
||
# Ensure data-import directory exists
|
||
import_dir = "data-import"
|
||
os.makedirs(import_dir, exist_ok=True)
|
||
|
||
for file in files:
|
||
try:
|
||
# Validate file type
|
||
if not file.filename.lower().endswith('.csv'):
|
||
errors.append(f"File '{file.filename}' is not a CSV file")
|
||
continue
|
||
|
||
# Generate unique filename to avoid conflicts
|
||
file_id = str(uuid.uuid4())
|
||
file_ext = os.path.splitext(file.filename)[1]
|
||
# Determine import type from original filename for better categorization later
|
||
try:
|
||
detected_type = get_import_type_from_filename(file.filename)
|
||
except ValueError:
|
||
detected_type = 'unknown'
|
||
|
||
# Prefix stored filename with detected type to preserve context
|
||
stored_filename = f"{detected_type}_{file_id}{file_ext}"
|
||
file_path = os.path.join(import_dir, stored_filename)
|
||
|
||
# Save file
|
||
contents = await file.read()
|
||
with open(file_path, "wb") as f:
|
||
f.write(contents)
|
||
|
||
# Use detected type (already derived from original name)
|
||
import_type = detected_type
|
||
|
||
results.append({
|
||
'filename': file.filename,
|
||
'stored_filename': stored_filename,
|
||
'import_type': import_type,
|
||
'file_path': file_path,
|
||
'size': len(contents)
|
||
})
|
||
|
||
except Exception as e:
|
||
errors.append(f"Error processing '{file.filename}': {str(e)}")
|
||
continue
|
||
|
||
# Log the upload operation
|
||
logger.info(
|
||
"admin_upload",
|
||
uploaded_count=len(results),
|
||
error_count=len(errors),
|
||
username=user.username,
|
||
)
|
||
|
||
return templates.TemplateResponse("admin.html", {
|
||
"request": request,
|
||
"user": user,
|
||
"upload_results": results,
|
||
"upload_errors": errors,
|
||
"show_upload_results": True
|
||
})
|
||
|
||
|
||
@app.post("/admin/import/{data_type}")
|
||
async def admin_import_data(
|
||
request: Request,
|
||
data_type: str,
|
||
db: Session = Depends(get_db)
|
||
):
|
||
"""
|
||
Process CSV import for specified data type.
|
||
|
||
Creates import log entry and processes the import in the background.
|
||
"""
|
||
# Check authentication
|
||
user = get_current_user_from_session(request.session)
|
||
if not user:
|
||
return RedirectResponse(url="/login", status_code=302)
|
||
|
||
# Validate data type
|
||
valid_types = ['client', 'phone', 'case', 'transaction', 'document', 'payment']
|
||
if data_type not in valid_types:
|
||
return templates.TemplateResponse("admin.html", {
|
||
"request": request,
|
||
"user": user,
|
||
"error": f"Invalid data type: {data_type}"
|
||
})
|
||
|
||
# Get form data for file selection
|
||
form = await request.form()
|
||
selected_files = form.getlist("selected_files")
|
||
|
||
if not selected_files:
|
||
return templates.TemplateResponse("admin.html", {
|
||
"request": request,
|
||
"user": user,
|
||
"error": "No files selected for import"
|
||
})
|
||
|
||
import_results = []
|
||
total_success = 0
|
||
total_errors = 0
|
||
|
||
for stored_filename in selected_files:
|
||
file_path = os.path.join("data-import", stored_filename)
|
||
|
||
if not os.path.exists(file_path):
|
||
import_results.append({
|
||
'filename': stored_filename,
|
||
'status': 'error',
|
||
'message': 'File not found'
|
||
})
|
||
total_errors += 1
|
||
continue
|
||
|
||
# Create import log entry
|
||
import_log = ImportLog(
|
||
import_type=data_type,
|
||
file_name=stored_filename,
|
||
file_path=file_path,
|
||
status="running"
|
||
)
|
||
db.add(import_log)
|
||
db.commit()
|
||
|
||
try:
|
||
# Process the import
|
||
result = process_csv_import(db, data_type, file_path)
|
||
|
||
# Update import log
|
||
import_log.status = "completed" if result['errors'] else "failed"
|
||
import_log.total_rows = result['total_rows']
|
||
import_log.success_count = result['success']
|
||
import_log.error_count = len(result['errors'])
|
||
import_log.error_details = json.dumps(result['errors'])
|
||
import_log.completed_at = datetime.now()
|
||
|
||
db.commit()
|
||
|
||
import_results.append({
|
||
'filename': stored_filename,
|
||
'status': 'success' if result['success'] > 0 else 'error',
|
||
'total_rows': result['total_rows'],
|
||
'success_count': result['success'],
|
||
'error_count': len(result['errors']),
|
||
'errors': result['errors'][:10] # Show first 10 errors
|
||
})
|
||
|
||
total_success += result['success']
|
||
total_errors += len(result['errors'])
|
||
|
||
except Exception as e:
|
||
# Update import log on error
|
||
import_log.status = "failed"
|
||
import_log.error_details = json.dumps([str(e)])
|
||
import_log.completed_at = datetime.now()
|
||
db.commit()
|
||
|
||
import_results.append({
|
||
'filename': stored_filename,
|
||
'status': 'error',
|
||
'message': str(e)
|
||
})
|
||
total_errors += 1
|
||
|
||
# Log the import operation
|
||
logger.info(
|
||
"admin_import",
|
||
import_type=data_type,
|
||
success_count=total_success,
|
||
error_count=total_errors,
|
||
username=user.username,
|
||
)
|
||
|
||
return templates.TemplateResponse("admin.html", {
|
||
"request": request,
|
||
"user": user,
|
||
"import_results": import_results,
|
||
"total_success": total_success,
|
||
"total_errors": total_errors,
|
||
"show_import_results": True
|
||
})
|
||
|
||
|
||
@app.get("/admin")
|
||
async def admin_panel(request: Request, db: Session = Depends(get_db)):
|
||
"""
|
||
Admin panel - requires authentication.
|
||
|
||
Provides administrative functions like data import and system management.
|
||
"""
|
||
# Check authentication
|
||
user = get_current_user_from_session(request.session)
|
||
if not user:
|
||
return RedirectResponse(url="/login", status_code=302)
|
||
|
||
# Get recent import history
|
||
recent_imports = db.query(ImportLog).order_by(ImportLog.created_at.desc()).limit(10).all()
|
||
|
||
# Get available files for import
|
||
import_dir = "data-import"
|
||
available_files = []
|
||
if os.path.exists(import_dir):
|
||
for filename in os.listdir(import_dir):
|
||
if filename.endswith('.csv'):
|
||
file_path = os.path.join(import_dir, filename)
|
||
file_size = os.path.getsize(file_path)
|
||
try:
|
||
import_type = get_import_type_from_filename(filename)
|
||
except ValueError:
|
||
import_type = 'unknown'
|
||
|
||
available_files.append({
|
||
'filename': filename,
|
||
'import_type': import_type,
|
||
'size': file_size,
|
||
'modified': datetime.fromtimestamp(os.path.getmtime(file_path))
|
||
})
|
||
|
||
# Group files by import type
|
||
files_by_type = {}
|
||
for file_info in available_files:
|
||
import_type = file_info['import_type']
|
||
if import_type not in files_by_type:
|
||
files_by_type[import_type] = []
|
||
files_by_type[import_type].append(file_info)
|
||
|
||
return templates.TemplateResponse("admin.html", {
|
||
"request": request,
|
||
"user": user,
|
||
"recent_imports": recent_imports,
|
||
"available_files": available_files,
|
||
"files_by_type": files_by_type
|
||
})
|
||
|
||
|
||
@app.get("/case/{case_id}")
|
||
async def case_detail(
|
||
request: Request,
|
||
case_id: int,
|
||
saved: bool = Query(False, description="Whether to show success message"),
|
||
db: Session = Depends(get_db),
|
||
):
|
||
"""
|
||
Case detail view.
|
||
|
||
Displays detailed information for a single case and its related client and
|
||
associated records (transactions, documents, payments).
|
||
"""
|
||
# Check authentication
|
||
user = get_current_user_from_session(request.session)
|
||
if not user:
|
||
return RedirectResponse(url="/login", status_code=302)
|
||
|
||
# Fetch case with related entities eagerly loaded to avoid lazy-load issues
|
||
case_obj = (
|
||
db.query(Case)
|
||
.options(
|
||
joinedload(Case.client),
|
||
joinedload(Case.transactions),
|
||
joinedload(Case.documents),
|
||
joinedload(Case.payments),
|
||
)
|
||
.filter(Case.id == case_id)
|
||
.first()
|
||
)
|
||
|
||
if not case_obj:
|
||
logger.warning("case_not_found", case_id=case_id)
|
||
# Get any errors from session and clear them
|
||
errors = request.session.pop("case_update_errors", None)
|
||
|
||
return templates.TemplateResponse(
|
||
"case.html",
|
||
{
|
||
"request": request,
|
||
"user": user,
|
||
"case": None,
|
||
"error": "Case not found",
|
||
"saved": False,
|
||
"errors": errors or [],
|
||
},
|
||
status_code=404,
|
||
)
|
||
|
||
logger.info("case_detail", case_id=case_obj.id, file_no=case_obj.file_no)
|
||
|
||
# Get any errors from session and clear them
|
||
errors = request.session.pop("case_update_errors", None)
|
||
|
||
# Sort transactions by date then item_no for stable display
|
||
sorted_transactions = sorted(
|
||
case_obj.transactions or [],
|
||
key=lambda t: (
|
||
t.transaction_date or datetime.min,
|
||
t.item_no or 0,
|
||
)
|
||
)
|
||
case_obj.transactions = sorted_transactions
|
||
|
||
totals = compute_case_totals_from_case(case_obj)
|
||
|
||
return templates.TemplateResponse(
|
||
"case.html",
|
||
{
|
||
"request": request,
|
||
"user": user,
|
||
"case": case_obj,
|
||
"saved": saved,
|
||
"errors": errors or [],
|
||
"totals": totals,
|
||
},
|
||
)
|
||
|
||
|
||
@app.post("/case/{case_id}/update")
|
||
async def case_update(
|
||
request: Request,
|
||
case_id: int,
|
||
db: Session = Depends(get_db),
|
||
) -> RedirectResponse:
|
||
"""
|
||
Update case details.
|
||
|
||
Updates the specified fields on a case and redirects back to the case detail view.
|
||
"""
|
||
# Check authentication
|
||
user = get_current_user_from_session(request.session)
|
||
if not user:
|
||
return RedirectResponse(url="/login", status_code=302)
|
||
|
||
# Get form data
|
||
form = await request.form()
|
||
|
||
# Fetch the case
|
||
case_obj = db.query(Case).filter(Case.id == case_id).first()
|
||
if not case_obj:
|
||
logger.warning("case_not_found_update", case_id=case_id)
|
||
return RedirectResponse(url=f"/case/{case_id}", status_code=302)
|
||
|
||
# Validate and process fields
|
||
errors = []
|
||
update_data = {}
|
||
|
||
# Status validation
|
||
status = form.get("status")
|
||
if status is not None:
|
||
if status not in ["active", "closed"]:
|
||
errors.append("Status must be 'active' or 'closed'")
|
||
else:
|
||
update_data["status"] = status
|
||
|
||
# Case type and description (optional)
|
||
case_type = form.get("case_type")
|
||
if case_type is not None:
|
||
update_data["case_type"] = case_type.strip() if case_type.strip() else None
|
||
|
||
description = form.get("description")
|
||
if description is not None:
|
||
update_data["description"] = description.strip() if description.strip() else None
|
||
|
||
# Date validation and parsing
|
||
open_date = form.get("open_date")
|
||
if open_date is not None:
|
||
if open_date.strip():
|
||
try:
|
||
update_data["open_date"] = datetime.strptime(open_date.strip(), "%Y-%m-%d")
|
||
except ValueError:
|
||
errors.append("Open date must be in YYYY-MM-DD format")
|
||
else:
|
||
update_data["open_date"] = None
|
||
|
||
close_date = form.get("close_date")
|
||
if close_date is not None:
|
||
if close_date.strip():
|
||
try:
|
||
update_data["close_date"] = datetime.strptime(close_date.strip(), "%Y-%m-%d")
|
||
except ValueError:
|
||
errors.append("Close date must be in YYYY-MM-DD format")
|
||
else:
|
||
update_data["close_date"] = None
|
||
|
||
# If there are validation errors, redirect back with errors
|
||
if errors:
|
||
# Store errors in session for display on the case page
|
||
request.session["case_update_errors"] = errors
|
||
return RedirectResponse(url=f"/case/{case_id}", status_code=302)
|
||
|
||
# Apply updates
|
||
try:
|
||
changed_fields = {}
|
||
for field, value in update_data.items():
|
||
old_value = getattr(case_obj, field)
|
||
if old_value != value:
|
||
changed_fields[field] = {"old": old_value, "new": value}
|
||
setattr(case_obj, field, value)
|
||
|
||
db.commit()
|
||
logger.info(
|
||
"case_update",
|
||
case_id=case_id,
|
||
changed_fields=list(update_data.keys()),
|
||
changed_details=changed_fields,
|
||
)
|
||
|
||
# Clear any previous errors from session
|
||
request.session.pop("case_update_errors", None)
|
||
|
||
return RedirectResponse(url=f"/case/{case_id}?saved=1", status_code=302)
|
||
|
||
except Exception as e:
|
||
db.rollback()
|
||
logger.error("case_update_failed", case_id=case_id, error=str(e))
|
||
|
||
# Store error in session for display
|
||
request.session["case_update_errors"] = ["Failed to save changes. Please try again."]
|
||
|
||
return RedirectResponse(url=f"/case/{case_id}", status_code=302)
|
||
|
||
|
||
@app.post("/case/{case_id}/close")
|
||
async def case_close(
|
||
request: Request,
|
||
case_id: int,
|
||
db: Session = Depends(get_db),
|
||
) -> RedirectResponse:
|
||
"""
|
||
Close a case.
|
||
|
||
Sets the case status to 'closed' and sets close_date to current date if not already set.
|
||
"""
|
||
# Check authentication
|
||
user = get_current_user_from_session(request.session)
|
||
if not user:
|
||
return RedirectResponse(url="/login", status_code=302)
|
||
|
||
# Fetch the case
|
||
case_obj = db.query(Case).filter(Case.id == case_id).first()
|
||
if not case_obj:
|
||
logger.warning("case_not_found_close", case_id=case_id)
|
||
return RedirectResponse(url=f"/case/{case_id}", status_code=302)
|
||
|
||
# Update case
|
||
try:
|
||
case_obj.status = "closed"
|
||
# Only set close_date if it's not already set
|
||
if not case_obj.close_date:
|
||
case_obj.close_date = datetime.now()
|
||
|
||
db.commit()
|
||
logger.info("case_closed", case_id=case_id, close_date=case_obj.close_date.isoformat() if case_obj.close_date else None)
|
||
|
||
return RedirectResponse(url=f"/case/{case_id}?saved=1", status_code=302)
|
||
|
||
except Exception as e:
|
||
db.rollback()
|
||
logger.error("case_close_failed", case_id=case_id, error=str(e))
|
||
|
||
# Store error in session for display
|
||
request.session["case_update_errors"] = ["Failed to close case. Please try again."]
|
||
|
||
return RedirectResponse(url=f"/case/{case_id}", status_code=302)
|
||
|
||
|
||
@app.post("/case/{case_id}/reopen")
|
||
async def case_reopen(
|
||
request: Request,
|
||
case_id: int,
|
||
db: Session = Depends(get_db),
|
||
) -> RedirectResponse:
|
||
"""
|
||
Reopen a case.
|
||
|
||
Sets the case status to 'active' and clears the close_date.
|
||
"""
|
||
# Check authentication
|
||
user = get_current_user_from_session(request.session)
|
||
if not user:
|
||
return RedirectResponse(url="/login", status_code=302)
|
||
|
||
# Fetch the case
|
||
case_obj = db.query(Case).filter(Case.id == case_id).first()
|
||
if not case_obj:
|
||
logger.warning("case_not_found_reopen", case_id=case_id)
|
||
return RedirectResponse(url=f"/case/{case_id}", status_code=302)
|
||
|
||
# Update case
|
||
try:
|
||
case_obj.status = "active"
|
||
case_obj.close_date = None
|
||
|
||
db.commit()
|
||
logger.info("case_reopened", case_id=case_id)
|
||
|
||
return RedirectResponse(url=f"/case/{case_id}?saved=1", status_code=302)
|
||
|
||
except Exception as e:
|
||
db.rollback()
|
||
logger.error("case_reopen_failed", case_id=case_id, error=str(e))
|
||
|
||
# Store error in session for display
|
||
request.session["case_update_errors"] = ["Failed to reopen case. Please try again."]
|
||
|
||
return RedirectResponse(url=f"/case/{case_id}", status_code=302)
|
||
|
||
|
||
@app.get("/rolodex")
|
||
async def rolodex_list(
|
||
request: Request,
|
||
q: str | None = Query(None, description="Search by name or company"),
|
||
phone: str | None = Query(None, description="Search by phone contains"),
|
||
page: int = Query(1, ge=1, description="Page number (1-indexed)"),
|
||
page_size: int = Query(20, ge=1, le=100, description="Results per page"),
|
||
db: Session = Depends(get_db),
|
||
):
|
||
"""
|
||
Rolodex list with simple search and pagination.
|
||
|
||
Filters clients by name/company and optional phone substring.
|
||
"""
|
||
user = get_current_user_from_session(request.session)
|
||
if not user:
|
||
return RedirectResponse(url="/login", status_code=302)
|
||
|
||
# Eager-load phones to avoid N+1 in template
|
||
query = db.query(Client).options(joinedload(Client.phones))
|
||
|
||
if q:
|
||
like = f"%{q}%"
|
||
query = query.filter(
|
||
or_(
|
||
Client.first_name.ilike(like),
|
||
Client.last_name.ilike(like),
|
||
Client.company.ilike(like),
|
||
)
|
||
)
|
||
|
||
if phone:
|
||
like_phone = f"%{phone}%"
|
||
# Use EXISTS over join to avoid duplicate rows
|
||
query = query.filter(Client.phones.any(Phone.phone_number.ilike(like_phone)))
|
||
|
||
# Order by last then first for stable display
|
||
query = query.order_by(Client.last_name.asc().nulls_last(), Client.first_name.asc().nulls_last())
|
||
|
||
total: int = query.count()
|
||
total_pages: int = (total + page_size - 1) // page_size if total > 0 else 1
|
||
if page > total_pages:
|
||
page = total_pages
|
||
|
||
offset = (page - 1) * page_size
|
||
clients = query.offset(offset).limit(page_size).all()
|
||
|
||
start_page = max(1, page - 2)
|
||
end_page = min(total_pages, page + 2)
|
||
page_numbers = list(range(start_page, end_page + 1))
|
||
|
||
logger.info(
|
||
"rolodex_render",
|
||
query=q,
|
||
phone=phone,
|
||
page=page,
|
||
page_size=page_size,
|
||
total=total,
|
||
)
|
||
|
||
return templates.TemplateResponse(
|
||
"rolodex.html",
|
||
{
|
||
"request": request,
|
||
"user": user,
|
||
"clients": clients,
|
||
"q": q,
|
||
"phone": phone,
|
||
"page": page,
|
||
"page_size": page_size,
|
||
"total": total,
|
||
"total_pages": total_pages,
|
||
"page_numbers": page_numbers,
|
||
"start_index": (offset + 1) if total > 0 else 0,
|
||
"end_index": min(offset + len(clients), total),
|
||
"enable_bulk": True,
|
||
},
|
||
)
|
||
|
||
|
||
@app.get("/rolodex/new")
|
||
async def rolodex_new(request: Request):
|
||
user = get_current_user_from_session(request.session)
|
||
if not user:
|
||
return RedirectResponse(url="/login", status_code=302)
|
||
|
||
return templates.TemplateResponse("rolodex_edit.html", {"request": request, "user": user, "client": None})
|
||
|
||
|
||
@app.get("/rolodex/{client_id}")
|
||
async def rolodex_view(client_id: int, request: Request, db: Session = Depends(get_db)):
|
||
user = get_current_user_from_session(request.session)
|
||
if not user:
|
||
return RedirectResponse(url="/login", status_code=302)
|
||
|
||
client = (
|
||
db.query(Client)
|
||
.options(joinedload(Client.phones), joinedload(Client.cases))
|
||
.filter(Client.id == client_id)
|
||
.first()
|
||
)
|
||
if not client:
|
||
raise HTTPException(status_code=404, detail="Client not found")
|
||
|
||
return templates.TemplateResponse("rolodex_view.html", {"request": request, "user": user, "client": client})
|
||
|
||
|
||
@app.post("/rolodex/create")
|
||
async def rolodex_create(
|
||
request: Request,
|
||
first_name: str = Form(None),
|
||
last_name: str = Form(None),
|
||
company: str = Form(None),
|
||
address: str = Form(None),
|
||
city: str = Form(None),
|
||
state: str = Form(None),
|
||
zip_code: str = Form(None),
|
||
rolodex_id: str = Form(None),
|
||
db: Session = Depends(get_db),
|
||
):
|
||
user = get_current_user_from_session(request.session)
|
||
if not user:
|
||
return RedirectResponse(url="/login", status_code=302)
|
||
|
||
client = Client(
|
||
first_name=(first_name or "").strip() or None,
|
||
last_name=(last_name or "").strip() or None,
|
||
company=(company or "").strip() or None,
|
||
address=(address or "").strip() or None,
|
||
city=(city or "").strip() or None,
|
||
state=(state or "").strip() or None,
|
||
zip_code=(zip_code or "").strip() or None,
|
||
rolodex_id=(rolodex_id or "").strip() or None,
|
||
)
|
||
db.add(client)
|
||
db.commit()
|
||
db.refresh(client)
|
||
logger.info("rolodex_create", client_id=client.id, rolodex_id=client.rolodex_id)
|
||
return RedirectResponse(url=f"/rolodex/{client.id}", status_code=302)
|
||
|
||
|
||
@app.post("/rolodex/{client_id}/update")
|
||
async def rolodex_update(
|
||
client_id: int,
|
||
request: Request,
|
||
first_name: str = Form(None),
|
||
last_name: str = Form(None),
|
||
company: str = Form(None),
|
||
address: str = Form(None),
|
||
city: str = Form(None),
|
||
state: str = Form(None),
|
||
zip_code: str = Form(None),
|
||
rolodex_id: str = Form(None),
|
||
db: Session = Depends(get_db),
|
||
):
|
||
user = get_current_user_from_session(request.session)
|
||
if not user:
|
||
return RedirectResponse(url="/login", status_code=302)
|
||
|
||
client = db.query(Client).filter(Client.id == client_id).first()
|
||
if not client:
|
||
raise HTTPException(status_code=404, detail="Client not found")
|
||
|
||
client.first_name = (first_name or "").strip() or None
|
||
client.last_name = (last_name or "").strip() or None
|
||
client.company = (company or "").strip() or None
|
||
client.address = (address or "").strip() or None
|
||
client.city = (city or "").strip() or None
|
||
client.state = (state or "").strip() or None
|
||
client.zip_code = (zip_code or "").strip() or None
|
||
client.rolodex_id = (rolodex_id or "").strip() or None
|
||
|
||
db.commit()
|
||
logger.info(
|
||
"rolodex_update",
|
||
client_id=client.id,
|
||
fields={
|
||
"first_name": client.first_name,
|
||
"last_name": client.last_name,
|
||
"company": client.company,
|
||
"rolodex_id": client.rolodex_id,
|
||
},
|
||
)
|
||
return RedirectResponse(url=f"/rolodex/{client.id}", status_code=302)
|
||
|
||
|
||
@app.post("/rolodex/{client_id}/delete")
|
||
async def rolodex_delete(client_id: int, request: Request, db: Session = Depends(get_db)):
|
||
user = get_current_user_from_session(request.session)
|
||
if not user:
|
||
return RedirectResponse(url="/login", status_code=302)
|
||
|
||
client = db.query(Client).filter(Client.id == client_id).first()
|
||
if not client:
|
||
raise HTTPException(status_code=404, detail="Client not found")
|
||
|
||
db.delete(client)
|
||
db.commit()
|
||
logger.info("rolodex_delete", client_id=client_id)
|
||
return RedirectResponse(url="/rolodex", status_code=302)
|
||
|
||
|
||
@app.post("/rolodex/{client_id}/phone/add")
|
||
async def rolodex_add_phone(
|
||
client_id: int,
|
||
request: Request,
|
||
phone_number: str = Form(...),
|
||
phone_type: str = Form(None),
|
||
db: Session = Depends(get_db),
|
||
):
|
||
user = get_current_user_from_session(request.session)
|
||
if not user:
|
||
return RedirectResponse(url="/login", status_code=302)
|
||
|
||
client = db.query(Client).filter(Client.id == client_id).first()
|
||
if not client:
|
||
raise HTTPException(status_code=404, detail="Client not found")
|
||
|
||
phone = Phone(
|
||
client_id=client.id,
|
||
phone_number=(phone_number or "").strip(),
|
||
phone_type=(phone_type or "").strip() or None,
|
||
)
|
||
db.add(phone)
|
||
db.commit()
|
||
logger.info("rolodex_phone_add", client_id=client.id, phone_id=phone.id, number=phone.phone_number)
|
||
return RedirectResponse(url=f"/rolodex/{client.id}", status_code=302)
|
||
|
||
|
||
@app.post("/rolodex/{client_id}/phone/{phone_id}/delete")
|
||
async def rolodex_delete_phone(client_id: int, phone_id: int, request: Request, db: Session = Depends(get_db)):
|
||
user = get_current_user_from_session(request.session)
|
||
if not user:
|
||
return RedirectResponse(url="/login", status_code=302)
|
||
|
||
phone = db.query(Phone).filter(Phone.id == phone_id, Phone.client_id == client_id).first()
|
||
if not phone:
|
||
raise HTTPException(status_code=404, detail="Phone not found")
|
||
|
||
db.delete(phone)
|
||
db.commit()
|
||
logger.info("rolodex_phone_delete", client_id=client_id, phone_id=phone_id)
|
||
return RedirectResponse(url=f"/rolodex/{client_id}", status_code=302)
|
||
|
||
|
||
@app.get("/payments")
|
||
async def payments_search(
|
||
request: Request,
|
||
from_date: str | None = Query(None, description="YYYY-MM-DD"),
|
||
to_date: str | None = Query(None, description="YYYY-MM-DD"),
|
||
file_no: str | None = Query(None, description="Case file number"),
|
||
rolodex_id: str | None = Query(None, description="Legacy client Id"),
|
||
q: str | None = Query(None, description="Description contains"),
|
||
page: int = Query(1, ge=1),
|
||
page_size: int = Query(50, ge=1, le=200),
|
||
db: Session = Depends(get_db),
|
||
):
|
||
user = get_current_user_from_session(request.session)
|
||
if not user:
|
||
return RedirectResponse(url="/login", status_code=302)
|
||
|
||
query = (
|
||
db.query(Payment)
|
||
.join(Case, Payment.case_id == Case.id)
|
||
.join(Client, Case.client_id == Client.id)
|
||
.order_by(Payment.payment_date.desc().nulls_last(), Payment.id.desc())
|
||
)
|
||
|
||
filters = []
|
||
if from_date:
|
||
try:
|
||
dt = datetime.strptime(from_date, "%Y-%m-%d")
|
||
filters.append(Payment.payment_date >= dt)
|
||
except ValueError:
|
||
pass
|
||
if to_date:
|
||
try:
|
||
dt = datetime.strptime(to_date, "%Y-%m-%d")
|
||
filters.append(Payment.payment_date <= dt)
|
||
except ValueError:
|
||
pass
|
||
if file_no:
|
||
filters.append(Case.file_no.ilike(f"%{file_no}%"))
|
||
if rolodex_id:
|
||
filters.append(Client.rolodex_id.ilike(f"%{rolodex_id}%"))
|
||
if q:
|
||
filters.append(Payment.description.ilike(f"%{q}%"))
|
||
|
||
if filters:
|
||
query = query.filter(and_(*filters))
|
||
|
||
total = query.count()
|
||
total_pages = (total + page_size - 1) // page_size if total > 0 else 1
|
||
if page > total_pages:
|
||
page = total_pages
|
||
offset = (page - 1) * page_size
|
||
payments = query.offset(offset).limit(page_size).all()
|
||
|
||
# Totals for current result page
|
||
page_total_amount = sum(p.amount or 0 for p in payments)
|
||
|
||
logger.info(
|
||
"payments_render",
|
||
from_date=from_date,
|
||
to_date=to_date,
|
||
file_no=file_no,
|
||
rolodex_id=rolodex_id,
|
||
q=q,
|
||
total=total,
|
||
)
|
||
|
||
return templates.TemplateResponse(
|
||
"payments.html",
|
||
{
|
||
"request": request,
|
||
"user": user,
|
||
"payments": payments,
|
||
"from_date": from_date,
|
||
"to_date": to_date,
|
||
"file_no": file_no,
|
||
"rolodex_id": rolodex_id,
|
||
"q": q,
|
||
"page": page,
|
||
"page_size": page_size,
|
||
"total": total,
|
||
"total_pages": total_pages,
|
||
"start_index": (offset + 1) if total > 0 else 0,
|
||
"end_index": min(offset + len(payments), total),
|
||
"page_total_amount": page_total_amount,
|
||
},
|
||
)
|
||
|
||
|
||
@app.post("/reports/phone-book")
|
||
async def phone_book_report_post(request: Request):
|
||
"""Accepts selected client IDs from forms and redirects to GET for rendering."""
|
||
user = get_current_user_from_session(request.session)
|
||
if not user:
|
||
return RedirectResponse(url="/login", status_code=302)
|
||
|
||
form = await request.form()
|
||
client_ids = form.getlist("client_ids")
|
||
if not client_ids:
|
||
return RedirectResponse(url="/rolodex", status_code=302)
|
||
|
||
ids_param = "&".join([f"client_ids={cid}" for cid in client_ids])
|
||
return RedirectResponse(url=f"/reports/phone-book?{ids_param}", status_code=302)
|
||
|
||
|
||
@app.get("/reports/phone-book")
|
||
async def phone_book_report(
|
||
request: Request,
|
||
client_ids: List[int] | None = Query(None),
|
||
q: str | None = Query(None, description="Filter by name/company"),
|
||
format: str | None = Query(None, description="csv or pdf for export"),
|
||
db: Session = Depends(get_db),
|
||
):
|
||
user = get_current_user_from_session(request.session)
|
||
if not user:
|
||
return RedirectResponse(url="/login", status_code=302)
|
||
|
||
query = db.query(Client).options(joinedload(Client.phones))
|
||
if client_ids:
|
||
query = query.filter(Client.id.in_(client_ids))
|
||
elif q:
|
||
like = f"%{q}%"
|
||
query = query.filter(
|
||
or_(Client.first_name.ilike(like), Client.last_name.ilike(like), Client.company.ilike(like))
|
||
)
|
||
|
||
clients = query.order_by(Client.last_name.asc().nulls_last(), Client.first_name.asc().nulls_last()).all()
|
||
|
||
if format == "csv":
|
||
# Build CSV output
|
||
output = StringIO()
|
||
writer = csv.writer(output)
|
||
writer.writerow(["Last", "First", "Company", "Phone Type", "Phone Number"])
|
||
for c in clients:
|
||
if c.phones:
|
||
for p in c.phones:
|
||
writer.writerow([
|
||
c.last_name or "",
|
||
c.first_name or "",
|
||
c.company or "",
|
||
p.phone_type or "",
|
||
p.phone_number or "",
|
||
])
|
||
else:
|
||
writer.writerow([c.last_name or "", c.first_name or "", c.company or "", "", ""])
|
||
csv_bytes = output.getvalue().encode("utf-8")
|
||
return Response(
|
||
content=csv_bytes,
|
||
media_type="text/csv",
|
||
headers={"Content-Disposition": "attachment; filename=phone_book.csv"},
|
||
)
|
||
|
||
if format == "pdf":
|
||
pdf_bytes = build_phone_book_pdf(clients)
|
||
return Response(
|
||
content=pdf_bytes,
|
||
media_type="application/pdf",
|
||
headers={"Content-Disposition": "attachment; filename=phone_book.pdf"},
|
||
)
|
||
|
||
logger.info("phone_book_render", count=len(clients))
|
||
return templates.TemplateResponse(
|
||
"report_phone_book.html",
|
||
{"request": request, "user": user, "clients": clients, "q": q, "client_ids": client_ids or []},
|
||
)
|
||
|
||
|
||
# ------------------------------
|
||
# Reports: Payments - Detailed
|
||
# ------------------------------
|
||
|
||
@app.get("/reports/payments-detailed")
|
||
async def payments_detailed_report(
|
||
request: Request,
|
||
from_date: str | None = Query(None, description="YYYY-MM-DD"),
|
||
to_date: str | None = Query(None, description="YYYY-MM-DD"),
|
||
file_no: str | None = Query(None, description="Case file number"),
|
||
format: str | None = Query(None, description="pdf for PDF output"),
|
||
db: Session = Depends(get_db),
|
||
):
|
||
user = get_current_user_from_session(request.session)
|
||
if not user:
|
||
return RedirectResponse(url="/login", status_code=302)
|
||
|
||
query = (
|
||
db.query(Payment)
|
||
.join(Case, Payment.case_id == Case.id)
|
||
.join(Client, Case.client_id == Client.id)
|
||
)
|
||
|
||
filters = []
|
||
if from_date:
|
||
try:
|
||
dt = datetime.strptime(from_date, "%Y-%m-%d")
|
||
filters.append(Payment.payment_date >= dt)
|
||
except ValueError:
|
||
pass
|
||
if to_date:
|
||
try:
|
||
dt = datetime.strptime(to_date, "%Y-%m-%d")
|
||
filters.append(Payment.payment_date <= dt)
|
||
except ValueError:
|
||
pass
|
||
if file_no:
|
||
filters.append(Case.file_no.ilike(f"%{file_no}%"))
|
||
|
||
if filters:
|
||
query = query.filter(and_(*filters))
|
||
|
||
# For grouping by deposit date, order by date then id
|
||
payments = (
|
||
query.options(joinedload(Payment.case).joinedload(Case.client))
|
||
.order_by(Payment.payment_date.asc().nulls_last(), Payment.id.asc())
|
||
.all()
|
||
)
|
||
|
||
if format == "pdf":
|
||
pdf_bytes = build_payments_detailed_pdf(payments)
|
||
return Response(
|
||
content=pdf_bytes,
|
||
media_type="application/pdf",
|
||
headers={"Content-Disposition": "attachment; filename=payments_detailed.pdf"},
|
||
)
|
||
|
||
# Build preview groups for template: [{date, total, items}]
|
||
groups: list[dict[str, Any]] = []
|
||
from collections import defaultdict
|
||
grouped: dict[str, list[Payment]] = defaultdict(list)
|
||
for p in payments:
|
||
key = p.payment_date.date().isoformat() if p.payment_date else "(No Date)"
|
||
grouped[key].append(p)
|
||
overall_total = sum((p.amount or 0.0) for p in payments)
|
||
for key in sorted(grouped.keys()):
|
||
items = grouped[key]
|
||
total_amt = sum((p.amount or 0.0) for p in items)
|
||
groups.append({"date": key, "total": total_amt, "items": items})
|
||
|
||
logger.info(
|
||
"payments_detailed_render",
|
||
from_date=from_date,
|
||
to_date=to_date,
|
||
file_no=file_no,
|
||
count=len(payments),
|
||
)
|
||
|
||
return templates.TemplateResponse(
|
||
"payments_detailed.html",
|
||
{
|
||
"request": request,
|
||
"user": user,
|
||
"groups": groups,
|
||
"overall_total": overall_total,
|
||
"from_date": from_date,
|
||
"to_date": to_date,
|
||
"file_no": file_no,
|
||
},
|
||
)
|
||
|
||
|
||
# ------------------------------
|
||
# Reports: Phone Book (Address + Phone)
|
||
# ------------------------------
|
||
|
||
@app.post("/reports/phone-book-address")
|
||
async def phone_book_address_post(request: Request):
|
||
"""Accept selected client IDs from forms and redirect to GET for rendering."""
|
||
user = get_current_user_from_session(request.session)
|
||
if not user:
|
||
return RedirectResponse(url="/login", status_code=302)
|
||
|
||
form = await request.form()
|
||
client_ids = form.getlist("client_ids")
|
||
if not client_ids:
|
||
return RedirectResponse(url="/rolodex", status_code=302)
|
||
|
||
ids_param = "&".join([f"client_ids={cid}" for cid in client_ids])
|
||
return RedirectResponse(url=f"/reports/phone-book-address?{ids_param}", status_code=302)
|
||
|
||
|
||
@app.get("/reports/phone-book-address")
|
||
async def phone_book_address_report(
|
||
request: Request,
|
||
client_ids: List[int] | None = Query(None),
|
||
q: str | None = Query(None, description="Filter by name/company"),
|
||
phone: str | None = Query(None, description="Phone contains"),
|
||
format: str | None = Query(None, description="csv or pdf for export"),
|
||
db: Session = Depends(get_db),
|
||
):
|
||
user = get_current_user_from_session(request.session)
|
||
if not user:
|
||
return RedirectResponse(url="/login", status_code=302)
|
||
|
||
query = db.query(Client).options(joinedload(Client.phones))
|
||
if client_ids:
|
||
query = query.filter(Client.id.in_(client_ids))
|
||
else:
|
||
if q:
|
||
like = f"%{q}%"
|
||
query = query.filter(
|
||
or_(Client.first_name.ilike(like), Client.last_name.ilike(like), Client.company.ilike(like))
|
||
)
|
||
if phone:
|
||
query = query.filter(Client.phones.any(Phone.phone_number.ilike(f"%{phone}%")))
|
||
|
||
clients = query.order_by(Client.last_name.asc().nulls_last(), Client.first_name.asc().nulls_last()).all()
|
||
|
||
if format == "csv":
|
||
# Build CSV output
|
||
output = StringIO()
|
||
writer = csv.writer(output)
|
||
writer.writerow(["Last", "First", "Company", "Address", "City", "State", "ZIP", "Phone Type", "Phone Number"])
|
||
for c in clients:
|
||
if c.phones:
|
||
for p in c.phones:
|
||
writer.writerow([
|
||
c.last_name or "",
|
||
c.first_name or "",
|
||
c.company or "",
|
||
c.address or "",
|
||
c.city or "",
|
||
c.state or "",
|
||
c.zip_code or "",
|
||
p.phone_type or "",
|
||
p.phone_number or "",
|
||
])
|
||
else:
|
||
writer.writerow([
|
||
c.last_name or "",
|
||
c.first_name or "",
|
||
c.company or "",
|
||
c.address or "",
|
||
c.city or "",
|
||
c.state or "",
|
||
c.zip_code or "",
|
||
"",
|
||
"",
|
||
])
|
||
csv_bytes = output.getvalue().encode("utf-8")
|
||
return Response(
|
||
content=csv_bytes,
|
||
media_type="text/csv",
|
||
headers={"Content-Disposition": "attachment; filename=phone_book_address.csv"},
|
||
)
|
||
|
||
if format == "pdf":
|
||
pdf_bytes = build_phone_book_address_pdf(clients)
|
||
return Response(
|
||
content=pdf_bytes,
|
||
media_type="application/pdf",
|
||
headers={"Content-Disposition": "attachment; filename=phone_book_address.pdf"},
|
||
)
|
||
|
||
logger.info("phone_book_address_render", count=len(clients))
|
||
return templates.TemplateResponse(
|
||
"report_phone_book_address.html",
|
||
{
|
||
"request": request,
|
||
"user": user,
|
||
"clients": clients,
|
||
"q": q,
|
||
"phone": phone,
|
||
"client_ids": client_ids or [],
|
||
},
|
||
)
|
||
|
||
|
||
# ------------------------------
|
||
# Reports: Envelope (PDF)
|
||
# ------------------------------
|
||
|
||
@app.post("/reports/envelope")
|
||
async def envelope_report_post(request: Request):
|
||
"""Accept selected client IDs and redirect to GET for PDF download."""
|
||
user = get_current_user_from_session(request.session)
|
||
if not user:
|
||
return RedirectResponse(url="/login", status_code=302)
|
||
|
||
form = await request.form()
|
||
client_ids = form.getlist("client_ids")
|
||
if not client_ids:
|
||
return RedirectResponse(url="/rolodex", status_code=302)
|
||
|
||
ids_param = "&".join([f"client_ids={cid}" for cid in client_ids])
|
||
return RedirectResponse(url=f"/reports/envelope?{ids_param}&format=pdf", status_code=302)
|
||
|
||
|
||
@app.get("/reports/envelope")
|
||
async def envelope_report(
|
||
request: Request,
|
||
client_ids: List[int] | None = Query(None),
|
||
q: str | None = Query(None, description="Filter by name/company"),
|
||
phone: str | None = Query(None, description="Phone contains (optional)"),
|
||
format: str | None = Query("pdf", description="pdf output only"),
|
||
db: Session = Depends(get_db),
|
||
):
|
||
user = get_current_user_from_session(request.session)
|
||
if not user:
|
||
return RedirectResponse(url="/login", status_code=302)
|
||
|
||
query = db.query(Client)
|
||
if client_ids:
|
||
query = query.filter(Client.id.in_(client_ids))
|
||
else:
|
||
if q:
|
||
like = f"%{q}%"
|
||
query = query.filter(
|
||
or_(Client.first_name.ilike(like), Client.last_name.ilike(like), Client.company.ilike(like))
|
||
)
|
||
if phone:
|
||
# include clients that have a matching phone
|
||
query = query.join(Phone, isouter=True).filter(or_(Phone.phone_number.ilike(f"%{phone}%"), Phone.id == None)).distinct() # noqa: E711
|
||
|
||
clients = query.order_by(Client.last_name.asc().nulls_last(), Client.first_name.asc().nulls_last()).all()
|
||
|
||
# Always produce PDF
|
||
pdf_bytes = build_envelope_pdf(clients)
|
||
logger.info("envelope_pdf", count=len(clients))
|
||
return Response(
|
||
content=pdf_bytes,
|
||
media_type="application/pdf",
|
||
headers={"Content-Disposition": "attachment; filename=envelopes.pdf"},
|
||
)
|
||
|
||
|
||
# ------------------------------
|
||
# Reports: Rolodex Info (PDF)
|
||
# ------------------------------
|
||
|
||
@app.post("/reports/rolodex-info")
|
||
async def rolodex_info_post(request: Request):
|
||
user = get_current_user_from_session(request.session)
|
||
if not user:
|
||
return RedirectResponse(url="/login", status_code=302)
|
||
|
||
form = await request.form()
|
||
client_ids = form.getlist("client_ids")
|
||
if not client_ids:
|
||
return RedirectResponse(url="/rolodex", status_code=302)
|
||
|
||
ids_param = "&".join([f"client_ids={cid}" for cid in client_ids])
|
||
return RedirectResponse(url=f"/reports/rolodex-info?{ids_param}&format=pdf", status_code=302)
|
||
|
||
|
||
@app.get("/reports/rolodex-info")
|
||
async def rolodex_info_report(
|
||
request: Request,
|
||
client_ids: List[int] | None = Query(None),
|
||
q: str | None = Query(None, description="Filter by name/company"),
|
||
format: str | None = Query("pdf", description="pdf output only"),
|
||
db: Session = Depends(get_db),
|
||
):
|
||
user = get_current_user_from_session(request.session)
|
||
if not user:
|
||
return RedirectResponse(url="/login", status_code=302)
|
||
|
||
query = db.query(Client).options(joinedload(Client.phones))
|
||
if client_ids:
|
||
query = query.filter(Client.id.in_(client_ids))
|
||
elif q:
|
||
like = f"%{q}%"
|
||
query = query.filter(
|
||
or_(Client.first_name.ilike(like), Client.last_name.ilike(like), Client.company.ilike(like))
|
||
)
|
||
|
||
clients = query.order_by(Client.last_name.asc().nulls_last(), Client.first_name.asc().nulls_last()).all()
|
||
|
||
pdf_bytes = build_rolodex_info_pdf(clients)
|
||
logger.info("rolodex_info_pdf", count=len(clients))
|
||
return Response(
|
||
content=pdf_bytes,
|
||
media_type="application/pdf",
|
||
headers={"Content-Disposition": "attachment; filename=rolodex_info.pdf"},
|
||
)
|
||
|
||
# ------------------------------
|
||
# JSON API: list/filter endpoints
|
||
# ------------------------------
|
||
|
||
def _apply_sorting(query, sort_by: str | None, sort_dir: str, allowed_map: dict[str, Any], default_order: list[Any]):
|
||
"""Apply validated sorting to a SQLAlchemy query.
|
||
|
||
Args:
|
||
query: Base SQLAlchemy query object
|
||
sort_by: Optional requested sort field
|
||
sort_dir: 'asc' or 'desc'
|
||
allowed_map: Map of allowed sort_by -> SQLAlchemy column or list of columns
|
||
default_order: Fallback order_by list when sort_by is not provided
|
||
|
||
Returns:
|
||
(query, applied_sort_by, applied_sort_dir)
|
||
"""
|
||
if not sort_by:
|
||
for col in default_order:
|
||
query = query.order_by(col)
|
||
return query, None, sort_dir
|
||
|
||
column_expr = allowed_map.get(sort_by)
|
||
if column_expr is None:
|
||
raise HTTPException(status_code=400, detail=f"Invalid sort_by: '{sort_by}'. Allowed: {sorted(list(allowed_map.keys()))}")
|
||
|
||
def _order(expr):
|
||
return expr.asc().nulls_last() if sort_dir == "asc" else expr.desc().nulls_last()
|
||
|
||
if isinstance(column_expr, (list, tuple)):
|
||
for expr in column_expr:
|
||
query = query.order_by(_order(expr))
|
||
else:
|
||
query = query.order_by(_order(column_expr))
|
||
|
||
return query, sort_by, sort_dir
|
||
|
||
@app.get("/api/rolodex", response_model=RolodexListResponse)
|
||
async def api_list_rolodex(
|
||
request: Request,
|
||
q: str | None = Query(None, description="Search by first/last/company contains"),
|
||
phone: str | None = Query(None, description="Phone number contains"),
|
||
rolodex_id: str | None = Query(None, description="Legacy Rolodex ID contains"),
|
||
page: int = Query(1, ge=1, description="Page number (1-indexed)"),
|
||
page_size: int = Query(20, ge=1, le=100, description="Results per page"),
|
||
sort_by: str | None = Query(None, description="Sort field: id, rolodex_id, last_name, first_name, company, created_at"),
|
||
sort_dir: str = Query("asc", description="Sort direction: asc or desc"),
|
||
db: Session = Depends(get_db),
|
||
) -> RolodexListResponse:
|
||
"""Return paginated clients with simple filters as JSON."""
|
||
user = get_current_user_from_session(request.session)
|
||
if not user:
|
||
# Middleware ensures JSON 401 for /api/*, keep explicit for clarity
|
||
raise HTTPException(status_code=401, detail="Unauthorized")
|
||
|
||
query = db.query(Client).options(joinedload(Client.phones))
|
||
|
||
if q:
|
||
like = f"%{q}%"
|
||
query = query.filter(
|
||
or_(
|
||
Client.first_name.ilike(like),
|
||
Client.last_name.ilike(like),
|
||
Client.company.ilike(like),
|
||
)
|
||
)
|
||
if phone:
|
||
query = query.filter(Client.phones.any(Phone.phone_number.ilike(f"%{phone}%")))
|
||
if rolodex_id:
|
||
query = query.filter(Client.rolodex_id.ilike(f"%{rolodex_id}%"))
|
||
|
||
# Sorting
|
||
sort_dir_norm = (sort_dir or "").lower()
|
||
if sort_dir_norm not in ("asc", "desc"):
|
||
raise HTTPException(status_code=400, detail="Invalid sort_dir. Allowed: 'asc' or 'desc'")
|
||
|
||
allowed_sort = {
|
||
"id": Client.id,
|
||
"rolodex_id": Client.rolodex_id,
|
||
"last_name": Client.last_name,
|
||
"first_name": Client.first_name,
|
||
"company": Client.company,
|
||
"created_at": Client.created_at,
|
||
}
|
||
default_order = [Client.last_name.asc().nulls_last(), Client.first_name.asc().nulls_last(), Client.id.asc()]
|
||
query, applied_sort_by, applied_sort_dir = _apply_sorting(query, sort_by, sort_dir_norm, allowed_sort, default_order)
|
||
|
||
total: int = query.count()
|
||
total_pages: int = (total + page_size - 1) // page_size if total > 0 else 1
|
||
if page > total_pages:
|
||
page = total_pages
|
||
offset = (page - 1) * page_size
|
||
|
||
clients = query.offset(offset).limit(page_size).all()
|
||
|
||
logger.info(
|
||
"api_rolodex_list",
|
||
query=q,
|
||
phone=phone,
|
||
rolodex_id=rolodex_id,
|
||
page=page,
|
||
page_size=page_size,
|
||
total=total,
|
||
sort_by=applied_sort_by,
|
||
sort_dir=applied_sort_dir,
|
||
)
|
||
|
||
items = [ClientOut.model_validate(c) for c in clients]
|
||
return RolodexListResponse(
|
||
items=items,
|
||
pagination=Pagination(page=page, page_size=page_size, total=total, total_pages=total_pages),
|
||
)
|
||
|
||
|
||
@app.get("/api/files", response_model=FilesListResponse)
|
||
async def api_list_files(
|
||
request: Request,
|
||
q: str | None = Query(None, description="Search file no/description/client name/company"),
|
||
status: str | None = Query(None, description="Case status: active or closed"),
|
||
case_type: str | None = Query(None, description="Case type contains"),
|
||
file_no: str | None = Query(None, description="File number contains"),
|
||
client_rolodex_id: str | None = Query(None, description="Legacy client Id contains"),
|
||
from_open_date: str | None = Query(None, description="Opened on/after YYYY-MM-DD"),
|
||
to_open_date: str | None = Query(None, description="Opened on/before YYYY-MM-DD"),
|
||
page: int = Query(1, ge=1, description="Page number (1-indexed)"),
|
||
page_size: int = Query(20, ge=1, le=100, description="Results per page"),
|
||
sort_by: str | None = Query(None, description="Sort field: file_no, status, case_type, description, open_date, close_date, created_at, client_last_name, client_first_name, client_company"),
|
||
sort_dir: str = Query("desc", description="Sort direction: asc or desc"),
|
||
db: Session = Depends(get_db),
|
||
) -> FilesListResponse:
|
||
"""Return paginated cases with simple filters as JSON."""
|
||
user = get_current_user_from_session(request.session)
|
||
if not user:
|
||
raise HTTPException(status_code=401, detail="Unauthorized")
|
||
|
||
query = (
|
||
db.query(Case)
|
||
.join(Client, Case.client_id == Client.id)
|
||
.options(joinedload(Case.client))
|
||
)
|
||
|
||
filters = []
|
||
if q:
|
||
like = f"%{q}%"
|
||
filters.append(
|
||
or_(
|
||
Case.file_no.ilike(like),
|
||
Case.description.ilike(like),
|
||
Client.first_name.ilike(like),
|
||
Client.last_name.ilike(like),
|
||
Client.company.ilike(like),
|
||
)
|
||
)
|
||
if status:
|
||
filters.append(Case.status.ilike(f"%{status}%"))
|
||
if case_type:
|
||
filters.append(Case.case_type.ilike(f"%{case_type}%"))
|
||
if file_no:
|
||
filters.append(Case.file_no.ilike(f"%{file_no}%"))
|
||
if client_rolodex_id:
|
||
filters.append(Client.rolodex_id.ilike(f"%{client_rolodex_id}%"))
|
||
if from_open_date:
|
||
try:
|
||
dt = datetime.strptime(from_open_date, "%Y-%m-%d")
|
||
filters.append(Case.open_date >= dt)
|
||
except ValueError:
|
||
pass
|
||
if to_open_date:
|
||
try:
|
||
dt = datetime.strptime(to_open_date, "%Y-%m-%d")
|
||
filters.append(Case.open_date <= dt)
|
||
except ValueError:
|
||
pass
|
||
|
||
if filters:
|
||
query = query.filter(and_(*filters))
|
||
|
||
# Sorting
|
||
sort_dir_norm = (sort_dir or "").lower()
|
||
if sort_dir_norm not in ("asc", "desc"):
|
||
raise HTTPException(status_code=400, detail="Invalid sort_dir. Allowed: 'asc' or 'desc'")
|
||
|
||
allowed_sort = {
|
||
"file_no": Case.file_no,
|
||
"status": Case.status,
|
||
"case_type": Case.case_type,
|
||
"description": Case.description,
|
||
"open_date": Case.open_date,
|
||
"close_date": Case.close_date,
|
||
"created_at": Case.created_at,
|
||
"client_last_name": Client.last_name,
|
||
"client_first_name": Client.first_name,
|
||
"client_company": Client.company,
|
||
"id": Case.id,
|
||
}
|
||
default_order = [Case.open_date.desc().nulls_last(), Case.created_at.desc()]
|
||
query, applied_sort_by, applied_sort_dir = _apply_sorting(query, sort_by, sort_dir_norm, allowed_sort, default_order)
|
||
|
||
total: int = query.count()
|
||
total_pages: int = (total + page_size - 1) // page_size if total > 0 else 1
|
||
if page > total_pages:
|
||
page = total_pages
|
||
offset = (page - 1) * page_size
|
||
|
||
cases = query.offset(offset).limit(page_size).all()
|
||
|
||
logger.info(
|
||
"api_files_list",
|
||
query=q,
|
||
status=status,
|
||
case_type=case_type,
|
||
file_no=file_no,
|
||
client_rolodex_id=client_rolodex_id,
|
||
page=page,
|
||
page_size=page_size,
|
||
total=total,
|
||
sort_by=applied_sort_by,
|
||
sort_dir=applied_sort_dir,
|
||
)
|
||
|
||
items = [CaseOut.model_validate(c) for c in cases]
|
||
return FilesListResponse(
|
||
items=items,
|
||
pagination=Pagination(page=page, page_size=page_size, total=total, total_pages=total_pages),
|
||
)
|
||
|
||
|
||
@app.get("/api/ledger", response_model=LedgerListResponse)
|
||
async def api_list_ledger(
|
||
request: Request,
|
||
case_id: int | None = Query(None, description="Filter by case ID"),
|
||
file_no: str | None = Query(None, description="Filter by case file number contains"),
|
||
from_date: str | None = Query(None, description="On/after YYYY-MM-DD"),
|
||
to_date: str | None = Query(None, description="On/before YYYY-MM-DD"),
|
||
billed: str | None = Query(None, description="'Y' or 'N'"),
|
||
t_code: str | None = Query(None, description="Transaction code contains"),
|
||
t_type_l: str | None = Query(None, description="Legacy type flag (e.g., C/D)"),
|
||
employee_number: str | None = Query(None, description="Employee number contains"),
|
||
q: str | None = Query(None, description="Description contains"),
|
||
page: int = Query(1, ge=1),
|
||
page_size: int = Query(50, ge=1, le=200),
|
||
sort_by: str | None = Query(None, description="Sort field: transaction_date, item_no, id, amount, billed, t_code, t_type_l, employee_number, case_file_no, case_id"),
|
||
sort_dir: str = Query("desc", description="Sort direction: asc or desc"),
|
||
db: Session = Depends(get_db),
|
||
) -> LedgerListResponse:
|
||
"""Return paginated ledger (transactions) with simple filters as JSON."""
|
||
user = get_current_user_from_session(request.session)
|
||
if not user:
|
||
raise HTTPException(status_code=401, detail="Unauthorized")
|
||
|
||
query = (
|
||
db.query(Transaction)
|
||
.join(Case, Transaction.case_id == Case.id)
|
||
.options(joinedload(Transaction.case))
|
||
)
|
||
|
||
filters = []
|
||
if case_id is not None:
|
||
filters.append(Transaction.case_id == case_id)
|
||
if file_no:
|
||
filters.append(Case.file_no.ilike(f"%{file_no}%"))
|
||
if from_date:
|
||
try:
|
||
dt = datetime.strptime(from_date, "%Y-%m-%d")
|
||
filters.append(Transaction.transaction_date >= dt)
|
||
except ValueError:
|
||
pass
|
||
if to_date:
|
||
try:
|
||
dt = datetime.strptime(to_date, "%Y-%m-%d")
|
||
filters.append(Transaction.transaction_date <= dt)
|
||
except ValueError:
|
||
pass
|
||
if billed in ("Y", "N"):
|
||
filters.append(Transaction.billed == billed)
|
||
if t_code:
|
||
filters.append(Transaction.t_code.ilike(f"%{t_code}%"))
|
||
if t_type_l:
|
||
filters.append(Transaction.t_type_l.ilike(f"%{t_type_l}%"))
|
||
if employee_number:
|
||
filters.append(Transaction.employee_number.ilike(f"%{employee_number}%"))
|
||
if q:
|
||
filters.append(Transaction.description.ilike(f"%{q}%"))
|
||
|
||
if filters:
|
||
query = query.filter(and_(*filters))
|
||
|
||
# Sorting
|
||
sort_dir_norm = (sort_dir or "").lower()
|
||
if sort_dir_norm not in ("asc", "desc"):
|
||
raise HTTPException(status_code=400, detail="Invalid sort_dir. Allowed: 'asc' or 'desc'")
|
||
allowed_sort = {
|
||
"transaction_date": Transaction.transaction_date,
|
||
"item_no": Transaction.item_no,
|
||
"id": Transaction.id,
|
||
"amount": Transaction.amount,
|
||
"billed": Transaction.billed,
|
||
"t_code": Transaction.t_code,
|
||
"t_type_l": Transaction.t_type_l,
|
||
"employee_number": Transaction.employee_number,
|
||
"case_file_no": Case.file_no,
|
||
"case_id": Transaction.case_id,
|
||
}
|
||
default_order = [
|
||
Transaction.transaction_date.desc().nulls_last(),
|
||
Transaction.item_no.asc().nulls_last(),
|
||
Transaction.id.desc(),
|
||
]
|
||
query, applied_sort_by, applied_sort_dir = _apply_sorting(query, sort_by, sort_dir_norm, allowed_sort, default_order)
|
||
|
||
total: int = query.count()
|
||
total_pages: int = (total + page_size - 1) // page_size if total > 0 else 1
|
||
if page > total_pages:
|
||
page = total_pages
|
||
offset = (page - 1) * page_size
|
||
|
||
txns = query.offset(offset).limit(page_size).all()
|
||
|
||
logger.info(
|
||
"api_ledger_list",
|
||
case_id=case_id,
|
||
file_no=file_no,
|
||
from_date=from_date,
|
||
to_date=to_date,
|
||
billed=billed,
|
||
t_code=t_code,
|
||
t_type_l=t_type_l,
|
||
employee_number=employee_number,
|
||
q=q,
|
||
page=page,
|
||
page_size=page_size,
|
||
total=total,
|
||
sort_by=applied_sort_by,
|
||
sort_dir=applied_sort_dir,
|
||
)
|
||
|
||
items = [
|
||
TransactionOut(
|
||
id=t.id,
|
||
case_id=t.case_id,
|
||
case_file_no=t.case.file_no if t.case else None,
|
||
transaction_date=t.transaction_date,
|
||
item_no=t.item_no,
|
||
amount=t.amount,
|
||
billed=t.billed,
|
||
t_code=t.t_code,
|
||
t_type_l=t.t_type_l,
|
||
quantity=t.quantity,
|
||
rate=t.rate,
|
||
description=t.description,
|
||
employee_number=t.employee_number,
|
||
)
|
||
for t in txns
|
||
]
|
||
|
||
return LedgerListResponse(
|
||
items=items,
|
||
pagination=Pagination(page=page, page_size=page_size, total=total, total_pages=total_pages),
|
||
)
|