API: Standardized JSON list responses with Pydantic schemas and Pagination; add sort_by/sort_dir validation with whitelists; consistent JSON 401 for /api/*; structured logging for sorting/pagination; add pydantic dep; add Docker smoke script and README docs.

This commit is contained in:
HotSwapp
2025-10-07 16:05:09 -05:00
parent c68ba45ceb
commit 1eb8ba8edd
9 changed files with 537 additions and 1 deletions

View File

@@ -16,7 +16,7 @@ from typing import Optional, List, Dict, Any
from io import StringIO
from fastapi import FastAPI, Depends, Request, Query, HTTPException, UploadFile, File, Form
from fastapi.responses import RedirectResponse, Response
from fastapi.responses import RedirectResponse, Response, JSONResponse
from starlette.middleware.sessions import SessionMiddleware
from fastapi.middleware.cors import CORSMiddleware
from fastapi.staticfiles import StaticFiles
@@ -32,6 +32,16 @@ from .database import create_tables, get_db, get_database_url
from .models import User, Case, Client, Phone, Transaction, Document, Payment, ImportLog
from .auth import authenticate_user, get_current_user_from_session
from .logging_config import setup_logging
from .schemas import (
ClientOut,
PhoneOut,
CaseOut,
TransactionOut,
Pagination,
RolodexListResponse,
FilesListResponse,
LedgerListResponse,
)
# Load environment variables
load_dotenv()
@@ -73,6 +83,9 @@ class AuthMiddleware(BaseHTTPMiddleware):
# Enforce authentication for other paths
if not request.session.get("user_id"):
# Return JSON 401 for API routes, redirect for HTML routes
if path.startswith("/api/"):
return JSONResponse(status_code=401, content={"detail": "Unauthorized"})
return RedirectResponse(url="/login", status_code=302)
return await call_next(request)
@@ -2185,3 +2198,364 @@ async def phone_book_report(
"report_phone_book.html",
{"request": request, "user": user, "clients": clients, "q": q, "client_ids": client_ids or []},
)
# ------------------------------
# JSON API: list/filter endpoints
# ------------------------------
def _apply_sorting(query, sort_by: str | None, sort_dir: str, allowed_map: dict[str, Any], default_order: list[Any]):
"""Apply validated sorting to a SQLAlchemy query.
Args:
query: Base SQLAlchemy query object
sort_by: Optional requested sort field
sort_dir: 'asc' or 'desc'
allowed_map: Map of allowed sort_by -> SQLAlchemy column or list of columns
default_order: Fallback order_by list when sort_by is not provided
Returns:
(query, applied_sort_by, applied_sort_dir)
"""
if not sort_by:
for col in default_order:
query = query.order_by(col)
return query, None, sort_dir
column_expr = allowed_map.get(sort_by)
if column_expr is None:
raise HTTPException(status_code=400, detail=f"Invalid sort_by: '{sort_by}'. Allowed: {sorted(list(allowed_map.keys()))}")
def _order(expr):
return expr.asc().nulls_last() if sort_dir == "asc" else expr.desc().nulls_last()
if isinstance(column_expr, (list, tuple)):
for expr in column_expr:
query = query.order_by(_order(expr))
else:
query = query.order_by(_order(column_expr))
return query, sort_by, sort_dir
@app.get("/api/rolodex", response_model=RolodexListResponse)
async def api_list_rolodex(
request: Request,
q: str | None = Query(None, description="Search by first/last/company contains"),
phone: str | None = Query(None, description="Phone number contains"),
rolodex_id: str | None = Query(None, description="Legacy Rolodex ID contains"),
page: int = Query(1, ge=1, description="Page number (1-indexed)"),
page_size: int = Query(20, ge=1, le=100, description="Results per page"),
sort_by: str | None = Query(None, description="Sort field: id, rolodex_id, last_name, first_name, company, created_at"),
sort_dir: str = Query("asc", description="Sort direction: asc or desc"),
db: Session = Depends(get_db),
) -> RolodexListResponse:
"""Return paginated clients with simple filters as JSON."""
user = get_current_user_from_session(request.session)
if not user:
# Middleware ensures JSON 401 for /api/*, keep explicit for clarity
raise HTTPException(status_code=401, detail="Unauthorized")
query = db.query(Client).options(joinedload(Client.phones))
if q:
like = f"%{q}%"
query = query.filter(
or_(
Client.first_name.ilike(like),
Client.last_name.ilike(like),
Client.company.ilike(like),
)
)
if phone:
query = query.filter(Client.phones.any(Phone.phone_number.ilike(f"%{phone}%")))
if rolodex_id:
query = query.filter(Client.rolodex_id.ilike(f"%{rolodex_id}%"))
# Sorting
sort_dir_norm = (sort_dir or "").lower()
if sort_dir_norm not in ("asc", "desc"):
raise HTTPException(status_code=400, detail="Invalid sort_dir. Allowed: 'asc' or 'desc'")
allowed_sort = {
"id": Client.id,
"rolodex_id": Client.rolodex_id,
"last_name": Client.last_name,
"first_name": Client.first_name,
"company": Client.company,
"created_at": Client.created_at,
}
default_order = [Client.last_name.asc().nulls_last(), Client.first_name.asc().nulls_last(), Client.id.asc()]
query, applied_sort_by, applied_sort_dir = _apply_sorting(query, sort_by, sort_dir_norm, allowed_sort, default_order)
total: int = query.count()
total_pages: int = (total + page_size - 1) // page_size if total > 0 else 1
if page > total_pages:
page = total_pages
offset = (page - 1) * page_size
clients = query.offset(offset).limit(page_size).all()
logger.info(
"api_rolodex_list",
query=q,
phone=phone,
rolodex_id=rolodex_id,
page=page,
page_size=page_size,
total=total,
sort_by=applied_sort_by,
sort_dir=applied_sort_dir,
)
items = [ClientOut.model_validate(c) for c in clients]
return RolodexListResponse(
items=items,
pagination=Pagination(page=page, page_size=page_size, total=total, total_pages=total_pages),
)
@app.get("/api/files", response_model=FilesListResponse)
async def api_list_files(
request: Request,
q: str | None = Query(None, description="Search file no/description/client name/company"),
status: str | None = Query(None, description="Case status: active or closed"),
case_type: str | None = Query(None, description="Case type contains"),
file_no: str | None = Query(None, description="File number contains"),
client_rolodex_id: str | None = Query(None, description="Legacy client Id contains"),
from_open_date: str | None = Query(None, description="Opened on/after YYYY-MM-DD"),
to_open_date: str | None = Query(None, description="Opened on/before YYYY-MM-DD"),
page: int = Query(1, ge=1, description="Page number (1-indexed)"),
page_size: int = Query(20, ge=1, le=100, description="Results per page"),
sort_by: str | None = Query(None, description="Sort field: file_no, status, case_type, description, open_date, close_date, created_at, client_last_name, client_first_name, client_company"),
sort_dir: str = Query("desc", description="Sort direction: asc or desc"),
db: Session = Depends(get_db),
) -> FilesListResponse:
"""Return paginated cases with simple filters as JSON."""
user = get_current_user_from_session(request.session)
if not user:
raise HTTPException(status_code=401, detail="Unauthorized")
query = (
db.query(Case)
.join(Client, Case.client_id == Client.id)
.options(joinedload(Case.client))
)
filters = []
if q:
like = f"%{q}%"
filters.append(
or_(
Case.file_no.ilike(like),
Case.description.ilike(like),
Client.first_name.ilike(like),
Client.last_name.ilike(like),
Client.company.ilike(like),
)
)
if status:
filters.append(Case.status.ilike(f"%{status}%"))
if case_type:
filters.append(Case.case_type.ilike(f"%{case_type}%"))
if file_no:
filters.append(Case.file_no.ilike(f"%{file_no}%"))
if client_rolodex_id:
filters.append(Client.rolodex_id.ilike(f"%{client_rolodex_id}%"))
if from_open_date:
try:
dt = datetime.strptime(from_open_date, "%Y-%m-%d")
filters.append(Case.open_date >= dt)
except ValueError:
pass
if to_open_date:
try:
dt = datetime.strptime(to_open_date, "%Y-%m-%d")
filters.append(Case.open_date <= dt)
except ValueError:
pass
if filters:
query = query.filter(and_(*filters))
# Sorting
sort_dir_norm = (sort_dir or "").lower()
if sort_dir_norm not in ("asc", "desc"):
raise HTTPException(status_code=400, detail="Invalid sort_dir. Allowed: 'asc' or 'desc'")
allowed_sort = {
"file_no": Case.file_no,
"status": Case.status,
"case_type": Case.case_type,
"description": Case.description,
"open_date": Case.open_date,
"close_date": Case.close_date,
"created_at": Case.created_at,
"client_last_name": Client.last_name,
"client_first_name": Client.first_name,
"client_company": Client.company,
"id": Case.id,
}
default_order = [Case.open_date.desc().nulls_last(), Case.created_at.desc()]
query, applied_sort_by, applied_sort_dir = _apply_sorting(query, sort_by, sort_dir_norm, allowed_sort, default_order)
total: int = query.count()
total_pages: int = (total + page_size - 1) // page_size if total > 0 else 1
if page > total_pages:
page = total_pages
offset = (page - 1) * page_size
cases = query.offset(offset).limit(page_size).all()
logger.info(
"api_files_list",
query=q,
status=status,
case_type=case_type,
file_no=file_no,
client_rolodex_id=client_rolodex_id,
page=page,
page_size=page_size,
total=total,
sort_by=applied_sort_by,
sort_dir=applied_sort_dir,
)
items = [CaseOut.model_validate(c) for c in cases]
return FilesListResponse(
items=items,
pagination=Pagination(page=page, page_size=page_size, total=total, total_pages=total_pages),
)
@app.get("/api/ledger", response_model=LedgerListResponse)
async def api_list_ledger(
request: Request,
case_id: int | None = Query(None, description="Filter by case ID"),
file_no: str | None = Query(None, description="Filter by case file number contains"),
from_date: str | None = Query(None, description="On/after YYYY-MM-DD"),
to_date: str | None = Query(None, description="On/before YYYY-MM-DD"),
billed: str | None = Query(None, description="'Y' or 'N'"),
t_code: str | None = Query(None, description="Transaction code contains"),
t_type_l: str | None = Query(None, description="Legacy type flag (e.g., C/D)"),
employee_number: str | None = Query(None, description="Employee number contains"),
q: str | None = Query(None, description="Description contains"),
page: int = Query(1, ge=1),
page_size: int = Query(50, ge=1, le=200),
sort_by: str | None = Query(None, description="Sort field: transaction_date, item_no, id, amount, billed, t_code, t_type_l, employee_number, case_file_no, case_id"),
sort_dir: str = Query("desc", description="Sort direction: asc or desc"),
db: Session = Depends(get_db),
) -> LedgerListResponse:
"""Return paginated ledger (transactions) with simple filters as JSON."""
user = get_current_user_from_session(request.session)
if not user:
raise HTTPException(status_code=401, detail="Unauthorized")
query = (
db.query(Transaction)
.join(Case, Transaction.case_id == Case.id)
.options(joinedload(Transaction.case))
)
filters = []
if case_id is not None:
filters.append(Transaction.case_id == case_id)
if file_no:
filters.append(Case.file_no.ilike(f"%{file_no}%"))
if from_date:
try:
dt = datetime.strptime(from_date, "%Y-%m-%d")
filters.append(Transaction.transaction_date >= dt)
except ValueError:
pass
if to_date:
try:
dt = datetime.strptime(to_date, "%Y-%m-%d")
filters.append(Transaction.transaction_date <= dt)
except ValueError:
pass
if billed in ("Y", "N"):
filters.append(Transaction.billed == billed)
if t_code:
filters.append(Transaction.t_code.ilike(f"%{t_code}%"))
if t_type_l:
filters.append(Transaction.t_type_l.ilike(f"%{t_type_l}%"))
if employee_number:
filters.append(Transaction.employee_number.ilike(f"%{employee_number}%"))
if q:
filters.append(Transaction.description.ilike(f"%{q}%"))
if filters:
query = query.filter(and_(*filters))
# Sorting
sort_dir_norm = (sort_dir or "").lower()
if sort_dir_norm not in ("asc", "desc"):
raise HTTPException(status_code=400, detail="Invalid sort_dir. Allowed: 'asc' or 'desc'")
allowed_sort = {
"transaction_date": Transaction.transaction_date,
"item_no": Transaction.item_no,
"id": Transaction.id,
"amount": Transaction.amount,
"billed": Transaction.billed,
"t_code": Transaction.t_code,
"t_type_l": Transaction.t_type_l,
"employee_number": Transaction.employee_number,
"case_file_no": Case.file_no,
"case_id": Transaction.case_id,
}
default_order = [
Transaction.transaction_date.desc().nulls_last(),
Transaction.item_no.asc().nulls_last(),
Transaction.id.desc(),
]
query, applied_sort_by, applied_sort_dir = _apply_sorting(query, sort_by, sort_dir_norm, allowed_sort, default_order)
total: int = query.count()
total_pages: int = (total + page_size - 1) // page_size if total > 0 else 1
if page > total_pages:
page = total_pages
offset = (page - 1) * page_size
txns = query.offset(offset).limit(page_size).all()
logger.info(
"api_ledger_list",
case_id=case_id,
file_no=file_no,
from_date=from_date,
to_date=to_date,
billed=billed,
t_code=t_code,
t_type_l=t_type_l,
employee_number=employee_number,
q=q,
page=page,
page_size=page_size,
total=total,
sort_by=applied_sort_by,
sort_dir=applied_sort_dir,
)
items = [
TransactionOut(
id=t.id,
case_id=t.case_id,
case_file_no=t.case.file_no if t.case else None,
transaction_date=t.transaction_date,
item_no=t.item_no,
amount=t.amount,
billed=t.billed,
t_code=t.t_code,
t_type_l=t.t_type_l,
quantity=t.quantity,
rate=t.rate,
description=t.description,
employee_number=t.employee_number,
)
for t in txns
]
return LedgerListResponse(
items=items,
pagination=Pagination(page=page, page_size=page_size, total=total, total_pages=total_pages),
)