248 lines
8.1 KiB
Python
248 lines
8.1 KiB
Python
import os
|
|
import uuid
|
|
|
|
import pytest
|
|
from fastapi.testclient import TestClient
|
|
|
|
os.environ.setdefault("SECRET_KEY", "x" * 32)
|
|
os.environ.setdefault("DATABASE_URL", "sqlite:////tmp/delphi_test.sqlite")
|
|
|
|
from app.main import app # noqa: E402
|
|
from app.auth.security import get_current_user, get_admin_user # noqa: E402
|
|
from tests.helpers import assert_http_error # noqa: E402
|
|
|
|
|
|
class _User:
|
|
def __init__(self, is_admin: bool):
|
|
self.id = 1 if is_admin else 2
|
|
self.username = "admin" if is_admin else "user"
|
|
self.is_admin = is_admin
|
|
self.is_active = True
|
|
self.first_name = "Test"
|
|
self.last_name = "User"
|
|
self.is_approver = is_admin
|
|
|
|
|
|
@pytest.fixture()
|
|
def client_admin():
|
|
app.dependency_overrides[get_current_user] = lambda: _User(True)
|
|
app.dependency_overrides[get_admin_user] = lambda: _User(True)
|
|
try:
|
|
yield TestClient(app)
|
|
finally:
|
|
app.dependency_overrides.pop(get_current_user, None)
|
|
app.dependency_overrides.pop(get_admin_user, None)
|
|
|
|
|
|
@pytest.fixture()
|
|
def client_user():
|
|
app.dependency_overrides[get_current_user] = lambda: _User(False)
|
|
try:
|
|
yield TestClient(app)
|
|
finally:
|
|
app.dependency_overrides.pop(get_current_user, None)
|
|
|
|
|
|
def test_admin_only_access(client_user: TestClient):
|
|
# Drop auth to simulate unauthenticated
|
|
app.dependency_overrides.pop(get_current_user, None)
|
|
c = TestClient(app)
|
|
resp = c.get("/api/admin/health")
|
|
assert_http_error(resp, 403, "Not authenticated")
|
|
|
|
# Authenticated non-admin should get 403 from admin endpoints
|
|
app.dependency_overrides[get_current_user] = lambda: _User(False)
|
|
resp = c.get("/api/admin/users")
|
|
assert_http_error(resp, 403, "Not enough permissions")
|
|
|
|
|
|
def test_lookup_crud_file_types_and_statuses_and_audit(client_admin: TestClient):
|
|
# List lookup tables
|
|
resp = client_admin.get("/api/admin/lookups/tables")
|
|
assert resp.status_code == 200
|
|
assert "tables" in resp.json()
|
|
|
|
# Create a system setting (as a simple admin CRUD target)
|
|
skey = f"test_setting_{uuid.uuid4().hex[:6]}"
|
|
resp = client_admin.post(
|
|
"/api/admin/settings",
|
|
json={
|
|
"setting_key": skey,
|
|
"setting_value": "on",
|
|
"description": "pytest",
|
|
"setting_type": "STRING",
|
|
},
|
|
)
|
|
assert resp.status_code == 200
|
|
assert resp.json()["setting"]["setting_key"] == skey
|
|
|
|
# Update the setting
|
|
resp = client_admin.put(
|
|
f"/api/admin/settings/{skey}",
|
|
json={"setting_value": "off", "description": "changed"},
|
|
)
|
|
assert resp.status_code == 200
|
|
assert resp.json()["setting"]["setting_value"] == "off"
|
|
|
|
# Read the setting
|
|
resp = client_admin.get(f"/api/admin/settings/{skey}")
|
|
assert resp.status_code == 200
|
|
assert resp.json()["setting_key"] == skey
|
|
|
|
# Delete the setting
|
|
resp = client_admin.delete(f"/api/admin/settings/{skey}")
|
|
assert resp.status_code == 200
|
|
|
|
# Verify audit logs endpoint is accessible and returns structure
|
|
resp = client_admin.get("/api/admin/audit/logs", params={"include_total": 1})
|
|
assert resp.status_code == 200
|
|
body = resp.json()
|
|
assert set(body.keys()) == {"total", "items"}
|
|
assert isinstance(body["items"], list)
|
|
|
|
|
|
def test_printer_setup_crud(client_admin: TestClient):
|
|
# Create a printer
|
|
resp = client_admin.post(
|
|
"/api/admin/printers",
|
|
json={
|
|
"printer_name": "TestPrinter",
|
|
"description": "Test",
|
|
"driver": "Generic",
|
|
"port": "LPT1",
|
|
"default_printer": True,
|
|
"page_break": "\f",
|
|
"setup_st": "^[[0m",
|
|
"reset_st": "^[[0m",
|
|
"b_bold": "^[[1m",
|
|
"e_bold": "^[[22m",
|
|
"b_underline": "^[[4m",
|
|
"e_underline": "^[[24m",
|
|
"phone_book": True,
|
|
"rolodex_info": False,
|
|
"envelope": True,
|
|
"file_cabinet": True,
|
|
"accounts": False,
|
|
"statements": True,
|
|
"calendar": False,
|
|
},
|
|
)
|
|
assert resp.status_code == 200
|
|
printer = resp.json()
|
|
assert printer["printer_name"] == "TestPrinter"
|
|
assert printer["default_printer"] is True
|
|
|
|
# Update printer flags
|
|
resp = client_admin.put(
|
|
"/api/admin/printers/TestPrinter",
|
|
json={
|
|
"default_printer": False,
|
|
"statements": False,
|
|
"calendar": True,
|
|
},
|
|
)
|
|
assert resp.status_code == 200
|
|
updated = resp.json()
|
|
assert updated["default_printer"] is False
|
|
assert updated["statements"] is False
|
|
assert updated["calendar"] is True
|
|
|
|
# Get printer by name
|
|
resp = client_admin.get("/api/admin/printers/TestPrinter")
|
|
assert resp.status_code == 200
|
|
fetched = resp.json()
|
|
assert fetched["printer_name"] == "TestPrinter"
|
|
|
|
# List printers includes our printer
|
|
resp = client_admin.get("/api/admin/printers")
|
|
assert resp.status_code == 200
|
|
names = [p["printer_name"] for p in resp.json()]
|
|
assert "TestPrinter" in names
|
|
|
|
# Delete the printer
|
|
resp = client_admin.delete("/api/admin/printers/TestPrinter")
|
|
assert resp.status_code == 200
|
|
# Verify it's gone
|
|
resp = client_admin.get("/api/admin/printers")
|
|
assert resp.status_code == 200
|
|
names = [p["printer_name"] for p in resp.json()]
|
|
assert "TestPrinter" not in names
|
|
|
|
|
|
def test_qdro_notification_routes_admin_crud(client_admin: TestClient):
|
|
# Initially list should succeed
|
|
resp = client_admin.get("/api/admin/qdro/notification-routes")
|
|
assert resp.status_code == 200
|
|
assert "items" in resp.json()
|
|
|
|
# Create a per-file route
|
|
file_no = "ROUTE-123"
|
|
payload = {
|
|
"scope": "file",
|
|
"identifier": file_no,
|
|
"email_to": "a@example.com,b@example.com",
|
|
"webhook_url": "https://hooks.example.com/qdro",
|
|
"webhook_secret": "sekret",
|
|
}
|
|
resp = client_admin.post("/api/admin/qdro/notification-routes", json=payload)
|
|
assert resp.status_code == 200, resp.text
|
|
|
|
# Verify appears in list
|
|
resp = client_admin.get("/api/admin/qdro/notification-routes?scope=file")
|
|
assert resp.status_code == 200
|
|
items = resp.json().get("items")
|
|
assert any(it["identifier"] == file_no and it["email_to"] for it in items)
|
|
|
|
# Delete route
|
|
resp = client_admin.delete(f"/api/admin/qdro/notification-routes/file/{file_no}")
|
|
assert resp.status_code == 200
|
|
# Verify gone
|
|
resp = client_admin.get("/api/admin/qdro/notification-routes?scope=file")
|
|
assert resp.status_code == 200
|
|
items = resp.json().get("items")
|
|
assert not any(it["identifier"] == file_no for it in items)
|
|
|
|
def test_approver_toggle_admin_only(client_admin: TestClient):
|
|
# Create a user
|
|
uname = f"u_{uuid.uuid4().hex[:6]}"
|
|
resp = client_admin.post(
|
|
"/api/admin/users",
|
|
json={
|
|
"username": uname,
|
|
"email": f"{uname}@example.com",
|
|
"password": "secret123",
|
|
"first_name": "A",
|
|
"last_name": "B",
|
|
"is_admin": False,
|
|
"is_active": True,
|
|
"is_approver": False,
|
|
},
|
|
)
|
|
assert resp.status_code == 200, resp.text
|
|
user_id = resp.json()["id"]
|
|
|
|
# Toggle approver on
|
|
resp = client_admin.post(f"/api/admin/users/{user_id}/approver", json={"is_approver": True})
|
|
assert resp.status_code == 200, resp.text
|
|
assert resp.json()["is_approver"] is True
|
|
|
|
# Toggle approver off
|
|
resp = client_admin.post(f"/api/admin/users/{user_id}/approver", json={"is_approver": False})
|
|
assert resp.status_code == 200, resp.text
|
|
assert resp.json()["is_approver"] is False
|
|
|
|
# Non-admin should be forbidden
|
|
app.dependency_overrides[get_current_user] = lambda: _User(False)
|
|
# Ensure admin override is not present so permission is enforced
|
|
prev_admin_override = app.dependency_overrides.pop(get_admin_user, None)
|
|
try:
|
|
c = TestClient(app)
|
|
resp = c.post(f"/api/admin/users/{user_id}/approver", json={"is_approver": True})
|
|
assert_http_error(resp, 403, "Not enough permissions")
|
|
finally:
|
|
if prev_admin_override is not None:
|
|
app.dependency_overrides[get_admin_user] = prev_admin_override
|
|
app.dependency_overrides.pop(get_current_user, None)
|
|
|
|
|