templates: add multi-category filter (repeatable or CSV) to GET /api/templates/search; add has_keywords filter; add categories listing endpoint with counts; update docs; add tests

This commit is contained in:
HotSwapp
2025-08-15 15:04:40 -05:00
parent 21c6b285d6
commit e3a279dba7
17 changed files with 3727 additions and 2 deletions

View File

@@ -0,0 +1,95 @@
import io
from datetime import date
from fastapi.testclient import TestClient
from app.main import app
from app.auth.security import get_current_user
def _csv_file(name: str, text: str):
return ("files", (name, io.BytesIO(text.encode("utf-8")), "text/csv"))
def test_crud_and_list_filters_for_pensions_tables():
# Auth override
app.dependency_overrides[get_current_user] = lambda: {
"id": 1,
"username": "tester",
"is_admin": True,
"is_active": True,
}
client = TestClient(app)
# Seed base data via import
rolodex_csv = "Id,Last\nR2,Beta\n"
files_csv = "File_No,Id,File_Type,Regarding,Opened,Empl_Num,Status,Rate_Per_Hour\nF-2,R2,CIVIL,Test,2024-01-01,E01,ACTIVE,100\n"
client.post("/api/import/batch-upload", files=[
_csv_file("ROLODEX.csv", rolodex_csv),
_csv_file("FILES.csv", files_csv),
])
# Create schedule rows
resp = client.post("/api/pensions/schedules", json={"file_no": "F-2", "vests_on": "2024-06-01", "vests_at": 50})
assert resp.status_code == 201
sched_id = resp.json()["id"]
# Filter by date range (hit)
rlist = client.get("/api/pensions/schedules", params={"file_no": "F-2", "start": "2024-01-01", "end": "2024-12-31"})
assert rlist.status_code == 200 and len(rlist.json()) >= 1
# Update
up = client.put(f"/api/pensions/schedules/{sched_id}", json={"vests_at": 75})
assert up.status_code == 200 and up.json()["vests_at"] == 75
# Create marriage history
resp = client.post("/api/pensions/marriages", json={
"file_no": "F-2",
"married_from": "2000-01-01",
"married_to": "2010-01-01",
"married_years": 10,
"service_from": "1998-01-01",
"service_to": "2010-01-01",
"service_years": 12,
"marital_percent": 40,
})
assert resp.status_code == 201
marr_id = resp.json()["id"]
# Filter by married_from
rlist = client.get("/api/pensions/marriages", params={"file_no": "F-2", "start": "1999-01-01", "end": "2001-12-31"})
assert rlist.status_code == 200 and len(rlist.json()) >= 1
# Update
up = client.put(f"/api/pensions/marriages/{marr_id}", json={"marital_percent": 50})
assert up.status_code == 200 and up.json()["marital_percent"] == 50
# Create death benefit
resp = client.post("/api/pensions/death-benefits", json={"file_no": "F-2", "lump1": 1000})
assert resp.status_code == 201
death_id = resp.json()["id"]
# List by file_no (created today)
rlist = client.get("/api/pensions/death-benefits", params={"file_no": "F-2"})
assert rlist.status_code == 200 and any(row.get("id") == death_id for row in rlist.json())
# Update
up = client.put(f"/api/pensions/death-benefits/{death_id}", json={"lump2": 500})
assert up.status_code == 200 and up.json()["lump2"] == 500
# Create separation agreement
resp = client.post("/api/pensions/separations", json={"file_no": "F-2", "agreement_date": "2024-02-01", "terms": "Terms"})
assert resp.status_code == 201
sep_id = resp.json()["id"]
# Filter by agreement_date
rlist = client.get("/api/pensions/separations", params={"file_no": "F-2", "start": "2024-01-01", "end": "2024-12-31"})
assert rlist.status_code == 200 and len(rlist.json()) >= 1
# Update
up = client.put(f"/api/pensions/separations/{sep_id}", json={"terms": "Updated"})
assert up.status_code == 200 and up.json()["terms"] == "Updated"
# Delete paths
assert client.delete(f"/api/pensions/schedules/{sched_id}").status_code == 204
assert client.delete(f"/api/pensions/marriages/{marr_id}").status_code == 204
assert client.delete(f"/api/pensions/death-benefits/{death_id}").status_code == 204
assert client.delete(f"/api/pensions/separations/{sep_id}").status_code == 204
# Cleanup override
app.dependency_overrides.pop(get_current_user, None)

View File

@@ -0,0 +1,84 @@
import io
from datetime import date
import uuid
from fastapi.testclient import TestClient
from app.main import app
from app.auth.security import get_current_user
def _csv_file(name: str, text: str):
return ("files", (name, io.BytesIO(text.encode("utf-8")), "text/csv"))
def _seed_file(client: TestClient, file_no: str, owner_id: str = None) -> None:
owner_id = owner_id or f"R{uuid.uuid4().hex[:6]}"
rolodex_csv = f"Id,Last\n{owner_id},Alpha\n"
files_csv = (
"File_No,Id,File_Type,Regarding,Opened,Empl_Num,Status,Rate_Per_Hour\n"
f"{file_no},{owner_id},CIVIL,Test,{date.today():%Y-%m-%d},E01,ACTIVE,100\n"
)
client.post("/api/import/batch-upload", files=[
_csv_file("ROLODEX.csv", rolodex_csv),
_csv_file("FILES.csv", files_csv),
])
def _auth():
app.dependency_overrides[get_current_user] = lambda: {
"id": 1,
"username": "tester",
"is_admin": True,
"is_active": True,
}
def test_pension_crud_and_validation():
_auth()
client = TestClient(app)
file_no = f"PF-CRUD-{uuid.uuid4().hex[:8]}"
_seed_file(client, file_no)
# Create
create_payload = {
"file_no": file_no,
"version": "01",
"plan_id": "PID1",
"plan_name": "Plan A",
"vested_per": 50,
"tax_rate": 25,
}
r = client.post("/api/pensions/", json=create_payload)
assert r.status_code == 201
pid = r.json()["id"]
# Get
rg = client.get(f"/api/pensions/{pid}")
assert rg.status_code == 200 and rg.json()["plan_name"] == "Plan A"
# Update
ru = client.put(f"/api/pensions/{pid}", json={"plan_name": "Plan B", "vested_per": 75})
assert ru.status_code == 200 and ru.json()["plan_name"] == "Plan B" and ru.json()["vested_per"] == 75
# Validation edges: negative values or over 100 should fail
bads = [
{"vested_per": -1},
{"vested_per": 101},
{"tax_rate": -5},
{"tax_rate": 150},
{"valu": -0.01},
]
for payload in bads:
rv = client.put(f"/api/pensions/{pid}", json=payload)
assert rv.status_code == 422
# Delete
rd = client.delete(f"/api/pensions/{pid}")
assert rd.status_code == 204
r404 = client.get(f"/api/pensions/{pid}")
assert r404.status_code == 404
app.dependency_overrides.pop(get_current_user, None)

View File

@@ -0,0 +1,92 @@
import io
from datetime import date
import uuid
from fastapi.testclient import TestClient
from app.main import app
from app.auth.security import get_current_user
def _csv_file(name: str, text: str):
return ("files", (name, io.BytesIO(text.encode("utf-8")), "text/csv"))
def _seed_file(client: TestClient, file_no: str, owner_id: str = None) -> None:
owner_id = owner_id or f"R{uuid.uuid4().hex[:6]}"
rolodex_csv = f"Id,Last\n{owner_id},Alpha\n"
files_csv = (
"File_No,Id,File_Type,Regarding,Opened,Empl_Num,Status,Rate_Per_Hour\n"
f"{file_no},{owner_id},CIVIL,Test,{date.today():%Y-%m-%d},E01,ACTIVE,100\n"
)
client.post("/api/import/batch-upload", files=[
_csv_file("ROLODEX.csv", rolodex_csv),
_csv_file("FILES.csv", files_csv),
])
def _auth():
app.dependency_overrides[get_current_user] = lambda: {
"id": 1,
"username": "tester",
"is_admin": True,
"is_active": True,
}
def test_pension_detail_nested_collections_support_pagination_and_sorting():
_auth()
client = TestClient(app)
file_no = f"PF-DET-{uuid.uuid4().hex[:8]}"
_seed_file(client, file_no)
# Seed related rows
client.post("/api/pensions/schedules", json={"file_no": file_no, "version": "01", "frequency": "Monthly", "vests_on": "2024-01-01", "vests_at": 10})
client.post("/api/pensions/schedules", json={"file_no": file_no, "version": "01", "frequency": "Monthly", "vests_on": "2024-02-01", "vests_at": 20})
client.post("/api/pensions/schedules", json={"file_no": file_no, "version": "01", "frequency": "Monthly", "vests_on": "2024-03-01", "vests_at": 30})
client.post("/api/pensions/marriages", json={"file_no": file_no, "version": "01", "married_from": "2001-01-01", "marital_percent": 10})
client.post("/api/pensions/marriages", json={"file_no": file_no, "version": "01", "married_from": "2002-01-01", "marital_percent": 20})
client.post("/api/pensions/death-benefits", json={"file_no": file_no, "version": "01", "lump1": 100})
client.post("/api/pensions/death-benefits", json={"file_no": file_no, "version": "01", "lump1": 200})
client.post("/api/pensions/separations", json={"file_no": file_no, "version": "01", "agreement_date": "2024-02-01", "terms": "X"})
client.post("/api/pensions/separations", json={"file_no": file_no, "version": "01", "agreement_date": "2024-03-01", "terms": "Y"})
# Call detail with pagination and sorting per section
resp = client.get(
f"/api/pensions/{file_no}/detail",
params={
"s_sort_by": "vests_on", "s_sort_dir": "asc", "s_limit": 2, "s_skip": 1,
"m_sort_by": "married_from", "m_sort_dir": "desc", "m_limit": 1,
"d_sort_by": "lump1", "d_sort_dir": "desc",
"sep_sort_by": "agreement_date", "sep_sort_dir": "asc",
},
)
assert resp.status_code == 200
body = resp.json()
# Validate schedules pagination
sched = body["schedules"]
assert isinstance(sched["items"], list)
assert sched["total"] >= 3
# After skipping first (Jan), next two should start from Feb
sched_dates = [row["vests_on"] for row in sched["items"]]
assert sched_dates[0] == "2024-02-01"
# Marriages sorted desc
marr = body["marriages"]
assert [row["married_from"] for row in marr["items"]][0] == "2002-01-01"
# Death benefits sorted desc by lump1
deaths = body["death_benefits"]
assert [row["lump1"] for row in deaths["items"]][:2] == [200, 100]
# Separations sorted asc by date
seps = body["separations"]
assert [row["agreement_date"] for row in seps["items"]][:2] == ["2024-02-01", "2024-03-01"]
app.dependency_overrides.pop(get_current_user, None)

View File

@@ -0,0 +1,158 @@
import io
from datetime import date
import uuid
from fastapi.testclient import TestClient
from app.main import app
from app.auth.security import get_current_user
def _csv_file(name: str, text: str):
return ("files", (name, io.BytesIO(text.encode("utf-8")), "text/csv"))
def _seed_file(client: TestClient, file_no: str, owner_id: str = None) -> None:
owner_id = owner_id or f"R{uuid.uuid4().hex[:6]}"
rolodex_csv = f"Id,Last\n{owner_id},Alpha\n"
files_csv = (
"File_No,Id,File_Type,Regarding,Opened,Empl_Num,Status,Rate_Per_Hour\n"
f"{file_no},{owner_id},CIVIL,Test,{date.today():%Y-%m-%d},E01,ACTIVE,100\n"
)
client.post("/api/import/batch-upload", files=[
_csv_file("ROLODEX.csv", rolodex_csv),
_csv_file("FILES.csv", files_csv),
])
def _auth():
app.dependency_overrides[get_current_user] = lambda: {
"id": 1,
"username": "tester",
"is_admin": True,
"is_active": True,
}
def test_schedule_filters_by_version_and_vests_at_range():
_auth()
client = TestClient(app)
file_no = f"PF-SF-{uuid.uuid4().hex[:8]}"
_seed_file(client, file_no)
client.post("/api/pensions/schedules", json={"file_no": file_no, "version": "01", "vests_on": "2024-01-01", "vests_at": 10})
client.post("/api/pensions/schedules", json={"file_no": file_no, "version": "02", "vests_on": "2024-02-01", "vests_at": 20})
client.post("/api/pensions/schedules", json={"file_no": file_no, "version": "02", "vests_on": "2024-03-01", "vests_at": 30})
# Filter by version
r = client.get("/api/pensions/schedules", params={"file_no": file_no, "version": "02", "sort_by": "vests_on"})
assert r.status_code == 200
body = r.json()
assert len(body) == 2
assert {row["vests_at"] for row in body} == {20, 30}
# Filter by vests_at range
r = client.get("/api/pensions/schedules", params={"file_no": file_no, "vests_at_min": 15, "vests_at_max": 25})
assert r.status_code == 200
body = r.json()
assert len(body) == 1 and body[0]["vests_at"] == 20
app.dependency_overrides.pop(get_current_user, None)
def test_marriage_filters_by_version_and_numeric_ranges():
_auth()
client = TestClient(app)
file_no = f"PF-MF-{uuid.uuid4().hex[:8]}"
_seed_file(client, file_no)
client.post("/api/pensions/marriages", json={"file_no": file_no, "version": "01", "married_from": "2000-01-01", "married_years": 10, "service_years": 12, "marital_percent": 40})
client.post("/api/pensions/marriages", json={"file_no": file_no, "version": "02", "married_from": "2005-01-01", "married_years": 5, "service_years": 8, "marital_percent": 20})
client.post("/api/pensions/marriages", json={"file_no": file_no, "version": "02", "married_from": "2010-01-01", "married_years": 15, "service_years": 20, "marital_percent": 60})
# Version filter
r = client.get("/api/pensions/marriages", params={"file_no": file_no, "version": "02", "sort_by": "married_from"})
assert r.status_code == 200
items = r.json()
assert len(items) == 2
assert {row["marital_percent"] for row in items} == {20, 60}
# married_years range
r = client.get("/api/pensions/marriages", params={"file_no": file_no, "married_years_min": 6, "married_years_max": 12})
assert r.status_code == 200
items = r.json()
assert len(items) == 1 and items[0]["married_years"] == 10
# service_years and marital_percent ranges combined
r = client.get("/api/pensions/marriages", params={
"file_no": file_no,
"service_years_min": 10,
"service_years_max": 20,
"marital_percent_min": 50,
"marital_percent_max": 70,
})
assert r.status_code == 200
items = r.json()
assert len(items) == 1 and items[0]["marital_percent"] == 60
app.dependency_overrides.pop(get_current_user, None)
def test_death_filters_by_version_and_numeric_ranges():
_auth()
client = TestClient(app)
file_no = f"PF-DF-{uuid.uuid4().hex[:8]}"
_seed_file(client, file_no)
client.post("/api/pensions/death-benefits", json={"file_no": file_no, "version": "01", "lump1": 100, "lump2": 5, "growth1": 1, "growth2": 2, "disc1": 0.5, "disc2": 0.2})
client.post("/api/pensions/death-benefits", json={"file_no": file_no, "version": "02", "lump1": 300, "lump2": 7, "growth1": 3, "growth2": 4, "disc1": 0.7, "disc2": 0.3})
client.post("/api/pensions/death-benefits", json={"file_no": file_no, "version": "02", "lump1": 200, "lump2": 6, "growth1": 2, "growth2": 3, "disc1": 0.6, "disc2": 0.25})
# Version filter
r = client.get("/api/pensions/death-benefits", params={"file_no": file_no, "version": "02", "sort_by": "lump1", "sort_dir": "asc"})
assert r.status_code == 200
items = r.json()
assert [row["lump1"] for row in items] == [200, 300]
# Numeric ranges combined
r = client.get("/api/pensions/death-benefits", params={
"file_no": file_no,
"lump1_min": 150,
"lump1_max": 250,
"growth1_min": 1.5,
"growth1_max": 2.5,
"disc1_min": 0.55,
"disc1_max": 0.65,
})
assert r.status_code == 200
items = r.json()
assert len(items) == 1 and items[0]["lump1"] == 200
app.dependency_overrides.pop(get_current_user, None)
def test_separations_filters_by_version_and_date_range():
_auth()
client = TestClient(app)
file_no = f"PF-SepF-{uuid.uuid4().hex[:8]}"
_seed_file(client, file_no)
client.post("/api/pensions/separations", json={"file_no": file_no, "version": "01", "agreement_date": "2024-01-01", "terms": "t1"})
client.post("/api/pensions/separations", json={"file_no": file_no, "version": "02", "agreement_date": "2024-02-01", "terms": "t2"})
client.post("/api/pensions/separations", json={"file_no": file_no, "version": "02", "agreement_date": "2024-03-01", "terms": "t3"})
# Version filter
r = client.get("/api/pensions/separations", params={"file_no": file_no, "version": "02", "sort_by": "agreement_date", "sort_dir": "asc"})
assert r.status_code == 200
dates = [row.get("agreement_date") for row in r.json()]
assert dates == ["2024-02-01", "2024-03-01"]
# Date range
r = client.get("/api/pensions/separations", params={"file_no": file_no, "start": "2024-01-15", "end": "2024-02-15"})
assert r.status_code == 200
dates = [row.get("agreement_date") for row in r.json()]
assert dates == ["2024-02-01"]
app.dependency_overrides.pop(get_current_user, None)

View File

@@ -0,0 +1,76 @@
import io
from fastapi.testclient import TestClient
from app.main import app
from app.auth.security import get_current_user
def _csv_file(name: str, text: str):
return ("files", (name, io.BytesIO(text.encode("utf-8")), "text/csv"))
def test_batch_import_includes_pension_aux_files_and_read_endpoints():
# Auth override
app.dependency_overrides[get_current_user] = lambda: {
"id": 1,
"username": "tester",
"is_admin": True,
"is_active": True,
}
client = TestClient(app)
# Minimal seed for dependent data
rolodex_csv = "Id,Last\nR1,Alpha\n"
files_csv = "File_No,Id,File_Type,Regarding,Opened,Empl_Num,Status,Rate_Per_Hour\nF-1,R1,CIVIL,Test,2024-01-01,E01,ACTIVE,100\n"
schedule_csv = "File_No,Version,Vests_On,Vests_At\nF-1,01,2024-01-01,100\n"
marriage_csv = (
"File_No,Version,Married_From,Married_To,Married_Years,Service_From,Service_To,Service_Years,Marital_%\n"
"F-1,01,2000-01-01,2010-01-01,10,1995-01-01,2010-01-01,15,50\n"
)
death_csv = "File_No,Version,Lump1,Lump2,Growth1,Growth2,Disc1,Disc2\nF-1,01,1000,0,0,0,0,0\n"
separate_csv = "File_No,Version,Separation_Rate\nF-1,01,Terms\n"
payload = [
_csv_file("ROLODEX.csv", rolodex_csv),
_csv_file("FILES.csv", files_csv),
_csv_file("SCHEDULE.csv", schedule_csv),
_csv_file("MARRIAGE.csv", marriage_csv),
_csv_file("DEATH.csv", death_csv),
_csv_file("SEPARATE.csv", separate_csv),
]
# Batch upload
resp = client.post("/api/import/batch-upload", files=payload)
assert resp.status_code == 200
body = resp.json()
results = body.get("batch_results", [])
# Ensure each target file is reported as processed with at least one row
by_name = {r.get("file_type"): r for r in results}
for name in ("SCHEDULE.csv", "MARRIAGE.csv", "DEATH.csv", "SEPARATE.csv"):
assert name in by_name
assert by_name[name].get("imported_count", 0) >= 1
# Call read endpoints
r1 = client.get("/api/pensions/schedules", params={"file_no": "F-1"})
assert r1.status_code == 200
assert isinstance(r1.json(), list)
r2 = client.get("/api/pensions/marriages", params={"file_no": "F-1"})
assert r2.status_code == 200
assert isinstance(r2.json(), list)
r3 = client.get("/api/pensions/death-benefits", params={"file_no": "F-1"})
assert r3.status_code == 200
assert isinstance(r3.json(), list)
r4 = client.get("/api/pensions/separations", params={"file_no": "F-1"})
assert r4.status_code == 200
assert isinstance(r4.json(), list)
# Cleanup override
app.dependency_overrides.pop(get_current_user, None)

View File

@@ -0,0 +1,235 @@
import io
from datetime import date
import uuid
from fastapi.testclient import TestClient
from app.main import app
from app.auth.security import get_current_user
def _csv_file(name: str, text: str):
return ("files", (name, io.BytesIO(text.encode("utf-8")), "text/csv"))
def _seed_file(client: TestClient, file_no: str, owner_id: str = "RS") -> None:
rolodex_csv = f"Id,Last\n{owner_id},Alpha\n"
files_csv = (
"File_No,Id,File_Type,Regarding,Opened,Empl_Num,Status,Rate_Per_Hour\n"
f"{file_no},{owner_id},CIVIL,Test,{date.today():%Y-%m-%d},E01,ACTIVE,100\n"
)
client.post("/api/import/batch-upload", files=[
_csv_file("ROLODEX.csv", rolodex_csv),
_csv_file("FILES.csv", files_csv),
])
def test_pensions_schedules_pagination_and_sorting():
app.dependency_overrides[get_current_user] = lambda: {
"id": 1,
"username": "tester",
"is_admin": True,
"is_active": True,
}
client = TestClient(app)
file_no = f"PF-SCHED-{uuid.uuid4().hex[:8]}"
_seed_file(client, file_no)
# Create three schedules
client.post("/api/pensions/schedules", json={"file_no": file_no, "vests_on": "2024-01-01", "vests_at": 10})
client.post("/api/pensions/schedules", json={"file_no": file_no, "vests_on": "2024-02-01", "vests_at": 20})
client.post("/api/pensions/schedules", json={"file_no": file_no, "vests_on": "2024-03-01", "vests_at": 30})
# Sort by vests_on ascending
r = client.get("/api/pensions/schedules", params={
"file_no": file_no,
"sort_by": "vests_on",
"sort_dir": "asc",
})
assert r.status_code == 200
dates = [row.get("vests_on") for row in r.json()]
assert dates[:3] == ["2024-01-01", "2024-02-01", "2024-03-01"]
# Sort by vests_on descending
r = client.get("/api/pensions/schedules", params={
"file_no": file_no,
"sort_by": "vests_on",
"sort_dir": "desc",
})
assert r.status_code == 200
dates = [row.get("vests_on") for row in r.json()]
assert dates[:3] == ["2024-03-01", "2024-02-01", "2024-01-01"]
# Pagination with include_total
r = client.get("/api/pensions/schedules", params={
"file_no": file_no,
"sort_by": "vests_on",
"sort_dir": "asc",
"limit": 2,
"skip": 0,
"include_total": True,
})
assert r.status_code == 200
body = r.json()
assert isinstance(body.get("items"), list)
assert body["total"] >= 3
assert [row.get("vests_on") for row in body["items"]] == ["2024-01-01", "2024-02-01"]
r = client.get("/api/pensions/schedules", params={
"file_no": file_no,
"sort_by": "vests_on",
"sort_dir": "asc",
"limit": 2,
"skip": 2,
})
assert r.status_code == 200
tail = r.json()
assert len(tail) >= 1
assert tail[0]["vests_on"] == "2024-03-01"
app.dependency_overrides.pop(get_current_user, None)
def test_pensions_marriages_pagination_and_sorting():
app.dependency_overrides[get_current_user] = lambda: {
"id": 1,
"username": "tester",
"is_admin": True,
"is_active": True,
}
client = TestClient(app)
file_no = f"PF-MARR-{uuid.uuid4().hex[:8]}"
_seed_file(client, file_no)
client.post("/api/pensions/marriages", json={
"file_no": file_no,
"married_from": "2000-01-01",
"married_to": "2005-01-01",
"marital_percent": 10,
})
client.post("/api/pensions/marriages", json={
"file_no": file_no,
"married_from": "2005-01-01",
"married_to": "2010-01-01",
"marital_percent": 20,
})
client.post("/api/pensions/marriages", json={
"file_no": file_no,
"married_from": "2010-01-01",
"married_to": "2015-01-01",
"marital_percent": 30,
})
# Sort by marital_percent desc
r = client.get("/api/pensions/marriages", params={
"file_no": file_no,
"sort_by": "marital_percent",
"sort_dir": "desc",
})
assert r.status_code == 200
percents = [row.get("marital_percent") for row in r.json()]
assert percents[:3] == [30, 20, 10]
# Pagination
r = client.get("/api/pensions/marriages", params={
"file_no": file_no,
"sort_by": "married_from",
"sort_dir": "asc",
"limit": 1,
"skip": 1,
"include_total": True,
})
assert r.status_code == 200
body = r.json()
assert body["total"] >= 3
assert body["items"][0]["married_from"] == "2005-01-01"
app.dependency_overrides.pop(get_current_user, None)
def test_pensions_death_benefits_pagination_and_sorting():
app.dependency_overrides[get_current_user] = lambda: {
"id": 1,
"username": "tester",
"is_admin": True,
"is_active": True,
}
client = TestClient(app)
file_no = f"PF-DEATH-{uuid.uuid4().hex[:8]}"
_seed_file(client, file_no)
client.post("/api/pensions/death-benefits", json={"file_no": file_no, "lump1": 100})
client.post("/api/pensions/death-benefits", json={"file_no": file_no, "lump1": 300})
client.post("/api/pensions/death-benefits", json={"file_no": file_no, "lump1": 200})
# Sort by lump1 desc
r = client.get("/api/pensions/death-benefits", params={
"file_no": file_no,
"sort_by": "lump1",
"sort_dir": "desc",
})
assert r.status_code == 200
l1s = [row.get("lump1") for row in r.json()]
assert l1s[:3] == [300, 200, 100]
# Pagination basic
r = client.get("/api/pensions/death-benefits", params={
"file_no": file_no,
"sort_by": "lump1",
"sort_dir": "asc",
"limit": 2,
"skip": 1,
})
assert r.status_code == 200
page = r.json()
assert [row.get("lump1") for row in page] == [200, 300]
app.dependency_overrides.pop(get_current_user, None)
def test_pensions_separations_pagination_and_sorting():
app.dependency_overrides[get_current_user] = lambda: {
"id": 1,
"username": "tester",
"is_admin": True,
"is_active": True,
}
client = TestClient(app)
file_no = f"PF-SEP-{uuid.uuid4().hex[:8]}"
_seed_file(client, file_no)
client.post("/api/pensions/separations", json={"file_no": file_no, "agreement_date": "2024-01-01", "terms": "t1"})
client.post("/api/pensions/separations", json={"file_no": file_no, "agreement_date": "2024-02-01", "terms": "t2"})
client.post("/api/pensions/separations", json={"file_no": file_no, "agreement_date": "2024-03-01", "terms": "t3"})
# Sort by agreement_date desc
r = client.get("/api/pensions/separations", params={
"file_no": file_no,
"sort_by": "agreement_date",
"sort_dir": "desc",
})
assert r.status_code == 200
dates = [row.get("agreement_date") for row in r.json()]
assert dates[:3] == ["2024-03-01", "2024-02-01", "2024-01-01"]
# Pagination
r = client.get("/api/pensions/separations", params={
"file_no": file_no,
"sort_by": "agreement_date",
"sort_dir": "asc",
"limit": 1,
"skip": 2,
"include_total": True,
})
assert r.status_code == 200
body = r.json()
assert body["total"] >= 3
assert body["items"][0]["agreement_date"] == "2024-03-01"
app.dependency_overrides.pop(get_current_user, None)

View File

@@ -0,0 +1,131 @@
import io
import uuid
from datetime import date
from fastapi.testclient import TestClient
from app.main import app
from app.auth.security import get_current_user
def _csv_file(name: str, text: str):
return ("files", (name, io.BytesIO(text.encode("utf-8")), "text/csv"))
def _seed_file(client: TestClient, file_no: str, owner_id: str = None) -> None:
owner_id = owner_id or f"R{uuid.uuid4().hex[:6]}"
rolodex_csv = f"Id,Last\n{owner_id},Alpha\n"
files_csv = (
"File_No,Id,File_Type,Regarding,Opened,Empl_Num,Status,Rate_Per_Hour\n"
f"{file_no},{owner_id},CIVIL,Test,{date.today():%Y-%m-%d},E01,ACTIVE,100\n"
)
client.post("/api/import/batch-upload", files=[
_csv_file("ROLODEX.csv", rolodex_csv),
_csv_file("FILES.csv", files_csv),
])
def _auth():
app.dependency_overrides[get_current_user] = lambda: {
"id": 1,
"username": "tester",
"is_admin": True,
"is_active": True,
}
def test_schedule_tokenized_search_version_and_frequency():
_auth()
client = TestClient(app)
file_no = f"PF-SS-{uuid.uuid4().hex[:8]}"
_seed_file(client, file_no)
client.post("/api/pensions/schedules", json={"file_no": file_no, "version": "A1", "frequency": "Monthly", "vests_on": "2024-01-01"})
client.post("/api/pensions/schedules", json={"file_no": file_no, "version": "B2", "frequency": "Quarterly", "vests_on": "2024-02-01"})
# Both tokens must be present across allowed columns
r = client.get("/api/pensions/schedules", params={"file_no": file_no, "search": "B2 Month"})
assert r.status_code == 200
items = r.json()
# No schedule has both 'B2' and 'Month'
assert items == []
# Single token search
r = client.get("/api/pensions/schedules", params={"file_no": file_no, "search": "Monthly"})
assert r.status_code == 200
items = r.json()
assert len(items) == 1 and items[0]["frequency"] == "Monthly"
app.dependency_overrides.pop(get_current_user, None)
def test_marriages_tokenized_search_spouse_and_notes():
_auth()
client = TestClient(app)
file_no = f"PF-MS-{uuid.uuid4().hex[:8]}"
_seed_file(client, file_no)
client.post("/api/pensions/marriages", json={"file_no": file_no, "version": "01", "spouse_name": "Jane Doe", "notes": "Alpha beta"})
client.post("/api/pensions/marriages", json={"file_no": file_no, "version": "02", "spouse_name": "John Smith", "notes": "Gamma delta"})
# Both tokens required across fields
r = client.get("/api/pensions/marriages", params={"file_no": file_no, "search": "Jane delta"})
assert r.status_code == 200
items = r.json()
assert items == []
# Single token
r = client.get("/api/pensions/marriages", params={"file_no": file_no, "search": "Gamma"})
assert r.status_code == 200
items = r.json()
assert len(items) == 1 and items[0]["spouse_name"] == "John Smith"
app.dependency_overrides.pop(get_current_user, None)
def test_death_tokenized_search_beneficiary_and_type():
_auth()
client = TestClient(app)
file_no = f"PF-DS-{uuid.uuid4().hex[:8]}"
_seed_file(client, file_no)
client.post("/api/pensions/death-benefits", json={"file_no": file_no, "version": "01", "beneficiary_name": "Alice", "benefit_type": "Lump Sum", "notes": "Alpha"})
client.post("/api/pensions/death-benefits", json={"file_no": file_no, "version": "02", "beneficiary_name": "Bob", "benefit_type": "Annuity", "notes": "Beta"})
# Both tokens required
r = client.get("/api/pensions/death-benefits", params={"file_no": file_no, "search": "Alice Annuity"})
assert r.status_code == 200
items = r.json()
assert items == []
# Single token
r = client.get("/api/pensions/death-benefits", params={"file_no": file_no, "search": "Annuity"})
assert r.status_code == 200
items = r.json()
assert len(items) == 1 and items[0]["benefit_type"] == "Annuity"
app.dependency_overrides.pop(get_current_user, None)
def test_separations_tokenized_search_terms_and_notes():
_auth()
client = TestClient(app)
file_no = f"PF-SSep-{uuid.uuid4().hex[:8]}"
_seed_file(client, file_no)
client.post("/api/pensions/separations", json={"file_no": file_no, "version": "01", "agreement_date": "2024-01-01", "terms": "Alpha Clause", "notes": "First"})
client.post("/api/pensions/separations", json={"file_no": file_no, "version": "02", "agreement_date": "2024-02-01", "terms": "Beta Clause", "notes": "Second"})
# Both tokens required
r = client.get("/api/pensions/separations", params={"file_no": file_no, "search": "Alpha Second"})
assert r.status_code == 200
items = r.json()
assert items == []
# Single token
r = client.get("/api/pensions/separations", params={"file_no": file_no, "search": "Clause"})
assert r.status_code == 200
items = r.json()
assert len(items) == 2
app.dependency_overrides.pop(get_current_user, None)

378
tests/test_templates_api.py Normal file
View File

@@ -0,0 +1,378 @@
import os
import io
from fastapi.testclient import TestClient
import pytest
os.environ.setdefault("SECRET_KEY", "x" * 32)
os.environ.setdefault("DATABASE_URL", "sqlite:////tmp/delphi_test.sqlite")
from app.main import app # noqa: E402
from app.auth.security import get_current_user # noqa: E402
class _User:
def __init__(self):
self.id = 1
self.username = "tester"
self.is_admin = True
self.is_active = True
@pytest.fixture()
def client():
app.dependency_overrides[get_current_user] = lambda: _User()
try:
yield TestClient(app)
finally:
app.dependency_overrides.pop(get_current_user, None)
def _dummy_docx_bytes():
# Minimal docx that docxtpl can open. To avoid binary template creation,
# we use a pre-generated minimal DOCX header stored as bytes.
# Fallback: create in-memory empty docx using python-docx if available.
try:
from docx import Document
except Exception:
return b"PK\x03\x04" # still accepted and stored; preview will not render
d = Document()
p = d.add_paragraph()
p.add_run("Hello ")
p.add_run("{{CLIENT_NAME}}")
buf = io.BytesIO()
d.save(buf)
return buf.getvalue()
def test_upload_search_get_versions_and_preview(client: TestClient):
# Upload a DOCX template
payload = {
"name": "Engagement Letter",
"category": "GENERAL",
"description": "Test template",
"semantic_version": "1.0.0",
}
files = {
"file": ("letter.docx", _dummy_docx_bytes(), "application/vnd.openxmlformats-officedocument.wordprocessingml.document"),
}
resp = client.post("/api/templates/upload", data=payload, files=files)
assert resp.status_code == 200, resp.text
tpl = resp.json()
tpl_id = tpl["id"]
# Search
resp = client.get("/api/templates/search?q=Engagement")
assert resp.status_code == 200
assert any(item["id"] == tpl_id for item in resp.json())
# Also match by description via q
resp = client.get("/api/templates/search?q=template")
assert resp.status_code == 200
ids = {item["id"] for item in resp.json()}
assert tpl_id in ids
# Get template
resp = client.get(f"/api/templates/{tpl_id}")
assert resp.status_code == 200
# List versions
resp = client.get(f"/api/templates/{tpl_id}/versions")
assert resp.status_code == 200
versions = resp.json()
assert len(versions) >= 1
vid = versions[0]["id"]
# Preview with context that resolves CLIENT_NAME
resp = client.post(
f"/api/templates/{tpl_id}/preview",
json={"context": {"CLIENT_NAME": "Alice"}, "version_id": vid},
)
assert resp.status_code == 200, resp.text
body = resp.json()
assert "resolved" in body and body["resolved"].get("CLIENT_NAME") == "Alice"
assert isinstance(body.get("unresolved", []), list)
assert body.get("output_size", 0) >= 0
def _docx_with_tokens(text: str) -> bytes:
try:
from docx import Document
except Exception:
return b"PK\x03\x04"
d = Document()
d.add_paragraph(text)
buf = io.BytesIO()
d.save(buf)
return buf.getvalue()
def test_add_version_and_form_variable_resolution(client: TestClient):
# Upload initial template
files = {"file": ("vars.docx", _docx_with_tokens("{{OFFICE_NAME}}"), "application/vnd.openxmlformats-officedocument.wordprocessingml.document")}
resp = client.post(
"/api/templates/upload",
data={"name": "VarsTpl", "semantic_version": "1.0.0"},
files=files,
)
assert resp.status_code == 200, resp.text
tpl_id = resp.json()["id"]
# Add a new version
files2 = {"file": ("vars2.docx", _docx_with_tokens("{{OFFICE_NAME}} {{NEW_FIELD}}"), "application/vnd.openxmlformats-officedocument.wordprocessingml.document")}
resp = client.post(
f"/api/templates/{tpl_id}/versions",
data={"semantic_version": "1.1.0", "approve": True},
files=files2,
)
assert resp.status_code == 200, resp.text
# Insert a FormVariable directly
from app.database.base import SessionLocal
from app.models.additional import FormVariable
db = SessionLocal()
try:
db.merge(FormVariable(identifier="OFFICE_NAME", query="static", response="Delphi", active=1))
db.commit()
finally:
db.close()
# Preview without explicit context should resolve OFFICE_NAME from FormVariable
resp = client.post(f"/api/templates/{tpl_id}/preview", json={"context": {}})
assert resp.status_code == 200, resp.text
body = resp.json()
assert body["resolved"].get("OFFICE_NAME") == "Delphi"
def _upload_template(client: TestClient, name: str, category: str = "GENERAL") -> int:
files = {
"file": ("t.docx", _dummy_docx_bytes(), "application/vnd.openxmlformats-officedocument.wordprocessingml.document"),
}
resp = client.post(
"/api/templates/upload",
data={"name": name, "category": category, "semantic_version": "1.0.0"},
files=files,
)
assert resp.status_code == 200, resp.text
return resp.json()["id"]
def _add_keywords(client: TestClient, template_id: int, keywords):
resp = client.post(f"/api/templates/{template_id}/keywords", json={"keywords": list(keywords)})
assert resp.status_code == 200, resp.text
def test_templates_search_keywords_mode_any_and_all(client: TestClient):
# Create three templates with overlapping keywords
t1 = _upload_template(client, "Template A")
t2 = _upload_template(client, "Template B")
t3 = _upload_template(client, "Template C")
_add_keywords(client, t1, ["divorce", "qdro"]) # both
_add_keywords(client, t2, ["divorce"]) # only divorce
_add_keywords(client, t3, ["qdro", "pension"]) # qdro + other
# ANY (default)
resp = client.get(
"/api/templates/search",
params=[("keywords", "divorce"), ("keywords", "qdro")],
)
assert resp.status_code == 200, resp.text
ids = {item["id"] for item in resp.json()}
assert {t1, t2, t3}.issubset(ids) # all three should appear
# ANY (explicit)
resp = client.get(
"/api/templates/search",
params=[("keywords", "divorce"), ("keywords", "qdro"), ("keywords_mode", "any")],
)
assert resp.status_code == 200
ids = {item["id"] for item in resp.json()}
assert {t1, t2, t3}.issubset(ids)
# ALL - must contain both divorce AND qdro
resp = client.get(
"/api/templates/search",
params=[("keywords", "divorce"), ("keywords", "qdro"), ("keywords_mode", "all")],
)
assert resp.status_code == 200, resp.text
ids = {item["id"] for item in resp.json()}
assert ids == {t1}
def test_templates_search_pagination_and_sorting(client: TestClient):
# Ensure clean state for this test
# Upload multiple templates with different names and categories
ids = []
ids.append(_upload_template(client, "Alpha", category="CAT2"))
ids.append(_upload_template(client, "Charlie", category="CAT1"))
ids.append(_upload_template(client, "Bravo", category="CAT1"))
ids.append(_upload_template(client, "Echo", category="CAT3"))
ids.append(_upload_template(client, "Delta", category="CAT2"))
# Sort by name asc, limit 2
resp = client.get(
"/api/templates/search",
params={"sort_by": "name", "sort_dir": "asc", "limit": 2},
)
assert resp.status_code == 200, resp.text
names = [item["name"] for item in resp.json()]
assert names == sorted(names) # asc
assert len(names) == 2
# Sort by name desc, skip 1, limit 3
resp = client.get(
"/api/templates/search",
params={"sort_by": "name", "sort_dir": "desc", "skip": 1, "limit": 3},
)
assert resp.status_code == 200
names_desc = [item["name"] for item in resp.json()]
assert len(names_desc) == 3
assert names_desc == sorted(names_desc, reverse=True)
# Sort by category asc (ties unresolved by name asc implicitly by DB)
resp = client.get(
"/api/templates/search",
params={"sort_by": "category", "sort_dir": "asc"},
)
assert resp.status_code == 200
categories = [item["category"] for item in resp.json()]
assert categories == sorted(categories)
# Sort by updated desc
resp = client.get(
"/api/templates/search",
params={"sort_by": "updated", "sort_dir": "desc"},
)
assert resp.status_code == 200
# We can't assert exact order of timestamps easily; just ensure we got results
assert isinstance(resp.json(), list) and len(resp.json()) >= 5
def test_templates_search_active_filtering(client: TestClient):
# Create two templates, mark one inactive directly
tid_active = _upload_template(client, "Active T")
tid_inactive = _upload_template(client, "Inactive T")
# Mark second as inactive via direct DB update
from app.database.base import SessionLocal
from app.models.templates import DocumentTemplate
db = SessionLocal()
try:
tpl = db.query(DocumentTemplate).filter(DocumentTemplate.id == tid_inactive).first()
tpl.active = False
db.commit()
finally:
db.close()
# Default active_only=true should return only the active one
resp = client.get("/api/templates/search")
assert resp.status_code == 200
ids = {item["id"] for item in resp.json()}
assert tid_active in ids
assert tid_inactive not in ids
# active_only=false should return both
resp = client.get("/api/templates/search", params={"active_only": False})
assert resp.status_code == 200
ids2 = {item["id"] for item in resp.json()}
assert tid_active in ids2 and tid_inactive in ids2
def test_templates_search_category_multi_repeat_and_csv(client: TestClient):
# Upload templates across multiple categories
t_cat1_a = _upload_template(client, "C1-A", category="CAT1")
t_cat1_b = _upload_template(client, "C1-B", category="CAT1")
t_cat2 = _upload_template(client, "C2-A", category="CAT2")
t_cat3 = _upload_template(client, "C3-A", category="CAT3")
# Repeatable category parameters (?category=CAT1&category=CAT3)
resp = client.get(
"/api/templates/search",
params=[("category", "CAT1"), ("category", "CAT3")],
)
assert resp.status_code == 200, resp.text
ids = {item["id"] for item in resp.json()}
# Expect only CAT1 and CAT3 templates
assert t_cat1_a in ids and t_cat1_b in ids and t_cat3 in ids
assert t_cat2 not in ids
# CSV within a single category parameter (?category=CAT2,CAT3)
resp = client.get(
"/api/templates/search",
params={"category": "CAT2,CAT3"},
)
assert resp.status_code == 200, resp.text
ids_csv = {item["id"] for item in resp.json()}
assert t_cat2 in ids_csv and t_cat3 in ids_csv
assert t_cat1_a not in ids_csv and t_cat1_b not in ids_csv
# Unknown category should return empty set when exclusive
resp = client.get(
"/api/templates/search",
params={"category": "NON_EXISTENT"},
)
assert resp.status_code == 200
assert resp.json() == []
def test_templates_search_has_keywords_filter(client: TestClient):
# Create two templates
t1 = _upload_template(client, "HasKW")
t2 = _upload_template(client, "NoKW")
# Add keywords only to t1
_add_keywords(client, t1, ["alpha", "beta"])
# has_keywords=true should include only t1
resp = client.get("/api/templates/search", params={"has_keywords": True})
assert resp.status_code == 200, resp.text
ids_true = {item["id"] for item in resp.json()}
assert t1 in ids_true and t2 not in ids_true
# has_keywords=false should include only t2
resp = client.get("/api/templates/search", params={"has_keywords": False})
assert resp.status_code == 200
ids_false = {item["id"] for item in resp.json()}
assert t2 in ids_false and t1 not in ids_false
def test_templates_categories_listing(client: TestClient):
# Empty DB categories
resp = client.get("/api/templates/categories")
assert resp.status_code == 200
empty = resp.json()
# May contain defaults from previous tests; ensure it's a list
assert isinstance(empty, list)
# Create active/inactive across categories
t1 = _upload_template(client, "K-A1", category="K1")
t2 = _upload_template(client, "K-A2", category="K1")
t3 = _upload_template(client, "K-B1", category="K2")
# Inactivate one of K1
from app.database.base import SessionLocal
from app.models.templates import DocumentTemplate
db = SessionLocal()
try:
tpl = db.query(DocumentTemplate).filter(DocumentTemplate.id == t2).first()
tpl.active = False
db.commit()
finally:
db.close()
# active_only=true (default) should count only active: K1:1, K2:1
resp = client.get("/api/templates/categories")
assert resp.status_code == 200
rows = resp.json()
by_cat = {r["category"]: r["count"] for r in rows}
assert by_cat.get("K1", 0) >= 1
assert by_cat.get("K2", 0) >= 1
# active_only=false should count both entries in K1
resp = client.get("/api/templates/categories", params={"active_only": False})
assert resp.status_code == 200
rows_all = resp.json()
by_cat_all = {r["category"]: r["count"] for r in rows_all}
assert by_cat_all.get("K1", 0) >= 2
assert by_cat_all.get("K2", 0) >= 1