1884 lines
61 KiB
Python
1884 lines
61 KiB
Python
"""
|
|
FastAPI application entry point for Delphi Database.
|
|
|
|
This module initializes the FastAPI application, sets up database connections,
|
|
and provides the main application instance.
|
|
"""
|
|
|
|
import os
|
|
import time
|
|
import csv
|
|
import json
|
|
import uuid
|
|
from contextlib import asynccontextmanager
|
|
from datetime import datetime
|
|
from typing import Optional, List, Dict, Any
|
|
from io import StringIO
|
|
|
|
from fastapi import FastAPI, Depends, Request, Query, HTTPException, UploadFile, File, Form
|
|
from fastapi.responses import RedirectResponse, Response
|
|
from starlette.middleware.sessions import SessionMiddleware
|
|
from fastapi.middleware.cors import CORSMiddleware
|
|
from fastapi.staticfiles import StaticFiles
|
|
from fastapi.templating import Jinja2Templates
|
|
from sqlalchemy.orm import Session, joinedload
|
|
from sqlalchemy import or_, and_
|
|
from dotenv import load_dotenv
|
|
from starlette.middleware.base import BaseHTTPMiddleware
|
|
import structlog
|
|
from structlog import contextvars as structlog_contextvars
|
|
|
|
from .database import create_tables, get_db, get_database_url
|
|
from .models import User, Case, Client, Phone, Transaction, Document, Payment, ImportLog
|
|
from .auth import authenticate_user, get_current_user_from_session
|
|
from .logging_config import setup_logging
|
|
|
|
# Load environment variables
|
|
load_dotenv()
|
|
|
|
# Get SECRET_KEY from environment variables
|
|
SECRET_KEY = os.getenv("SECRET_KEY")
|
|
if not SECRET_KEY:
|
|
raise ValueError("SECRET_KEY environment variable must be set")
|
|
|
|
# Configure structured logging
|
|
setup_logging()
|
|
logger = structlog.get_logger(__name__)
|
|
|
|
# Configure Jinja2 templates
|
|
templates = Jinja2Templates(directory="app/templates")
|
|
|
|
|
|
class AuthMiddleware(BaseHTTPMiddleware):
|
|
"""
|
|
Simple session-based authentication middleware.
|
|
|
|
Redirects unauthenticated users to /login for protected routes.
|
|
"""
|
|
|
|
def __init__(self, app, exempt_paths: list[str] | None = None):
|
|
super().__init__(app)
|
|
self.exempt_paths = exempt_paths or []
|
|
|
|
async def dispatch(self, request, call_next):
|
|
path = request.url.path
|
|
|
|
# Allow exempt paths and static assets
|
|
if (
|
|
path in self.exempt_paths
|
|
or path.startswith("/static")
|
|
or path.startswith("/favicon")
|
|
):
|
|
return await call_next(request)
|
|
|
|
# Enforce authentication for other paths
|
|
if not request.session.get("user_id"):
|
|
return RedirectResponse(url="/login", status_code=302)
|
|
|
|
return await call_next(request)
|
|
|
|
|
|
class RequestIdMiddleware(BaseHTTPMiddleware):
|
|
"""
|
|
Middleware that assigns a request_id and binds request context for logging.
|
|
|
|
Adds: request_id, http.method, http.path, user.id to the structlog context.
|
|
Emits a JSON access log with status_code and duration_ms after response.
|
|
"""
|
|
|
|
async def dispatch(self, request: Request, call_next):
|
|
start_time = time.perf_counter()
|
|
|
|
request_id = request.headers.get("X-Request-ID") or str(uuid.uuid4())
|
|
method = request.method
|
|
path = request.url.path
|
|
|
|
# user id from session if available (SessionMiddleware runs first)
|
|
user_id = request.session.get("user_id") if hasattr(request, "session") else None
|
|
|
|
structlog_contextvars.bind_contextvars(
|
|
request_id=request_id,
|
|
**{"http.method": method, "http.path": path, "user.id": user_id},
|
|
)
|
|
|
|
try:
|
|
response = await call_next(request)
|
|
status_code = response.status_code
|
|
except Exception as exc: # noqa: BLE001 - we re-raise after logging
|
|
status_code = 500
|
|
duration_ms = int((time.perf_counter() - start_time) * 1000)
|
|
logger.error(
|
|
"request",
|
|
status_code=status_code,
|
|
duration_ms=duration_ms,
|
|
exc_info=True,
|
|
)
|
|
structlog_contextvars.unbind_contextvars("request_id", "http.method", "http.path", "user.id")
|
|
raise
|
|
|
|
# Ensure response header has request id
|
|
try:
|
|
response.headers["X-Request-ID"] = request_id
|
|
except Exception:
|
|
pass
|
|
|
|
duration_ms = int((time.perf_counter() - start_time) * 1000)
|
|
logger.info(
|
|
"request",
|
|
status_code=status_code,
|
|
duration_ms=duration_ms,
|
|
)
|
|
|
|
structlog_contextvars.unbind_contextvars("request_id", "http.method", "http.path", "user.id")
|
|
return response
|
|
|
|
@asynccontextmanager
|
|
async def lifespan(app: FastAPI):
|
|
"""
|
|
Lifespan context manager for FastAPI application.
|
|
|
|
Handles startup and shutdown events:
|
|
- Creates database tables on startup
|
|
- Logs database connection info
|
|
"""
|
|
# Startup
|
|
logger.info("app_start")
|
|
|
|
# Create database tables
|
|
create_tables()
|
|
logger.info("db_tables_verified")
|
|
|
|
# Log database connection info
|
|
db_url = get_database_url()
|
|
logger.info("db_connected", database_url=db_url)
|
|
|
|
yield
|
|
|
|
# Shutdown
|
|
logger.info("app_shutdown")
|
|
|
|
|
|
# Create FastAPI application with lifespan management
|
|
app = FastAPI(
|
|
title="Delphi Database",
|
|
description="Legal case management database application",
|
|
version="1.0.0",
|
|
lifespan=lifespan
|
|
)
|
|
|
|
# Add CORS middleware for cross-origin requests
|
|
app.add_middleware(
|
|
CORSMiddleware,
|
|
allow_origins=["*"], # In production, specify allowed origins
|
|
allow_credentials=True,
|
|
allow_methods=["*"],
|
|
allow_headers=["*"],
|
|
)
|
|
|
|
# Register request logging and authentication middleware with exempt paths
|
|
EXEMPT_PATHS = ["/", "/health", "/login", "/logout"]
|
|
app.add_middleware(RequestIdMiddleware)
|
|
app.add_middleware(AuthMiddleware, exempt_paths=EXEMPT_PATHS)
|
|
|
|
# Add SessionMiddleware for session management (must be added LAST so it runs FIRST)
|
|
app.add_middleware(SessionMiddleware, secret_key=SECRET_KEY)
|
|
|
|
# Mount static files directory
|
|
app.mount("/static", StaticFiles(directory="static"), name="static")
|
|
|
|
|
|
def get_import_type_from_filename(filename: str) -> str:
|
|
"""
|
|
Determine import type based on filename pattern.
|
|
|
|
Args:
|
|
filename: Name of the uploaded CSV file
|
|
|
|
Returns:
|
|
Import type string (client, phone, case, transaction, document, payment)
|
|
"""
|
|
filename_upper = filename.upper()
|
|
|
|
if filename_upper.startswith('ROLODEX') or filename_upper.startswith('ROLEX'):
|
|
return 'client'
|
|
elif filename_upper.startswith('PHONE'):
|
|
return 'phone'
|
|
elif filename_upper.startswith('FILES'):
|
|
return 'case'
|
|
elif filename_upper.startswith('LEDGER'):
|
|
return 'transaction'
|
|
elif filename_upper.startswith('QDROS') or filename_upper.startswith('QDRO'):
|
|
return 'document'
|
|
elif filename_upper.startswith('PAYMENTS') or filename_upper.startswith('DEPOSITS'):
|
|
return 'payment'
|
|
else:
|
|
raise ValueError(f"Unknown file type for filename: {filename}")
|
|
|
|
|
|
def validate_csv_headers(headers: List[str], expected_fields: Dict[str, str]) -> Dict[str, Any]:
|
|
"""
|
|
Validate CSV headers against expected model fields.
|
|
|
|
Args:
|
|
headers: List of CSV column headers
|
|
expected_fields: Dict mapping field names to descriptions
|
|
|
|
Returns:
|
|
Dict with validation results and field mapping
|
|
"""
|
|
result = {
|
|
'valid': True,
|
|
'missing_fields': [],
|
|
'field_mapping': {},
|
|
'errors': []
|
|
}
|
|
|
|
# Create mapping from CSV headers to model fields (case-insensitive)
|
|
for csv_header in headers:
|
|
csv_header_clean = csv_header.strip().lower()
|
|
matched = False
|
|
|
|
for model_field, description in expected_fields.items():
|
|
if csv_header_clean == model_field.lower():
|
|
result['field_mapping'][model_field] = csv_header
|
|
matched = True
|
|
break
|
|
|
|
if not matched:
|
|
# Try partial matches for common variations
|
|
for model_field, description in expected_fields.items():
|
|
if model_field.lower() in csv_header_clean or csv_header_clean in model_field.lower():
|
|
result['field_mapping'][model_field] = csv_header
|
|
matched = True
|
|
break
|
|
|
|
if not matched:
|
|
result['errors'].append(f"Unknown header: '{csv_header}'")
|
|
|
|
# Check for required fields (case-insensitive)
|
|
required_fields = ['id'] # Most imports need some form of ID
|
|
for required in required_fields:
|
|
found = False
|
|
for mapped_field in result['field_mapping']:
|
|
if mapped_field.lower() == required.lower():
|
|
found = True
|
|
break
|
|
if not found:
|
|
result['missing_fields'].append(required)
|
|
|
|
if result['missing_fields'] or result['errors']:
|
|
result['valid'] = False
|
|
|
|
return result
|
|
|
|
|
|
def parse_date(date_str: str) -> Optional[datetime]:
|
|
"""Parse date string into datetime object."""
|
|
if not date_str or date_str.strip() in ('', 'NULL', 'N/A'):
|
|
return None
|
|
|
|
# Try common date formats
|
|
formats = ['%Y-%m-%d', '%m/%d/%Y', '%Y/%m/%d', '%d-%m-%Y']
|
|
|
|
for fmt in formats:
|
|
try:
|
|
return datetime.strptime(date_str.strip(), fmt)
|
|
except ValueError:
|
|
continue
|
|
|
|
logger.warning("parse_date_failed", value=date_str)
|
|
return None
|
|
|
|
|
|
def parse_float(value: str) -> Optional[float]:
|
|
"""Parse string value into float."""
|
|
if not value or value.strip() in ('', 'NULL', 'N/A'):
|
|
return None
|
|
|
|
try:
|
|
return float(value.strip())
|
|
except ValueError:
|
|
logger.warning("parse_float_failed", value=value)
|
|
return None
|
|
|
|
|
|
def parse_int(value: str) -> Optional[int]:
|
|
"""Parse string value into int."""
|
|
if not value or value.strip() in ('', 'NULL', 'N/A'):
|
|
return None
|
|
|
|
try:
|
|
return int(value.strip())
|
|
except ValueError:
|
|
logger.warning("parse_int_failed", value=value)
|
|
return None
|
|
|
|
|
|
def import_rolodex_data(db: Session, file_path: str) -> Dict[str, Any]:
|
|
"""
|
|
Import ROLODEX CSV data into Client model.
|
|
|
|
Expected CSV format: Id,Prefix,First,Middle,Last,Suffix,Title,A1,A2,A3,City,Abrev,St,Zip,Email,DOB,SS#,Legal_Status,Group,Memo
|
|
"""
|
|
result = {
|
|
'success': 0,
|
|
'errors': [],
|
|
'total_rows': 0
|
|
}
|
|
|
|
expected_fields = {
|
|
'Id': 'Client ID',
|
|
'Prefix': 'Name Prefix',
|
|
'First': 'First Name',
|
|
'Middle': 'Middle Initial',
|
|
'Last': 'Last Name',
|
|
'Suffix': 'Name Suffix',
|
|
'Title': 'Company/Organization',
|
|
'A1': 'Address Line 1',
|
|
'A2': 'Address Line 2',
|
|
'A3': 'Address Line 3',
|
|
'City': 'City',
|
|
'Abrev': 'State Abbreviation',
|
|
'St': 'State',
|
|
'Zip': 'ZIP Code',
|
|
'Email': 'Email Address',
|
|
'DOB': 'Date of Birth',
|
|
'SS#': 'Social Security Number',
|
|
'Legal_Status': 'Legal Status',
|
|
'Group': 'Group',
|
|
'Memo': 'Memo/Notes'
|
|
}
|
|
|
|
try:
|
|
with open(file_path, 'r', encoding='utf-8') as file:
|
|
reader = csv.DictReader(file)
|
|
|
|
# Validate headers
|
|
headers = reader.fieldnames or []
|
|
validation = validate_csv_headers(headers, expected_fields)
|
|
|
|
if not validation['valid']:
|
|
result['errors'].append(f"Header validation failed: {validation['errors']}")
|
|
return result
|
|
|
|
for row_num, row in enumerate(reader, start=2): # Start at 2 (header is row 1)
|
|
result['total_rows'] += 1
|
|
|
|
try:
|
|
# Extract and clean data
|
|
rolodex_id = row.get('Id', '').strip()
|
|
if not rolodex_id:
|
|
result['errors'].append(f"Row {row_num}: Missing client ID")
|
|
continue
|
|
|
|
# Check for existing client
|
|
existing = db.query(Client).filter(Client.rolodex_id == rolodex_id).first()
|
|
if existing:
|
|
result['errors'].append(f"Row {row_num}: Client with ID '{rolodex_id}' already exists")
|
|
continue
|
|
|
|
client = Client(
|
|
rolodex_id=rolodex_id,
|
|
first_name=row.get('First', '').strip() or None,
|
|
middle_initial=row.get('Middle', '').strip() or None,
|
|
last_name=row.get('Last', '').strip() or None,
|
|
company=row.get('Title', '').strip() or None,
|
|
address=row.get('A1', '').strip() or None,
|
|
city=row.get('City', '').strip() or None,
|
|
state=row.get('St', '').strip() or None,
|
|
zip_code=row.get('Zip', '').strip() or None
|
|
)
|
|
|
|
db.add(client)
|
|
result['success'] += 1
|
|
|
|
except Exception as e:
|
|
result['errors'].append(f"Row {row_num}: {str(e)}")
|
|
|
|
db.commit()
|
|
|
|
except Exception as e:
|
|
result['errors'].append(f"Import failed: {str(e)}")
|
|
db.rollback()
|
|
|
|
return result
|
|
|
|
|
|
def import_phone_data(db: Session, file_path: str) -> Dict[str, Any]:
|
|
"""
|
|
Import PHONE CSV data into Phone model.
|
|
|
|
Expected CSV format: Id,Phone,Location
|
|
"""
|
|
result = {
|
|
'success': 0,
|
|
'errors': [],
|
|
'total_rows': 0
|
|
}
|
|
|
|
try:
|
|
with open(file_path, 'r', encoding='utf-8') as file:
|
|
reader = csv.DictReader(file)
|
|
|
|
headers = reader.fieldnames or []
|
|
if len(headers) < 2:
|
|
result['errors'].append("Invalid CSV format: expected at least 2 columns")
|
|
return result
|
|
|
|
for row_num, row in enumerate(reader, start=2):
|
|
result['total_rows'] += 1
|
|
|
|
try:
|
|
client_id = row.get('Id', '').strip()
|
|
if not client_id:
|
|
result['errors'].append(f"Row {row_num}: Missing client ID")
|
|
continue
|
|
|
|
# Find the client
|
|
client = db.query(Client).filter(Client.rolodex_id == client_id).first()
|
|
if not client:
|
|
result['errors'].append(f"Row {row_num}: Client with ID '{client_id}' not found")
|
|
continue
|
|
|
|
phone_number = row.get('Phone', '').strip()
|
|
if not phone_number:
|
|
result['errors'].append(f"Row {row_num}: Missing phone number")
|
|
continue
|
|
|
|
phone = Phone(
|
|
client_id=client.id,
|
|
phone_type=row.get('Location', '').strip() or 'primary',
|
|
phone_number=phone_number
|
|
)
|
|
|
|
db.add(phone)
|
|
result['success'] += 1
|
|
|
|
except Exception as e:
|
|
result['errors'].append(f"Row {row_num}: {str(e)}")
|
|
|
|
db.commit()
|
|
|
|
except Exception as e:
|
|
result['errors'].append(f"Import failed: {str(e)}")
|
|
db.rollback()
|
|
|
|
return result
|
|
|
|
|
|
def import_files_data(db: Session, file_path: str) -> Dict[str, Any]:
|
|
"""
|
|
Import FILES CSV data into Case model.
|
|
|
|
Expected CSV format: File_No,Id,File_Type,Regarding,Opened,Closed,Empl_Num,Rate_Per_Hour,Status,Footer_Code,Opposing,Hours,Hours_P,Trust_Bal,Trust_Bal_P,Hourly_Fees,Hourly_Fees_P,Flat_Fees,Flat_Fees_P,Disbursements,Disbursements_P,Credit_Bal,Credit_Bal_P,Total_Charges,Total_Charges_P,Amount_Owing,Amount_Owing_P,Transferable,Memo
|
|
"""
|
|
result = {
|
|
'success': 0,
|
|
'errors': [],
|
|
'total_rows': 0
|
|
}
|
|
|
|
expected_fields = {
|
|
'File_No': 'File Number',
|
|
'Status': 'Status',
|
|
'File_Type': 'File Type',
|
|
'Regarding': 'Regarding',
|
|
'Opened': 'Opened Date',
|
|
'Closed': 'Closed Date',
|
|
'Id': 'Client ID',
|
|
'Empl_Num': 'Employee Number',
|
|
'Rate_Per_Hour': 'Rate Per Hour',
|
|
'Footer_Code': 'Footer Code',
|
|
'Opposing': 'Opposing Party',
|
|
'Hours': 'Hours',
|
|
'Hours_P': 'Hours (Previous)',
|
|
'Trust_Bal': 'Trust Balance',
|
|
'Trust_Bal_P': 'Trust Balance (Previous)',
|
|
'Hourly_Fees': 'Hourly Fees',
|
|
'Hourly_Fees_P': 'Hourly Fees (Previous)',
|
|
'Flat_Fees': 'Flat Fees',
|
|
'Flat_Fees_P': 'Flat Fees (Previous)',
|
|
'Disbursements': 'Disbursements',
|
|
'Disbursements_P': 'Disbursements (Previous)',
|
|
'Credit_Bal': 'Credit Balance',
|
|
'Credit_Bal_P': 'Credit Balance (Previous)',
|
|
'Total_Charges': 'Total Charges',
|
|
'Total_Charges_P': 'Total Charges (Previous)',
|
|
'Amount_Owing': 'Amount Owing',
|
|
'Amount_Owing_P': 'Amount Owing (Previous)',
|
|
'Transferable': 'Transferable',
|
|
'Memo': 'Memo'
|
|
}
|
|
|
|
try:
|
|
with open(file_path, 'r', encoding='utf-8') as file:
|
|
reader = csv.DictReader(file)
|
|
|
|
headers = reader.fieldnames or []
|
|
validation = validate_csv_headers(headers, expected_fields)
|
|
|
|
if not validation['valid']:
|
|
result['errors'].append(f"Header validation failed: {validation['errors']}")
|
|
return result
|
|
|
|
for row_num, row in enumerate(reader, start=2):
|
|
result['total_rows'] += 1
|
|
|
|
try:
|
|
file_no = row.get('File_No', '').strip()
|
|
if not file_no:
|
|
result['errors'].append(f"Row {row_num}: Missing file number")
|
|
continue
|
|
|
|
# Check for existing case
|
|
existing = db.query(Case).filter(Case.file_no == file_no).first()
|
|
if existing:
|
|
result['errors'].append(f"Row {row_num}: Case with file number '{file_no}' already exists")
|
|
continue
|
|
|
|
# Find client by ID
|
|
client_id = row.get('Id', '').strip()
|
|
client = None
|
|
if client_id:
|
|
client = db.query(Client).filter(Client.rolodex_id == client_id).first()
|
|
if not client:
|
|
result['errors'].append(f"Row {row_num}: Client with ID '{client_id}' not found")
|
|
continue
|
|
|
|
case = Case(
|
|
file_no=file_no,
|
|
client_id=client.id if client else None,
|
|
status=row.get('Status', '').strip() or 'active',
|
|
case_type=row.get('File_Type', '').strip() or None,
|
|
description=row.get('Regarding', '').strip() or None,
|
|
open_date=parse_date(row.get('Opened', '')),
|
|
close_date=parse_date(row.get('Closed', ''))
|
|
)
|
|
|
|
db.add(case)
|
|
result['success'] += 1
|
|
|
|
except Exception as e:
|
|
result['errors'].append(f"Row {row_num}: {str(e)}")
|
|
|
|
db.commit()
|
|
|
|
except Exception as e:
|
|
result['errors'].append(f"Import failed: {str(e)}")
|
|
db.rollback()
|
|
|
|
return result
|
|
|
|
|
|
def import_ledger_data(db: Session, file_path: str) -> Dict[str, Any]:
|
|
"""
|
|
Import LEDGER CSV data into Transaction model.
|
|
|
|
Expected CSV format: File_No,Date,Item_No,Empl_Num,T_Code,T_Type,T_Type_L,Quantity,Rate,Amount,Billed,Note
|
|
"""
|
|
result = {
|
|
'success': 0,
|
|
'errors': [],
|
|
'total_rows': 0
|
|
}
|
|
|
|
try:
|
|
with open(file_path, 'r', encoding='utf-8') as file:
|
|
reader = csv.DictReader(file)
|
|
|
|
headers = reader.fieldnames or []
|
|
if len(headers) < 3:
|
|
result['errors'].append("Invalid CSV format: expected at least 3 columns")
|
|
return result
|
|
|
|
for row_num, row in enumerate(reader, start=2):
|
|
result['total_rows'] += 1
|
|
|
|
try:
|
|
file_no = row.get('File_No', '').strip()
|
|
if not file_no:
|
|
result['errors'].append(f"Row {row_num}: Missing file number")
|
|
continue
|
|
|
|
# Find the case
|
|
case = db.query(Case).filter(Case.file_no == file_no).first()
|
|
if not case:
|
|
result['errors'].append(f"Row {row_num}: Case with file number '{file_no}' not found")
|
|
continue
|
|
|
|
amount = parse_float(row.get('Amount', '0'))
|
|
if amount is None:
|
|
result['errors'].append(f"Row {row_num}: Invalid amount")
|
|
continue
|
|
|
|
transaction = Transaction(
|
|
case_id=case.id,
|
|
transaction_date=parse_date(row.get('Date', '')),
|
|
transaction_type=row.get('T_Type', '').strip() or None,
|
|
amount=amount,
|
|
description=row.get('Note', '').strip() or None,
|
|
reference=row.get('Item_No', '').strip() or None
|
|
)
|
|
|
|
db.add(transaction)
|
|
result['success'] += 1
|
|
|
|
except Exception as e:
|
|
result['errors'].append(f"Row {row_num}: {str(e)}")
|
|
|
|
db.commit()
|
|
|
|
except Exception as e:
|
|
result['errors'].append(f"Import failed: {str(e)}")
|
|
db.rollback()
|
|
|
|
return result
|
|
|
|
|
|
def import_qdros_data(db: Session, file_path: str) -> Dict[str, Any]:
|
|
"""
|
|
Import QDROS CSV data into Document model.
|
|
|
|
Expected CSV format: File_No,Document_Type,Description,File_Name,Date
|
|
"""
|
|
result = {
|
|
'success': 0,
|
|
'errors': [],
|
|
'total_rows': 0
|
|
}
|
|
|
|
try:
|
|
with open(file_path, 'r', encoding='utf-8') as file:
|
|
reader = csv.DictReader(file)
|
|
|
|
headers = reader.fieldnames or []
|
|
if len(headers) < 2:
|
|
result['errors'].append("Invalid CSV format: expected at least 2 columns")
|
|
return result
|
|
|
|
for row_num, row in enumerate(reader, start=2):
|
|
result['total_rows'] += 1
|
|
|
|
try:
|
|
file_no = row.get('File_No', '').strip()
|
|
if not file_no:
|
|
result['errors'].append(f"Row {row_num}: Missing file number")
|
|
continue
|
|
|
|
# Find the case
|
|
case = db.query(Case).filter(Case.file_no == file_no).first()
|
|
if not case:
|
|
result['errors'].append(f"Row {row_num}: Case with file number '{file_no}' not found")
|
|
continue
|
|
|
|
document = Document(
|
|
case_id=case.id,
|
|
document_type=row.get('Document_Type', '').strip() or 'QDRO',
|
|
file_name=row.get('File_Name', '').strip() or None,
|
|
description=row.get('Description', '').strip() or None,
|
|
uploaded_date=parse_date(row.get('Date', ''))
|
|
)
|
|
|
|
db.add(document)
|
|
result['success'] += 1
|
|
|
|
except Exception as e:
|
|
result['errors'].append(f"Row {row_num}: {str(e)}")
|
|
|
|
db.commit()
|
|
|
|
except Exception as e:
|
|
result['errors'].append(f"Import failed: {str(e)}")
|
|
db.rollback()
|
|
|
|
return result
|
|
|
|
|
|
def import_payments_data(db: Session, file_path: str) -> Dict[str, Any]:
|
|
"""
|
|
Import PAYMENTS CSV data into Payment model.
|
|
|
|
Expected CSV format: File_No,Date,Amount,Type,Description,Check_Number
|
|
"""
|
|
result = {
|
|
'success': 0,
|
|
'errors': [],
|
|
'total_rows': 0
|
|
}
|
|
|
|
try:
|
|
with open(file_path, 'r', encoding='utf-8') as file:
|
|
reader = csv.DictReader(file)
|
|
|
|
headers = reader.fieldnames or []
|
|
if len(headers) < 2:
|
|
result['errors'].append("Invalid CSV format: expected at least 2 columns")
|
|
return result
|
|
|
|
for row_num, row in enumerate(reader, start=2):
|
|
result['total_rows'] += 1
|
|
|
|
try:
|
|
file_no = row.get('File_No', '').strip()
|
|
if not file_no:
|
|
result['errors'].append(f"Row {row_num}: Missing file number")
|
|
continue
|
|
|
|
# Find the case
|
|
case = db.query(Case).filter(Case.file_no == file_no).first()
|
|
if not case:
|
|
result['errors'].append(f"Row {row_num}: Case with file number '{file_no}' not found")
|
|
continue
|
|
|
|
amount = parse_float(row.get('Amount', '0'))
|
|
if amount is None:
|
|
result['errors'].append(f"Row {row_num}: Invalid amount")
|
|
continue
|
|
|
|
payment = Payment(
|
|
case_id=case.id,
|
|
payment_date=parse_date(row.get('Date', '')),
|
|
payment_type=row.get('Type', '').strip() or None,
|
|
amount=amount,
|
|
description=row.get('Description', '').strip() or None,
|
|
check_number=row.get('Check_Number', '').strip() or None
|
|
)
|
|
|
|
db.add(payment)
|
|
result['success'] += 1
|
|
|
|
except Exception as e:
|
|
result['errors'].append(f"Row {row_num}: {str(e)}")
|
|
|
|
db.commit()
|
|
|
|
except Exception as e:
|
|
result['errors'].append(f"Import failed: {str(e)}")
|
|
db.rollback()
|
|
|
|
return result
|
|
|
|
|
|
def process_csv_import(db: Session, import_type: str, file_path: str) -> Dict[str, Any]:
|
|
"""
|
|
Process CSV import based on type.
|
|
|
|
Args:
|
|
db: Database session
|
|
import_type: Type of import (client, phone, case, transaction, document, payment)
|
|
file_path: Path to CSV file
|
|
|
|
Returns:
|
|
Dict with import results
|
|
"""
|
|
import_functions = {
|
|
'client': import_rolodex_data,
|
|
'phone': import_phone_data,
|
|
'case': import_files_data,
|
|
'transaction': import_ledger_data,
|
|
'document': import_qdros_data,
|
|
'payment': import_payments_data
|
|
}
|
|
|
|
import_func = import_functions.get(import_type)
|
|
if not import_func:
|
|
return {
|
|
'success': 0,
|
|
'errors': [f"Unknown import type: {import_type}"],
|
|
'total_rows': 0
|
|
}
|
|
|
|
return import_func(db, file_path)
|
|
|
|
|
|
@app.get("/")
|
|
async def root():
|
|
"""
|
|
Root endpoint - serves login form for web interface.
|
|
"""
|
|
return RedirectResponse(url="/login", status_code=302)
|
|
|
|
|
|
@app.get("/health")
|
|
async def health_check(db: Session = Depends(get_db)):
|
|
"""
|
|
Health check endpoint that verifies database connectivity.
|
|
"""
|
|
try:
|
|
# Test database connection by querying user count
|
|
user_count = db.query(User).count()
|
|
return {
|
|
"status": "healthy",
|
|
"database": "connected",
|
|
"users": user_count
|
|
}
|
|
except Exception as e:
|
|
logger.error("health_check_failed", error=str(e))
|
|
return {
|
|
"status": "unhealthy",
|
|
"database": "error",
|
|
"error": str(e)
|
|
}
|
|
|
|
|
|
@app.get("/login")
|
|
async def login_form(request: Request):
|
|
"""
|
|
Display login form.
|
|
|
|
If user is already logged in, redirect to dashboard.
|
|
"""
|
|
# Check if user is already logged in
|
|
user = get_current_user_from_session(request.session)
|
|
if user:
|
|
return RedirectResponse(url="/dashboard", status_code=302)
|
|
|
|
return templates.TemplateResponse("login.html", {"request": request})
|
|
|
|
|
|
@app.post("/login")
|
|
async def login_submit(request: Request, db: Session = Depends(get_db)):
|
|
"""
|
|
Handle login form submission.
|
|
|
|
Authenticates user credentials and sets up session.
|
|
"""
|
|
form = await request.form()
|
|
username = form.get("username")
|
|
password = form.get("password")
|
|
|
|
if not username or not password:
|
|
error_message = "Username and password are required"
|
|
logger.warning("login_failed", username=username, reason="missing_credentials")
|
|
return templates.TemplateResponse("login.html", {
|
|
"request": request,
|
|
"error": error_message
|
|
})
|
|
|
|
# Authenticate user
|
|
user = authenticate_user(username, password)
|
|
if not user:
|
|
error_message = "Invalid username or password"
|
|
logger.warning("login_failed", username=username, reason="invalid_credentials")
|
|
return templates.TemplateResponse("login.html", {
|
|
"request": request,
|
|
"error": error_message
|
|
})
|
|
|
|
# Set up user session
|
|
request.session["user_id"] = user.id
|
|
request.session["user"] = {"id": user.id, "username": user.username}
|
|
|
|
# Update bound context with authenticated user id
|
|
structlog_contextvars.bind_contextvars(**{"user.id": user.id})
|
|
logger.info("login_success", username=username, **{"user.id": user.id})
|
|
|
|
# Redirect to dashboard after successful login
|
|
return RedirectResponse(url="/dashboard", status_code=302)
|
|
|
|
|
|
@app.get("/logout")
|
|
async def logout(request: Request):
|
|
"""
|
|
Handle user logout.
|
|
|
|
Clears user session and redirects to home page.
|
|
"""
|
|
username = request.session.get("user", {}).get("username", "unknown")
|
|
request.session.clear()
|
|
logger.info("logout", username=username)
|
|
|
|
return RedirectResponse(url="/", status_code=302)
|
|
|
|
|
|
@app.get("/dashboard")
|
|
async def dashboard(
|
|
request: Request,
|
|
q: str | None = Query(None, description="Search by file number or client name"),
|
|
page: int = Query(1, ge=1, description="Page number (1-indexed)"),
|
|
page_size: int = Query(20, ge=1, le=100, description="Results per page"),
|
|
db: Session = Depends(get_db),
|
|
):
|
|
"""
|
|
Dashboard page - lists recent cases with search and pagination.
|
|
|
|
- Optional query param `q` filters by case file number or client name/company
|
|
- `page` and `page_size` control pagination
|
|
"""
|
|
# Check authentication
|
|
user = get_current_user_from_session(request.session)
|
|
if not user:
|
|
return RedirectResponse(url="/login", status_code=302)
|
|
|
|
# Base query: join clients for name/company access
|
|
query = db.query(Case).join(Client).order_by(
|
|
Case.open_date.desc(),
|
|
Case.created_at.desc(),
|
|
)
|
|
|
|
# Apply search filter if provided
|
|
if q:
|
|
like_term = f"%{q}%"
|
|
query = query.filter(
|
|
or_(
|
|
Case.file_no.ilike(like_term),
|
|
Client.first_name.ilike(like_term),
|
|
Client.last_name.ilike(like_term),
|
|
Client.company.ilike(like_term),
|
|
)
|
|
)
|
|
|
|
# Total count for pagination
|
|
total: int = query.count()
|
|
|
|
# Clamp page to valid range when total is known
|
|
total_pages: int = (total + page_size - 1) // page_size if total > 0 else 1
|
|
if page > total_pages:
|
|
page = total_pages
|
|
|
|
# Pagination window
|
|
offset = (page - 1) * page_size
|
|
cases = query.offset(offset).limit(page_size).all()
|
|
|
|
# Page number window for UI (current +/- 2)
|
|
start_page = max(1, page - 2)
|
|
end_page = min(total_pages, page + 2)
|
|
page_numbers = list(range(start_page, end_page + 1))
|
|
|
|
logger.info(
|
|
"dashboard_render",
|
|
query=q,
|
|
page=page,
|
|
page_size=page_size,
|
|
total=total,
|
|
)
|
|
|
|
return templates.TemplateResponse(
|
|
"dashboard.html",
|
|
{
|
|
"request": request,
|
|
"user": user,
|
|
"cases": cases,
|
|
"q": q,
|
|
"page": page,
|
|
"page_size": page_size,
|
|
"total": total,
|
|
"total_pages": total_pages,
|
|
"page_numbers": page_numbers,
|
|
"start_index": (offset + 1) if total > 0 else 0,
|
|
"end_index": min(offset + len(cases), total),
|
|
},
|
|
)
|
|
|
|
|
|
@app.post("/admin/upload")
|
|
async def admin_upload_files(
|
|
request: Request,
|
|
files: List[UploadFile] = File(...),
|
|
db: Session = Depends(get_db)
|
|
):
|
|
"""
|
|
Handle CSV file uploads for admin panel.
|
|
|
|
Validates uploaded files are CSV format and stores them in data-import directory.
|
|
"""
|
|
# Check authentication
|
|
user = get_current_user_from_session(request.session)
|
|
if not user:
|
|
return RedirectResponse(url="/login", status_code=302)
|
|
|
|
results = []
|
|
errors = []
|
|
|
|
# Ensure data-import directory exists
|
|
import_dir = "data-import"
|
|
os.makedirs(import_dir, exist_ok=True)
|
|
|
|
for file in files:
|
|
try:
|
|
# Validate file type
|
|
if not file.filename.lower().endswith('.csv'):
|
|
errors.append(f"File '{file.filename}' is not a CSV file")
|
|
continue
|
|
|
|
# Generate unique filename to avoid conflicts
|
|
file_id = str(uuid.uuid4())
|
|
file_ext = os.path.splitext(file.filename)[1]
|
|
stored_filename = f"{file_id}{file_ext}"
|
|
file_path = os.path.join(import_dir, stored_filename)
|
|
|
|
# Save file
|
|
contents = await file.read()
|
|
with open(file_path, "wb") as f:
|
|
f.write(contents)
|
|
|
|
# Determine import type from filename
|
|
try:
|
|
import_type = get_import_type_from_filename(file.filename)
|
|
except ValueError as e:
|
|
errors.append(f"File '{file.filename}': {str(e)}")
|
|
# Clean up uploaded file
|
|
os.remove(file_path)
|
|
continue
|
|
|
|
results.append({
|
|
'filename': file.filename,
|
|
'stored_filename': stored_filename,
|
|
'import_type': import_type,
|
|
'file_path': file_path,
|
|
'size': len(contents)
|
|
})
|
|
|
|
except Exception as e:
|
|
errors.append(f"Error processing '{file.filename}': {str(e)}")
|
|
continue
|
|
|
|
# Log the upload operation
|
|
logger.info(
|
|
"admin_upload",
|
|
uploaded_count=len(results),
|
|
error_count=len(errors),
|
|
username=user.username,
|
|
)
|
|
|
|
return templates.TemplateResponse("admin.html", {
|
|
"request": request,
|
|
"user": user,
|
|
"upload_results": results,
|
|
"upload_errors": errors,
|
|
"show_upload_results": True
|
|
})
|
|
|
|
|
|
@app.post("/admin/import/{data_type}")
|
|
async def admin_import_data(
|
|
request: Request,
|
|
data_type: str,
|
|
db: Session = Depends(get_db)
|
|
):
|
|
"""
|
|
Process CSV import for specified data type.
|
|
|
|
Creates import log entry and processes the import in the background.
|
|
"""
|
|
# Check authentication
|
|
user = get_current_user_from_session(request.session)
|
|
if not user:
|
|
return RedirectResponse(url="/login", status_code=302)
|
|
|
|
# Validate data type
|
|
valid_types = ['client', 'phone', 'case', 'transaction', 'document', 'payment']
|
|
if data_type not in valid_types:
|
|
return templates.TemplateResponse("admin.html", {
|
|
"request": request,
|
|
"user": user,
|
|
"error": f"Invalid data type: {data_type}"
|
|
})
|
|
|
|
# Get form data for file selection
|
|
form = await request.form()
|
|
selected_files = form.getlist("selected_files")
|
|
|
|
if not selected_files:
|
|
return templates.TemplateResponse("admin.html", {
|
|
"request": request,
|
|
"user": user,
|
|
"error": "No files selected for import"
|
|
})
|
|
|
|
import_results = []
|
|
total_success = 0
|
|
total_errors = 0
|
|
|
|
for stored_filename in selected_files:
|
|
file_path = os.path.join("data-import", stored_filename)
|
|
|
|
if not os.path.exists(file_path):
|
|
import_results.append({
|
|
'filename': stored_filename,
|
|
'status': 'error',
|
|
'message': 'File not found'
|
|
})
|
|
total_errors += 1
|
|
continue
|
|
|
|
# Create import log entry
|
|
import_log = ImportLog(
|
|
import_type=data_type,
|
|
file_name=stored_filename,
|
|
file_path=file_path,
|
|
status="running"
|
|
)
|
|
db.add(import_log)
|
|
db.commit()
|
|
|
|
try:
|
|
# Process the import
|
|
result = process_csv_import(db, data_type, file_path)
|
|
|
|
# Update import log
|
|
import_log.status = "completed" if result['errors'] else "failed"
|
|
import_log.total_rows = result['total_rows']
|
|
import_log.success_count = result['success']
|
|
import_log.error_count = len(result['errors'])
|
|
import_log.error_details = json.dumps(result['errors'])
|
|
import_log.completed_at = datetime.now()
|
|
|
|
db.commit()
|
|
|
|
import_results.append({
|
|
'filename': stored_filename,
|
|
'status': 'success' if result['success'] > 0 else 'error',
|
|
'total_rows': result['total_rows'],
|
|
'success_count': result['success'],
|
|
'error_count': len(result['errors']),
|
|
'errors': result['errors'][:10] # Show first 10 errors
|
|
})
|
|
|
|
total_success += result['success']
|
|
total_errors += len(result['errors'])
|
|
|
|
except Exception as e:
|
|
# Update import log on error
|
|
import_log.status = "failed"
|
|
import_log.error_details = json.dumps([str(e)])
|
|
import_log.completed_at = datetime.now()
|
|
db.commit()
|
|
|
|
import_results.append({
|
|
'filename': stored_filename,
|
|
'status': 'error',
|
|
'message': str(e)
|
|
})
|
|
total_errors += 1
|
|
|
|
# Log the import operation
|
|
logger.info(
|
|
"admin_import",
|
|
import_type=data_type,
|
|
success_count=total_success,
|
|
error_count=total_errors,
|
|
username=user.username,
|
|
)
|
|
|
|
return templates.TemplateResponse("admin.html", {
|
|
"request": request,
|
|
"user": user,
|
|
"import_results": import_results,
|
|
"total_success": total_success,
|
|
"total_errors": total_errors,
|
|
"show_import_results": True
|
|
})
|
|
|
|
|
|
@app.get("/admin")
|
|
async def admin_panel(request: Request, db: Session = Depends(get_db)):
|
|
"""
|
|
Admin panel - requires authentication.
|
|
|
|
Provides administrative functions like data import and system management.
|
|
"""
|
|
# Check authentication
|
|
user = get_current_user_from_session(request.session)
|
|
if not user:
|
|
return RedirectResponse(url="/login", status_code=302)
|
|
|
|
# Get recent import history
|
|
recent_imports = db.query(ImportLog).order_by(ImportLog.created_at.desc()).limit(10).all()
|
|
|
|
# Get available files for import
|
|
import_dir = "data-import"
|
|
available_files = []
|
|
if os.path.exists(import_dir):
|
|
for filename in os.listdir(import_dir):
|
|
if filename.endswith('.csv'):
|
|
file_path = os.path.join(import_dir, filename)
|
|
file_size = os.path.getsize(file_path)
|
|
try:
|
|
import_type = get_import_type_from_filename(filename)
|
|
except ValueError:
|
|
import_type = 'unknown'
|
|
|
|
available_files.append({
|
|
'filename': filename,
|
|
'import_type': import_type,
|
|
'size': file_size,
|
|
'modified': datetime.fromtimestamp(os.path.getmtime(file_path))
|
|
})
|
|
|
|
# Group files by import type
|
|
files_by_type = {}
|
|
for file_info in available_files:
|
|
import_type = file_info['import_type']
|
|
if import_type not in files_by_type:
|
|
files_by_type[import_type] = []
|
|
files_by_type[import_type].append(file_info)
|
|
|
|
return templates.TemplateResponse("admin.html", {
|
|
"request": request,
|
|
"user": user,
|
|
"recent_imports": recent_imports,
|
|
"available_files": available_files,
|
|
"files_by_type": files_by_type
|
|
})
|
|
|
|
|
|
@app.get("/case/{case_id}")
|
|
async def case_detail(
|
|
request: Request,
|
|
case_id: int,
|
|
saved: bool = Query(False, description="Whether to show success message"),
|
|
db: Session = Depends(get_db),
|
|
):
|
|
"""
|
|
Case detail view.
|
|
|
|
Displays detailed information for a single case and its related client and
|
|
associated records (transactions, documents, payments).
|
|
"""
|
|
# Check authentication
|
|
user = get_current_user_from_session(request.session)
|
|
if not user:
|
|
return RedirectResponse(url="/login", status_code=302)
|
|
|
|
# Fetch case with related entities eagerly loaded to avoid lazy-load issues
|
|
case_obj = (
|
|
db.query(Case)
|
|
.options(
|
|
joinedload(Case.client),
|
|
joinedload(Case.transactions),
|
|
joinedload(Case.documents),
|
|
joinedload(Case.payments),
|
|
)
|
|
.filter(Case.id == case_id)
|
|
.first()
|
|
)
|
|
|
|
if not case_obj:
|
|
logger.warning("case_not_found", case_id=case_id)
|
|
# Get any errors from session and clear them
|
|
errors = request.session.pop("case_update_errors", None)
|
|
|
|
return templates.TemplateResponse(
|
|
"case.html",
|
|
{
|
|
"request": request,
|
|
"user": user,
|
|
"case": None,
|
|
"error": "Case not found",
|
|
"saved": False,
|
|
"errors": errors or [],
|
|
},
|
|
status_code=404,
|
|
)
|
|
|
|
logger.info("case_detail", case_id=case_obj.id, file_no=case_obj.file_no)
|
|
|
|
# Get any errors from session and clear them
|
|
errors = request.session.pop("case_update_errors", None)
|
|
|
|
return templates.TemplateResponse(
|
|
"case.html",
|
|
{
|
|
"request": request,
|
|
"user": user,
|
|
"case": case_obj,
|
|
"saved": saved,
|
|
"errors": errors or [],
|
|
},
|
|
)
|
|
|
|
|
|
@app.post("/case/{case_id}/update")
|
|
async def case_update(
|
|
request: Request,
|
|
case_id: int,
|
|
db: Session = Depends(get_db),
|
|
) -> RedirectResponse:
|
|
"""
|
|
Update case details.
|
|
|
|
Updates the specified fields on a case and redirects back to the case detail view.
|
|
"""
|
|
# Check authentication
|
|
user = get_current_user_from_session(request.session)
|
|
if not user:
|
|
return RedirectResponse(url="/login", status_code=302)
|
|
|
|
# Get form data
|
|
form = await request.form()
|
|
|
|
# Fetch the case
|
|
case_obj = db.query(Case).filter(Case.id == case_id).first()
|
|
if not case_obj:
|
|
logger.warning("case_not_found_update", case_id=case_id)
|
|
return RedirectResponse(url=f"/case/{case_id}", status_code=302)
|
|
|
|
# Validate and process fields
|
|
errors = []
|
|
update_data = {}
|
|
|
|
# Status validation
|
|
status = form.get("status")
|
|
if status is not None:
|
|
if status not in ["active", "closed"]:
|
|
errors.append("Status must be 'active' or 'closed'")
|
|
else:
|
|
update_data["status"] = status
|
|
|
|
# Case type and description (optional)
|
|
case_type = form.get("case_type")
|
|
if case_type is not None:
|
|
update_data["case_type"] = case_type.strip() if case_type.strip() else None
|
|
|
|
description = form.get("description")
|
|
if description is not None:
|
|
update_data["description"] = description.strip() if description.strip() else None
|
|
|
|
# Date validation and parsing
|
|
open_date = form.get("open_date")
|
|
if open_date is not None:
|
|
if open_date.strip():
|
|
try:
|
|
update_data["open_date"] = datetime.strptime(open_date.strip(), "%Y-%m-%d")
|
|
except ValueError:
|
|
errors.append("Open date must be in YYYY-MM-DD format")
|
|
else:
|
|
update_data["open_date"] = None
|
|
|
|
close_date = form.get("close_date")
|
|
if close_date is not None:
|
|
if close_date.strip():
|
|
try:
|
|
update_data["close_date"] = datetime.strptime(close_date.strip(), "%Y-%m-%d")
|
|
except ValueError:
|
|
errors.append("Close date must be in YYYY-MM-DD format")
|
|
else:
|
|
update_data["close_date"] = None
|
|
|
|
# If there are validation errors, redirect back with errors
|
|
if errors:
|
|
# Store errors in session for display on the case page
|
|
request.session["case_update_errors"] = errors
|
|
return RedirectResponse(url=f"/case/{case_id}", status_code=302)
|
|
|
|
# Apply updates
|
|
try:
|
|
changed_fields = {}
|
|
for field, value in update_data.items():
|
|
old_value = getattr(case_obj, field)
|
|
if old_value != value:
|
|
changed_fields[field] = {"old": old_value, "new": value}
|
|
setattr(case_obj, field, value)
|
|
|
|
db.commit()
|
|
logger.info(
|
|
"case_update",
|
|
case_id=case_id,
|
|
changed_fields=list(update_data.keys()),
|
|
changed_details=changed_fields,
|
|
)
|
|
|
|
# Clear any previous errors from session
|
|
request.session.pop("case_update_errors", None)
|
|
|
|
return RedirectResponse(url=f"/case/{case_id}?saved=1", status_code=302)
|
|
|
|
except Exception as e:
|
|
db.rollback()
|
|
logger.error("case_update_failed", case_id=case_id, error=str(e))
|
|
|
|
# Store error in session for display
|
|
request.session["case_update_errors"] = ["Failed to save changes. Please try again."]
|
|
|
|
return RedirectResponse(url=f"/case/{case_id}", status_code=302)
|
|
|
|
|
|
@app.post("/case/{case_id}/close")
|
|
async def case_close(
|
|
request: Request,
|
|
case_id: int,
|
|
db: Session = Depends(get_db),
|
|
) -> RedirectResponse:
|
|
"""
|
|
Close a case.
|
|
|
|
Sets the case status to 'closed' and sets close_date to current date if not already set.
|
|
"""
|
|
# Check authentication
|
|
user = get_current_user_from_session(request.session)
|
|
if not user:
|
|
return RedirectResponse(url="/login", status_code=302)
|
|
|
|
# Fetch the case
|
|
case_obj = db.query(Case).filter(Case.id == case_id).first()
|
|
if not case_obj:
|
|
logger.warning("case_not_found_close", case_id=case_id)
|
|
return RedirectResponse(url=f"/case/{case_id}", status_code=302)
|
|
|
|
# Update case
|
|
try:
|
|
case_obj.status = "closed"
|
|
# Only set close_date if it's not already set
|
|
if not case_obj.close_date:
|
|
case_obj.close_date = datetime.now()
|
|
|
|
db.commit()
|
|
logger.info("case_closed", case_id=case_id, close_date=case_obj.close_date.isoformat() if case_obj.close_date else None)
|
|
|
|
return RedirectResponse(url=f"/case/{case_id}?saved=1", status_code=302)
|
|
|
|
except Exception as e:
|
|
db.rollback()
|
|
logger.error("case_close_failed", case_id=case_id, error=str(e))
|
|
|
|
# Store error in session for display
|
|
request.session["case_update_errors"] = ["Failed to close case. Please try again."]
|
|
|
|
return RedirectResponse(url=f"/case/{case_id}", status_code=302)
|
|
|
|
|
|
@app.post("/case/{case_id}/reopen")
|
|
async def case_reopen(
|
|
request: Request,
|
|
case_id: int,
|
|
db: Session = Depends(get_db),
|
|
) -> RedirectResponse:
|
|
"""
|
|
Reopen a case.
|
|
|
|
Sets the case status to 'active' and clears the close_date.
|
|
"""
|
|
# Check authentication
|
|
user = get_current_user_from_session(request.session)
|
|
if not user:
|
|
return RedirectResponse(url="/login", status_code=302)
|
|
|
|
# Fetch the case
|
|
case_obj = db.query(Case).filter(Case.id == case_id).first()
|
|
if not case_obj:
|
|
logger.warning("case_not_found_reopen", case_id=case_id)
|
|
return RedirectResponse(url=f"/case/{case_id}", status_code=302)
|
|
|
|
# Update case
|
|
try:
|
|
case_obj.status = "active"
|
|
case_obj.close_date = None
|
|
|
|
db.commit()
|
|
logger.info("case_reopened", case_id=case_id)
|
|
|
|
return RedirectResponse(url=f"/case/{case_id}?saved=1", status_code=302)
|
|
|
|
except Exception as e:
|
|
db.rollback()
|
|
logger.error("case_reopen_failed", case_id=case_id, error=str(e))
|
|
|
|
# Store error in session for display
|
|
request.session["case_update_errors"] = ["Failed to reopen case. Please try again."]
|
|
|
|
return RedirectResponse(url=f"/case/{case_id}", status_code=302)
|
|
|
|
|
|
@app.get("/rolodex")
|
|
async def rolodex_list(
|
|
request: Request,
|
|
q: str | None = Query(None, description="Search by name or company"),
|
|
phone: str | None = Query(None, description="Search by phone contains"),
|
|
page: int = Query(1, ge=1, description="Page number (1-indexed)"),
|
|
page_size: int = Query(20, ge=1, le=100, description="Results per page"),
|
|
db: Session = Depends(get_db),
|
|
):
|
|
"""
|
|
Rolodex list with simple search and pagination.
|
|
|
|
Filters clients by name/company and optional phone substring.
|
|
"""
|
|
user = get_current_user_from_session(request.session)
|
|
if not user:
|
|
return RedirectResponse(url="/login", status_code=302)
|
|
|
|
# Eager-load phones to avoid N+1 in template
|
|
query = db.query(Client).options(joinedload(Client.phones))
|
|
|
|
if q:
|
|
like = f"%{q}%"
|
|
query = query.filter(
|
|
or_(
|
|
Client.first_name.ilike(like),
|
|
Client.last_name.ilike(like),
|
|
Client.company.ilike(like),
|
|
)
|
|
)
|
|
|
|
if phone:
|
|
like_phone = f"%{phone}%"
|
|
# Use EXISTS over join to avoid duplicate rows
|
|
query = query.filter(Client.phones.any(Phone.phone_number.ilike(like_phone)))
|
|
|
|
# Order by last then first for stable display
|
|
query = query.order_by(Client.last_name.asc().nulls_last(), Client.first_name.asc().nulls_last())
|
|
|
|
total: int = query.count()
|
|
total_pages: int = (total + page_size - 1) // page_size if total > 0 else 1
|
|
if page > total_pages:
|
|
page = total_pages
|
|
|
|
offset = (page - 1) * page_size
|
|
clients = query.offset(offset).limit(page_size).all()
|
|
|
|
start_page = max(1, page - 2)
|
|
end_page = min(total_pages, page + 2)
|
|
page_numbers = list(range(start_page, end_page + 1))
|
|
|
|
logger.info(
|
|
"rolodex_render",
|
|
query=q,
|
|
phone=phone,
|
|
page=page,
|
|
page_size=page_size,
|
|
total=total,
|
|
)
|
|
|
|
return templates.TemplateResponse(
|
|
"rolodex.html",
|
|
{
|
|
"request": request,
|
|
"user": user,
|
|
"clients": clients,
|
|
"q": q,
|
|
"phone": phone,
|
|
"page": page,
|
|
"page_size": page_size,
|
|
"total": total,
|
|
"total_pages": total_pages,
|
|
"page_numbers": page_numbers,
|
|
"start_index": (offset + 1) if total > 0 else 0,
|
|
"end_index": min(offset + len(clients), total),
|
|
"enable_bulk": True,
|
|
},
|
|
)
|
|
|
|
|
|
@app.get("/rolodex/new")
|
|
async def rolodex_new(request: Request):
|
|
user = get_current_user_from_session(request.session)
|
|
if not user:
|
|
return RedirectResponse(url="/login", status_code=302)
|
|
|
|
return templates.TemplateResponse("rolodex_edit.html", {"request": request, "user": user, "client": None})
|
|
|
|
|
|
@app.get("/rolodex/{client_id}")
|
|
async def rolodex_view(client_id: int, request: Request, db: Session = Depends(get_db)):
|
|
user = get_current_user_from_session(request.session)
|
|
if not user:
|
|
return RedirectResponse(url="/login", status_code=302)
|
|
|
|
client = (
|
|
db.query(Client)
|
|
.options(joinedload(Client.phones), joinedload(Client.cases))
|
|
.filter(Client.id == client_id)
|
|
.first()
|
|
)
|
|
if not client:
|
|
raise HTTPException(status_code=404, detail="Client not found")
|
|
|
|
return templates.TemplateResponse("rolodex_view.html", {"request": request, "user": user, "client": client})
|
|
|
|
|
|
@app.post("/rolodex/create")
|
|
async def rolodex_create(
|
|
request: Request,
|
|
first_name: str = Form(None),
|
|
last_name: str = Form(None),
|
|
company: str = Form(None),
|
|
address: str = Form(None),
|
|
city: str = Form(None),
|
|
state: str = Form(None),
|
|
zip_code: str = Form(None),
|
|
rolodex_id: str = Form(None),
|
|
db: Session = Depends(get_db),
|
|
):
|
|
user = get_current_user_from_session(request.session)
|
|
if not user:
|
|
return RedirectResponse(url="/login", status_code=302)
|
|
|
|
client = Client(
|
|
first_name=(first_name or "").strip() or None,
|
|
last_name=(last_name or "").strip() or None,
|
|
company=(company or "").strip() or None,
|
|
address=(address or "").strip() or None,
|
|
city=(city or "").strip() or None,
|
|
state=(state or "").strip() or None,
|
|
zip_code=(zip_code or "").strip() or None,
|
|
rolodex_id=(rolodex_id or "").strip() or None,
|
|
)
|
|
db.add(client)
|
|
db.commit()
|
|
db.refresh(client)
|
|
logger.info("rolodex_create", client_id=client.id, rolodex_id=client.rolodex_id)
|
|
return RedirectResponse(url=f"/rolodex/{client.id}", status_code=302)
|
|
|
|
|
|
@app.post("/rolodex/{client_id}/update")
|
|
async def rolodex_update(
|
|
client_id: int,
|
|
request: Request,
|
|
first_name: str = Form(None),
|
|
last_name: str = Form(None),
|
|
company: str = Form(None),
|
|
address: str = Form(None),
|
|
city: str = Form(None),
|
|
state: str = Form(None),
|
|
zip_code: str = Form(None),
|
|
rolodex_id: str = Form(None),
|
|
db: Session = Depends(get_db),
|
|
):
|
|
user = get_current_user_from_session(request.session)
|
|
if not user:
|
|
return RedirectResponse(url="/login", status_code=302)
|
|
|
|
client = db.query(Client).filter(Client.id == client_id).first()
|
|
if not client:
|
|
raise HTTPException(status_code=404, detail="Client not found")
|
|
|
|
client.first_name = (first_name or "").strip() or None
|
|
client.last_name = (last_name or "").strip() or None
|
|
client.company = (company or "").strip() or None
|
|
client.address = (address or "").strip() or None
|
|
client.city = (city or "").strip() or None
|
|
client.state = (state or "").strip() or None
|
|
client.zip_code = (zip_code or "").strip() or None
|
|
client.rolodex_id = (rolodex_id or "").strip() or None
|
|
|
|
db.commit()
|
|
logger.info(
|
|
"rolodex_update",
|
|
client_id=client.id,
|
|
fields={
|
|
"first_name": client.first_name,
|
|
"last_name": client.last_name,
|
|
"company": client.company,
|
|
"rolodex_id": client.rolodex_id,
|
|
},
|
|
)
|
|
return RedirectResponse(url=f"/rolodex/{client.id}", status_code=302)
|
|
|
|
|
|
@app.post("/rolodex/{client_id}/delete")
|
|
async def rolodex_delete(client_id: int, request: Request, db: Session = Depends(get_db)):
|
|
user = get_current_user_from_session(request.session)
|
|
if not user:
|
|
return RedirectResponse(url="/login", status_code=302)
|
|
|
|
client = db.query(Client).filter(Client.id == client_id).first()
|
|
if not client:
|
|
raise HTTPException(status_code=404, detail="Client not found")
|
|
|
|
db.delete(client)
|
|
db.commit()
|
|
logger.info("rolodex_delete", client_id=client_id)
|
|
return RedirectResponse(url="/rolodex", status_code=302)
|
|
|
|
|
|
@app.post("/rolodex/{client_id}/phone/add")
|
|
async def rolodex_add_phone(
|
|
client_id: int,
|
|
request: Request,
|
|
phone_number: str = Form(...),
|
|
phone_type: str = Form(None),
|
|
db: Session = Depends(get_db),
|
|
):
|
|
user = get_current_user_from_session(request.session)
|
|
if not user:
|
|
return RedirectResponse(url="/login", status_code=302)
|
|
|
|
client = db.query(Client).filter(Client.id == client_id).first()
|
|
if not client:
|
|
raise HTTPException(status_code=404, detail="Client not found")
|
|
|
|
phone = Phone(
|
|
client_id=client.id,
|
|
phone_number=(phone_number or "").strip(),
|
|
phone_type=(phone_type or "").strip() or None,
|
|
)
|
|
db.add(phone)
|
|
db.commit()
|
|
logger.info("rolodex_phone_add", client_id=client.id, phone_id=phone.id, number=phone.phone_number)
|
|
return RedirectResponse(url=f"/rolodex/{client.id}", status_code=302)
|
|
|
|
|
|
@app.post("/rolodex/{client_id}/phone/{phone_id}/delete")
|
|
async def rolodex_delete_phone(client_id: int, phone_id: int, request: Request, db: Session = Depends(get_db)):
|
|
user = get_current_user_from_session(request.session)
|
|
if not user:
|
|
return RedirectResponse(url="/login", status_code=302)
|
|
|
|
phone = db.query(Phone).filter(Phone.id == phone_id, Phone.client_id == client_id).first()
|
|
if not phone:
|
|
raise HTTPException(status_code=404, detail="Phone not found")
|
|
|
|
db.delete(phone)
|
|
db.commit()
|
|
logger.info("rolodex_phone_delete", client_id=client_id, phone_id=phone_id)
|
|
return RedirectResponse(url=f"/rolodex/{client_id}", status_code=302)
|
|
|
|
|
|
@app.get("/payments")
|
|
async def payments_search(
|
|
request: Request,
|
|
from_date: str | None = Query(None, description="YYYY-MM-DD"),
|
|
to_date: str | None = Query(None, description="YYYY-MM-DD"),
|
|
file_no: str | None = Query(None, description="Case file number"),
|
|
rolodex_id: str | None = Query(None, description="Legacy client Id"),
|
|
q: str | None = Query(None, description="Description contains"),
|
|
page: int = Query(1, ge=1),
|
|
page_size: int = Query(50, ge=1, le=200),
|
|
db: Session = Depends(get_db),
|
|
):
|
|
user = get_current_user_from_session(request.session)
|
|
if not user:
|
|
return RedirectResponse(url="/login", status_code=302)
|
|
|
|
query = (
|
|
db.query(Payment)
|
|
.join(Case, Payment.case_id == Case.id)
|
|
.join(Client, Case.client_id == Client.id)
|
|
.order_by(Payment.payment_date.desc().nulls_last(), Payment.id.desc())
|
|
)
|
|
|
|
filters = []
|
|
if from_date:
|
|
try:
|
|
dt = datetime.strptime(from_date, "%Y-%m-%d")
|
|
filters.append(Payment.payment_date >= dt)
|
|
except ValueError:
|
|
pass
|
|
if to_date:
|
|
try:
|
|
dt = datetime.strptime(to_date, "%Y-%m-%d")
|
|
filters.append(Payment.payment_date <= dt)
|
|
except ValueError:
|
|
pass
|
|
if file_no:
|
|
filters.append(Case.file_no.ilike(f"%{file_no}%"))
|
|
if rolodex_id:
|
|
filters.append(Client.rolodex_id.ilike(f"%{rolodex_id}%"))
|
|
if q:
|
|
filters.append(Payment.description.ilike(f"%{q}%"))
|
|
|
|
if filters:
|
|
query = query.filter(and_(*filters))
|
|
|
|
total = query.count()
|
|
total_pages = (total + page_size - 1) // page_size if total > 0 else 1
|
|
if page > total_pages:
|
|
page = total_pages
|
|
offset = (page - 1) * page_size
|
|
payments = query.offset(offset).limit(page_size).all()
|
|
|
|
# Totals for current result page
|
|
page_total_amount = sum(p.amount or 0 for p in payments)
|
|
|
|
logger.info(
|
|
"payments_render",
|
|
from_date=from_date,
|
|
to_date=to_date,
|
|
file_no=file_no,
|
|
rolodex_id=rolodex_id,
|
|
q=q,
|
|
total=total,
|
|
)
|
|
|
|
return templates.TemplateResponse(
|
|
"payments.html",
|
|
{
|
|
"request": request,
|
|
"user": user,
|
|
"payments": payments,
|
|
"from_date": from_date,
|
|
"to_date": to_date,
|
|
"file_no": file_no,
|
|
"rolodex_id": rolodex_id,
|
|
"q": q,
|
|
"page": page,
|
|
"page_size": page_size,
|
|
"total": total,
|
|
"total_pages": total_pages,
|
|
"start_index": (offset + 1) if total > 0 else 0,
|
|
"end_index": min(offset + len(payments), total),
|
|
"page_total_amount": page_total_amount,
|
|
},
|
|
)
|
|
|
|
|
|
@app.post("/reports/phone-book")
|
|
async def phone_book_report_post(request: Request):
|
|
"""Accepts selected client IDs from forms and redirects to GET for rendering."""
|
|
user = get_current_user_from_session(request.session)
|
|
if not user:
|
|
return RedirectResponse(url="/login", status_code=302)
|
|
|
|
form = await request.form()
|
|
client_ids = form.getlist("client_ids")
|
|
if not client_ids:
|
|
return RedirectResponse(url="/rolodex", status_code=302)
|
|
|
|
ids_param = "&".join([f"client_ids={cid}" for cid in client_ids])
|
|
return RedirectResponse(url=f"/reports/phone-book?{ids_param}", status_code=302)
|
|
|
|
|
|
@app.get("/reports/phone-book")
|
|
async def phone_book_report(
|
|
request: Request,
|
|
client_ids: List[int] | None = Query(None),
|
|
q: str | None = Query(None, description="Filter by name/company"),
|
|
format: str | None = Query(None, description="csv for CSV output"),
|
|
db: Session = Depends(get_db),
|
|
):
|
|
user = get_current_user_from_session(request.session)
|
|
if not user:
|
|
return RedirectResponse(url="/login", status_code=302)
|
|
|
|
query = db.query(Client).options(joinedload(Client.phones))
|
|
if client_ids:
|
|
query = query.filter(Client.id.in_(client_ids))
|
|
elif q:
|
|
like = f"%{q}%"
|
|
query = query.filter(
|
|
or_(Client.first_name.ilike(like), Client.last_name.ilike(like), Client.company.ilike(like))
|
|
)
|
|
|
|
clients = query.order_by(Client.last_name.asc().nulls_last(), Client.first_name.asc().nulls_last()).all()
|
|
|
|
if format == "csv":
|
|
# Build CSV output
|
|
output = StringIO()
|
|
writer = csv.writer(output)
|
|
writer.writerow(["Last", "First", "Company", "Phone Type", "Phone Number"])
|
|
for c in clients:
|
|
if c.phones:
|
|
for p in c.phones:
|
|
writer.writerow([
|
|
c.last_name or "",
|
|
c.first_name or "",
|
|
c.company or "",
|
|
p.phone_type or "",
|
|
p.phone_number or "",
|
|
])
|
|
else:
|
|
writer.writerow([c.last_name or "", c.first_name or "", c.company or "", "", ""])
|
|
csv_bytes = output.getvalue().encode("utf-8")
|
|
return Response(
|
|
content=csv_bytes,
|
|
media_type="text/csv",
|
|
headers={"Content-Disposition": "attachment; filename=phone_book.csv"},
|
|
)
|
|
|
|
logger.info("phone_book_render", count=len(clients))
|
|
return templates.TemplateResponse(
|
|
"report_phone_book.html",
|
|
{"request": request, "user": user, "clients": clients, "q": q, "client_ids": client_ids or []},
|
|
)
|