Files
delphi-database/tests/test_billing_statements_api.py
2025-08-15 22:04:43 -05:00

1512 lines
55 KiB
Python

import os
from datetime import date
import pytest
from fastapi.testclient import TestClient
# Ensure required env vars for app import/config
os.environ.setdefault("SECRET_KEY", "x" * 32)
os.environ.setdefault("DATABASE_URL", "sqlite:////tmp/delphi_test.sqlite")
from app.main import app # noqa: E402
from app.auth.security import get_current_user # noqa: E402
@pytest.fixture(scope="module")
def client():
# Override auth to bypass JWT for these tests
class _User:
def __init__(self):
self.id = "test"
self.username = "tester"
self.is_admin = True
self.is_active = True
app.dependency_overrides[get_current_user] = lambda: _User()
try:
yield TestClient(app)
finally:
app.dependency_overrides.pop(get_current_user, None)
def _create_customer(client: TestClient) -> str:
from uuid import uuid4
customer_id = f"BILL-CUST-{uuid4().hex[:8]}"
payload = {"id": customer_id, "last": "Billing", "email": "billing@example.com"}
resp = client.post("/api/customers/", json=payload)
assert resp.status_code == 200
return customer_id
def _create_file(client: TestClient, owner_id: str) -> str:
from uuid import uuid4
file_no = f"B-{uuid4().hex[:6]}"
payload = {
"file_no": file_no,
"id": owner_id,
"regarding": "Billing matter",
"empl_num": "E01",
"file_type": "CIVIL",
"opened": date.today().isoformat(),
"status": "ACTIVE",
"rate_per_hour": 150.0,
"memo": "Created by pytest",
}
resp = client.post("/api/files/", json=payload)
assert resp.status_code == 200
return file_no
def test_statement_empty_account(client: TestClient):
owner_id = _create_customer(client)
file_no = _create_file(client, owner_id)
resp = client.get(f"/api/billing/statements/{file_no}")
assert resp.status_code == 200
data = resp.json()
assert data["file_no"] == file_no
assert data["totals"]["charges_billed"] == 0
assert data["totals"]["charges_unbilled"] == 0
assert data["totals"]["charges_total"] == 0
assert data["totals"]["payments"] == 0
assert data["totals"]["current_balance"] == 0
assert isinstance(data["unbilled_entries"], list) and len(data["unbilled_entries"]) == 0
def test_statement_with_mixed_entries_and_rounding(client: TestClient):
owner_id = _create_customer(client)
file_no = _create_file(client, owner_id)
# Time entry unbilled: 1.25h * 150 = 187.5
payload_time = {
"file_no": file_no,
"date": date.today().isoformat(),
"t_code": "TIME",
"t_type": "2",
"empl_num": "E01",
"quantity": 1.25,
"rate": 150.0,
"amount": 187.5,
"billed": "N",
"note": "Work 1",
}
resp = client.post("/api/financial/ledger/", json=payload_time)
assert resp.status_code == 200
# Flat fee billed: 300
payload_flat = {
"file_no": file_no,
"date": date.today().isoformat(),
"t_code": "FLAT",
"t_type": "3",
"empl_num": "E01",
"quantity": 0.0,
"rate": 0.0,
"amount": 300.0,
"billed": "Y",
"note": "Flat fee",
}
resp = client.post("/api/financial/ledger/", json=payload_flat)
assert resp.status_code == 200
# Disbursement unbilled: 49.995 (rounds to 50.00)
payload_disb = {
"file_no": file_no,
"date": date.today().isoformat(),
"t_code": "MISC",
"t_type": "4",
"empl_num": "E01",
"quantity": 0.0,
"rate": 0.0,
"amount": 49.995,
"billed": "N",
"note": "Expense",
}
resp = client.post("/api/financial/ledger/", json=payload_disb)
assert resp.status_code == 200
# Payment: 100
payload_payment = {
"file_no": file_no,
"date": date.today().isoformat(),
"t_code": "PMT",
"t_type": "5",
"empl_num": "E01",
"quantity": 0.0,
"rate": 0.0,
"amount": 100.0,
"billed": "Y",
"note": "Payment",
}
resp = client.post("/api/financial/ledger/", json=payload_payment)
assert resp.status_code == 200
# Snapshot
resp = client.get(f"/api/billing/statements/{file_no}")
assert resp.status_code == 200
data = resp.json()
# charges_billed = 300
assert data["totals"]["charges_billed"] == 300.0
# charges_unbilled = 187.5 + 49.995 ~= 237.50
assert data["totals"]["charges_unbilled"] == 237.5
# charges_total = 537.5
assert data["totals"]["charges_total"] == 537.5
# payments = 100
assert data["totals"]["payments"] == 100.0
# current_balance = 437.5
assert data["totals"]["current_balance"] == 437.5
# Unbilled entries include two items
unbilled = data["unbilled_entries"]
assert len(unbilled) == 2
codes = {e["t_code"] for e in unbilled}
assert "TIME" in codes and "MISC" in codes
def test_generate_statement_missing_file_returns_404(client: TestClient):
resp = client.post(
"/api/billing/statements/generate",
json={"file_no": "NOFILE-123"},
)
assert resp.status_code == 404
def test_generate_statement_empty_ledger(client: TestClient):
owner_id = _create_customer(client)
file_no = _create_file(client, owner_id)
resp = client.post(
"/api/billing/statements/generate",
json={"file_no": file_no},
)
assert resp.status_code == 200, resp.text
body = resp.json()
assert body["file_no"] == file_no
assert body["unbilled_count"] == 0
assert body["totals"]["charges_total"] == 0
assert body["totals"]["payments"] == 0
# Verify file saved
path = body["export_path"]
assert isinstance(path, str) and path.endswith(".html")
assert os.path.exists(path)
# Read and check minimal content
with open(path, "r", encoding="utf-8") as f:
html = f.read()
assert "Statement" in html and file_no in html
def test_generate_statement_populated(client: TestClient):
owner_id = _create_customer(client)
file_no = _create_file(client, owner_id)
# Populate entries similar to snapshot test
# Time entry unbilled: 1.25h * 150 = 187.5
payload_time = {
"file_no": file_no,
"date": date.today().isoformat(),
"t_code": "TIME",
"t_type": "2",
"empl_num": "E01",
"quantity": 1.25,
"rate": 150.0,
"amount": 187.5,
"billed": "N",
"note": "Work 1",
}
assert client.post("/api/financial/ledger/", json=payload_time).status_code == 200
# Flat fee billed: 300
payload_flat = {
"file_no": file_no,
"date": date.today().isoformat(),
"t_code": "FLAT",
"t_type": "3",
"empl_num": "E01",
"quantity": 0.0,
"rate": 0.0,
"amount": 300.0,
"billed": "Y",
"note": "Flat fee",
}
assert client.post("/api/financial/ledger/", json=payload_flat).status_code == 200
# Disbursement unbilled: 49.995 (rounds to 50.00)
payload_disb = {
"file_no": file_no,
"date": date.today().isoformat(),
"t_code": "MISC",
"t_type": "4",
"empl_num": "E01",
"quantity": 0.0,
"rate": 0.0,
"amount": 49.995,
"billed": "N",
"note": "Expense",
}
assert client.post("/api/financial/ledger/", json=payload_disb).status_code == 200
# Payment: 100
payload_payment = {
"file_no": file_no,
"date": date.today().isoformat(),
"t_code": "PMT",
"t_type": "5",
"empl_num": "E01",
"quantity": 0.0,
"rate": 0.0,
"amount": 100.0,
"billed": "Y",
"note": "Payment",
}
assert client.post("/api/financial/ledger/", json=payload_payment).status_code == 200
# Generate
resp = client.post(
"/api/billing/statements/generate",
json={"file_no": file_no},
)
assert resp.status_code == 200, resp.text
data = resp.json()
assert data["file_no"] == file_no
assert data["unbilled_count"] == 2
assert data["totals"]["charges_billed"] == 300.0
assert data["totals"]["charges_unbilled"] == 237.5
assert data["totals"]["charges_total"] == 537.5
assert data["totals"]["payments"] == 100.0
assert data["totals"]["current_balance"] == 437.5
# Verify saved file exists
assert os.path.exists(data["export_path"]) and data["filename"].endswith(".html")
def test_list_statements_empty(client: TestClient):
owner_id = _create_customer(client)
file_no = _create_file(client, owner_id)
resp = client.get(f"/api/billing/statements/{file_no}/list")
assert resp.status_code == 200
data = resp.json()
assert isinstance(data, list) and len(data) == 0
def test_list_statements_with_generated_files(client: TestClient):
owner_id = _create_customer(client)
file_no = _create_file(client, owner_id)
# Generate two statements
resp1 = client.post("/api/billing/statements/generate", json={"file_no": file_no})
assert resp1.status_code == 200
file1 = resp1.json()
# Small delay to ensure different timestamps
import time
time.sleep(1.1) # Ensure different seconds in filename
resp2 = client.post("/api/billing/statements/generate", json={"file_no": file_no, "period": "2024-01"})
assert resp2.status_code == 200
file2 = resp2.json()
# List all statements
resp = client.get(f"/api/billing/statements/{file_no}/list")
assert resp.status_code == 200
data = resp.json()
assert len(data) == 2
# Check structure
for item in data:
assert "filename" in item
assert "size" in item
assert "created" in item
assert item["size"] > 0
# Should be sorted newest first (file2 should be first)
filenames = [item["filename"] for item in data]
assert file2["filename"] in filenames
assert file1["filename"] in filenames
def test_list_statements_with_period_filter(client: TestClient):
owner_id = _create_customer(client)
file_no = _create_file(client, owner_id)
# Generate statements with different periods
resp1 = client.post("/api/billing/statements/generate", json={"file_no": file_no})
assert resp1.status_code == 200
resp2 = client.post("/api/billing/statements/generate", json={"file_no": file_no, "period": "2024-01"})
assert resp2.status_code == 200
# Filter by period
resp = client.get(f"/api/billing/statements/{file_no}/list?period=2024-01")
assert resp.status_code == 200
data = resp.json()
assert len(data) == 1
assert data[0]["filename"] == resp2.json()["filename"]
def test_list_statements_file_not_found(client: TestClient):
resp = client.get("/api/billing/statements/NONEXISTENT/list")
assert resp.status_code == 404
assert "File not found" in resp.json()["error"]["message"]
def test_download_statement_latest(client: TestClient):
owner_id = _create_customer(client)
file_no = _create_file(client, owner_id)
# Generate a statement
resp_gen = client.post("/api/billing/statements/generate", json={"file_no": file_no})
assert resp_gen.status_code == 200
gen_data = resp_gen.json()
# Download latest
resp = client.get(f"/api/billing/statements/{file_no}/download")
assert resp.status_code == 200
assert resp.headers["content-type"] == "text/html; charset=utf-8"
assert "content-disposition" in resp.headers
assert gen_data["filename"] in resp.headers["content-disposition"]
# Verify HTML content
content = resp.content.decode("utf-8")
assert "Statement" in content
assert file_no in content
def test_download_statement_with_period_filter(client: TestClient):
owner_id = _create_customer(client)
file_no = _create_file(client, owner_id)
# Generate statements with different periods
resp1 = client.post("/api/billing/statements/generate", json={"file_no": file_no})
assert resp1.status_code == 200
resp2 = client.post("/api/billing/statements/generate", json={"file_no": file_no, "period": "2024-01"})
assert resp2.status_code == 200
gen_data2 = resp2.json()
# Download with period filter
resp = client.get(f"/api/billing/statements/{file_no}/download?period=2024-01")
assert resp.status_code == 200
assert gen_data2["filename"] in resp.headers["content-disposition"]
def test_download_statement_no_files(client: TestClient):
owner_id = _create_customer(client)
file_no = _create_file(client, owner_id)
resp = client.get(f"/api/billing/statements/{file_no}/download")
assert resp.status_code == 404
assert "No statements found" in resp.json()["error"]["message"]
def test_download_statement_no_matching_period(client: TestClient):
owner_id = _create_customer(client)
file_no = _create_file(client, owner_id)
# Generate statement without period
resp_gen = client.post("/api/billing/statements/generate", json={"file_no": file_no})
assert resp_gen.status_code == 200
# Try to download with different period
resp = client.get(f"/api/billing/statements/{file_no}/download?period=2024-01")
assert resp.status_code == 404
assert "No statements found for requested period" in resp.json()["error"]["message"]
def test_download_statement_file_not_found(client: TestClient):
resp = client.get("/api/billing/statements/NONEXISTENT/download")
assert resp.status_code == 404
assert "File not found" in resp.json()["error"]["message"]
def test_delete_statement_success(client: TestClient):
owner_id = _create_customer(client)
file_no = _create_file(client, owner_id)
# Generate a statement
resp_gen = client.post("/api/billing/statements/generate", json={"file_no": file_no})
assert resp_gen.status_code == 200
gen_data = resp_gen.json()
filename = gen_data["filename"]
# Verify file exists
assert os.path.exists(gen_data["export_path"])
# Delete statement
resp = client.delete(f"/api/billing/statements/{file_no}/{filename}")
assert resp.status_code == 200
data = resp.json()
assert data["message"] == "Statement deleted successfully"
assert data["filename"] == filename
# Verify file is gone
assert not os.path.exists(gen_data["export_path"])
def test_delete_statement_file_not_found(client: TestClient):
owner_id = _create_customer(client)
file_no = _create_file(client, owner_id)
resp = client.delete(f"/api/billing/statements/{file_no}/nonexistent.html")
assert resp.status_code == 404
assert "Statement not found" in resp.json()["error"]["message"]
def test_delete_statement_invalid_file_no(client: TestClient):
resp = client.delete("/api/billing/statements/NONEXISTENT/test.html")
assert resp.status_code == 404
assert "File not found" in resp.json()["error"]["message"]
def test_delete_statement_security_invalid_filename(client: TestClient):
owner_id = _create_customer(client)
file_no = _create_file(client, owner_id)
# Try to delete a file that doesn't match the expected pattern
resp = client.delete(f"/api/billing/statements/{file_no}/malicious_file.html")
assert resp.status_code == 404
assert "Statement not found" in resp.json()["error"]["message"]
def test_delete_statement_security_wrong_file_no(client: TestClient):
owner_id1 = _create_customer(client)
file_no1 = _create_file(client, owner_id1)
owner_id2 = _create_customer(client)
file_no2 = _create_file(client, owner_id2)
# Generate statement for file_no1
resp_gen = client.post("/api/billing/statements/generate", json={"file_no": file_no1})
assert resp_gen.status_code == 200
gen_data = resp_gen.json()
filename = gen_data["filename"]
# Try to delete file_no1's statement using file_no2
resp = client.delete(f"/api/billing/statements/{file_no2}/{filename}")
assert resp.status_code == 404
assert "Statement not found" in resp.json()["error"]["message"]
# Verify original file still exists
assert os.path.exists(gen_data["export_path"])
def test_delete_statement_security_path_traversal(client: TestClient):
owner_id = _create_customer(client)
file_no = _create_file(client, owner_id)
# Try path traversal attack
resp = client.delete(f"/api/billing/statements/{file_no}/../../../etc/passwd")
assert resp.status_code == 404
# This one returns a different message since it's caught by FastAPI routing
response_data = resp.json()
assert resp.status_code == 404
# Integration Tests - Complete Workflow
def test_complete_billing_workflow_single_statement(client: TestClient):
"""Test complete workflow: generate -> list -> download -> delete"""
# Setup
owner_id = _create_customer(client)
file_no = _create_file(client, owner_id)
# 1. Generate statement
resp_gen = client.post("/api/billing/statements/generate", json={"file_no": file_no})
assert resp_gen.status_code == 200
gen_data = resp_gen.json()
filename = gen_data["filename"]
# 2. List statements - should show 1 item
resp_list = client.get(f"/api/billing/statements/{file_no}/list")
assert resp_list.status_code == 200
list_data = resp_list.json()
assert len(list_data) == 1
assert list_data[0]["filename"] == filename
assert list_data[0]["size"] > 0
# 3. Download statement - should return HTML
resp_download = client.get(f"/api/billing/statements/{file_no}/download")
assert resp_download.status_code == 200
assert resp_download.headers["content-type"] == "text/html; charset=utf-8"
assert filename in resp_download.headers["content-disposition"]
content = resp_download.content.decode("utf-8")
assert "Statement" in content
assert file_no in content
# 4. Delete statement
resp_delete = client.delete(f"/api/billing/statements/{file_no}/{filename}")
assert resp_delete.status_code == 200
delete_data = resp_delete.json()
assert delete_data["message"] == "Statement deleted successfully"
assert delete_data["filename"] == filename
# 5. Verify deletion - list should be empty
resp_list_after = client.get(f"/api/billing/statements/{file_no}/list")
assert resp_list_after.status_code == 200
assert len(resp_list_after.json()) == 0
# 6. Download should fail after deletion
resp_download_after = client.get(f"/api/billing/statements/{file_no}/download")
assert resp_download_after.status_code == 404
def test_complete_billing_workflow_multiple_statements_with_periods(client: TestClient):
"""Test workflow with multiple statements and period filtering"""
# Setup
owner_id = _create_customer(client)
file_no = _create_file(client, owner_id)
# 1. Generate statements with different periods
resp_gen1 = client.post("/api/billing/statements/generate", json={"file_no": file_no})
assert resp_gen1.status_code == 200
gen_data1 = resp_gen1.json()
import time
time.sleep(1.1) # Ensure different timestamps
resp_gen2 = client.post("/api/billing/statements/generate", json={"file_no": file_no, "period": "2024-01"})
assert resp_gen2.status_code == 200
gen_data2 = resp_gen2.json()
time.sleep(1.1)
resp_gen3 = client.post("/api/billing/statements/generate", json={"file_no": file_no, "period": "2024-02"})
assert resp_gen3.status_code == 200
gen_data3 = resp_gen3.json()
# 2. List all statements - should show 3 items, newest first
resp_list_all = client.get(f"/api/billing/statements/{file_no}/list")
assert resp_list_all.status_code == 200
list_all_data = resp_list_all.json()
assert len(list_all_data) == 3
# Verify sorting (newest first) - gen3 should be first
filenames = [item["filename"] for item in list_all_data]
assert gen_data3["filename"] == filenames[0]
# 3. List with period filter - should show only 2024-01
resp_list_filtered = client.get(f"/api/billing/statements/{file_no}/list?period=2024-01")
assert resp_list_filtered.status_code == 200
list_filtered_data = resp_list_filtered.json()
assert len(list_filtered_data) == 1
assert list_filtered_data[0]["filename"] == gen_data2["filename"]
# 4. Download with period filter - should get 2024-01 statement
resp_download_filtered = client.get(f"/api/billing/statements/{file_no}/download?period=2024-01")
assert resp_download_filtered.status_code == 200
assert gen_data2["filename"] in resp_download_filtered.headers["content-disposition"]
# 5. Download without filter - should get newest (gen3)
resp_download_latest = client.get(f"/api/billing/statements/{file_no}/download")
assert resp_download_latest.status_code == 200
assert gen_data3["filename"] in resp_download_latest.headers["content-disposition"]
# 6. Delete middle statement (gen2)
resp_delete = client.delete(f"/api/billing/statements/{file_no}/{gen_data2['filename']}")
assert resp_delete.status_code == 200
# 7. Verify partial deletion
resp_list_after_delete = client.get(f"/api/billing/statements/{file_no}/list")
assert resp_list_after_delete.status_code == 200
list_after_data = resp_list_after_delete.json()
assert len(list_after_data) == 2
remaining_filenames = [item["filename"] for item in list_after_data]
assert gen_data1["filename"] in remaining_filenames
assert gen_data3["filename"] in remaining_filenames
assert gen_data2["filename"] not in remaining_filenames
# 8. Period filter should return nothing for 2024-01
resp_list_filtered_after = client.get(f"/api/billing/statements/{file_no}/list?period=2024-01")
assert resp_list_filtered_after.status_code == 200
assert len(resp_list_filtered_after.json()) == 0
# 9. Download with 2024-01 filter should fail
resp_download_filtered_after = client.get(f"/api/billing/statements/{file_no}/download?period=2024-01")
assert resp_download_filtered_after.status_code == 404
def test_complete_billing_workflow_with_ledger_data(client: TestClient):
"""Test workflow with actual ledger data to verify statement content"""
# Setup
owner_id = _create_customer(client)
file_no = _create_file(client, owner_id)
# Add ledger entries
time_entry = {
"file_no": file_no,
"date": date.today().isoformat(),
"t_code": "TIME",
"t_type": "2",
"empl_num": "E01",
"quantity": 2.5,
"rate": 200.0,
"amount": 500.0,
"billed": "N",
"note": "Legal research",
}
resp_ledger = client.post("/api/financial/ledger/", json=time_entry)
assert resp_ledger.status_code == 200
# Generate statement
resp_gen = client.post("/api/billing/statements/generate", json={"file_no": file_no})
assert resp_gen.status_code == 200
gen_data = resp_gen.json()
# Verify statement metadata includes ledger data
assert gen_data["totals"]["charges_unbilled"] == 500.0
assert gen_data["unbilled_count"] == 1
# Download and verify content
resp_download = client.get(f"/api/billing/statements/{file_no}/download")
assert resp_download.status_code == 200
content = resp_download.content.decode("utf-8")
# Verify statement contains ledger data
assert "Legal research" in content
assert "500.00" in content
assert "2.5" in content # quantity
assert "200.00" in content # rate
# List statements and verify metadata
resp_list = client.get(f"/api/billing/statements/{file_no}/list")
assert resp_list.status_code == 200
list_data = resp_list.json()
assert len(list_data) == 1
assert list_data[0]["size"] > 1000 # Statement with data should be larger
def test_cross_file_security_integration(client: TestClient):
"""Test that the complete workflow respects file security boundaries"""
# Setup two different files
owner_id1 = _create_customer(client)
file_no1 = _create_file(client, owner_id1)
owner_id2 = _create_customer(client)
file_no2 = _create_file(client, owner_id2)
# Generate statements for both files
resp_gen1 = client.post("/api/billing/statements/generate", json={"file_no": file_no1})
assert resp_gen1.status_code == 200
gen_data1 = resp_gen1.json()
resp_gen2 = client.post("/api/billing/statements/generate", json={"file_no": file_no2})
assert resp_gen2.status_code == 200
gen_data2 = resp_gen2.json()
# 1. List statements for file1 should only show file1's statements
resp_list1 = client.get(f"/api/billing/statements/{file_no1}/list")
assert resp_list1.status_code == 200
list1_data = resp_list1.json()
assert len(list1_data) == 1
assert list1_data[0]["filename"] == gen_data1["filename"]
# 2. List statements for file2 should only show file2's statements
resp_list2 = client.get(f"/api/billing/statements/{file_no2}/list")
assert resp_list2.status_code == 200
list2_data = resp_list2.json()
assert len(list2_data) == 1
assert list2_data[0]["filename"] == gen_data2["filename"]
# 3. Download for file1 should only get file1's statement
resp_download1 = client.get(f"/api/billing/statements/{file_no1}/download")
assert resp_download1.status_code == 200
assert gen_data1["filename"] in resp_download1.headers["content-disposition"]
# 4. Try to delete file1's statement using file2's endpoint - should fail
resp_delete_cross = client.delete(f"/api/billing/statements/{file_no2}/{gen_data1['filename']}")
assert resp_delete_cross.status_code == 404
# 5. Verify file1's statement still exists
resp_list1_after = client.get(f"/api/billing/statements/{file_no1}/list")
assert resp_list1_after.status_code == 200
assert len(resp_list1_after.json()) == 1
# 6. Proper deletion should work
resp_delete_proper = client.delete(f"/api/billing/statements/{file_no1}/{gen_data1['filename']}")
assert resp_delete_proper.status_code == 200
# 7. Verify file1 list is now empty but file2 is unaffected
resp_list1_final = client.get(f"/api/billing/statements/{file_no1}/list")
assert resp_list1_final.status_code == 200
assert len(resp_list1_final.json()) == 0
resp_list2_final = client.get(f"/api/billing/statements/{file_no2}/list")
assert resp_list2_final.status_code == 200
assert len(resp_list2_final.json()) == 1
# Batch Statement Generation Tests
def test_batch_generate_statements_empty_list(client: TestClient):
"""Test batch generation with empty file list"""
resp = client.post("/api/billing/statements/batch-generate", json={"file_numbers": []})
assert resp.status_code == 400
assert "At least one file number must be provided" in resp.json()["error"]["message"]
def test_batch_generate_statements_too_many_files(client: TestClient):
"""Test batch generation with too many files"""
file_numbers = [f"FILE-{i}" for i in range(51)] # 51 files exceeds limit
resp = client.post("/api/billing/statements/batch-generate", json={"file_numbers": file_numbers})
assert resp.status_code == 422 # Pydantic validation error
assert "max_length" in resp.text.lower() or "validation" in resp.text.lower()
def test_batch_generate_statements_single_file_success(client: TestClient):
"""Test batch generation with a single existing file"""
owner_id = _create_customer(client)
file_no = _create_file(client, owner_id)
resp = client.post("/api/billing/statements/batch-generate", json={"file_numbers": [file_no]})
assert resp.status_code == 200
data = resp.json()
assert data["total_files"] == 1
assert data["successful"] == 1
assert data["failed"] == 0
assert data["success_rate"] == 100.0
assert "batch_id" in data
assert data["batch_id"].startswith("batch_")
assert len(data["results"]) == 1
result = data["results"][0]
assert result["file_no"] == file_no
assert result["status"] == "success"
assert result["statement_meta"] is not None
assert result["statement_meta"]["file_no"] == file_no
assert result["statement_meta"]["filename"].endswith(".html")
def test_batch_generate_statements_multiple_files_success(client: TestClient):
"""Test batch generation with multiple existing files"""
# Create test data
owner_id1 = _create_customer(client)
file_no1 = _create_file(client, owner_id1)
owner_id2 = _create_customer(client)
file_no2 = _create_file(client, owner_id2)
owner_id3 = _create_customer(client)
file_no3 = _create_file(client, owner_id3)
file_numbers = [file_no1, file_no2, file_no3]
resp = client.post("/api/billing/statements/batch-generate", json={"file_numbers": file_numbers})
assert resp.status_code == 200
data = resp.json()
assert data["total_files"] == 3
assert data["successful"] == 3
assert data["failed"] == 0
assert data["success_rate"] == 100.0
assert len(data["results"]) == 3
# Check all files were processed successfully
file_nos_in_results = [r["file_no"] for r in data["results"]]
assert set(file_nos_in_results) == set(file_numbers)
for result in data["results"]:
assert result["status"] == "success"
assert result["statement_meta"] is not None
assert result["statement_meta"]["filename"].endswith(".html")
def test_batch_generate_statements_mixed_success_failure(client: TestClient):
"""Test batch generation with mix of existing and non-existing files"""
# Create one existing file
owner_id = _create_customer(client)
existing_file = _create_file(client, owner_id)
# Mix of existing and non-existing files
file_numbers = [existing_file, "NONEXISTENT-1", "NONEXISTENT-2"]
resp = client.post("/api/billing/statements/batch-generate", json={"file_numbers": file_numbers})
assert resp.status_code == 200
data = resp.json()
assert data["total_files"] == 3
assert data["successful"] == 1
assert data["failed"] == 2
assert data["success_rate"] == 33.33
assert len(data["results"]) == 3
# Check results
results_by_file = {r["file_no"]: r for r in data["results"]}
# Existing file should succeed
assert results_by_file[existing_file]["status"] == "success"
assert results_by_file[existing_file]["statement_meta"] is not None
# Non-existing files should fail
assert results_by_file["NONEXISTENT-1"]["status"] == "failed"
assert "not found" in results_by_file["NONEXISTENT-1"]["message"].lower()
assert results_by_file["NONEXISTENT-1"]["error_details"] is not None
assert results_by_file["NONEXISTENT-2"]["status"] == "failed"
assert "not found" in results_by_file["NONEXISTENT-2"]["message"].lower()
def test_batch_generate_statements_all_failures(client: TestClient):
"""Test batch generation where all files fail"""
file_numbers = ["NONEXISTENT-1", "NONEXISTENT-2", "NONEXISTENT-3"]
resp = client.post("/api/billing/statements/batch-generate", json={"file_numbers": file_numbers})
assert resp.status_code == 200
data = resp.json()
assert data["total_files"] == 3
assert data["successful"] == 0
assert data["failed"] == 3
assert data["success_rate"] == 0.0
assert len(data["results"]) == 3
# All should be failures
for result in data["results"]:
assert result["status"] == "failed"
assert "not found" in result["message"].lower()
assert result["error_details"] is not None
def test_batch_generate_statements_with_period(client: TestClient):
"""Test batch generation with period filter"""
owner_id1 = _create_customer(client)
file_no1 = _create_file(client, owner_id1)
owner_id2 = _create_customer(client)
file_no2 = _create_file(client, owner_id2)
file_numbers = [file_no1, file_no2]
resp = client.post("/api/billing/statements/batch-generate", json={
"file_numbers": file_numbers,
"period": "2024-01"
})
assert resp.status_code == 200
data = resp.json()
assert data["total_files"] == 2
assert data["successful"] == 2
assert data["failed"] == 0
# Check that period was applied to all statements
for result in data["results"]:
assert result["status"] == "success"
assert result["statement_meta"]["period"] == "2024-01"
def test_batch_generate_statements_deduplicates_files(client: TestClient):
"""Test that batch generation removes duplicate file numbers"""
owner_id = _create_customer(client)
file_no = _create_file(client, owner_id)
# Pass same file number multiple times
file_numbers = [file_no, file_no, file_no]
resp = client.post("/api/billing/statements/batch-generate", json={"file_numbers": file_numbers})
assert resp.status_code == 200
data = resp.json()
# Should only process once, not three times
assert data["total_files"] == 1
assert data["successful"] == 1
assert data["failed"] == 0
assert len(data["results"]) == 1
assert data["results"][0]["file_no"] == file_no
def test_batch_generate_statements_timing_and_metadata(client: TestClient):
"""Test that batch operation includes proper timing and metadata"""
owner_id = _create_customer(client)
file_no = _create_file(client, owner_id)
resp = client.post("/api/billing/statements/batch-generate", json={"file_numbers": [file_no]})
assert resp.status_code == 200
data = resp.json()
# Check timing fields
assert "started_at" in data
assert "completed_at" in data
assert "processing_time_seconds" in data
assert isinstance(data["processing_time_seconds"], (int, float))
assert data["processing_time_seconds"] >= 0
# Check batch ID format
assert "batch_id" in data
batch_id = data["batch_id"]
assert batch_id.startswith("batch_")
assert len(batch_id.split("_")) == 4 # batch_YYYYMMDD_HHMMSS_hash
# Parse timestamps
from datetime import datetime
started_at = datetime.fromisoformat(data["started_at"].replace('Z', '+00:00'))
completed_at = datetime.fromisoformat(data["completed_at"].replace('Z', '+00:00'))
assert completed_at >= started_at
def test_batch_generate_statements_with_ledger_data(client: TestClient):
"""Test batch generation with files containing ledger data"""
# Create files with some ledger data
owner_id1 = _create_customer(client)
file_no1 = _create_file(client, owner_id1)
# Add ledger entry to first file
time_entry = {
"file_no": file_no1,
"date": date.today().isoformat(),
"t_code": "TIME",
"t_type": "2",
"empl_num": "E01",
"quantity": 1.5,
"rate": 200.0,
"amount": 300.0,
"billed": "N",
"note": "Legal work",
}
assert client.post("/api/financial/ledger/", json=time_entry).status_code == 200
owner_id2 = _create_customer(client)
file_no2 = _create_file(client, owner_id2)
# file_no2 has no ledger entries
file_numbers = [file_no1, file_no2]
resp = client.post("/api/billing/statements/batch-generate", json={"file_numbers": file_numbers})
assert resp.status_code == 200
data = resp.json()
assert data["successful"] == 2
assert data["failed"] == 0
# Check that file with ledger data has unbilled entries
results_by_file = {r["file_no"]: r for r in data["results"]}
file1_result = results_by_file[file_no1]
assert file1_result["statement_meta"]["unbilled_count"] == 1
assert file1_result["statement_meta"]["totals"]["charges_unbilled"] == 300.0
file2_result = results_by_file[file_no2]
assert file2_result["statement_meta"]["unbilled_count"] == 0
assert file2_result["statement_meta"]["totals"]["charges_unbilled"] == 0.0
def test_batch_generate_statements_file_system_verification(client: TestClient):
"""Test that batch generation actually creates files on disk"""
owner_id1 = _create_customer(client)
file_no1 = _create_file(client, owner_id1)
owner_id2 = _create_customer(client)
file_no2 = _create_file(client, owner_id2)
file_numbers = [file_no1, file_no2]
resp = client.post("/api/billing/statements/batch-generate", json={"file_numbers": file_numbers})
assert resp.status_code == 200
data = resp.json()
assert data["successful"] == 2
# Verify files exist on disk
for result in data["results"]:
if result["status"] == "success":
export_path = result["statement_meta"]["export_path"]
assert os.path.exists(export_path)
# Check file content
with open(export_path, "r", encoding="utf-8") as f:
content = f.read()
assert "Statement" in content
assert result["file_no"] in content
def test_batch_generate_statements_performance_within_limits(client: TestClient):
"""Test that batch generation performs reasonably with multiple files"""
# Create several files
file_numbers = []
for i in range(10): # Test with 10 files
owner_id = _create_customer(client)
file_no = _create_file(client, owner_id)
file_numbers.append(file_no)
import time
start_time = time.time()
resp = client.post("/api/billing/statements/batch-generate", json={"file_numbers": file_numbers})
end_time = time.time()
actual_processing_time = end_time - start_time
assert resp.status_code == 200
data = resp.json()
assert data["total_files"] == 10
assert data["successful"] == 10
assert data["failed"] == 0
# Processing should be reasonably fast (less than 30 seconds for 10 files)
assert actual_processing_time < 30
# Reported processing time should be close to actual
reported_time = data["processing_time_seconds"]
assert abs(reported_time - actual_processing_time) < 5 # Within 5 seconds tolerance
# Integration Tests - Batch + Existing Endpoints
def test_batch_generate_then_list_and_download(client: TestClient):
"""Test complete workflow: batch generate -> list -> download"""
# Create test files
owner_id1 = _create_customer(client)
file_no1 = _create_file(client, owner_id1)
owner_id2 = _create_customer(client)
file_no2 = _create_file(client, owner_id2)
# Batch generate
resp_batch = client.post("/api/billing/statements/batch-generate", json={
"file_numbers": [file_no1, file_no2]
})
assert resp_batch.status_code == 200
batch_data = resp_batch.json()
assert batch_data["successful"] == 2
# List statements for each file
for result in batch_data["results"]:
file_no = result["file_no"]
filename = result["statement_meta"]["filename"]
# List should show the generated statement
resp_list = client.get(f"/api/billing/statements/{file_no}/list")
assert resp_list.status_code == 200
list_data = resp_list.json()
assert len(list_data) >= 1
filenames = [item["filename"] for item in list_data]
assert filename in filenames
# Download should work
resp_download = client.get(f"/api/billing/statements/{file_no}/download")
assert resp_download.status_code == 200
assert filename in resp_download.headers["content-disposition"]
def test_batch_generate_with_existing_statements(client: TestClient):
"""Test batch generation when files already have statements"""
owner_id = _create_customer(client)
file_no = _create_file(client, owner_id)
# Generate individual statement first
resp_single = client.post("/api/billing/statements/generate", json={"file_no": file_no})
assert resp_single.status_code == 200
single_filename = resp_single.json()["filename"]
# Small delay to ensure different microsecond timestamps
import time
time.sleep(0.001)
# Generate via batch
resp_batch = client.post("/api/billing/statements/batch-generate", json={
"file_numbers": [file_no]
})
assert resp_batch.status_code == 200
batch_data = resp_batch.json()
assert batch_data["successful"] == 1
batch_filename = batch_data["results"][0]["statement_meta"]["filename"]
# Should have two different files (due to microsecond timestamps)
assert single_filename != batch_filename
# List should show both
resp_list = client.get(f"/api/billing/statements/{file_no}/list")
assert resp_list.status_code == 200
list_data = resp_list.json()
assert len(list_data) == 2
filenames = [item["filename"] for item in list_data]
assert single_filename in filenames
assert batch_filename in filenames
# Progress Polling Tests
def test_batch_progress_polling_successful_operation(client: TestClient):
"""Test progress polling for a successful batch operation"""
# Create test files
owner_id1 = _create_customer(client)
file_no1 = _create_file(client, owner_id1)
owner_id2 = _create_customer(client)
file_no2 = _create_file(client, owner_id2)
file_numbers = [file_no1, file_no2]
# Start batch operation
resp_batch = client.post("/api/billing/statements/batch-generate", json={
"file_numbers": file_numbers
})
assert resp_batch.status_code == 200
batch_data = resp_batch.json()
batch_id = batch_data["batch_id"]
# Check progress (should be completed after synchronous operation)
resp_progress = client.get(f"/api/billing/statements/batch-progress/{batch_id}")
assert resp_progress.status_code == 200
progress_data = resp_progress.json()
assert progress_data["batch_id"] == batch_id
assert progress_data["status"] == "completed"
assert progress_data["total_files"] == 2
assert progress_data["processed_files"] == 2
assert progress_data["successful_files"] == 2
assert progress_data["failed_files"] == 0
assert progress_data["success_rate"] == 100.0
assert progress_data["started_at"] is not None
assert progress_data["completed_at"] is not None
assert progress_data["processing_time_seconds"] is not None
assert len(progress_data["files"]) == 2
# Check individual file statuses
for file_progress in progress_data["files"]:
assert file_progress["file_no"] in file_numbers
assert file_progress["status"] == "completed"
assert file_progress["started_at"] is not None
assert file_progress["completed_at"] is not None
assert file_progress["statement_meta"] is not None
def test_batch_progress_polling_mixed_results(client: TestClient):
"""Test progress polling for batch operation with mixed success/failure"""
# Create one existing file
owner_id = _create_customer(client)
existing_file = _create_file(client, owner_id)
# Mix of existing and non-existing files
file_numbers = [existing_file, "NONEXISTENT-1"]
# Start batch operation
resp_batch = client.post("/api/billing/statements/batch-generate", json={
"file_numbers": file_numbers
})
assert resp_batch.status_code == 200
batch_data = resp_batch.json()
batch_id = batch_data["batch_id"]
# Check progress
resp_progress = client.get(f"/api/billing/statements/batch-progress/{batch_id}")
assert resp_progress.status_code == 200
progress_data = resp_progress.json()
assert progress_data["batch_id"] == batch_id
assert progress_data["status"] == "completed"
assert progress_data["total_files"] == 2
assert progress_data["processed_files"] == 2
assert progress_data["successful_files"] == 1
assert progress_data["failed_files"] == 1
assert progress_data["success_rate"] == 50.0
# Check individual file statuses
files_by_no = {f["file_no"]: f for f in progress_data["files"]}
# Existing file should be successful
assert files_by_no[existing_file]["status"] == "completed"
assert files_by_no[existing_file]["statement_meta"] is not None
# Non-existing file should be failed
assert files_by_no["NONEXISTENT-1"]["status"] == "failed"
assert files_by_no["NONEXISTENT-1"]["error_message"] is not None
assert "not found" in files_by_no["NONEXISTENT-1"]["error_message"].lower()
def test_batch_progress_polling_nonexistent_batch(client: TestClient):
"""Test progress polling for non-existent batch ID"""
resp = client.get("/api/billing/statements/batch-progress/nonexistent-batch-id")
assert resp.status_code == 404
assert "not found" in resp.json()["error"]["message"].lower()
def test_list_active_batches_empty(client: TestClient):
"""Test listing active batches when none exist"""
resp = client.get("/api/billing/statements/batch-list")
assert resp.status_code == 200
assert isinstance(resp.json(), list)
# Note: May contain active batches from other tests, so we just check it's a list
def test_batch_cancellation_nonexistent(client: TestClient):
"""Test cancelling a non-existent batch operation"""
resp = client.delete("/api/billing/statements/batch-progress/nonexistent-batch-id")
assert resp.status_code == 404
assert "not found" in resp.json()["error"]["message"].lower()
def test_batch_cancellation_already_completed(client: TestClient):
"""Test cancelling an already completed batch operation"""
# Create and complete a batch operation
owner_id = _create_customer(client)
file_no = _create_file(client, owner_id)
resp_batch = client.post("/api/billing/statements/batch-generate", json={
"file_numbers": [file_no]
})
assert resp_batch.status_code == 200
batch_id = resp_batch.json()["batch_id"]
# Try to cancel completed batch
resp_cancel = client.delete(f"/api/billing/statements/batch-progress/{batch_id}")
assert resp_cancel.status_code == 400
assert "cannot cancel" in resp_cancel.json()["error"]["message"].lower()
def test_progress_store_cleanup_mechanism(client: TestClient):
"""Test that progress store cleanup works correctly"""
# This is more of an integration test to ensure the cleanup mechanism works
# We can't easily test the time-based cleanup without mocking time
# Create a batch operation
owner_id = _create_customer(client)
file_no = _create_file(client, owner_id)
resp_batch = client.post("/api/billing/statements/batch-generate", json={
"file_numbers": [file_no]
})
assert resp_batch.status_code == 200
batch_id = resp_batch.json()["batch_id"]
# Verify we can still get progress
resp_progress = client.get(f"/api/billing/statements/batch-progress/{batch_id}")
assert resp_progress.status_code == 200
# The cleanup mechanism runs automatically in the background
# In a real test environment, we could mock the retention period
# For now, we just verify the basic functionality works
def test_batch_progress_timing_metadata(client: TestClient):
"""Test that batch progress includes proper timing metadata"""
owner_id = _create_customer(client)
file_no = _create_file(client, owner_id)
resp_batch = client.post("/api/billing/statements/batch-generate", json={
"file_numbers": [file_no]
})
assert resp_batch.status_code == 200
batch_id = resp_batch.json()["batch_id"]
resp_progress = client.get(f"/api/billing/statements/batch-progress/{batch_id}")
assert resp_progress.status_code == 200
progress_data = resp_progress.json()
# Check timing fields exist and are valid
assert "started_at" in progress_data
assert "updated_at" in progress_data
assert "completed_at" in progress_data
assert "processing_time_seconds" in progress_data
# Parse timestamps to ensure they're valid ISO format
from datetime import datetime
started_at = datetime.fromisoformat(progress_data["started_at"].replace('Z', '+00:00'))
updated_at = datetime.fromisoformat(progress_data["updated_at"].replace('Z', '+00:00'))
completed_at = datetime.fromisoformat(progress_data["completed_at"].replace('Z', '+00:00'))
# Logical order check
assert updated_at >= started_at
assert completed_at >= started_at
# Processing time should be reasonable
assert isinstance(progress_data["processing_time_seconds"], (int, float))
assert progress_data["processing_time_seconds"] >= 0
def test_batch_progress_estimated_completion_logic(client: TestClient):
"""Test that estimated completion time logic works"""
# For a completed batch, estimated_completion should be None
owner_id = _create_customer(client)
file_no = _create_file(client, owner_id)
resp_batch = client.post("/api/billing/statements/batch-generate", json={
"file_numbers": [file_no]
})
assert resp_batch.status_code == 200
batch_id = resp_batch.json()["batch_id"]
resp_progress = client.get(f"/api/billing/statements/batch-progress/{batch_id}")
assert resp_progress.status_code == 200
progress_data = resp_progress.json()
# For completed operations, estimated completion should be None
assert progress_data["estimated_completion"] is None
assert progress_data["current_file"] is None
def test_batch_progress_file_level_details(client: TestClient):
"""Test that file-level progress details are accurate"""
# Create files with ledger data for more detailed testing
owner_id1 = _create_customer(client)
file_no1 = _create_file(client, owner_id1)
# Add ledger entry to first file
time_entry = {
"file_no": file_no1,
"date": date.today().isoformat(),
"t_code": "TIME",
"t_type": "2",
"empl_num": "E01",
"quantity": 2.0,
"rate": 175.0,
"amount": 350.0,
"billed": "N",
"note": "Progress test work",
}
assert client.post("/api/financial/ledger/", json=time_entry).status_code == 200
owner_id2 = _create_customer(client)
file_no2 = _create_file(client, owner_id2)
file_numbers = [file_no1, file_no2]
resp_batch = client.post("/api/billing/statements/batch-generate", json={
"file_numbers": file_numbers
})
assert resp_batch.status_code == 200
batch_id = resp_batch.json()["batch_id"]
resp_progress = client.get(f"/api/billing/statements/batch-progress/{batch_id}")
assert resp_progress.status_code == 200
progress_data = resp_progress.json()
# Check file-level details
assert len(progress_data["files"]) == 2
for file_progress in progress_data["files"]:
assert file_progress["file_no"] in file_numbers
assert file_progress["status"] == "completed"
assert file_progress["started_at"] is not None
assert file_progress["completed_at"] is not None
# Successful files should have statement metadata
if file_progress["status"] == "completed":
assert file_progress["statement_meta"] is not None
assert file_progress["statement_meta"]["file_no"] == file_progress["file_no"]
assert file_progress["statement_meta"]["filename"].endswith(".html")
assert file_progress["statement_meta"]["size"] > 0
# File with ledger data should have unbilled entries
if file_progress["file_no"] == file_no1:
assert file_progress["statement_meta"]["unbilled_count"] == 1
assert file_progress["statement_meta"]["totals"]["charges_unbilled"] == 350.0
def test_batch_progress_api_integration_workflow(client: TestClient):
"""Test complete workflow integration: batch start -> progress polling -> completion"""
# Create multiple files for a more comprehensive test
file_numbers = []
for i in range(3):
owner_id = _create_customer(client)
file_no = _create_file(client, owner_id)
file_numbers.append(file_no)
# Start batch operation
resp_batch = client.post("/api/billing/statements/batch-generate", json={
"file_numbers": file_numbers,
"period": "2024-01"
})
assert resp_batch.status_code == 200
batch_data = resp_batch.json()
batch_id = batch_data["batch_id"]
# 1. Check initial response has batch_id
assert batch_id.startswith("batch_")
assert batch_data["total_files"] == 3
# 2. Poll progress (should be completed for synchronous operation)
resp_progress = client.get(f"/api/billing/statements/batch-progress/{batch_id}")
assert resp_progress.status_code == 200
progress_data = resp_progress.json()
# 3. Verify progress data matches batch response data
assert progress_data["batch_id"] == batch_id
assert progress_data["total_files"] == batch_data["total_files"]
assert progress_data["successful_files"] == batch_data["successful"]
assert progress_data["failed_files"] == batch_data["failed"]
assert progress_data["success_rate"] == batch_data["success_rate"]
# 4. Verify all individual files were processed
assert len(progress_data["files"]) == 3
for file_progress in progress_data["files"]:
assert file_progress["file_no"] in file_numbers
assert file_progress["status"] == "completed"
# Check that period was applied
if file_progress["statement_meta"]:
assert file_progress["statement_meta"]["period"] == "2024-01"
# 5. Verify generated statements exist and can be listed
for file_no in file_numbers:
resp_list = client.get(f"/api/billing/statements/{file_no}/list")
assert resp_list.status_code == 200
statements = resp_list.json()
assert len(statements) >= 1
# Find the statement from our batch
batch_statement = None
for stmt in statements:
if any(f["statement_meta"]["filename"] == stmt["filename"]
for f in progress_data["files"]
if f["file_no"] == file_no and f["statement_meta"]):
batch_statement = stmt
break
assert batch_statement is not None
assert batch_statement["size"] > 0
def test_batch_progress_error_handling_and_recovery(client: TestClient):
"""Test error handling in progress tracking with mixed file results"""
# Mix of valid and invalid files to test error handling
owner_id = _create_customer(client)
valid_file = _create_file(client, owner_id)
file_numbers = [valid_file, "INVALID-1", "INVALID-2", valid_file] # Include duplicate
resp_batch = client.post("/api/billing/statements/batch-generate", json={
"file_numbers": file_numbers
})
assert resp_batch.status_code == 200
batch_data = resp_batch.json()
batch_id = batch_data["batch_id"]
# Should deduplicate files
assert batch_data["total_files"] == 3 # valid_file, INVALID-1, INVALID-2
resp_progress = client.get(f"/api/billing/statements/batch-progress/{batch_id}")
assert resp_progress.status_code == 200
progress_data = resp_progress.json()
# Check overall batch status
assert progress_data["status"] == "completed"
assert progress_data["total_files"] == 3
assert progress_data["processed_files"] == 3
assert progress_data["successful_files"] == 1
assert progress_data["failed_files"] == 2
# Check individual file results
files_by_no = {f["file_no"]: f for f in progress_data["files"]}
# Valid file should succeed
assert files_by_no[valid_file]["status"] == "completed"
assert files_by_no[valid_file]["statement_meta"] is not None
assert files_by_no[valid_file]["error_message"] is None
# Invalid files should fail with error messages
for invalid_file in ["INVALID-1", "INVALID-2"]:
assert files_by_no[invalid_file]["status"] == "failed"
assert files_by_no[invalid_file]["statement_meta"] is None
assert files_by_no[invalid_file]["error_message"] is not None
assert "not found" in files_by_no[invalid_file]["error_message"].lower()