Files
delphi-database/app/api/pensions.py
2025-08-15 17:19:51 -05:00

1149 lines
40 KiB
Python

"""
CRUD and list endpoints for pension-related legacy tables with basic date filters.
Tables:
- PensionSchedule (SCHEDULE.csv)
- MarriageHistory (MARRIAGE.csv)
- DeathBenefit (DEATH.csv)
- SeparationAgreement (SEPARATE.csv)
"""
from typing import List, Optional, Union
from datetime import date, datetime, time
from fastapi import APIRouter, Depends, HTTPException, Query, status
from pydantic import BaseModel, Field
from sqlalchemy.orm import Session
from sqlalchemy import func
from app.api.search_highlight import build_query_tokens
from app.services.query_utils import apply_sorting, paginate_with_total, tokenized_ilike_filter
from app.database.base import get_db
from app.auth.security import get_current_user
from app.models.user import User
from app.models.pensions import (
PensionSchedule,
MarriageHistory,
DeathBenefit,
SeparationAgreement,
Pension,
)
router = APIRouter()
class ScheduleResponse(BaseModel):
id: int
file_no: str
version: Optional[str] = None
vests_on: Optional[date] = None
vests_at: Optional[float] = None
frequency: Optional[str] = None
model_config = {
"from_attributes": True,
}
class PaginatedSchedulesResponse(BaseModel):
items: List[ScheduleResponse]
total: int
@router.get(
"/schedules",
response_model=Union[List[ScheduleResponse], PaginatedSchedulesResponse],
summary="List pension schedules for a file",
description="Filter by file number, date range, version, numeric ranges, and optional tokenized search (version, frequency). Supports pagination, sorting, and optional total count.",
tags=["pensions", "pensions-schedules"],
)
async def list_pension_schedules(
file_no: str = Query(..., description="Filter by file number"),
start: Optional[date] = Query(None, description="Start date (inclusive) for vests_on"),
end: Optional[date] = Query(None, description="End date (inclusive) for vests_on"),
version: Optional[str] = Query(None, description="Filter by version"),
vests_at_min: Optional[float] = Query(None, description="Minimum vests_at (inclusive)"),
vests_at_max: Optional[float] = Query(None, description="Maximum vests_at (inclusive)"),
search: Optional[str] = Query(None, description="Tokenized search across version and frequency"),
skip: int = Query(0, ge=0),
limit: int = Query(50, ge=1, le=200),
sort_by: Optional[str] = Query("id", description="Sort by: id, file_no, version, vests_on, vests_at"),
sort_dir: Optional[str] = Query("asc", description="Sort direction: asc or desc"),
include_total: bool = Query(False, description="When true, returns {items, total} instead of a plain list"),
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
q = db.query(PensionSchedule).filter(PensionSchedule.file_no == file_no)
if start is not None:
q = q.filter(PensionSchedule.vests_on >= start)
if end is not None:
q = q.filter(PensionSchedule.vests_on <= end)
if version is not None:
q = q.filter(PensionSchedule.version == version)
if vests_at_min is not None:
q = q.filter(PensionSchedule.vests_at >= vests_at_min)
if vests_at_max is not None:
q = q.filter(PensionSchedule.vests_at <= vests_at_max)
if search:
tokens = build_query_tokens(search)
filter_expr = tokenized_ilike_filter(tokens, [
PensionSchedule.version,
PensionSchedule.frequency,
])
if filter_expr is not None:
q = q.filter(filter_expr)
q = apply_sorting(
q,
sort_by,
sort_dir,
allowed={
"id": [PensionSchedule.id],
"file_no": [PensionSchedule.file_no],
"version": [PensionSchedule.version],
"vests_on": [PensionSchedule.vests_on],
"vests_at": [PensionSchedule.vests_at],
},
)
items, total = paginate_with_total(q, skip, limit, include_total)
if include_total:
return {"items": items, "total": total or 0}
return items
class ScheduleCreate(BaseModel):
file_no: str
version: Optional[str] = "01"
vests_on: Optional[date] = None
vests_at: Optional[float] = None
frequency: Optional[str] = None
class ScheduleUpdate(BaseModel):
version: Optional[str] = None
vests_on: Optional[date] = None
vests_at: Optional[float] = None
frequency: Optional[str] = None
@router.post(
"/schedules",
response_model=ScheduleResponse,
status_code=status.HTTP_201_CREATED,
summary="Create pension schedule",
description="Create a new pension schedule row for a file.",
tags=["pensions", "pensions-schedules"],
)
async def create_pension_schedule(
payload: ScheduleCreate,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
row = PensionSchedule(
file_no=payload.file_no,
version=payload.version or "01",
vests_on=payload.vests_on,
vests_at=payload.vests_at,
frequency=payload.frequency,
)
db.add(row)
db.commit()
db.refresh(row)
return row
@router.get(
"/schedules/{row_id}",
response_model=ScheduleResponse,
summary="Get pension schedule",
description="Fetch a single pension schedule row by ID.",
tags=["pensions", "pensions-schedules"],
)
async def get_pension_schedule(
row_id: int,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
row = db.query(PensionSchedule).filter(PensionSchedule.id == row_id).first()
if not row:
raise HTTPException(status_code=404, detail="Schedule not found")
return row
@router.put(
"/schedules/{row_id}",
response_model=ScheduleResponse,
summary="Update pension schedule",
description="Update fields on an existing pension schedule.",
tags=["pensions", "pensions-schedules"],
)
async def update_pension_schedule(
row_id: int,
payload: ScheduleUpdate,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
row = db.query(PensionSchedule).filter(PensionSchedule.id == row_id).first()
if not row:
raise HTTPException(status_code=404, detail="Schedule not found")
for field, value in payload.model_dump(exclude_unset=True).items():
setattr(row, field, value)
db.commit()
db.refresh(row)
return row
@router.delete(
"/schedules/{row_id}",
status_code=status.HTTP_204_NO_CONTENT,
summary="Delete pension schedule",
description="Delete a pension schedule row by ID.",
tags=["pensions", "pensions-schedules"],
)
async def delete_pension_schedule(
row_id: int,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
row = db.query(PensionSchedule).filter(PensionSchedule.id == row_id).first()
if not row:
raise HTTPException(status_code=404, detail="Schedule not found")
db.delete(row)
db.commit()
return None
class MarriageResponse(BaseModel):
id: int
file_no: str
spouse_name: Optional[str] = None
notes: Optional[str] = None
married_from: Optional[date] = None
married_to: Optional[date] = None
married_years: Optional[float] = None
service_from: Optional[date] = None
service_to: Optional[date] = None
service_years: Optional[float] = None
marital_percent: Optional[float] = None
model_config = {
"from_attributes": True,
}
class PaginatedMarriagesResponse(BaseModel):
items: List[MarriageResponse]
total: int
@router.get(
"/marriages",
response_model=Union[List[MarriageResponse], PaginatedMarriagesResponse],
summary="List marriage history",
description="Filter by file, date range, version, numeric ranges, and optional tokenized search (version, spouse_name, notes). Supports pagination and sorting.",
tags=["pensions", "pensions-marriages"],
)
async def list_marriages(
file_no: str = Query(..., description="Filter by file number"),
start: Optional[date] = Query(None, description="Start date (inclusive) for married_from"),
end: Optional[date] = Query(None, description="End date (inclusive) for married_from"),
version: Optional[str] = Query(None, description="Filter by version"),
married_years_min: Optional[float] = Query(None, description="Minimum married_years (inclusive)"),
married_years_max: Optional[float] = Query(None, description="Maximum married_years (inclusive)"),
service_years_min: Optional[float] = Query(None, description="Minimum service_years (inclusive)"),
service_years_max: Optional[float] = Query(None, description="Maximum service_years (inclusive)"),
marital_percent_min: Optional[float] = Query(None, description="Minimum marital_percent (inclusive)"),
marital_percent_max: Optional[float] = Query(None, description="Maximum marital_percent (inclusive)"),
search: Optional[str] = Query(None, description="Tokenized search across version and notes/spouse_name when present"),
skip: int = Query(0, ge=0),
limit: int = Query(50, ge=1, le=200),
sort_by: Optional[str] = Query("id", description="Sort by: id, file_no, version, married_from, married_to, marital_percent, service_from, service_to"),
sort_dir: Optional[str] = Query("asc", description="Sort direction: asc or desc"),
include_total: bool = Query(False, description="When true, returns {items, total} instead of a plain list"),
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
q = db.query(MarriageHistory).filter(MarriageHistory.file_no == file_no)
if start is not None:
q = q.filter(MarriageHistory.married_from >= start)
if end is not None:
q = q.filter(MarriageHistory.married_from <= end)
if version is not None:
q = q.filter(MarriageHistory.version == version)
if married_years_min is not None:
q = q.filter(MarriageHistory.married_years >= married_years_min)
if married_years_max is not None:
q = q.filter(MarriageHistory.married_years <= married_years_max)
if service_years_min is not None:
q = q.filter(MarriageHistory.service_years >= service_years_min)
if service_years_max is not None:
q = q.filter(MarriageHistory.service_years <= service_years_max)
if marital_percent_min is not None:
q = q.filter(MarriageHistory.marital_percent >= marital_percent_min)
if marital_percent_max is not None:
q = q.filter(MarriageHistory.marital_percent <= marital_percent_max)
if search:
tokens = build_query_tokens(search)
filter_expr = tokenized_ilike_filter(tokens, [
MarriageHistory.version,
MarriageHistory.spouse_name,
MarriageHistory.notes,
])
if filter_expr is not None:
q = q.filter(filter_expr)
q = apply_sorting(
q,
sort_by,
sort_dir,
allowed={
"id": [MarriageHistory.id],
"file_no": [MarriageHistory.file_no],
"version": [MarriageHistory.version],
"married_from": [MarriageHistory.married_from],
"married_to": [MarriageHistory.married_to],
"marital_percent": [MarriageHistory.marital_percent],
"service_from": [MarriageHistory.service_from],
"service_to": [MarriageHistory.service_to],
},
)
items, total = paginate_with_total(q, skip, limit, include_total)
if include_total:
return {"items": items, "total": total or 0}
return items
class MarriageCreate(BaseModel):
file_no: str
version: Optional[str] = "01"
spouse_name: Optional[str] = None
notes: Optional[str] = None
married_from: Optional[date] = None
married_to: Optional[date] = None
married_years: Optional[float] = None
service_from: Optional[date] = None
service_to: Optional[date] = None
service_years: Optional[float] = None
marital_percent: Optional[float] = None
class MarriageUpdate(BaseModel):
version: Optional[str] = None
spouse_name: Optional[str] = None
notes: Optional[str] = None
married_from: Optional[date] = None
married_to: Optional[date] = None
married_years: Optional[float] = None
service_from: Optional[date] = None
service_to: Optional[date] = None
service_years: Optional[float] = None
marital_percent: Optional[float] = None
@router.post(
"/marriages",
response_model=MarriageResponse,
status_code=status.HTTP_201_CREATED,
summary="Create marriage history row",
description="Create a new marriage history record for a file.",
tags=["pensions", "pensions-marriages"],
)
async def create_marriage(
payload: MarriageCreate,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
row = MarriageHistory(**payload.model_dump(exclude_unset=True))
db.add(row)
db.commit()
db.refresh(row)
return row
@router.get(
"/marriages/{row_id}",
response_model=MarriageResponse,
summary="Get marriage history row",
description="Fetch a single marriage history record by ID.",
tags=["pensions", "pensions-marriages"],
)
async def get_marriage(
row_id: int,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
row = db.query(MarriageHistory).filter(MarriageHistory.id == row_id).first()
if not row:
raise HTTPException(status_code=404, detail="Marriage not found")
return row
@router.put(
"/marriages/{row_id}",
response_model=MarriageResponse,
summary="Update marriage history row",
description="Update fields on an existing marriage history record.",
tags=["pensions", "pensions-marriages"],
)
async def update_marriage(
row_id: int,
payload: MarriageUpdate,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
row = db.query(MarriageHistory).filter(MarriageHistory.id == row_id).first()
if not row:
raise HTTPException(status_code=404, detail="Marriage not found")
for field, value in payload.model_dump(exclude_unset=True).items():
setattr(row, field, value)
db.commit()
db.refresh(row)
return row
@router.delete(
"/marriages/{row_id}",
status_code=status.HTTP_204_NO_CONTENT,
summary="Delete marriage history row",
description="Delete a marriage history record by ID.",
tags=["pensions", "pensions-marriages"],
)
async def delete_marriage(
row_id: int,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
row = db.query(MarriageHistory).filter(MarriageHistory.id == row_id).first()
if not row:
raise HTTPException(status_code=404, detail="Marriage not found")
db.delete(row)
db.commit()
return None
class DeathResponse(BaseModel):
id: int
file_no: str
beneficiary_name: Optional[str] = None
benefit_type: Optional[str] = None
notes: Optional[str] = None
lump1: Optional[float] = None
lump2: Optional[float] = None
growth1: Optional[float] = None
growth2: Optional[float] = None
disc1: Optional[float] = None
disc2: Optional[float] = None
model_config = {
"from_attributes": True,
}
class PaginatedDeathResponse(BaseModel):
items: List[DeathResponse]
total: int
@router.get(
"/death-benefits",
response_model=Union[List[DeathResponse], PaginatedDeathResponse],
summary="List death benefits",
description="Filter by file, date range, version, numeric ranges, and optional tokenized search (version, beneficiary_name, benefit_type, notes). Supports pagination and sorting.",
tags=["pensions", "pensions-death"],
)
async def list_death_benefits(
file_no: str = Query(..., description="Filter by file number"),
start: Optional[date] = Query(None, description="Start date (inclusive) for created_at"),
end: Optional[date] = Query(None, description="End date (inclusive) for created_at"),
version: Optional[str] = Query(None, description="Filter by version"),
lump1_min: Optional[float] = Query(None, description="Minimum lump1 (inclusive)"),
lump1_max: Optional[float] = Query(None, description="Maximum lump1 (inclusive)"),
lump2_min: Optional[float] = Query(None, description="Minimum lump2 (inclusive)"),
lump2_max: Optional[float] = Query(None, description="Maximum lump2 (inclusive)"),
growth1_min: Optional[float] = Query(None, description="Minimum growth1 (inclusive)"),
growth1_max: Optional[float] = Query(None, description="Maximum growth1 (inclusive)"),
growth2_min: Optional[float] = Query(None, description="Minimum growth2 (inclusive)"),
growth2_max: Optional[float] = Query(None, description="Maximum growth2 (inclusive)"),
disc1_min: Optional[float] = Query(None, description="Minimum disc1 (inclusive)"),
disc1_max: Optional[float] = Query(None, description="Maximum disc1 (inclusive)"),
disc2_min: Optional[float] = Query(None, description="Minimum disc2 (inclusive)"),
disc2_max: Optional[float] = Query(None, description="Maximum disc2 (inclusive)"),
search: Optional[str] = Query(None, description="Tokenized search across version, beneficiary_name, benefit_type, notes"),
skip: int = Query(0, ge=0),
limit: int = Query(50, ge=1, le=200),
sort_by: Optional[str] = Query("id", description="Sort by: id, file_no, version, lump1, lump2, growth1, growth2, disc1, disc2, created"),
sort_dir: Optional[str] = Query("asc", description="Sort direction: asc or desc"),
include_total: bool = Query(False, description="When true, returns {items, total} instead of a plain list"),
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
q = db.query(DeathBenefit).filter(DeathBenefit.file_no == file_no)
if start is not None:
q = q.filter(func.date(DeathBenefit.created_at) >= start)
if end is not None:
q = q.filter(func.date(DeathBenefit.created_at) <= end)
if version is not None:
q = q.filter(DeathBenefit.version == version)
if lump1_min is not None:
q = q.filter(DeathBenefit.lump1 >= lump1_min)
if lump1_max is not None:
q = q.filter(DeathBenefit.lump1 <= lump1_max)
if lump2_min is not None:
q = q.filter(DeathBenefit.lump2 >= lump2_min)
if lump2_max is not None:
q = q.filter(DeathBenefit.lump2 <= lump2_max)
if growth1_min is not None:
q = q.filter(DeathBenefit.growth1 >= growth1_min)
if growth1_max is not None:
q = q.filter(DeathBenefit.growth1 <= growth1_max)
if growth2_min is not None:
q = q.filter(DeathBenefit.growth2 >= growth2_min)
if growth2_max is not None:
q = q.filter(DeathBenefit.growth2 <= growth2_max)
if disc1_min is not None:
q = q.filter(DeathBenefit.disc1 >= disc1_min)
if disc1_max is not None:
q = q.filter(DeathBenefit.disc1 <= disc1_max)
if disc2_min is not None:
q = q.filter(DeathBenefit.disc2 >= disc2_min)
if disc2_max is not None:
q = q.filter(DeathBenefit.disc2 <= disc2_max)
if search:
tokens = build_query_tokens(search)
filter_expr = tokenized_ilike_filter(tokens, [
DeathBenefit.version,
DeathBenefit.beneficiary_name,
DeathBenefit.benefit_type,
DeathBenefit.notes,
])
if filter_expr is not None:
q = q.filter(filter_expr)
q = apply_sorting(
q,
sort_by,
sort_dir,
allowed={
"id": [DeathBenefit.id],
"file_no": [DeathBenefit.file_no],
"version": [DeathBenefit.version],
"lump1": [DeathBenefit.lump1],
"lump2": [DeathBenefit.lump2],
"growth1": [DeathBenefit.growth1],
"growth2": [DeathBenefit.growth2],
"disc1": [DeathBenefit.disc1],
"disc2": [DeathBenefit.disc2],
"created": [DeathBenefit.created_at],
},
)
items, total = paginate_with_total(q, skip, limit, include_total)
if include_total:
return {"items": items, "total": total or 0}
return items
class DeathCreate(BaseModel):
file_no: str
version: Optional[str] = "01"
beneficiary_name: Optional[str] = None
benefit_type: Optional[str] = None
notes: Optional[str] = None
lump1: Optional[float] = None
lump2: Optional[float] = None
growth1: Optional[float] = None
growth2: Optional[float] = None
disc1: Optional[float] = None
disc2: Optional[float] = None
class DeathUpdate(BaseModel):
version: Optional[str] = None
beneficiary_name: Optional[str] = None
benefit_type: Optional[str] = None
notes: Optional[str] = None
lump1: Optional[float] = None
lump2: Optional[float] = None
growth1: Optional[float] = None
growth2: Optional[float] = None
disc1: Optional[float] = None
disc2: Optional[float] = None
@router.post(
"/death-benefits",
response_model=DeathResponse,
status_code=status.HTTP_201_CREATED,
summary="Create death benefit",
description="Create a new death benefit record for a file.",
tags=["pensions", "pensions-death"],
)
async def create_death_benefit(
payload: DeathCreate,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
row = DeathBenefit(**payload.model_dump(exclude_unset=True))
db.add(row)
db.commit()
db.refresh(row)
return row
@router.get(
"/death-benefits/{row_id}",
response_model=DeathResponse,
summary="Get death benefit",
description="Fetch a single death benefit record by ID.",
tags=["pensions", "pensions-death"],
)
async def get_death_benefit(
row_id: int,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
row = db.query(DeathBenefit).filter(DeathBenefit.id == row_id).first()
if not row:
raise HTTPException(status_code=404, detail="Death benefit not found")
return row
@router.put(
"/death-benefits/{row_id}",
response_model=DeathResponse,
summary="Update death benefit",
description="Update fields on an existing death benefit record.",
tags=["pensions", "pensions-death"],
)
async def update_death_benefit(
row_id: int,
payload: DeathUpdate,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
row = db.query(DeathBenefit).filter(DeathBenefit.id == row_id).first()
if not row:
raise HTTPException(status_code=404, detail="Death benefit not found")
for field, value in payload.model_dump(exclude_unset=True).items():
setattr(row, field, value)
db.commit()
db.refresh(row)
return row
@router.delete(
"/death-benefits/{row_id}",
status_code=status.HTTP_204_NO_CONTENT,
summary="Delete death benefit",
description="Delete a death benefit record by ID.",
tags=["pensions", "pensions-death"],
)
async def delete_death_benefit(
row_id: int,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
row = db.query(DeathBenefit).filter(DeathBenefit.id == row_id).first()
if not row:
raise HTTPException(status_code=404, detail="Death benefit not found")
db.delete(row)
db.commit()
return None
class SeparationResponse(BaseModel):
id: int
file_no: str
agreement_date: Optional[date] = None
terms: Optional[str] = None
notes: Optional[str] = None
model_config = {
"from_attributes": True,
}
class PaginatedSeparationsResponse(BaseModel):
items: List[SeparationResponse]
total: int
@router.get("/separations", response_model=Union[List[SeparationResponse], PaginatedSeparationsResponse])
async def list_separations(
file_no: str = Query(..., description="Filter by file number"),
start: Optional[date] = Query(None, description="Start date (inclusive) for agreement_date"),
end: Optional[date] = Query(None, description="End date (inclusive) for agreement_date"),
version: Optional[str] = Query(None, description="Filter by version"),
search: Optional[str] = Query(None, description="Tokenized search across version, terms, and notes"),
skip: int = Query(0, ge=0),
limit: int = Query(50, ge=1, le=200),
sort_by: Optional[str] = Query("id", description="Sort by: id, file_no, version, agreement_date"),
sort_dir: Optional[str] = Query("asc", description="Sort direction: asc or desc"),
include_total: bool = Query(False, description="When true, returns {items, total} instead of a plain list"),
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
q = db.query(SeparationAgreement).filter(SeparationAgreement.file_no == file_no)
if start is not None:
q = q.filter(SeparationAgreement.agreement_date >= start)
if end is not None:
q = q.filter(SeparationAgreement.agreement_date <= end)
if version is not None:
q = q.filter(SeparationAgreement.version == version)
if search:
tokens = build_query_tokens(search)
filter_expr = tokenized_ilike_filter(tokens, [
SeparationAgreement.version,
SeparationAgreement.terms,
SeparationAgreement.notes,
])
if filter_expr is not None:
q = q.filter(filter_expr)
q = apply_sorting(
q,
sort_by,
sort_dir,
allowed={
"id": [SeparationAgreement.id],
"file_no": [SeparationAgreement.file_no],
"version": [SeparationAgreement.version],
"agreement_date": [SeparationAgreement.agreement_date],
},
)
items, total = paginate_with_total(q, skip, limit, include_total)
if include_total:
return {"items": items, "total": total or 0}
return items
# -----------------------------
# Pension detail with nested lists
# -----------------------------
class PensionResponse(BaseModel):
id: Optional[int] = None
file_no: str
version: Optional[str] = None
plan_id: Optional[str] = None
plan_name: Optional[str] = None
title: Optional[str] = None
first: Optional[str] = None
last: Optional[str] = None
birth: Optional[date] = None
race: Optional[str] = None
sex: Optional[str] = None
info: Optional[str] = None
valu: Optional[float] = None
accrued: Optional[float] = None
vested_per: Optional[float] = None
start_age: Optional[int] = None
cola: Optional[float] = None
max_cola: Optional[float] = None
withdrawal: Optional[str] = None
pre_dr: Optional[float] = None
post_dr: Optional[float] = None
tax_rate: Optional[float] = None
model_config = {"from_attributes": True}
class PensionDetailResponse(BaseModel):
pension: Optional[PensionResponse] = None
schedules: PaginatedSchedulesResponse
marriages: PaginatedMarriagesResponse
death_benefits: PaginatedDeathResponse
separations: PaginatedSeparationsResponse
@router.get(
"/{file_no}/detail",
response_model=PensionDetailResponse,
summary="Pension detail with nested lists",
description="Return a representative Pension record for a file along with nested lists (schedules, marriages, death benefits, separations), each with independent pagination, sorting, and filtering controls.",
tags=["pensions", "pensions-detail"],
)
async def get_pension_detail(
file_no: str,
# Schedules controls
s_start: Optional[date] = Query(None),
s_end: Optional[date] = Query(None),
s_version: Optional[str] = Query(None),
s_search: Optional[str] = Query(None),
s_skip: int = Query(0, ge=0),
s_limit: int = Query(50, ge=1, le=200),
s_sort_by: Optional[str] = Query("vests_on"),
s_sort_dir: Optional[str] = Query("asc"),
# Marriages controls
m_start: Optional[date] = Query(None),
m_end: Optional[date] = Query(None),
m_version: Optional[str] = Query(None),
m_search: Optional[str] = Query(None),
m_skip: int = Query(0, ge=0),
m_limit: int = Query(50, ge=1, le=200),
m_sort_by: Optional[str] = Query("id"),
m_sort_dir: Optional[str] = Query("asc"),
# Death benefits controls
d_start: Optional[date] = Query(None),
d_end: Optional[date] = Query(None),
d_version: Optional[str] = Query(None),
d_search: Optional[str] = Query(None),
d_skip: int = Query(0, ge=0),
d_limit: int = Query(50, ge=1, le=200),
d_sort_by: Optional[str] = Query("id"),
d_sort_dir: Optional[str] = Query("asc"),
# Separations controls
sep_start: Optional[date] = Query(None),
sep_end: Optional[date] = Query(None),
sep_version: Optional[str] = Query(None),
sep_search: Optional[str] = Query(None),
sep_skip: int = Query(0, ge=0),
sep_limit: int = Query(50, ge=1, le=200),
sep_sort_by: Optional[str] = Query("id"),
sep_sort_dir: Optional[str] = Query("asc"),
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
# Load a representative pension record for the file (latest by updated_at/id)
pension_row = (
db.query(Pension)
.filter(Pension.file_no == file_no)
.order_by(Pension.updated_at.desc().nullslast(), Pension.id.desc())
.first()
)
# Schedules
sq = db.query(PensionSchedule).filter(PensionSchedule.file_no == file_no)
if s_start is not None:
sq = sq.filter(PensionSchedule.vests_on >= s_start)
if s_end is not None:
sq = sq.filter(PensionSchedule.vests_on <= s_end)
if s_version is not None:
sq = sq.filter(PensionSchedule.version == s_version)
if s_search:
tokens = build_query_tokens(s_search)
fexpr = tokenized_ilike_filter(tokens, [PensionSchedule.version, PensionSchedule.frequency])
if fexpr is not None:
sq = sq.filter(fexpr)
sq = apply_sorting(
sq,
s_sort_by,
s_sort_dir,
allowed={
"id": [PensionSchedule.id],
"file_no": [PensionSchedule.file_no],
"version": [PensionSchedule.version],
"vests_on": [PensionSchedule.vests_on],
"vests_at": [PensionSchedule.vests_at],
},
)
schedules_items, schedules_total = paginate_with_total(sq, s_skip, s_limit, True)
# Marriages
mq = db.query(MarriageHistory).filter(MarriageHistory.file_no == file_no)
if m_start is not None:
mq = mq.filter(MarriageHistory.married_from >= m_start)
if m_end is not None:
mq = mq.filter(MarriageHistory.married_from <= m_end)
if m_version is not None:
mq = mq.filter(MarriageHistory.version == m_version)
if m_search:
tokens = build_query_tokens(m_search)
fexpr = tokenized_ilike_filter(tokens, [MarriageHistory.version, MarriageHistory.spouse_name, MarriageHistory.notes])
if fexpr is not None:
mq = mq.filter(fexpr)
mq = apply_sorting(
mq,
m_sort_by,
m_sort_dir,
allowed={
"id": [MarriageHistory.id],
"file_no": [MarriageHistory.file_no],
"version": [MarriageHistory.version],
"married_from": [MarriageHistory.married_from],
"married_to": [MarriageHistory.married_to],
"marital_percent": [MarriageHistory.marital_percent],
"service_from": [MarriageHistory.service_from],
"service_to": [MarriageHistory.service_to],
},
)
marriages_items, marriages_total = paginate_with_total(mq, m_skip, m_limit, True)
# Death benefits
dq = db.query(DeathBenefit).filter(DeathBenefit.file_no == file_no)
if d_start is not None:
dq = dq.filter(func.date(DeathBenefit.created_at) >= d_start)
if d_end is not None:
dq = dq.filter(func.date(DeathBenefit.created_at) <= d_end)
if d_version is not None:
dq = dq.filter(DeathBenefit.version == d_version)
if d_search:
tokens = build_query_tokens(d_search)
fexpr = tokenized_ilike_filter(tokens, [DeathBenefit.version, DeathBenefit.beneficiary_name, DeathBenefit.benefit_type, DeathBenefit.notes])
if fexpr is not None:
dq = dq.filter(fexpr)
dq = apply_sorting(
dq,
d_sort_by,
d_sort_dir,
allowed={
"id": [DeathBenefit.id],
"file_no": [DeathBenefit.file_no],
"version": [DeathBenefit.version],
"lump1": [DeathBenefit.lump1],
"lump2": [DeathBenefit.lump2],
"growth1": [DeathBenefit.growth1],
"growth2": [DeathBenefit.growth2],
"disc1": [DeathBenefit.disc1],
"disc2": [DeathBenefit.disc2],
"created": [DeathBenefit.created_at],
},
)
death_items, death_total = paginate_with_total(dq, d_skip, d_limit, True)
# Separations
spq = db.query(SeparationAgreement).filter(SeparationAgreement.file_no == file_no)
if sep_start is not None:
spq = spq.filter(SeparationAgreement.agreement_date >= sep_start)
if sep_end is not None:
spq = spq.filter(SeparationAgreement.agreement_date <= sep_end)
if sep_version is not None:
spq = spq.filter(SeparationAgreement.version == sep_version)
if sep_search:
tokens = build_query_tokens(sep_search)
fexpr = tokenized_ilike_filter(tokens, [SeparationAgreement.version, SeparationAgreement.terms, SeparationAgreement.notes])
if fexpr is not None:
spq = spq.filter(fexpr)
spq = apply_sorting(
spq,
sep_sort_by,
sep_sort_dir,
allowed={
"id": [SeparationAgreement.id],
"file_no": [SeparationAgreement.file_no],
"version": [SeparationAgreement.version],
"agreement_date": [SeparationAgreement.agreement_date],
},
)
sep_items, sep_total = paginate_with_total(spq, sep_skip, sep_limit, True)
return {
"pension": pension_row,
"schedules": {"items": schedules_items, "total": schedules_total or 0},
"marriages": {"items": marriages_items, "total": marriages_total or 0},
"death_benefits": {"items": death_items, "total": death_total or 0},
"separations": {"items": sep_items, "total": sep_total or 0},
}
# -----------------------------
# Pension CRUD (main table)
# -----------------------------
class PensionCreate(BaseModel):
file_no: str
version: Optional[str] = "01"
plan_id: Optional[str] = None
plan_name: Optional[str] = None
title: Optional[str] = None
first: Optional[str] = None
last: Optional[str] = None
birth: Optional[date] = None
race: Optional[str] = None
sex: Optional[str] = None
info: Optional[str] = None
valu: Optional[float] = Field(default=None, ge=0)
accrued: Optional[float] = Field(default=None, ge=0)
vested_per: Optional[float] = Field(default=None, ge=0, le=100)
start_age: Optional[int] = Field(default=None, ge=0)
cola: Optional[float] = Field(default=None, ge=0)
max_cola: Optional[float] = Field(default=None, ge=0)
withdrawal: Optional[str] = None
pre_dr: Optional[float] = Field(default=None, ge=0)
post_dr: Optional[float] = Field(default=None, ge=0)
tax_rate: Optional[float] = Field(default=None, ge=0, le=100)
class PensionUpdate(BaseModel):
version: Optional[str] = None
plan_id: Optional[str] = None
plan_name: Optional[str] = None
title: Optional[str] = None
first: Optional[str] = None
last: Optional[str] = None
birth: Optional[date] = None
race: Optional[str] = None
sex: Optional[str] = None
info: Optional[str] = None
valu: Optional[float] = Field(default=None, ge=0)
accrued: Optional[float] = Field(default=None, ge=0)
vested_per: Optional[float] = Field(default=None, ge=0, le=100)
start_age: Optional[int] = Field(default=None, ge=0)
cola: Optional[float] = Field(default=None, ge=0)
max_cola: Optional[float] = Field(default=None, ge=0)
withdrawal: Optional[str] = None
pre_dr: Optional[float] = Field(default=None, ge=0)
post_dr: Optional[float] = Field(default=None, ge=0)
tax_rate: Optional[float] = Field(default=None, ge=0, le=100)
@router.post(
"/",
response_model=PensionResponse,
status_code=status.HTTP_201_CREATED,
summary="Create Pension",
description="Create a main Pension record for a file.",
tags=["pensions", "pensions-main"],
)
async def create_pension(
payload: PensionCreate,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
row = Pension(**payload.model_dump(exclude_unset=True))
db.add(row)
db.commit()
db.refresh(row)
return row
@router.get(
"/{pension_id}",
response_model=PensionResponse,
summary="Get Pension",
description="Fetch a main Pension record by ID.",
tags=["pensions", "pensions-main"],
)
async def get_pension(
pension_id: int,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
row = db.query(Pension).filter(Pension.id == pension_id).first()
if not row:
raise HTTPException(status_code=404, detail="Pension not found")
return row
@router.put(
"/{pension_id}",
response_model=PensionResponse,
summary="Update Pension",
description="Update fields on an existing Pension record.",
tags=["pensions", "pensions-main"],
)
async def update_pension(
pension_id: int,
payload: PensionUpdate,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
row = db.query(Pension).filter(Pension.id == pension_id).first()
if not row:
raise HTTPException(status_code=404, detail="Pension not found")
for field, value in payload.model_dump(exclude_unset=True).items():
setattr(row, field, value)
db.commit()
db.refresh(row)
return row
@router.delete(
"/{pension_id}",
status_code=status.HTTP_204_NO_CONTENT,
summary="Delete Pension",
description="Delete a Pension record by ID.",
tags=["pensions", "pensions-main"],
)
async def delete_pension(
pension_id: int,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
row = db.query(Pension).filter(Pension.id == pension_id).first()
if not row:
raise HTTPException(status_code=404, detail="Pension not found")
db.delete(row)
db.commit()
return None
class SeparationCreate(BaseModel):
file_no: str
version: Optional[str] = "01"
agreement_date: Optional[date] = None
terms: Optional[str] = None
notes: Optional[str] = None
class SeparationUpdate(BaseModel):
version: Optional[str] = None
agreement_date: Optional[date] = None
terms: Optional[str] = None
notes: Optional[str] = None
@router.post("/separations", response_model=SeparationResponse, status_code=status.HTTP_201_CREATED)
async def create_separation(
payload: SeparationCreate,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
row = SeparationAgreement(**payload.model_dump(exclude_unset=True))
db.add(row)
db.commit()
db.refresh(row)
return row
@router.get("/separations/{row_id}", response_model=SeparationResponse)
async def get_separation(
row_id: int,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
row = db.query(SeparationAgreement).filter(SeparationAgreement.id == row_id).first()
if not row:
raise HTTPException(status_code=404, detail="Separation agreement not found")
return row
@router.put("/separations/{row_id}", response_model=SeparationResponse)
async def update_separation(
row_id: int,
payload: SeparationUpdate,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
row = db.query(SeparationAgreement).filter(SeparationAgreement.id == row_id).first()
if not row:
raise HTTPException(status_code=404, detail="Separation agreement not found")
for field, value in payload.model_dump(exclude_unset=True).items():
setattr(row, field, value)
db.commit()
db.refresh(row)
return row
@router.delete("/separations/{row_id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_separation(
row_id: int,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
row = db.query(SeparationAgreement).filter(SeparationAgreement.id == row_id).first()
if not row:
raise HTTPException(status_code=404, detail="Separation agreement not found")
db.delete(row)
db.commit()
return None