fixes and refactor

This commit is contained in:
HotSwapp
2025-08-14 19:16:28 -05:00
parent 5111079149
commit bfc04a6909
61 changed files with 5689 additions and 767 deletions

View File

@@ -93,9 +93,78 @@ def test_lookup_crud_file_types_and_statuses_and_audit(client_admin: TestClient)
assert resp.status_code == 200
# Verify audit logs endpoint is accessible and returns structure
resp = client_admin.get("/api/admin/audit/logs")
resp = client_admin.get("/api/admin/audit/logs", params={"include_total": 1})
assert resp.status_code == 200
body = resp.json()
assert set(body.keys()) == {"total", "logs"}
assert set(body.keys()) == {"total", "items"}
assert isinstance(body["items"], list)
def test_printer_setup_crud(client_admin: TestClient):
# Create a printer
resp = client_admin.post(
"/api/admin/printers",
json={
"printer_name": "TestPrinter",
"description": "Test",
"driver": "Generic",
"port": "LPT1",
"default_printer": True,
"page_break": "\f",
"setup_st": "^[[0m",
"reset_st": "^[[0m",
"b_bold": "^[[1m",
"e_bold": "^[[22m",
"b_underline": "^[[4m",
"e_underline": "^[[24m",
"phone_book": True,
"rolodex_info": False,
"envelope": True,
"file_cabinet": True,
"accounts": False,
"statements": True,
"calendar": False,
},
)
assert resp.status_code == 200
printer = resp.json()
assert printer["printer_name"] == "TestPrinter"
assert printer["default_printer"] is True
# Update printer flags
resp = client_admin.put(
"/api/admin/printers/TestPrinter",
json={
"default_printer": False,
"statements": False,
"calendar": True,
},
)
assert resp.status_code == 200
updated = resp.json()
assert updated["default_printer"] is False
assert updated["statements"] is False
assert updated["calendar"] is True
# Get printer by name
resp = client_admin.get("/api/admin/printers/TestPrinter")
assert resp.status_code == 200
fetched = resp.json()
assert fetched["printer_name"] == "TestPrinter"
# List printers includes our printer
resp = client_admin.get("/api/admin/printers")
assert resp.status_code == 200
names = [p["printer_name"] for p in resp.json()]
assert "TestPrinter" in names
# Delete the printer
resp = client_admin.delete("/api/admin/printers/TestPrinter")
assert resp.status_code == 200
# Verify it's gone
resp = client_admin.get("/api/admin/printers")
assert resp.status_code == 200
names = [p["printer_name"] for p in resp.json()]
assert "TestPrinter" not in names

View File

@@ -1,7 +1,7 @@
import os
import sys
from pathlib import Path
from datetime import datetime, timedelta
from datetime import datetime, timedelta, timezone
import pytest
from jose import jwt
@@ -43,8 +43,8 @@ def test_jwt_rotation_decode(monkeypatch):
# Sign token with old key
payload = {
"sub": "tester",
"exp": datetime.utcnow() + timedelta(minutes=5),
"iat": datetime.utcnow(),
"exp": datetime.now(timezone.utc) + timedelta(minutes=5),
"iat": datetime.now(timezone.utc),
"type": "access",
}
token = jwt.encode(payload, old_key, algorithm=settings.algorithm)

View File

@@ -0,0 +1,80 @@
import pytest
try:
from hypothesis import given, strategies as st, settings
except Exception: # pragma: no cover
pytest.skip("Hypothesis not installed; skipping property-based tests.", allow_module_level=True)
from app.services.customers_search import apply_customer_filters, apply_customer_sorting
class FakeQuery:
def __init__(self):
self.filters = []
self.orderings = []
def filter(self, *args):
self.filters.extend(args)
return self
def order_by(self, *args):
self.orderings.extend(args)
return self
def _expected_filter_count(search, group, groups, state, states):
s = (search or "").strip()
search_filter = 1 if s else 0
eff_groups = [g for g in (groups or []) if g] or ([group] if group else [])
groups_filter = 1 if eff_groups else 0
eff_states = [s for s in (states or []) if s] or ([state] if state else [])
states_filter = 1 if eff_states else 0
return search_filter + groups_filter + states_filter
@settings(deadline=None, max_examples=100)
@given(
search=st.text(min_size=0, max_size=200),
group=st.one_of(st.none(), st.text(min_size=0, max_size=20)),
state=st.one_of(st.none(), st.text(min_size=0, max_size=10)),
groups=st.one_of(
st.none(),
st.lists(st.one_of(st.none(), st.text(min_size=0, max_size=10)), max_size=5),
),
states=st.one_of(
st.none(),
st.lists(st.one_of(st.none(), st.text(min_size=0, max_size=10)), max_size=5),
),
)
def test_apply_customer_filters_property(search, group, groups, state, states):
q = FakeQuery()
q = apply_customer_filters(q, search=search, group=group, state=state, groups=groups, states=states)
assert len(q.filters) == _expected_filter_count(search, group, groups, state, states)
@settings(deadline=None, max_examples=100)
@given(
sort_by=st.one_of(
st.none(),
st.sampled_from(["id", "name", "city", "email", "ID", "NAME", "CITY", "EMAIL"]),
st.text(min_size=0, max_size=15),
),
sort_dir=st.one_of(
st.none(),
st.sampled_from(["asc", "ASC", "desc", "DESC", ""]),
st.text(min_size=0, max_size=10),
),
)
def test_apply_customer_sorting_property(sort_by, sort_dir):
q = FakeQuery()
q = apply_customer_sorting(q, sort_by=sort_by, sort_dir=sort_dir)
sb = (sort_by or "id").lower()
expected_order_cols = 2 if sb == "name" else 1
assert len(q.orderings) == expected_order_cols

View File

@@ -0,0 +1,135 @@
from types import SimpleNamespace
from sqlalchemy.dialects import sqlite
from app.services.customers_search import (
apply_customer_filters,
apply_customer_sorting,
prepare_customer_csv_rows,
)
class FakeQuery:
"""Lightweight stand-in for SQLAlchemy Query that captures filters and orderings.
We only need to verify that our helper functions add the expected number of
filter/order_by clauses and roughly target the expected columns. We do not
execute any SQL.
"""
def __init__(self):
self.filters = []
self.orderings = []
def filter(self, *args):
self.filters.extend(args)
return self
def order_by(self, *args):
self.orderings.extend(args)
return self
def compile_sql(expr):
"""Compile a SQLAlchemy expression to a SQLite SQL string for simple assertions."""
try:
return str(expr.compile(dialect=sqlite.dialect()))
except Exception:
return str(expr)
def test_apply_customer_filters_search_and_comma_pattern():
q = FakeQuery()
q = apply_customer_filters(q, search="Smith, John", group=None, state=None, groups=None, states=None)
# One filter clause added (combined search filter)
assert len(q.filters) == 1
sql = compile_sql(q.filters[0])
assert "last" in sql and "first" in sql
def test_apply_customer_filters_groups_and_states():
q = FakeQuery()
q = apply_customer_filters(q, search=None, group="A", state="NY", groups=None, states=None)
# Two filter clauses added: group and state
assert len(q.filters) == 2
sql_group = compile_sql(q.filters[0])
sql_state = compile_sql(q.filters[1])
assert "group" in sql_group
assert "abrev" in sql_state or "state" in sql_state
def test_apply_customer_filters_multi_groups_priority():
q = FakeQuery()
q = apply_customer_filters(q, search=None, group="A", state=None, groups=["X", "Y"], states=None)
# Only one filter (multi-groups) should be applied for groups
assert len(q.filters) == 1
assert "IN" in compile_sql(q.filters[0])
def test_apply_customer_sorting_fields_and_direction():
# name sorting => two orderings
q1 = FakeQuery()
q1 = apply_customer_sorting(q1, sort_by="name", sort_dir="asc")
assert len(q1.orderings) == 2
assert "last" in compile_sql(q1.orderings[0])
assert "first" in compile_sql(q1.orderings[1])
# id sorting desc => one ordering and DESC direction in SQL
q2 = FakeQuery()
q2 = apply_customer_sorting(q2, sort_by="id", sort_dir="desc")
assert len(q2.orderings) == 1
assert "DESC" in compile_sql(q2.orderings[0]).upper()
# unknown field falls back to id
q3 = FakeQuery()
q3 = apply_customer_sorting(q3, sort_by="unknown", sort_dir="asc")
assert len(q3.orderings) == 1
assert "id" in compile_sql(q3.orderings[0]).lower()
def test_prepare_customer_csv_rows_default_and_selected_fields():
cust1 = SimpleNamespace(
id="001",
first="John",
last="Smith",
group="G1",
city="New York",
abrev="NY",
email="john@example.com",
phone_numbers=[SimpleNamespace(phone="123-456-7890")],
)
cust2 = SimpleNamespace(
id="002",
first="Jane",
last="Doe",
group="G2",
city="Boston",
abrev="MA",
email="jane@example.com",
phone_numbers=[],
)
# Default fields
header, rows = prepare_customer_csv_rows([cust1, cust2], fields=None)
assert header == [
"Customer ID",
"Name",
"Group",
"City",
"State",
"Primary Phone",
"Email",
]
assert rows[0][0] == "001"
assert rows[0][1] == "John Smith"
assert rows[0][2] == "G1"
assert rows[0][3] == "New York"
assert rows[0][4] == "NY"
assert rows[0][5] == "123-456-7890"
assert rows[0][6] == "john@example.com"
# Selected subset of fields
header_sel, rows_sel = prepare_customer_csv_rows([cust1], fields=["id", "name", "email"]) # any case ok
assert header_sel == ["Customer ID", "Name", "Email"]
assert rows_sel[0] == ["001", "John Smith", "john@example.com"]

View File

@@ -0,0 +1,82 @@
import json
import shutil
import subprocess
import sys
from pathlib import Path
from app.api.search_highlight import build_query_tokens, highlight_text
def _run_node_highlight(value: str, query: str):
"""Invoke Node to run client highlight.js and return tokens and html.
Skips DOM and sanitizer loading by providing a minimal window with an
escape() function that mirrors server escaping behavior.
"""
node_path = shutil.which("node")
if not node_path:
return None
repo_root = Path(__file__).resolve().parents[1]
highlight_js_path = repo_root / "static/js/highlight.js"
if not highlight_js_path.exists():
return None
payload = json.dumps({"value": value, "query": query})
script = f"""
const fs = require('fs');
global.window = {{}};
// Provide escape that matches server: replace &, <, >, ", '
window.htmlSanitizer = {{
escape: function(text) {{
const str = String(text == null ? '' : text);
return str
.replace(/&/g, '&amp;')
.replace(/</g, '&lt;')
.replace(/>/g, '&gt;')
.replace(/"/g, '&quot;')
.replace(/'/g, '&#39;');
}}
}};
require('{highlight_js_path.as_posix()}');
const input = JSON.parse(process.argv[2]);
const tokens = window.highlightUtils.buildTokens(input.query);
const html = window.highlightUtils.highlight(input.value, tokens);
process.stdout.write(JSON.stringify({{ tokens, html }}));
"""
res = subprocess.run(
[node_path, "-e", script, payload],
cwd=str(repo_root),
capture_output=True,
text=True,
)
if res.returncode != 0:
return None
return json.loads(res.stdout)
def test_highlight_parity_with_client_when_node_available():
"""Compare tokens and highlighted HTML between server and client implementations.
This test is skipped when Node is unavailable.
"""
samples = [
("Hello John Smith", "john smith"),
("<b>A&B</b> and C", "a b"),
("Anna and Ann went", "ann anna"),
("He said \"Hello\" & it's fine", "hello"),
("Case 12345", "case 123"),
]
for value, query in samples:
client = _run_node_highlight(value, query)
if client is None:
# Skip gracefully if Node not present or script failed
import pytest
pytest.skip("Node or client highlight not available")
server_tokens = build_query_tokens(query)
server_html = highlight_text(value, server_tokens)
assert client["tokens"] == server_tokens
assert client["html"] == server_html

158
tests/test_mortality.py Normal file
View File

@@ -0,0 +1,158 @@
import os
import sys
from pathlib import Path
import pytest
from fastapi.testclient import TestClient
# Ensure required env vars for app import/config
os.environ.setdefault("SECRET_KEY", "x" * 32)
os.environ.setdefault("DATABASE_URL", "sqlite:////tmp/delphi_test.sqlite")
# Ensure repository root on sys.path for direct test runs
ROOT = Path(__file__).resolve().parents[1]
if str(ROOT) not in sys.path:
sys.path.insert(0, str(ROOT))
from app.main import app # noqa: E402
from app.auth.security import get_current_user # noqa: E402
from tests.helpers import assert_http_error # noqa: E402
from app.database.base import SessionLocal # noqa: E402
from app.models.pensions import LifeTable, NumberTable # noqa: E402
from app.services.mortality import ( # noqa: E402
get_life_values,
get_number_value,
InvalidCodeError,
)
@pytest.fixture(scope="module")
def client():
# Override auth to bypass JWT for these tests
class _User:
def __init__(self):
self.id = "test"
self.username = "tester"
self.is_admin = True
self.is_active = True
app.dependency_overrides[get_current_user] = lambda: _User()
try:
yield TestClient(app)
finally:
app.dependency_overrides.pop(get_current_user, None)
def _seed_life_and_number():
db = SessionLocal()
try:
# Seed a life table row for age 65
db.query(LifeTable).filter(LifeTable.age == 65).delete()
lt = LifeTable(
age=65,
le_wm=14.5,
na_wm=87000.0,
le_af=20.1,
na_af=92000.0,
le_ha=18.2,
na_ha=88000.0,
)
db.add(lt)
# Seed a number table row for month 305
db.query(NumberTable).filter(NumberTable.month == 305).delete()
nt = NumberTable(
month=305,
na_wm=80000.0,
na_af=90000.0,
na_ha=85000.0,
)
db.add(nt)
db.commit()
finally:
db.close()
def test_service_helpers_success_invalid_and_not_found():
_seed_life_and_number()
db = SessionLocal()
try:
# Success cases
res = get_life_values(db, age=65, sex="M", race="W")
assert res and res["le"] == 14.5 and res["na"] == 87000.0
res = get_life_values(db, age=65, sex="F", race="A")
assert res and res["le"] == 20.1 and res["na"] == 92000.0
res = get_life_values(db, age=65, sex="A", race="H")
assert res and res["le"] == 18.2 and res["na"] == 88000.0
nres = get_number_value(db, month=305, sex="M", race="W")
assert nres and nres["na"] == 80000.0
nres = get_number_value(db, month=305, sex="F", race="A")
assert nres and nres["na"] == 90000.0
nres = get_number_value(db, month=305, sex="A", race="H")
assert nres and nres["na"] == 85000.0
# Invalid codes
with pytest.raises(InvalidCodeError):
get_life_values(db, age=65, sex="X", race="W")
with pytest.raises(InvalidCodeError):
get_number_value(db, month=305, sex="M", race="Z")
# Not found
assert get_life_values(db, age=9999, sex="M", race="W") is None
assert get_number_value(db, month=99999, sex="M", race="W") is None
finally:
db.close()
def test_api_life_valid_invalid_not_found(client: TestClient):
_seed_life_and_number()
# Valid lookups
resp = client.get("/api/mortality/life/65", params={"sex": "M", "race": "W"})
assert resp.status_code == 200
data = resp.json()
assert data["le"] == 14.5 and data["na"] == 87000.0
resp = client.get("/api/mortality/life/65", params={"sex": "F", "race": "A"})
assert resp.status_code == 200
assert resp.json()["le"] == 20.1
# Invalid code -> 400 wrapped error
resp = client.get("/api/mortality/life/65", params={"sex": "X", "race": "W"})
assert_http_error(resp, 400, "Invalid sex code")
# Not found -> 404 wrapped error
resp = client.get("/api/mortality/life/9999", params={"sex": "M", "race": "W"})
assert_http_error(resp, 404, "Age not found")
def test_api_number_valid_invalid_not_found(client: TestClient):
_seed_life_and_number()
# Valid lookup
resp = client.get("/api/mortality/number/305", params={"sex": "M", "race": "W"})
assert resp.status_code == 200
assert resp.json()["na"] == 80000.0
# Invalid code -> 400
resp = client.get("/api/mortality/number/305", params={"sex": "M", "race": "Z"})
assert_http_error(resp, 400, "Invalid race code")
# Not found -> 404
resp = client.get("/api/mortality/number/99999", params={"sex": "M", "race": "W"})
assert_http_error(resp, 404, "Month not found")
def test_api_validation_negative_inputs(client: TestClient):
# Negative age -> 422 validation envelope
resp = client.get("/api/mortality/life/-1", params={"sex": "M", "race": "W"})
from tests.helpers import assert_validation_error
assert_validation_error(resp, "age")
# Negative month -> 422 validation envelope
resp = client.get("/api/mortality/number/-5", params={"sex": "F", "race": "A"})
assert_validation_error(resp, "month")

View File

@@ -0,0 +1,201 @@
import os
import uuid
from datetime import date
import pytest
from fastapi.testclient import TestClient
os.environ.setdefault("SECRET_KEY", "x" * 32)
os.environ.setdefault("DATABASE_URL", "sqlite:////tmp/delphi_test.sqlite")
from app.main import app # noqa: E402
from app.auth.security import get_current_user, get_admin_user # noqa: E402
class _User:
def __init__(self, admin: bool = False):
self.id = 1
self.username = "tester"
self.is_admin = admin
self.is_active = True
@pytest.fixture()
def client():
app.dependency_overrides[get_current_user] = lambda: _User(True)
app.dependency_overrides[get_admin_user] = lambda: _User(True)
try:
yield TestClient(app)
finally:
app.dependency_overrides.pop(get_current_user, None)
app.dependency_overrides.pop(get_admin_user, None)
def _create_customer(client: TestClient) -> str:
cid = f"PGN-{uuid.uuid4().hex[:8]}"
resp = client.post("/api/customers/", json={"id": cid, "last": "Paginate", "email": f"{cid}@ex.com"})
assert resp.status_code == 200
return cid
def test_files_include_total_shape(client: TestClient):
owner_id = _create_customer(client)
for _ in range(2):
fno = f"P-{uuid.uuid4().hex[:6]}"
payload = {
"file_no": fno,
"id": owner_id,
"regarding": "Pagination Test",
"empl_num": "E01",
"file_type": "CIVIL",
"opened": date.today().isoformat(),
"status": "ACTIVE",
"rate_per_hour": 100.0,
}
resp = client.post("/api/files/", json=payload)
assert resp.status_code == 200
resp = client.get("/api/files/", params={"include_total": True, "limit": 1})
assert resp.status_code == 200
body = resp.json()
assert set(body.keys()) == {"items", "total"}
assert isinstance(body["items"], list)
assert body["total"] >= len(body["items"]) >= 1
def test_templates_include_total_shape(client: TestClient):
tid = f"PGT-{uuid.uuid4().hex[:6]}"
resp = client.post(
"/api/documents/templates/",
json={"form_id": tid, "form_name": "TName", "category": "GENERAL", "content": "C"},
)
assert resp.status_code == 200
resp = client.get("/api/documents/templates/", params={"include_total": True, "limit": 1})
assert resp.status_code == 200
body = resp.json()
assert set(body.keys()) == {"items", "total"}
assert isinstance(body["items"], list)
def test_users_include_total_shape(client: TestClient):
# Admin endpoint: just validate shape
resp = client.get("/api/admin/users", params={"include_total": True, "limit": 1})
assert resp.status_code == 200
body = resp.json()
assert set(body.keys()) == {"items", "total"}
assert isinstance(body["items"], list)
def test_support_tickets_include_total_shape(client: TestClient):
# Ensure at least one ticket exists
payload = {
"subject": "Pagination test subject",
"description": "A sufficiently long description for validation.",
"category": "bug_report",
"priority": "medium",
"contact_name": "Tester",
"contact_email": "tester@example.com",
}
resp = client.post("/api/support/tickets", json=payload)
assert resp.status_code == 200
# Validate include_total shape
resp = client.get("/api/support/tickets", params={"include_total": True, "limit": 1})
assert resp.status_code == 200
body = resp.json()
assert set(body.keys()) == {"items", "total"}
assert isinstance(body["items"], list)
assert body["total"] >= len(body["items"]) >= 0
def test_my_support_tickets_include_total_shape(client: TestClient):
# Even if empty, should return the same shape
resp = client.get("/api/support/my-tickets", params={"include_total": True, "limit": 1})
assert resp.status_code == 200
body = resp.json()
assert set(body.keys()) == {"items", "total"}
assert isinstance(body["items"], list)
assert body["total"] >= 0
def test_qdros_by_file_include_total_shape(client: TestClient):
# Create minimal file and a qdro
import uuid
owner_id = _create_customer(client)
fno = f"P-{uuid.uuid4().hex[:6]}"
resp = client.post(
"/api/files/",
json={
"file_no": fno,
"id": owner_id,
"regarding": "QDRO Pagination Test",
"empl_num": "E01",
"file_type": "CIVIL",
"opened": date.today().isoformat(),
"status": "ACTIVE",
"rate_per_hour": 100.0,
},
)
assert resp.status_code == 200
resp = client.post(
"/api/documents/qdros/",
json={"file_no": fno, "form_name": "FormX", "status": "DRAFT"},
)
assert resp.status_code == 200
# Validate include_total on file-specific qdros
resp = client.get(f"/api/documents/qdros/{fno}", params={"include_total": True, "limit": 1})
assert resp.status_code == 200
body = resp.json()
assert set(body.keys()) == {"items", "total"}
assert isinstance(body["items"], list)
def test_ledger_by_file_include_total_shape(client: TestClient):
# Create minimal file and a ledger entry via financial quick helper
import uuid
owner_id = _create_customer(client)
fno = f"P-{uuid.uuid4().hex[:6]}"
resp = client.post(
"/api/files/",
json={
"file_no": fno,
"id": owner_id,
"regarding": "Ledger Pagination Test",
"empl_num": "E01",
"file_type": "CIVIL",
"opened": date.today().isoformat(),
"status": "ACTIVE",
"rate_per_hour": 100.0,
},
)
assert resp.status_code == 200
# Quick time entry
resp = client.post(
"/api/financial/time-entry/quick",
params={"file_no": fno, "hours": 1.5, "description": "Work"},
)
assert resp.status_code == 200
# Validate include_total on file ledger
resp = client.get(f"/api/financial/ledger/{fno}", params={"include_total": True, "limit": 1})
assert resp.status_code == 200
body = resp.json()
assert set(body.keys()) == {"items", "total"}
assert isinstance(body["items"], list)
def test_customer_phones_include_total_shape(client: TestClient):
# Create customer and a couple of phones
owner_id = _create_customer(client)
for ph in ["555-1000", "555-1001"]:
resp = client.post(f"/api/customers/{owner_id}/phones", json={"phone": ph, "location": "Home"})
assert resp.status_code == 200
resp = client.get(f"/api/customers/{owner_id}/phones", params={"include_total": True, "limit": 1})
assert resp.status_code == 200
body = resp.json()
assert set(body.keys()) == {"items", "total"}
assert isinstance(body["items"], list)

View File

@@ -22,6 +22,7 @@ from tests.helpers import assert_validation_error # noqa: E402
from app.api.financial import LedgerCreate # noqa: E402
from app.database.base import SessionLocal # noqa: E402
from app.models.qdro import QDRO # noqa: E402
from app.config import settings # noqa: E402
@pytest.fixture(scope="module")
@@ -37,6 +38,8 @@ def client():
app.dependency_overrides[get_current_user] = lambda: _User()
try:
# Disable cache for search API tests unless explicitly testing caching
settings.cache_enabled = False
yield TestClient(app)
finally:
app.dependency_overrides.pop(get_current_user, None)
@@ -284,3 +287,181 @@ def test_global_search_highlights_mixed_case_for_customer_file_qdro(client: Test
assert q is not None and isinstance(q.get("highlight"), str)
assert "<strong>" in q["highlight"]
assert f"<strong>{token_mixed}</strong>" in q["highlight"]
def test_file_search_whole_words_and_exact_phrase(client: TestClient):
token = f"FW-{uuid.uuid4().hex[:6]}"
owner_id = _create_customer(client, f"Owner-{token}")
f_exact = _create_file(client, owner_id, regarding_token="The apple pie is fresh")
f_plural = _create_file(client, owner_id, regarding_token="The apple pies are fresh")
# whole_words=True should match 'pie' but not 'pies'
payload = {
"query": "pie",
"search_types": ["file"],
"whole_words": True,
"limit": 50,
}
resp = client.post("/api/search/advanced", json=payload)
assert resp.status_code == 200
results = resp.json()["results"]
ids = {r["id"] for r in results}
assert f_exact in ids
assert f_plural not in ids
# exact_phrase should match the exact wording only
payload = {
"query": "apple pie",
"search_types": ["file"],
"exact_phrase": True,
"limit": 50,
}
resp = client.post("/api/search/advanced", json=payload)
assert resp.status_code == 200
results = resp.json()["results"]
ids = {r["id"] for r in results}
assert f_exact in ids
assert f_plural not in ids
# default (substring) matching should include both
payload = {
"query": "pie",
"search_types": ["file"],
"limit": 50,
}
resp = client.post("/api/search/advanced", json=payload)
assert resp.status_code == 200
results = resp.json()["results"]
ids = {r["id"] for r in results}
assert f_exact in ids and f_plural in ids
def test_ledger_search_whole_words(client: TestClient):
token = f"LW-{uuid.uuid4().hex[:6]}"
# Create a file for ledger linkage
owner_id = _create_customer(client, f"Owner-{token}")
file_no = _create_file(client, owner_id, regarding_token=token)
# Ledger entries: 'retainer' vs 'retained'
resp = client.post(
"/api/financial/ledger/",
json=LedgerCreate(
file_no=file_no,
date=date.today().isoformat(),
t_code="NOTE",
t_type="2",
empl_num="E01",
quantity=0.0,
rate=0.0,
amount=0.0,
billed="N",
note="retainer fee approved",
).model_dump(mode="json"),
)
assert resp.status_code == 200
resp = client.post(
"/api/financial/ledger/",
json=LedgerCreate(
file_no=file_no,
date=date.today().isoformat(),
t_code="NOTE",
t_type="2",
empl_num="E01",
quantity=0.0,
rate=0.0,
amount=0.0,
billed="N",
note="retained amount on file",
).model_dump(mode="json"),
)
assert resp.status_code == 200
payload = {
"query": "retainer",
"search_types": ["ledger"],
"whole_words": True,
"limit": 50,
}
resp = client.post("/api/search/advanced", json=payload)
assert resp.status_code == 200
results = resp.json()["results"]
# Should contain the entry with 'retainer fee approved' and exclude 'retained amount on file'
texts = [r.get("description", "") for r in results]
assert any("retainer fee approved" in t for t in texts)
assert all("retained amount on file" not in t for t in texts)
def test_qdro_search_whole_words_and_exact_phrase(client: TestClient):
token = f"QW-{uuid.uuid4().hex[:6]}"
owner_id = _create_customer(client, f"Owner-{token}")
file_no = _create_file(client, owner_id, regarding_token=token)
q1 = _create_qdro_with_form_name(file_no, form_name="Order for benefit under plan")
q2 = _create_qdro_with_form_name(file_no, form_name="Order benefiting alternate payee")
# whole_words=True should match 'benefit' but not 'benefiting'
payload = {
"query": "benefit",
"search_types": ["qdro"],
"whole_words": True,
"limit": 50,
}
resp = client.post("/api/search/advanced", json=payload)
assert resp.status_code == 200
results = resp.json()["results"]
ids = {r["id"] for r in results}
assert q1 in ids
assert q2 not in ids
# exact_phrase should only match the precise phrase
payload = {
"query": "Order for benefit",
"search_types": ["qdro"],
"exact_phrase": True,
"limit": 50,
}
resp = client.post("/api/search/advanced", json=payload)
assert resp.status_code == 200
results = resp.json()["results"]
ids = {r["id"] for r in results}
assert q1 in ids
assert q2 not in ids
def test_advanced_facets_include_state_and_transaction_type(client: TestClient):
token = f"FAC-{uuid.uuid4().hex[:6]}"
# Ensure at least one TX customer
_ = _create_customer(client, f"Facet-{token}")
# Ensure at least one ledger with t_type '2'
owner_id = _create_customer(client, f"Owner-{token}")
file_no = _create_file(client, owner_id, regarding_token=token)
resp = client.post(
"/api/financial/ledger/",
json=LedgerCreate(
file_no=file_no,
date=date.today().isoformat(),
t_code="NOTE",
t_type="2",
empl_num="E01",
quantity=0.0,
rate=0.0,
amount=0.0,
billed="N",
note="Fee for facets token",
).model_dump(mode="json"),
)
assert resp.status_code == 200
# Query can be empty; we'll aggregate facets across returned results
payload = {
"search_types": ["customer", "ledger"],
"limit": 200,
}
resp = client.post("/api/search/advanced", json=payload)
assert resp.status_code == 200
data = resp.json()
facets = data.get("facets", {})
assert "state" in facets and isinstance(facets["state"], dict)
assert any(k in ("TX", "Tx", "tx") for k in facets["state"].keys())
assert "transaction_type" in facets and isinstance(facets["transaction_type"], dict)
assert "2" in facets["transaction_type"] or 2 in facets["transaction_type"]

View File

@@ -0,0 +1,93 @@
import os
import sys
from pathlib import Path
from time import sleep
import pytest
from fastapi.testclient import TestClient
# Ensure required env vars for app import/config
os.environ.setdefault("SECRET_KEY", "x" * 32)
os.environ.setdefault("DATABASE_URL", "sqlite:////tmp/delphi_test.sqlite")
ROOT = Path(__file__).resolve().parents[1]
if str(ROOT) not in sys.path:
sys.path.insert(0, str(ROOT))
from app.main import app # noqa: E402
from app.auth.security import get_current_user # noqa: E402
from app.config import settings # noqa: E402
from app.services.cache import invalidate_search_cache # noqa: E402
@pytest.fixture(scope="module")
def client():
class _User:
def __init__(self):
self.id = "cache-tester"
self.username = "tester"
self.is_admin = True
self.is_active = True
app.dependency_overrides[get_current_user] = lambda: _User()
# Enable cache for this test module if redis is configured
settings.cache_enabled = True
yield TestClient(app)
app.dependency_overrides.pop(get_current_user, None)
@pytest.mark.skipif(not settings.redis_url, reason="Redis not configured for caching tests")
def test_advanced_search_caches_by_criteria_and_user(client: TestClient):
criteria = {
"query": "cache-token",
"search_types": ["customer"],
"limit": 10,
"offset": 0,
}
# First call: cold cache
r1 = client.post("/api/search/advanced", json=criteria)
assert r1.status_code == 200
d1 = r1.json()
# Second call: should be served from cache and identical
r2 = client.post("/api/search/advanced", json=criteria)
assert r2.status_code == 200
d2 = r2.json()
assert d1 == d2
@pytest.mark.skipif(not settings.redis_url, reason="Redis not configured for caching tests")
def test_advanced_search_cache_invalidation_on_data_change(client: TestClient):
criteria = {
"query": "invalidate-token",
"search_types": ["customer"],
"limit": 10,
"offset": 0,
}
r1 = client.post("/api/search/advanced", json=criteria)
assert r1.status_code == 200
d1 = r1.json()
# Mutate data via customers API which triggers invalidation
resp = client.post("/api/customers/", json={
"id": "CACHE-INVALIDATE-1",
"last": "Cache",
"first": "Invalidate",
"email": "invalidate@example.com",
"city": "Austin",
"abrev": "TX",
})
assert resp.status_code == 200
# Best-effort async invalidation; give Redis a moment if needed
sleep(0.05)
r2 = client.post("/api/search/advanced", json=criteria)
assert r2.status_code == 200
d2 = r2.json()
# Total_results or results content may change; at minimum the payload should not be the same object
assert d1 != d2

View File

@@ -184,3 +184,26 @@ def test_create_qdro_highlight_requires_full_query_in_single_field():
out = create_qdro_highlight(qdro, 'plan 123')
assert out == ''
def test_highlight_text_escapes_html_in_source_and_tokens():
# Source contains HTML, should be escaped, not interpreted
out = highlight_text('<script>alert(1)</script> Alpha & Beta', ['alpha', 'beta'])
# Tags are escaped; only <strong> wrappers exist
assert '&lt;script&gt;alert(1)&lt;/script&gt;' in out
assert '<strong>Alpha</strong>' in out
assert '<strong>Beta</strong>' in out
assert '<script>' not in out and '</script>' not in out
def test_highlight_text_handles_quotes_and_apostrophes_safely():
out = highlight_text('He said "Hello" & it\'s fine', ['hello'])
# Quotes and ampersand should be escaped
assert '&quot;<strong>Hello</strong>&quot;' in out
assert '&#39;s' in out
assert '&amp;' in out
def test_highlight_text_no_tokens_returns_escaped_source():
out = highlight_text('<b>bold</b>', [])
assert out == '&lt;b&gt;bold&lt;/b&gt;'

View File

@@ -0,0 +1,101 @@
import os
import uuid
from datetime import date
import pytest
from fastapi.testclient import TestClient
os.environ.setdefault("SECRET_KEY", "x" * 32)
os.environ.setdefault("DATABASE_URL", "sqlite:////tmp/delphi_test.sqlite")
from app.main import app # noqa: E402
from app.auth.security import get_current_user # noqa: E402
class _User:
def __init__(self):
self.id = 1
self.username = "tester"
self.is_admin = True
self.is_active = True
@pytest.fixture()
def client():
app.dependency_overrides[get_current_user] = lambda: _User()
try:
yield TestClient(app)
finally:
app.dependency_overrides.pop(get_current_user, None)
def _create_customer_and_file(client: TestClient):
cust_id = f"DOCSS-{uuid.uuid4().hex[:8]}"
resp = client.post("/api/customers/", json={"id": cust_id, "last": "DocSS", "email": "dss@example.com"})
assert resp.status_code == 200
file_no = f"D-{uuid.uuid4().hex[:6]}"
payload = {
"file_no": file_no,
"id": cust_id,
"regarding": "Doc matter",
"empl_num": "E01",
"file_type": "CIVIL",
"opened": date.today().isoformat(),
"status": "ACTIVE",
"rate_per_hour": 100.0,
}
resp = client.post("/api/files/", json=payload)
assert resp.status_code == 200
return cust_id, file_no
def test_templates_tokenized_search_and_sort(client: TestClient):
# Create templates
t1 = f"TMP-{uuid.uuid4().hex[:6]}"
t2 = f"TMP-{uuid.uuid4().hex[:6]}"
resp = client.post(
"/api/documents/templates/",
json={"form_id": t1, "form_name": "Alpha Letter", "category": "GENERAL", "content": "Hello"},
)
assert resp.status_code == 200
resp = client.post(
"/api/documents/templates/",
json={"form_id": t2, "form_name": "Beta Memo", "category": "GENERAL", "content": "Hello"},
)
assert resp.status_code == 200
# Tokenized search for both tokens only matches when both present
resp = client.get("/api/documents/templates/", params={"search": "Alpha Letter"})
assert resp.status_code == 200
items = resp.json()
ids = {i["form_id"] for i in items}
assert t1 in ids and t2 not in ids
# Sorting by form_name desc
resp = client.get("/api/documents/templates/", params={"sort_by": "form_name", "sort_dir": "desc"})
assert resp.status_code == 200
items = resp.json()
if len(items) >= 2:
assert items[0]["form_name"] >= items[1]["form_name"]
def test_qdros_tokenized_search(client: TestClient):
_, file_no = _create_customer_and_file(client)
# Create QDROs
q1 = {"file_no": file_no, "version": "01", "status": "DRAFT", "form_name": "Alpha Order", "notes": "Beta token present"}
q2 = {"file_no": file_no, "version": "02", "status": "DRAFT", "form_name": "Gamma", "notes": "Beta only"}
resp = client.post("/api/documents/qdros/", json=q1)
assert resp.status_code == 200
resp = client.post("/api/documents/qdros/", json=q2)
assert resp.status_code == 200
# Only the one containing both tokens should match
resp = client.get("/api/documents/qdros/", params={"search": "Alpha Beta"})
assert resp.status_code == 200
items = resp.json()
names = {i.get("form_name") for i in items}
assert "Alpha Order" in names
assert "Gamma" not in names

View File

@@ -0,0 +1,94 @@
import os
import uuid
from datetime import date, timedelta
import pytest
from fastapi.testclient import TestClient
os.environ.setdefault("SECRET_KEY", "x" * 32)
os.environ.setdefault("DATABASE_URL", "sqlite:////tmp/delphi_test.sqlite")
from app.main import app # noqa: E402
from app.auth.security import get_current_user # noqa: E402
class _User:
def __init__(self):
self.id = "test"
self.username = "tester"
self.is_admin = True
self.is_active = True
@pytest.fixture()
def client():
app.dependency_overrides[get_current_user] = lambda: _User()
try:
yield TestClient(app)
finally:
app.dependency_overrides.pop(get_current_user, None)
def _create_customer(client: TestClient) -> str:
cid = f"FSSR-{uuid.uuid4().hex[:8]}"
resp = client.post("/api/customers/", json={"id": cid, "last": "SearchSort", "email": f"{cid}@example.com"})
assert resp.status_code == 200
return cid
def _create_file(client: TestClient, file_no: str, owner_id: str, regarding: str, opened: date):
payload = {
"file_no": file_no,
"id": owner_id,
"regarding": regarding,
"empl_num": "E01",
"file_type": "CIVIL",
"opened": opened.isoformat(),
"status": "ACTIVE",
"rate_per_hour": 100.0,
"memo": "test search/sort",
}
resp = client.post("/api/files/", json=payload)
assert resp.status_code == 200
def test_files_tokenized_search_sort_and_pagination(client: TestClient):
owner_id = _create_customer(client)
base_day = date.today()
f1 = f"FS-{uuid.uuid4().hex[:6]}"
f2 = f"FS-{uuid.uuid4().hex[:6]}"
# f1 contains both tokens across a single field
_create_file(client, f1, owner_id, regarding="Alpha project Beta milestone", opened=base_day - timedelta(days=1))
# f2 contains only one token
_create_file(client, f2, owner_id, regarding="Only Alpha token here", opened=base_day)
# Tokenized search: both tokens required (AND-of-OR across fields)
resp = client.get("/api/files/", params={"search": "Alpha Beta"})
assert resp.status_code == 200
items = resp.json()
file_nos = {it["file_no"] for it in items}
assert f1 in file_nos and f2 not in file_nos
# Sorting by opened desc should put f2 first if both were present; we restrict to both-token result (just f1)
resp = client.get("/api/files/", params={"search": "Alpha Beta", "sort_by": "opened", "sort_dir": "desc"})
assert resp.status_code == 200
items = resp.json()
assert len(items) >= 1 and items[0]["file_no"] == f1
# Pagination over a broader query (single-token) to verify skip/limit
resp = client.get(
"/api/files/",
params={"search": "Alpha", "sort_by": "file_no", "sort_dir": "asc", "limit": 1, "skip": 0},
)
assert resp.status_code == 200
first_page = resp.json()
assert len(first_page) == 1
resp = client.get(
"/api/files/",
params={"search": "Alpha", "sort_by": "file_no", "sort_dir": "asc", "limit": 1, "skip": 1},
)
second_page = resp.json()
assert len(second_page) >= 0 # may be 0 or 1 depending on other fixtures

View File

@@ -0,0 +1,177 @@
import os
import sys
from pathlib import Path
import pytest
from fastapi.testclient import TestClient
# Ensure required env vars for app import/config
os.environ.setdefault("SECRET_KEY", "x" * 32)
os.environ.setdefault("DATABASE_URL", "sqlite:////tmp/delphi_test.sqlite")
# Ensure repository root on sys.path for direct test runs
ROOT = Path(__file__).resolve().parents[1]
if str(ROOT) not in sys.path:
sys.path.insert(0, str(ROOT))
from app.main import app # noqa: E402
from app.auth.security import get_current_user # noqa: E402
from tests.helpers import assert_validation_error # noqa: E402
from app.config import settings # noqa: E402
@pytest.fixture(scope="module")
def client():
# Override auth to bypass JWT for these tests
class _User:
def __init__(self):
self.id = "test"
self.username = "tester"
self.is_admin = True
self.is_active = True
app.dependency_overrides[get_current_user] = lambda: _User()
# Disable cache to make validation tests deterministic
settings.cache_enabled = False
try:
yield TestClient(app)
finally:
app.dependency_overrides.pop(get_current_user, None)
def test_advanced_search_invalid_search_types(client: TestClient):
payload = {
"query": "anything",
"search_types": ["customer", "bogus"],
}
resp = client.post("/api/search/advanced", json=payload)
assert_validation_error(resp, "search_types")
def test_advanced_search_invalid_sort_options(client: TestClient):
# Invalid sort_by
payload = {
"query": "x",
"search_types": ["customer"],
"sort_by": "nope",
}
resp = client.post("/api/search/advanced", json=payload)
assert_validation_error(resp, "sort_by")
# Invalid sort_order
payload = {
"query": "x",
"search_types": ["customer"],
"sort_order": "sideways",
}
resp = client.post("/api/search/advanced", json=payload)
assert_validation_error(resp, "sort_order")
def test_advanced_search_limit_bounds(client: TestClient):
# Too low
payload = {
"query": "x",
"search_types": ["customer"],
"limit": 0,
}
resp = client.post("/api/search/advanced", json=payload)
assert_validation_error(resp, "limit")
# Too high
payload["limit"] = 201
resp = client.post("/api/search/advanced", json=payload)
assert_validation_error(resp, "limit")
def test_advanced_search_conflicting_flags_exact_phrase_and_whole_words(client: TestClient):
payload = {
"query": "apple pie",
"search_types": ["file"],
"exact_phrase": True,
"whole_words": True,
}
resp = client.post("/api/search/advanced", json=payload)
# Cannot rely on field location for model-level validation, check message text in details
assert resp.status_code == 422
body = resp.json()
assert body.get("success") is False
assert body.get("error", {}).get("code") == "validation_error"
msgs = [d.get("msg", "") for d in body.get("error", {}).get("details", [])]
assert any("exact_phrase and whole_words" in m for m in msgs)
def test_advanced_search_inverted_date_range(client: TestClient):
payload = {
"search_types": ["file"],
"date_field": "created",
"date_from": "2024-02-01",
"date_to": "2024-01-31",
}
resp = client.post("/api/search/advanced", json=payload)
assert resp.status_code == 422
body = resp.json()
assert body.get("success") is False
assert body.get("error", {}).get("code") == "validation_error"
msgs = [d.get("msg", "") for d in body.get("error", {}).get("details", [])]
assert any("date_from must be less than or equal to date_to" in m for m in msgs)
def test_advanced_search_inverted_amount_range(client: TestClient):
payload = {
"search_types": ["file"],
"amount_field": "amount",
"amount_min": 100.0,
"amount_max": 50.0,
}
resp = client.post("/api/search/advanced", json=payload)
assert resp.status_code == 422
body = resp.json()
assert body.get("success") is False
assert body.get("error", {}).get("code") == "validation_error"
msgs = [d.get("msg", "") for d in body.get("error", {}).get("details", [])]
assert any("amount_min must be less than or equal to amount_max" in m for m in msgs)
def test_advanced_search_date_field_supported_per_type(client: TestClient):
# 'opened' is only valid for files
payload = {
"search_types": ["customer", "ledger"],
"date_field": "opened",
"date_from": "2024-01-01",
"date_to": "2024-12-31",
}
resp = client.post("/api/search/advanced", json=payload)
assert resp.status_code == 422
body = resp.json()
msgs = [d.get("msg", "") for d in body.get("error", {}).get("details", [])]
assert any("date_field 'opened' is not supported" in m for m in msgs)
# Valid when 'file' included
payload["search_types"] = ["file"]
resp = client.post("/api/search/advanced", json=payload)
assert resp.status_code == 200
def test_advanced_search_amount_field_supported_per_type(client: TestClient):
# 'amount' is only valid for ledger
payload = {
"search_types": ["file"],
"amount_field": "amount",
"amount_min": 1,
"amount_max": 10,
}
resp = client.post("/api/search/advanced", json=payload)
assert resp.status_code == 422
body = resp.json()
msgs = [d.get("msg", "") for d in body.get("error", {}).get("details", [])]
assert any("amount_field 'amount' is not supported" in m for m in msgs)
# Valid when 'ledger' included
payload["search_types"] = ["ledger"]
resp = client.post("/api/search/advanced", json=payload)
assert resp.status_code == 200

View File

@@ -119,4 +119,9 @@ def test_ticket_lifecycle_and_404s_with_audit(client: TestClient):
assert resp.status_code == 200
assert isinstance(resp.json(), list)
# Search should filter results
resp = client.get("/api/support/tickets", params={"search": "Support issue"})
assert resp.status_code == 200
assert isinstance(resp.json(), list)