""" FastAPI application entry point for Delphi Database. This module initializes the FastAPI application, sets up database connections, and provides the main application instance. """ import os import time import csv import json import uuid from contextlib import asynccontextmanager from datetime import datetime from typing import Optional, List, Dict, Any from io import StringIO from fastapi import FastAPI, Depends, Request, Query, HTTPException, UploadFile, File, Form from fastapi.responses import RedirectResponse from starlette.middleware.sessions import SessionMiddleware from fastapi.middleware.cors import CORSMiddleware from fastapi.staticfiles import StaticFiles from fastapi.templating import Jinja2Templates from sqlalchemy.orm import Session, joinedload from sqlalchemy import or_ from dotenv import load_dotenv from starlette.middleware.base import BaseHTTPMiddleware import structlog from structlog import contextvars as structlog_contextvars from .database import create_tables, get_db, get_database_url from .models import User, Case, Client, Phone, Transaction, Document, Payment, ImportLog from .auth import authenticate_user, get_current_user_from_session from .logging_config import setup_logging # Load environment variables load_dotenv() # Get SECRET_KEY from environment variables SECRET_KEY = os.getenv("SECRET_KEY") if not SECRET_KEY: raise ValueError("SECRET_KEY environment variable must be set") # Configure structured logging setup_logging() logger = structlog.get_logger(__name__) # Configure Jinja2 templates templates = Jinja2Templates(directory="app/templates") class AuthMiddleware(BaseHTTPMiddleware): """ Simple session-based authentication middleware. Redirects unauthenticated users to /login for protected routes. """ def __init__(self, app, exempt_paths: list[str] | None = None): super().__init__(app) self.exempt_paths = exempt_paths or [] async def dispatch(self, request, call_next): path = request.url.path # Allow exempt paths and static assets if ( path in self.exempt_paths or path.startswith("/static") or path.startswith("/favicon") ): return await call_next(request) # Enforce authentication for other paths if not request.session.get("user_id"): return RedirectResponse(url="/login", status_code=302) return await call_next(request) class RequestIdMiddleware(BaseHTTPMiddleware): """ Middleware that assigns a request_id and binds request context for logging. Adds: request_id, http.method, http.path, user.id to the structlog context. Emits a JSON access log with status_code and duration_ms after response. """ async def dispatch(self, request: Request, call_next): start_time = time.perf_counter() request_id = request.headers.get("X-Request-ID") or str(uuid.uuid4()) method = request.method path = request.url.path # user id from session if available (SessionMiddleware runs first) user_id = request.session.get("user_id") if hasattr(request, "session") else None structlog_contextvars.bind_contextvars( request_id=request_id, **{"http.method": method, "http.path": path, "user.id": user_id}, ) try: response = await call_next(request) status_code = response.status_code except Exception as exc: # noqa: BLE001 - we re-raise after logging status_code = 500 duration_ms = int((time.perf_counter() - start_time) * 1000) logger.error( "request", status_code=status_code, duration_ms=duration_ms, exc_info=True, ) structlog_contextvars.unbind_contextvars("request_id", "http.method", "http.path", "user.id") raise # Ensure response header has request id try: response.headers["X-Request-ID"] = request_id except Exception: pass duration_ms = int((time.perf_counter() - start_time) * 1000) logger.info( "request", status_code=status_code, duration_ms=duration_ms, ) structlog_contextvars.unbind_contextvars("request_id", "http.method", "http.path", "user.id") return response @asynccontextmanager async def lifespan(app: FastAPI): """ Lifespan context manager for FastAPI application. Handles startup and shutdown events: - Creates database tables on startup - Logs database connection info """ # Startup logger.info("app_start") # Create database tables create_tables() logger.info("db_tables_verified") # Log database connection info db_url = get_database_url() logger.info("db_connected", database_url=db_url) yield # Shutdown logger.info("app_shutdown") # Create FastAPI application with lifespan management app = FastAPI( title="Delphi Database", description="Legal case management database application", version="1.0.0", lifespan=lifespan ) # Add CORS middleware for cross-origin requests app.add_middleware( CORSMiddleware, allow_origins=["*"], # In production, specify allowed origins allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # Register request logging and authentication middleware with exempt paths EXEMPT_PATHS = ["/", "/health", "/login", "/logout"] app.add_middleware(RequestIdMiddleware) app.add_middleware(AuthMiddleware, exempt_paths=EXEMPT_PATHS) # Add SessionMiddleware for session management (must be added LAST so it runs FIRST) app.add_middleware(SessionMiddleware, secret_key=SECRET_KEY) # Mount static files directory app.mount("/static", StaticFiles(directory="static"), name="static") def get_import_type_from_filename(filename: str) -> str: """ Determine import type based on filename pattern. Args: filename: Name of the uploaded CSV file Returns: Import type string (client, phone, case, transaction, document, payment) """ filename_upper = filename.upper() if filename_upper.startswith('ROLODEX') or filename_upper.startswith('ROLEX'): return 'client' elif filename_upper.startswith('PHONE'): return 'phone' elif filename_upper.startswith('FILES'): return 'case' elif filename_upper.startswith('LEDGER'): return 'transaction' elif filename_upper.startswith('QDROS') or filename_upper.startswith('QDRO'): return 'document' elif filename_upper.startswith('PAYMENTS') or filename_upper.startswith('DEPOSITS'): return 'payment' else: raise ValueError(f"Unknown file type for filename: {filename}") def validate_csv_headers(headers: List[str], expected_fields: Dict[str, str]) -> Dict[str, Any]: """ Validate CSV headers against expected model fields. Args: headers: List of CSV column headers expected_fields: Dict mapping field names to descriptions Returns: Dict with validation results and field mapping """ result = { 'valid': True, 'missing_fields': [], 'field_mapping': {}, 'errors': [] } # Create mapping from CSV headers to model fields (case-insensitive) for csv_header in headers: csv_header_clean = csv_header.strip().lower() matched = False for model_field, description in expected_fields.items(): if csv_header_clean == model_field.lower(): result['field_mapping'][model_field] = csv_header matched = True break if not matched: # Try partial matches for common variations for model_field, description in expected_fields.items(): if model_field.lower() in csv_header_clean or csv_header_clean in model_field.lower(): result['field_mapping'][model_field] = csv_header matched = True break if not matched: result['errors'].append(f"Unknown header: '{csv_header}'") # Check for required fields (case-insensitive) required_fields = ['id'] # Most imports need some form of ID for required in required_fields: found = False for mapped_field in result['field_mapping']: if mapped_field.lower() == required.lower(): found = True break if not found: result['missing_fields'].append(required) if result['missing_fields'] or result['errors']: result['valid'] = False return result def parse_date(date_str: str) -> Optional[datetime]: """Parse date string into datetime object.""" if not date_str or date_str.strip() in ('', 'NULL', 'N/A'): return None # Try common date formats formats = ['%Y-%m-%d', '%m/%d/%Y', '%Y/%m/%d', '%d-%m-%Y'] for fmt in formats: try: return datetime.strptime(date_str.strip(), fmt) except ValueError: continue logger.warning("parse_date_failed", value=date_str) return None def parse_float(value: str) -> Optional[float]: """Parse string value into float.""" if not value or value.strip() in ('', 'NULL', 'N/A'): return None try: return float(value.strip()) except ValueError: logger.warning("parse_float_failed", value=value) return None def parse_int(value: str) -> Optional[int]: """Parse string value into int.""" if not value or value.strip() in ('', 'NULL', 'N/A'): return None try: return int(value.strip()) except ValueError: logger.warning("parse_int_failed", value=value) return None def import_rolodex_data(db: Session, file_path: str) -> Dict[str, Any]: """ Import ROLODEX CSV data into Client model. Expected CSV format: Id,Prefix,First,Middle,Last,Suffix,Title,A1,A2,A3,City,Abrev,St,Zip,Email,DOB,SS#,Legal_Status,Group,Memo """ result = { 'success': 0, 'errors': [], 'total_rows': 0 } expected_fields = { 'Id': 'Client ID', 'Prefix': 'Name Prefix', 'First': 'First Name', 'Middle': 'Middle Initial', 'Last': 'Last Name', 'Suffix': 'Name Suffix', 'Title': 'Company/Organization', 'A1': 'Address Line 1', 'A2': 'Address Line 2', 'A3': 'Address Line 3', 'City': 'City', 'Abrev': 'State Abbreviation', 'St': 'State', 'Zip': 'ZIP Code', 'Email': 'Email Address', 'DOB': 'Date of Birth', 'SS#': 'Social Security Number', 'Legal_Status': 'Legal Status', 'Group': 'Group', 'Memo': 'Memo/Notes' } try: with open(file_path, 'r', encoding='utf-8') as file: reader = csv.DictReader(file) # Validate headers headers = reader.fieldnames or [] validation = validate_csv_headers(headers, expected_fields) if not validation['valid']: result['errors'].append(f"Header validation failed: {validation['errors']}") return result for row_num, row in enumerate(reader, start=2): # Start at 2 (header is row 1) result['total_rows'] += 1 try: # Extract and clean data rolodex_id = row.get('Id', '').strip() if not rolodex_id: result['errors'].append(f"Row {row_num}: Missing client ID") continue # Check for existing client existing = db.query(Client).filter(Client.rolodex_id == rolodex_id).first() if existing: result['errors'].append(f"Row {row_num}: Client with ID '{rolodex_id}' already exists") continue client = Client( rolodex_id=rolodex_id, first_name=row.get('First', '').strip() or None, middle_initial=row.get('Middle', '').strip() or None, last_name=row.get('Last', '').strip() or None, company=row.get('Title', '').strip() or None, address=row.get('A1', '').strip() or None, city=row.get('City', '').strip() or None, state=row.get('St', '').strip() or None, zip_code=row.get('Zip', '').strip() or None ) db.add(client) result['success'] += 1 except Exception as e: result['errors'].append(f"Row {row_num}: {str(e)}") db.commit() except Exception as e: result['errors'].append(f"Import failed: {str(e)}") db.rollback() return result def import_phone_data(db: Session, file_path: str) -> Dict[str, Any]: """ Import PHONE CSV data into Phone model. Expected CSV format: Id,Phone,Location """ result = { 'success': 0, 'errors': [], 'total_rows': 0 } try: with open(file_path, 'r', encoding='utf-8') as file: reader = csv.DictReader(file) headers = reader.fieldnames or [] if len(headers) < 2: result['errors'].append("Invalid CSV format: expected at least 2 columns") return result for row_num, row in enumerate(reader, start=2): result['total_rows'] += 1 try: client_id = row.get('Id', '').strip() if not client_id: result['errors'].append(f"Row {row_num}: Missing client ID") continue # Find the client client = db.query(Client).filter(Client.rolodex_id == client_id).first() if not client: result['errors'].append(f"Row {row_num}: Client with ID '{client_id}' not found") continue phone_number = row.get('Phone', '').strip() if not phone_number: result['errors'].append(f"Row {row_num}: Missing phone number") continue phone = Phone( client_id=client.id, phone_type=row.get('Location', '').strip() or 'primary', phone_number=phone_number ) db.add(phone) result['success'] += 1 except Exception as e: result['errors'].append(f"Row {row_num}: {str(e)}") db.commit() except Exception as e: result['errors'].append(f"Import failed: {str(e)}") db.rollback() return result def import_files_data(db: Session, file_path: str) -> Dict[str, Any]: """ Import FILES CSV data into Case model. Expected CSV format: File_No,Id,File_Type,Regarding,Opened,Closed,Empl_Num,Rate_Per_Hour,Status,Footer_Code,Opposing,Hours,Hours_P,Trust_Bal,Trust_Bal_P,Hourly_Fees,Hourly_Fees_P,Flat_Fees,Flat_Fees_P,Disbursements,Disbursements_P,Credit_Bal,Credit_Bal_P,Total_Charges,Total_Charges_P,Amount_Owing,Amount_Owing_P,Transferable,Memo """ result = { 'success': 0, 'errors': [], 'total_rows': 0 } expected_fields = { 'File_No': 'File Number', 'Status': 'Status', 'File_Type': 'File Type', 'Regarding': 'Regarding', 'Opened': 'Opened Date', 'Closed': 'Closed Date', 'Id': 'Client ID', 'Empl_Num': 'Employee Number', 'Rate_Per_Hour': 'Rate Per Hour', 'Footer_Code': 'Footer Code', 'Opposing': 'Opposing Party', 'Hours': 'Hours', 'Hours_P': 'Hours (Previous)', 'Trust_Bal': 'Trust Balance', 'Trust_Bal_P': 'Trust Balance (Previous)', 'Hourly_Fees': 'Hourly Fees', 'Hourly_Fees_P': 'Hourly Fees (Previous)', 'Flat_Fees': 'Flat Fees', 'Flat_Fees_P': 'Flat Fees (Previous)', 'Disbursements': 'Disbursements', 'Disbursements_P': 'Disbursements (Previous)', 'Credit_Bal': 'Credit Balance', 'Credit_Bal_P': 'Credit Balance (Previous)', 'Total_Charges': 'Total Charges', 'Total_Charges_P': 'Total Charges (Previous)', 'Amount_Owing': 'Amount Owing', 'Amount_Owing_P': 'Amount Owing (Previous)', 'Transferable': 'Transferable', 'Memo': 'Memo' } try: with open(file_path, 'r', encoding='utf-8') as file: reader = csv.DictReader(file) headers = reader.fieldnames or [] validation = validate_csv_headers(headers, expected_fields) if not validation['valid']: result['errors'].append(f"Header validation failed: {validation['errors']}") return result for row_num, row in enumerate(reader, start=2): result['total_rows'] += 1 try: file_no = row.get('File_No', '').strip() if not file_no: result['errors'].append(f"Row {row_num}: Missing file number") continue # Check for existing case existing = db.query(Case).filter(Case.file_no == file_no).first() if existing: result['errors'].append(f"Row {row_num}: Case with file number '{file_no}' already exists") continue # Find client by ID client_id = row.get('Id', '').strip() client = None if client_id: client = db.query(Client).filter(Client.rolodex_id == client_id).first() if not client: result['errors'].append(f"Row {row_num}: Client with ID '{client_id}' not found") continue case = Case( file_no=file_no, client_id=client.id if client else None, status=row.get('Status', '').strip() or 'active', case_type=row.get('File_Type', '').strip() or None, description=row.get('Regarding', '').strip() or None, open_date=parse_date(row.get('Opened', '')), close_date=parse_date(row.get('Closed', '')) ) db.add(case) result['success'] += 1 except Exception as e: result['errors'].append(f"Row {row_num}: {str(e)}") db.commit() except Exception as e: result['errors'].append(f"Import failed: {str(e)}") db.rollback() return result def import_ledger_data(db: Session, file_path: str) -> Dict[str, Any]: """ Import LEDGER CSV data into Transaction model. Expected CSV format: File_No,Date,Item_No,Empl_Num,T_Code,T_Type,T_Type_L,Quantity,Rate,Amount,Billed,Note """ result = { 'success': 0, 'errors': [], 'total_rows': 0 } try: with open(file_path, 'r', encoding='utf-8') as file: reader = csv.DictReader(file) headers = reader.fieldnames or [] if len(headers) < 3: result['errors'].append("Invalid CSV format: expected at least 3 columns") return result for row_num, row in enumerate(reader, start=2): result['total_rows'] += 1 try: file_no = row.get('File_No', '').strip() if not file_no: result['errors'].append(f"Row {row_num}: Missing file number") continue # Find the case case = db.query(Case).filter(Case.file_no == file_no).first() if not case: result['errors'].append(f"Row {row_num}: Case with file number '{file_no}' not found") continue amount = parse_float(row.get('Amount', '0')) if amount is None: result['errors'].append(f"Row {row_num}: Invalid amount") continue transaction = Transaction( case_id=case.id, transaction_date=parse_date(row.get('Date', '')), transaction_type=row.get('T_Type', '').strip() or None, amount=amount, description=row.get('Note', '').strip() or None, reference=row.get('Item_No', '').strip() or None ) db.add(transaction) result['success'] += 1 except Exception as e: result['errors'].append(f"Row {row_num}: {str(e)}") db.commit() except Exception as e: result['errors'].append(f"Import failed: {str(e)}") db.rollback() return result def import_qdros_data(db: Session, file_path: str) -> Dict[str, Any]: """ Import QDROS CSV data into Document model. Expected CSV format: File_No,Document_Type,Description,File_Name,Date """ result = { 'success': 0, 'errors': [], 'total_rows': 0 } try: with open(file_path, 'r', encoding='utf-8') as file: reader = csv.DictReader(file) headers = reader.fieldnames or [] if len(headers) < 2: result['errors'].append("Invalid CSV format: expected at least 2 columns") return result for row_num, row in enumerate(reader, start=2): result['total_rows'] += 1 try: file_no = row.get('File_No', '').strip() if not file_no: result['errors'].append(f"Row {row_num}: Missing file number") continue # Find the case case = db.query(Case).filter(Case.file_no == file_no).first() if not case: result['errors'].append(f"Row {row_num}: Case with file number '{file_no}' not found") continue document = Document( case_id=case.id, document_type=row.get('Document_Type', '').strip() or 'QDRO', file_name=row.get('File_Name', '').strip() or None, description=row.get('Description', '').strip() or None, uploaded_date=parse_date(row.get('Date', '')) ) db.add(document) result['success'] += 1 except Exception as e: result['errors'].append(f"Row {row_num}: {str(e)}") db.commit() except Exception as e: result['errors'].append(f"Import failed: {str(e)}") db.rollback() return result def import_payments_data(db: Session, file_path: str) -> Dict[str, Any]: """ Import PAYMENTS CSV data into Payment model. Expected CSV format: File_No,Date,Amount,Type,Description,Check_Number """ result = { 'success': 0, 'errors': [], 'total_rows': 0 } try: with open(file_path, 'r', encoding='utf-8') as file: reader = csv.DictReader(file) headers = reader.fieldnames or [] if len(headers) < 2: result['errors'].append("Invalid CSV format: expected at least 2 columns") return result for row_num, row in enumerate(reader, start=2): result['total_rows'] += 1 try: file_no = row.get('File_No', '').strip() if not file_no: result['errors'].append(f"Row {row_num}: Missing file number") continue # Find the case case = db.query(Case).filter(Case.file_no == file_no).first() if not case: result['errors'].append(f"Row {row_num}: Case with file number '{file_no}' not found") continue amount = parse_float(row.get('Amount', '0')) if amount is None: result['errors'].append(f"Row {row_num}: Invalid amount") continue payment = Payment( case_id=case.id, payment_date=parse_date(row.get('Date', '')), payment_type=row.get('Type', '').strip() or None, amount=amount, description=row.get('Description', '').strip() or None, check_number=row.get('Check_Number', '').strip() or None ) db.add(payment) result['success'] += 1 except Exception as e: result['errors'].append(f"Row {row_num}: {str(e)}") db.commit() except Exception as e: result['errors'].append(f"Import failed: {str(e)}") db.rollback() return result def process_csv_import(db: Session, import_type: str, file_path: str) -> Dict[str, Any]: """ Process CSV import based on type. Args: db: Database session import_type: Type of import (client, phone, case, transaction, document, payment) file_path: Path to CSV file Returns: Dict with import results """ import_functions = { 'client': import_rolodex_data, 'phone': import_phone_data, 'case': import_files_data, 'transaction': import_ledger_data, 'document': import_qdros_data, 'payment': import_payments_data } import_func = import_functions.get(import_type) if not import_func: return { 'success': 0, 'errors': [f"Unknown import type: {import_type}"], 'total_rows': 0 } return import_func(db, file_path) @app.get("/") async def root(): """ Root endpoint - health check. """ return {"message": "Delphi Database API is running"} @app.get("/health") async def health_check(db: Session = Depends(get_db)): """ Health check endpoint that verifies database connectivity. """ try: # Test database connection by querying user count user_count = db.query(User).count() return { "status": "healthy", "database": "connected", "users": user_count } except Exception as e: logger.error("health_check_failed", error=str(e)) return { "status": "unhealthy", "database": "error", "error": str(e) } @app.get("/login") async def login_form(request: Request): """ Display login form. If user is already logged in, redirect to dashboard. """ # Check if user is already logged in user = get_current_user_from_session(request.session) if user: return RedirectResponse(url="/dashboard", status_code=302) return templates.TemplateResponse("login.html", {"request": request}) @app.post("/login") async def login_submit(request: Request, db: Session = Depends(get_db)): """ Handle login form submission. Authenticates user credentials and sets up session. """ form = await request.form() username = form.get("username") password = form.get("password") if not username or not password: error_message = "Username and password are required" logger.warning("login_failed", username=username, reason="missing_credentials") return templates.TemplateResponse("login.html", { "request": request, "error": error_message }) # Authenticate user user = authenticate_user(username, password) if not user: error_message = "Invalid username or password" logger.warning("login_failed", username=username, reason="invalid_credentials") return templates.TemplateResponse("login.html", { "request": request, "error": error_message }) # Set up user session request.session["user_id"] = user.id request.session["user"] = {"id": user.id, "username": user.username} # Update bound context with authenticated user id structlog_contextvars.bind_contextvars(**{"user.id": user.id}) logger.info("login_success", username=username, **{"user.id": user.id}) # Redirect to dashboard after successful login return RedirectResponse(url="/dashboard", status_code=302) @app.get("/logout") async def logout(request: Request): """ Handle user logout. Clears user session and redirects to home page. """ username = request.session.get("user", {}).get("username", "unknown") request.session.clear() logger.info("logout", username=username) return RedirectResponse(url="/", status_code=302) @app.get("/dashboard") async def dashboard( request: Request, q: str | None = Query(None, description="Search by file number or client name"), page: int = Query(1, ge=1, description="Page number (1-indexed)"), page_size: int = Query(20, ge=1, le=100, description="Results per page"), db: Session = Depends(get_db), ): """ Dashboard page - lists recent cases with search and pagination. - Optional query param `q` filters by case file number or client name/company - `page` and `page_size` control pagination """ # Check authentication user = get_current_user_from_session(request.session) if not user: return RedirectResponse(url="/login", status_code=302) # Base query: join clients for name/company access query = db.query(Case).join(Client).order_by( Case.open_date.desc(), Case.created_at.desc(), ) # Apply search filter if provided if q: like_term = f"%{q}%" query = query.filter( or_( Case.file_no.ilike(like_term), Client.first_name.ilike(like_term), Client.last_name.ilike(like_term), Client.company.ilike(like_term), ) ) # Total count for pagination total: int = query.count() # Clamp page to valid range when total is known total_pages: int = (total + page_size - 1) // page_size if total > 0 else 1 if page > total_pages: page = total_pages # Pagination window offset = (page - 1) * page_size cases = query.offset(offset).limit(page_size).all() # Page number window for UI (current +/- 2) start_page = max(1, page - 2) end_page = min(total_pages, page + 2) page_numbers = list(range(start_page, end_page + 1)) logger.info( "dashboard_render", query=q, page=page, page_size=page_size, total=total, ) return templates.TemplateResponse( "dashboard.html", { "request": request, "user": user, "cases": cases, "q": q, "page": page, "page_size": page_size, "total": total, "total_pages": total_pages, "page_numbers": page_numbers, "start_index": (offset + 1) if total > 0 else 0, "end_index": min(offset + len(cases), total), }, ) @app.post("/admin/upload") async def admin_upload_files( request: Request, files: List[UploadFile] = File(...), db: Session = Depends(get_db) ): """ Handle CSV file uploads for admin panel. Validates uploaded files are CSV format and stores them in data-import directory. """ # Check authentication user = get_current_user_from_session(request.session) if not user: return RedirectResponse(url="/login", status_code=302) results = [] errors = [] # Ensure data-import directory exists import_dir = "data-import" os.makedirs(import_dir, exist_ok=True) for file in files: try: # Validate file type if not file.filename.lower().endswith('.csv'): errors.append(f"File '{file.filename}' is not a CSV file") continue # Generate unique filename to avoid conflicts file_id = str(uuid.uuid4()) file_ext = os.path.splitext(file.filename)[1] stored_filename = f"{file_id}{file_ext}" file_path = os.path.join(import_dir, stored_filename) # Save file contents = await file.read() with open(file_path, "wb") as f: f.write(contents) # Determine import type from filename try: import_type = get_import_type_from_filename(file.filename) except ValueError as e: errors.append(f"File '{file.filename}': {str(e)}") # Clean up uploaded file os.remove(file_path) continue results.append({ 'filename': file.filename, 'stored_filename': stored_filename, 'import_type': import_type, 'file_path': file_path, 'size': len(contents) }) except Exception as e: errors.append(f"Error processing '{file.filename}': {str(e)}") continue # Log the upload operation logger.info( "admin_upload", uploaded_count=len(results), error_count=len(errors), username=user.username, ) return templates.TemplateResponse("admin.html", { "request": request, "user": user, "upload_results": results, "upload_errors": errors, "show_upload_results": True }) @app.post("/admin/import/{data_type}") async def admin_import_data( request: Request, data_type: str, db: Session = Depends(get_db) ): """ Process CSV import for specified data type. Creates import log entry and processes the import in the background. """ # Check authentication user = get_current_user_from_session(request.session) if not user: return RedirectResponse(url="/login", status_code=302) # Validate data type valid_types = ['client', 'phone', 'case', 'transaction', 'document', 'payment'] if data_type not in valid_types: return templates.TemplateResponse("admin.html", { "request": request, "user": user, "error": f"Invalid data type: {data_type}" }) # Get form data for file selection form = await request.form() selected_files = form.getlist("selected_files") if not selected_files: return templates.TemplateResponse("admin.html", { "request": request, "user": user, "error": "No files selected for import" }) import_results = [] total_success = 0 total_errors = 0 for stored_filename in selected_files: file_path = os.path.join("data-import", stored_filename) if not os.path.exists(file_path): import_results.append({ 'filename': stored_filename, 'status': 'error', 'message': 'File not found' }) total_errors += 1 continue # Create import log entry import_log = ImportLog( import_type=data_type, file_name=stored_filename, file_path=file_path, status="running" ) db.add(import_log) db.commit() try: # Process the import result = process_csv_import(db, data_type, file_path) # Update import log import_log.status = "completed" if result['errors'] else "failed" import_log.total_rows = result['total_rows'] import_log.success_count = result['success'] import_log.error_count = len(result['errors']) import_log.error_details = json.dumps(result['errors']) import_log.completed_at = datetime.now() db.commit() import_results.append({ 'filename': stored_filename, 'status': 'success' if result['success'] > 0 else 'error', 'total_rows': result['total_rows'], 'success_count': result['success'], 'error_count': len(result['errors']), 'errors': result['errors'][:10] # Show first 10 errors }) total_success += result['success'] total_errors += len(result['errors']) except Exception as e: # Update import log on error import_log.status = "failed" import_log.error_details = json.dumps([str(e)]) import_log.completed_at = datetime.now() db.commit() import_results.append({ 'filename': stored_filename, 'status': 'error', 'message': str(e) }) total_errors += 1 # Log the import operation logger.info( "admin_import", import_type=data_type, success_count=total_success, error_count=total_errors, username=user.username, ) return templates.TemplateResponse("admin.html", { "request": request, "user": user, "import_results": import_results, "total_success": total_success, "total_errors": total_errors, "show_import_results": True }) @app.get("/admin") async def admin_panel(request: Request, db: Session = Depends(get_db)): """ Admin panel - requires authentication. Provides administrative functions like data import and system management. """ # Check authentication user = get_current_user_from_session(request.session) if not user: return RedirectResponse(url="/login", status_code=302) # Get recent import history recent_imports = db.query(ImportLog).order_by(ImportLog.created_at.desc()).limit(10).all() # Get available files for import import_dir = "data-import" available_files = [] if os.path.exists(import_dir): for filename in os.listdir(import_dir): if filename.endswith('.csv'): file_path = os.path.join(import_dir, filename) file_size = os.path.getsize(file_path) try: import_type = get_import_type_from_filename(filename) except ValueError: import_type = 'unknown' available_files.append({ 'filename': filename, 'import_type': import_type, 'size': file_size, 'modified': datetime.fromtimestamp(os.path.getmtime(file_path)) }) # Group files by import type files_by_type = {} for file_info in available_files: import_type = file_info['import_type'] if import_type not in files_by_type: files_by_type[import_type] = [] files_by_type[import_type].append(file_info) return templates.TemplateResponse("admin.html", { "request": request, "user": user, "recent_imports": recent_imports, "available_files": available_files, "files_by_type": files_by_type }) @app.get("/case/{case_id}") async def case_detail( request: Request, case_id: int, saved: bool = Query(False, description="Whether to show success message"), db: Session = Depends(get_db), ): """ Case detail view. Displays detailed information for a single case and its related client and associated records (transactions, documents, payments). """ # Check authentication user = get_current_user_from_session(request.session) if not user: return RedirectResponse(url="/login", status_code=302) # Fetch case with related entities eagerly loaded to avoid lazy-load issues case_obj = ( db.query(Case) .options( joinedload(Case.client), joinedload(Case.transactions), joinedload(Case.documents), joinedload(Case.payments), ) .filter(Case.id == case_id) .first() ) if not case_obj: logger.warning("case_not_found", case_id=case_id) # Get any errors from session and clear them errors = request.session.pop("case_update_errors", None) return templates.TemplateResponse( "case.html", { "request": request, "user": user, "case": None, "error": "Case not found", "saved": False, "errors": errors or [], }, status_code=404, ) logger.info("case_detail", case_id=case_obj.id, file_no=case_obj.file_no) # Get any errors from session and clear them errors = request.session.pop("case_update_errors", None) return templates.TemplateResponse( "case.html", { "request": request, "user": user, "case": case_obj, "saved": saved, "errors": errors or [], }, ) @app.post("/case/{case_id}/update") async def case_update( request: Request, case_id: int, db: Session = Depends(get_db), ) -> RedirectResponse: """ Update case details. Updates the specified fields on a case and redirects back to the case detail view. """ # Check authentication user = get_current_user_from_session(request.session) if not user: return RedirectResponse(url="/login", status_code=302) # Get form data form = await request.form() # Fetch the case case_obj = db.query(Case).filter(Case.id == case_id).first() if not case_obj: logger.warning("case_not_found_update", case_id=case_id) return RedirectResponse(url=f"/case/{case_id}", status_code=302) # Validate and process fields errors = [] update_data = {} # Status validation status = form.get("status") if status is not None: if status not in ["active", "closed"]: errors.append("Status must be 'active' or 'closed'") else: update_data["status"] = status # Case type and description (optional) case_type = form.get("case_type") if case_type is not None: update_data["case_type"] = case_type.strip() if case_type.strip() else None description = form.get("description") if description is not None: update_data["description"] = description.strip() if description.strip() else None # Date validation and parsing open_date = form.get("open_date") if open_date is not None: if open_date.strip(): try: update_data["open_date"] = datetime.strptime(open_date.strip(), "%Y-%m-%d") except ValueError: errors.append("Open date must be in YYYY-MM-DD format") else: update_data["open_date"] = None close_date = form.get("close_date") if close_date is not None: if close_date.strip(): try: update_data["close_date"] = datetime.strptime(close_date.strip(), "%Y-%m-%d") except ValueError: errors.append("Close date must be in YYYY-MM-DD format") else: update_data["close_date"] = None # If there are validation errors, redirect back with errors if errors: # Store errors in session for display on the case page request.session["case_update_errors"] = errors return RedirectResponse(url=f"/case/{case_id}", status_code=302) # Apply updates try: changed_fields = {} for field, value in update_data.items(): old_value = getattr(case_obj, field) if old_value != value: changed_fields[field] = {"old": old_value, "new": value} setattr(case_obj, field, value) db.commit() logger.info( "case_update", case_id=case_id, changed_fields=list(update_data.keys()), changed_details=changed_fields, ) # Clear any previous errors from session request.session.pop("case_update_errors", None) return RedirectResponse(url=f"/case/{case_id}?saved=1", status_code=302) except Exception as e: db.rollback() logger.error("case_update_failed", case_id=case_id, error=str(e)) # Store error in session for display request.session["case_update_errors"] = ["Failed to save changes. Please try again."] return RedirectResponse(url=f"/case/{case_id}", status_code=302) @app.post("/case/{case_id}/close") async def case_close( request: Request, case_id: int, db: Session = Depends(get_db), ) -> RedirectResponse: """ Close a case. Sets the case status to 'closed' and sets close_date to current date if not already set. """ # Check authentication user = get_current_user_from_session(request.session) if not user: return RedirectResponse(url="/login", status_code=302) # Fetch the case case_obj = db.query(Case).filter(Case.id == case_id).first() if not case_obj: logger.warning("case_not_found_close", case_id=case_id) return RedirectResponse(url=f"/case/{case_id}", status_code=302) # Update case try: case_obj.status = "closed" # Only set close_date if it's not already set if not case_obj.close_date: case_obj.close_date = datetime.now() db.commit() logger.info("case_closed", case_id=case_id, close_date=case_obj.close_date.isoformat() if case_obj.close_date else None) return RedirectResponse(url=f"/case/{case_id}?saved=1", status_code=302) except Exception as e: db.rollback() logger.error("case_close_failed", case_id=case_id, error=str(e)) # Store error in session for display request.session["case_update_errors"] = ["Failed to close case. Please try again."] return RedirectResponse(url=f"/case/{case_id}", status_code=302) @app.post("/case/{case_id}/reopen") async def case_reopen( request: Request, case_id: int, db: Session = Depends(get_db), ) -> RedirectResponse: """ Reopen a case. Sets the case status to 'active' and clears the close_date. """ # Check authentication user = get_current_user_from_session(request.session) if not user: return RedirectResponse(url="/login", status_code=302) # Fetch the case case_obj = db.query(Case).filter(Case.id == case_id).first() if not case_obj: logger.warning("case_not_found_reopen", case_id=case_id) return RedirectResponse(url=f"/case/{case_id}", status_code=302) # Update case try: case_obj.status = "active" case_obj.close_date = None db.commit() logger.info("case_reopened", case_id=case_id) return RedirectResponse(url=f"/case/{case_id}?saved=1", status_code=302) except Exception as e: db.rollback() logger.error("case_reopen_failed", case_id=case_id, error=str(e)) # Store error in session for display request.session["case_update_errors"] = ["Failed to reopen case. Please try again."] return RedirectResponse(url=f"/case/{case_id}", status_code=302)