import os import uuid import pytest from fastapi.testclient import TestClient os.environ.setdefault("SECRET_KEY", "x" * 32) os.environ.setdefault("DATABASE_URL", "sqlite:////tmp/delphi_test.sqlite") from app.main import app # noqa: E402 from app.auth.security import get_current_user, get_admin_user # noqa: E402 from tests.helpers import assert_http_error # noqa: E402 class _User: def __init__(self, is_admin: bool): self.id = 1 if is_admin else 2 self.username = "admin" if is_admin else "user" self.is_admin = is_admin self.is_active = True self.first_name = "Test" self.last_name = "User" self.is_approver = is_admin @pytest.fixture() def client_admin(): app.dependency_overrides[get_current_user] = lambda: _User(True) app.dependency_overrides[get_admin_user] = lambda: _User(True) try: yield TestClient(app) finally: app.dependency_overrides.pop(get_current_user, None) app.dependency_overrides.pop(get_admin_user, None) @pytest.fixture() def client_user(): app.dependency_overrides[get_current_user] = lambda: _User(False) try: yield TestClient(app) finally: app.dependency_overrides.pop(get_current_user, None) def test_admin_only_access(client_user: TestClient): # Drop auth to simulate unauthenticated app.dependency_overrides.pop(get_current_user, None) c = TestClient(app) resp = c.get("/api/admin/health") assert_http_error(resp, 403, "Not authenticated") # Authenticated non-admin should get 403 from admin endpoints app.dependency_overrides[get_current_user] = lambda: _User(False) resp = c.get("/api/admin/users") assert_http_error(resp, 403, "Not enough permissions") def test_lookup_crud_file_types_and_statuses_and_audit(client_admin: TestClient): # List lookup tables resp = client_admin.get("/api/admin/lookups/tables") assert resp.status_code == 200 assert "tables" in resp.json() # Create a system setting (as a simple admin CRUD target) skey = f"test_setting_{uuid.uuid4().hex[:6]}" resp = client_admin.post( "/api/admin/settings", json={ "setting_key": skey, "setting_value": "on", "description": "pytest", "setting_type": "STRING", }, ) assert resp.status_code == 200 assert resp.json()["setting"]["setting_key"] == skey # Update the setting resp = client_admin.put( f"/api/admin/settings/{skey}", json={"setting_value": "off", "description": "changed"}, ) assert resp.status_code == 200 assert resp.json()["setting"]["setting_value"] == "off" # Read the setting resp = client_admin.get(f"/api/admin/settings/{skey}") assert resp.status_code == 200 assert resp.json()["setting_key"] == skey # Delete the setting resp = client_admin.delete(f"/api/admin/settings/{skey}") assert resp.status_code == 200 # Verify audit logs endpoint is accessible and returns structure resp = client_admin.get("/api/admin/audit/logs", params={"include_total": 1}) assert resp.status_code == 200 body = resp.json() assert set(body.keys()) == {"total", "items"} assert isinstance(body["items"], list) def test_printer_setup_crud(client_admin: TestClient): # Create a printer resp = client_admin.post( "/api/admin/printers", json={ "printer_name": "TestPrinter", "description": "Test", "driver": "Generic", "port": "LPT1", "default_printer": True, "page_break": "\f", "setup_st": "^[[0m", "reset_st": "^[[0m", "b_bold": "^[[1m", "e_bold": "^[[22m", "b_underline": "^[[4m", "e_underline": "^[[24m", "phone_book": True, "rolodex_info": False, "envelope": True, "file_cabinet": True, "accounts": False, "statements": True, "calendar": False, }, ) assert resp.status_code == 200 printer = resp.json() assert printer["printer_name"] == "TestPrinter" assert printer["default_printer"] is True # Update printer flags resp = client_admin.put( "/api/admin/printers/TestPrinter", json={ "default_printer": False, "statements": False, "calendar": True, }, ) assert resp.status_code == 200 updated = resp.json() assert updated["default_printer"] is False assert updated["statements"] is False assert updated["calendar"] is True # Get printer by name resp = client_admin.get("/api/admin/printers/TestPrinter") assert resp.status_code == 200 fetched = resp.json() assert fetched["printer_name"] == "TestPrinter" # List printers includes our printer resp = client_admin.get("/api/admin/printers") assert resp.status_code == 200 names = [p["printer_name"] for p in resp.json()] assert "TestPrinter" in names # Delete the printer resp = client_admin.delete("/api/admin/printers/TestPrinter") assert resp.status_code == 200 # Verify it's gone resp = client_admin.get("/api/admin/printers") assert resp.status_code == 200 names = [p["printer_name"] for p in resp.json()] assert "TestPrinter" not in names def test_qdro_notification_routes_admin_crud(client_admin: TestClient): # Initially list should succeed resp = client_admin.get("/api/admin/qdro/notification-routes") assert resp.status_code == 200 assert "items" in resp.json() # Create a per-file route file_no = "ROUTE-123" payload = { "scope": "file", "identifier": file_no, "email_to": "a@example.com,b@example.com", "webhook_url": "https://hooks.example.com/qdro", "webhook_secret": "sekret", } resp = client_admin.post("/api/admin/qdro/notification-routes", json=payload) assert resp.status_code == 200, resp.text # Verify appears in list resp = client_admin.get("/api/admin/qdro/notification-routes?scope=file") assert resp.status_code == 200 items = resp.json().get("items") assert any(it["identifier"] == file_no and it["email_to"] for it in items) # Delete route resp = client_admin.delete(f"/api/admin/qdro/notification-routes/file/{file_no}") assert resp.status_code == 200 # Verify gone resp = client_admin.get("/api/admin/qdro/notification-routes?scope=file") assert resp.status_code == 200 items = resp.json().get("items") assert not any(it["identifier"] == file_no for it in items) def test_approver_toggle_admin_only(client_admin: TestClient): # Create a user uname = f"u_{uuid.uuid4().hex[:6]}" resp = client_admin.post( "/api/admin/users", json={ "username": uname, "email": f"{uname}@example.com", "password": "secret123", "first_name": "A", "last_name": "B", "is_admin": False, "is_active": True, "is_approver": False, }, ) assert resp.status_code == 200, resp.text user_id = resp.json()["id"] # Toggle approver on resp = client_admin.post(f"/api/admin/users/{user_id}/approver", json={"is_approver": True}) assert resp.status_code == 200, resp.text assert resp.json()["is_approver"] is True # Toggle approver off resp = client_admin.post(f"/api/admin/users/{user_id}/approver", json={"is_approver": False}) assert resp.status_code == 200, resp.text assert resp.json()["is_approver"] is False # Non-admin should be forbidden app.dependency_overrides[get_current_user] = lambda: _User(False) # Ensure admin override is not present so permission is enforced prev_admin_override = app.dependency_overrides.pop(get_admin_user, None) try: c = TestClient(app) resp = c.post(f"/api/admin/users/{user_id}/approver", json={"is_approver": True}) assert_http_error(resp, 403, "Not enough permissions") finally: if prev_admin_override is not None: app.dependency_overrides[get_admin_user] = prev_admin_override app.dependency_overrides.pop(get_current_user, None)