Files
delphi-database-v2/app/main.py
HotSwapp 728d26ad17 feat(case): enable editing and close/reopen actions on case detail
- Add POST /case/{id}/update route for editing case fields (status, case_type, description, open_date, close_date)
- Add POST /case/{id}/close route to set status='closed' and close_date=current date
- Add POST /case/{id}/reopen route to set status='active' and clear close_date
- Update case.html template with edit form, success/error messaging, and action buttons
- Include comprehensive validation for dates and status values
- Add proper error handling with session-based error storage
- Preserve existing view content and styling consistency
2025-10-06 19:43:21 -05:00

564 lines
17 KiB
Python

"""
FastAPI application entry point for Delphi Database.
This module initializes the FastAPI application, sets up database connections,
and provides the main application instance.
"""
import os
import logging
from contextlib import asynccontextmanager
from datetime import datetime
from typing import Optional
from fastapi import FastAPI, Depends, Request, Query, HTTPException
from fastapi.responses import RedirectResponse
from starlette.middleware.sessions import SessionMiddleware
from fastapi.middleware.cors import CORSMiddleware
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
from sqlalchemy.orm import Session, joinedload
from sqlalchemy import or_
from dotenv import load_dotenv
from starlette.middleware.base import BaseHTTPMiddleware
from .database import create_tables, get_db, get_database_url
from .models import User, Case, Client
from .auth import authenticate_user, get_current_user_from_session
# Load environment variables
load_dotenv()
# Get SECRET_KEY from environment variables
SECRET_KEY = os.getenv("SECRET_KEY")
if not SECRET_KEY:
raise ValueError("SECRET_KEY environment variable must be set")
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Configure Jinja2 templates
templates = Jinja2Templates(directory="app/templates")
class AuthMiddleware(BaseHTTPMiddleware):
"""
Simple session-based authentication middleware.
Redirects unauthenticated users to /login for protected routes.
"""
def __init__(self, app, exempt_paths: list[str] | None = None):
super().__init__(app)
self.exempt_paths = exempt_paths or []
async def dispatch(self, request, call_next):
path = request.url.path
# Allow exempt paths and static assets
if (
path in self.exempt_paths
or path.startswith("/static")
or path.startswith("/favicon")
):
return await call_next(request)
# Enforce authentication for other paths
if not request.session.get("user_id"):
return RedirectResponse(url="/login", status_code=302)
return await call_next(request)
@asynccontextmanager
async def lifespan(app: FastAPI):
"""
Lifespan context manager for FastAPI application.
Handles startup and shutdown events:
- Creates database tables on startup
- Logs database connection info
"""
# Startup
logger.info("Starting Delphi Database application...")
# Create database tables
create_tables()
logger.info("Database tables created/verified")
# Log database connection info
db_url = get_database_url()
logger.info(f"Database connected: {db_url}")
yield
# Shutdown
logger.info("Shutting down Delphi Database application...")
# Create FastAPI application with lifespan management
app = FastAPI(
title="Delphi Database",
description="Legal case management database application",
version="1.0.0",
lifespan=lifespan
)
# Add CORS middleware for cross-origin requests
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # In production, specify allowed origins
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Register authentication middleware with exempt paths
EXEMPT_PATHS = ["/", "/health", "/login", "/logout"]
app.add_middleware(AuthMiddleware, exempt_paths=EXEMPT_PATHS)
# Add SessionMiddleware for session management (must be added LAST so it runs FIRST)
app.add_middleware(SessionMiddleware, secret_key=SECRET_KEY)
# Mount static files directory
app.mount("/static", StaticFiles(directory="static"), name="static")
@app.get("/")
async def root():
"""
Root endpoint - health check.
"""
return {"message": "Delphi Database API is running"}
@app.get("/health")
async def health_check(db: Session = Depends(get_db)):
"""
Health check endpoint that verifies database connectivity.
"""
try:
# Test database connection by querying user count
user_count = db.query(User).count()
return {
"status": "healthy",
"database": "connected",
"users": user_count
}
except Exception as e:
logger.error(f"Health check failed: {e}")
return {
"status": "unhealthy",
"database": "error",
"error": str(e)
}
@app.get("/login")
async def login_form(request: Request):
"""
Display login form.
If user is already logged in, redirect to dashboard.
"""
# Check if user is already logged in
user = get_current_user_from_session(request.session)
if user:
return RedirectResponse(url="/dashboard", status_code=302)
return templates.TemplateResponse("login.html", {"request": request})
@app.post("/login")
async def login_submit(request: Request, db: Session = Depends(get_db)):
"""
Handle login form submission.
Authenticates user credentials and sets up session.
"""
form = await request.form()
username = form.get("username")
password = form.get("password")
if not username or not password:
error_message = "Username and password are required"
return templates.TemplateResponse("login.html", {
"request": request,
"error": error_message
})
# Authenticate user
user = authenticate_user(username, password)
if not user:
error_message = "Invalid username or password"
return templates.TemplateResponse("login.html", {
"request": request,
"error": error_message
})
# Set up user session
request.session["user_id"] = user.id
request.session["user"] = {"id": user.id, "username": user.username}
logger.info(f"User '{username}' logged in successfully")
# Redirect to dashboard after successful login
return RedirectResponse(url="/dashboard", status_code=302)
@app.get("/logout")
async def logout(request: Request):
"""
Handle user logout.
Clears user session and redirects to home page.
"""
username = request.session.get("user", {}).get("username", "unknown")
request.session.clear()
logger.info(f"User '{username}' logged out")
return RedirectResponse(url="/", status_code=302)
@app.get("/dashboard")
async def dashboard(
request: Request,
q: str | None = Query(None, description="Search by file number or client name"),
page: int = Query(1, ge=1, description="Page number (1-indexed)"),
page_size: int = Query(20, ge=1, le=100, description="Results per page"),
db: Session = Depends(get_db),
):
"""
Dashboard page - lists recent cases with search and pagination.
- Optional query param `q` filters by case file number or client name/company
- `page` and `page_size` control pagination
"""
# Check authentication
user = get_current_user_from_session(request.session)
if not user:
return RedirectResponse(url="/login", status_code=302)
# Base query: join clients for name/company access
query = db.query(Case).join(Client).order_by(
Case.open_date.desc(),
Case.created_at.desc(),
)
# Apply search filter if provided
if q:
like_term = f"%{q}%"
query = query.filter(
or_(
Case.file_no.ilike(like_term),
Client.first_name.ilike(like_term),
Client.last_name.ilike(like_term),
Client.company.ilike(like_term),
)
)
# Total count for pagination
total: int = query.count()
# Clamp page to valid range when total is known
total_pages: int = (total + page_size - 1) // page_size if total > 0 else 1
if page > total_pages:
page = total_pages
# Pagination window
offset = (page - 1) * page_size
cases = query.offset(offset).limit(page_size).all()
# Page number window for UI (current +/- 2)
start_page = max(1, page - 2)
end_page = min(total_pages, page + 2)
page_numbers = list(range(start_page, end_page + 1))
logger.info(
"Rendering dashboard: q='%s', page=%s, page_size=%s, total=%s",
q,
page,
page_size,
total,
)
return templates.TemplateResponse(
"dashboard.html",
{
"request": request,
"user": user,
"cases": cases,
"q": q,
"page": page,
"page_size": page_size,
"total": total,
"total_pages": total_pages,
"page_numbers": page_numbers,
"start_index": (offset + 1) if total > 0 else 0,
"end_index": min(offset + len(cases), total),
},
)
@app.get("/admin")
async def admin_panel(request: Request, db: Session = Depends(get_db)):
"""
Admin panel - requires authentication.
Provides administrative functions like data import and system management.
"""
# Check authentication
user = get_current_user_from_session(request.session)
if not user:
return RedirectResponse(url="/login", status_code=302)
return templates.TemplateResponse("admin.html", {
"request": request,
"user": user
})
@app.get("/case/{case_id}")
async def case_detail(
request: Request,
case_id: int,
saved: bool = Query(False, description="Whether to show success message"),
db: Session = Depends(get_db),
):
"""
Case detail view.
Displays detailed information for a single case and its related client and
associated records (transactions, documents, payments).
"""
# Check authentication
user = get_current_user_from_session(request.session)
if not user:
return RedirectResponse(url="/login", status_code=302)
# Fetch case with related entities eagerly loaded to avoid lazy-load issues
case_obj = (
db.query(Case)
.options(
joinedload(Case.client),
joinedload(Case.transactions),
joinedload(Case.documents),
joinedload(Case.payments),
)
.filter(Case.id == case_id)
.first()
)
if not case_obj:
logger.warning("Case not found: id=%s", case_id)
# Get any errors from session and clear them
errors = request.session.pop("case_update_errors", None)
return templates.TemplateResponse(
"case.html",
{
"request": request,
"user": user,
"case": None,
"error": "Case not found",
"saved": False,
"errors": errors or [],
},
status_code=404,
)
logger.info("Rendering case detail: id=%s, file_no='%s'", case_obj.id, case_obj.file_no)
# Get any errors from session and clear them
errors = request.session.pop("case_update_errors", None)
return templates.TemplateResponse(
"case.html",
{
"request": request,
"user": user,
"case": case_obj,
"saved": saved,
"errors": errors or [],
},
)
@app.post("/case/{case_id}/update")
async def case_update(
request: Request,
case_id: int,
status: str = None,
case_type: str = None,
description: str = None,
open_date: str = None,
close_date: str = None,
db: Session = Depends(get_db),
) -> RedirectResponse:
"""
Update case details.
Updates the specified fields on a case and redirects back to the case detail view.
"""
# Check authentication
user = get_current_user_from_session(request.session)
if not user:
return RedirectResponse(url="/login", status_code=302)
# Fetch the case
case_obj = db.query(Case).filter(Case.id == case_id).first()
if not case_obj:
logger.warning("Case not found for update: id=%s", case_id)
return RedirectResponse(url=f"/case/{case_id}", status_code=302)
# Validate and process fields
errors = []
update_data = {}
# Status validation
if status is not None:
if status not in ["active", "closed"]:
errors.append("Status must be 'active' or 'closed'")
else:
update_data["status"] = status
# Case type and description (optional)
if case_type is not None:
update_data["case_type"] = case_type.strip() if case_type.strip() else None
if description is not None:
update_data["description"] = description.strip() if description.strip() else None
# Date validation and parsing
if open_date is not None:
if open_date.strip():
try:
update_data["open_date"] = datetime.strptime(open_date.strip(), "%Y-%m-%d")
except ValueError:
errors.append("Open date must be in YYYY-MM-DD format")
else:
update_data["open_date"] = None
if close_date is not None:
if close_date.strip():
try:
update_data["close_date"] = datetime.strptime(close_date.strip(), "%Y-%m-%d")
except ValueError:
errors.append("Close date must be in YYYY-MM-DD format")
else:
update_data["close_date"] = None
# If there are validation errors, redirect back with errors
if errors:
# Store errors in session for display on the case page
request.session["case_update_errors"] = errors
return RedirectResponse(url=f"/case/{case_id}", status_code=302)
# Apply updates
try:
for field, value in update_data.items():
setattr(case_obj, field, value)
db.commit()
logger.info("Case updated successfully: id=%s, fields=%s", case_id, list(update_data.keys()))
# Clear any previous errors from session
request.session.pop("case_update_errors", None)
return RedirectResponse(url=f"/case/{case_id}?saved=1", status_code=302)
except Exception as e:
db.rollback()
logger.error("Failed to update case id=%s: %s", case_id, str(e))
# Store error in session for display
request.session["case_update_errors"] = ["Failed to save changes. Please try again."]
return RedirectResponse(url=f"/case/{case_id}", status_code=302)
@app.post("/case/{case_id}/close")
async def case_close(
request: Request,
case_id: int,
db: Session = Depends(get_db),
) -> RedirectResponse:
"""
Close a case.
Sets the case status to 'closed' and sets close_date to current date if not already set.
"""
# Check authentication
user = get_current_user_from_session(request.session)
if not user:
return RedirectResponse(url="/login", status_code=302)
# Fetch the case
case_obj = db.query(Case).filter(Case.id == case_id).first()
if not case_obj:
logger.warning("Case not found for close: id=%s", case_id)
return RedirectResponse(url=f"/case/{case_id}", status_code=302)
# Update case
try:
case_obj.status = "closed"
# Only set close_date if it's not already set
if not case_obj.close_date:
case_obj.close_date = datetime.now()
db.commit()
logger.info("Case closed: id=%s, close_date=%s", case_id, case_obj.close_date)
return RedirectResponse(url=f"/case/{case_id}?saved=1", status_code=302)
except Exception as e:
db.rollback()
logger.error("Failed to close case id=%s: %s", case_id, str(e))
# Store error in session for display
request.session["case_update_errors"] = ["Failed to close case. Please try again."]
return RedirectResponse(url=f"/case/{case_id}", status_code=302)
@app.post("/case/{case_id}/reopen")
async def case_reopen(
request: Request,
case_id: int,
db: Session = Depends(get_db),
) -> RedirectResponse:
"""
Reopen a case.
Sets the case status to 'active' and clears the close_date.
"""
# Check authentication
user = get_current_user_from_session(request.session)
if not user:
return RedirectResponse(url="/login", status_code=302)
# Fetch the case
case_obj = db.query(Case).filter(Case.id == case_id).first()
if not case_obj:
logger.warning("Case not found for reopen: id=%s", case_id)
return RedirectResponse(url=f"/case/{case_id}", status_code=302)
# Update case
try:
case_obj.status = "active"
case_obj.close_date = None
db.commit()
logger.info("Case reopened: id=%s", case_id)
return RedirectResponse(url=f"/case/{case_id}?saved=1", status_code=302)
except Exception as e:
db.rollback()
logger.error("Failed to reopen case id=%s: %s", case_id, str(e))
# Store error in session for display
request.session["case_update_errors"] = ["Failed to reopen case. Please try again."]
return RedirectResponse(url=f"/case/{case_id}", status_code=302)