coming together
This commit is contained in:
@@ -1,185 +1,110 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Test script for the customers module
|
||||
"""
|
||||
import requests
|
||||
import pytest
|
||||
import json
|
||||
"""Tests for Customers API using FastAPI TestClient (no live server required)."""
|
||||
import os
|
||||
import uuid
|
||||
from datetime import datetime
|
||||
|
||||
BASE_URL = "http://localhost:6920"
|
||||
import pytest
|
||||
from fastapi.testclient import TestClient
|
||||
|
||||
# Ensure required env vars for app import/config
|
||||
os.environ.setdefault("SECRET_KEY", "x" * 32)
|
||||
os.environ.setdefault("DATABASE_URL", "sqlite:////tmp/delphi_test.sqlite")
|
||||
|
||||
from app.main import app # noqa: E402
|
||||
from app.auth.security import get_current_user # noqa: E402
|
||||
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
def token():
|
||||
"""Obtain an access token from the running server, or skip if unavailable."""
|
||||
try:
|
||||
response = requests.post(f"{BASE_URL}/api/auth/login", json={
|
||||
"username": "admin",
|
||||
"password": "admin123"
|
||||
}, timeout=3)
|
||||
if response.status_code == 200:
|
||||
data = response.json()
|
||||
if data and data.get("access_token"):
|
||||
return data["access_token"]
|
||||
except Exception:
|
||||
pass
|
||||
pytest.skip("Auth server not available; skipping integration tests")
|
||||
def client():
|
||||
# Override auth to bypass JWT for these tests
|
||||
app.dependency_overrides[get_current_user] = lambda: {
|
||||
"id": "test",
|
||||
"username": "tester",
|
||||
"is_admin": True,
|
||||
"is_active": True,
|
||||
}
|
||||
|
||||
def test_auth():
|
||||
"""Test authentication"""
|
||||
print("🔐 Testing authentication...")
|
||||
|
||||
# First, create an admin user if needed
|
||||
try:
|
||||
response = requests.post(f"{BASE_URL}/api/auth/register", json={
|
||||
"username": "admin",
|
||||
"email": "admin@delphicg.local",
|
||||
"password": "admin123",
|
||||
"full_name": "System Administrator",
|
||||
"is_admin": True
|
||||
})
|
||||
print(f"Registration: {response.status_code}")
|
||||
except Exception as e:
|
||||
print(f"Registration may already exist: {e}")
|
||||
|
||||
# Login
|
||||
response = requests.post(f"{BASE_URL}/api/auth/login", json={
|
||||
"username": "admin",
|
||||
"password": "admin123"
|
||||
})
|
||||
|
||||
if response.status_code == 200:
|
||||
token_data = response.json()
|
||||
token = token_data["access_token"]
|
||||
print(f"✅ Login successful, token: {token[:20]}...")
|
||||
return token
|
||||
else:
|
||||
print(f"❌ Login failed: {response.status_code} - {response.text}")
|
||||
return None
|
||||
yield TestClient(app)
|
||||
finally:
|
||||
app.dependency_overrides.pop(get_current_user, None)
|
||||
|
||||
def test_customers_api(token):
|
||||
"""Test customers API endpoints"""
|
||||
headers = {"Authorization": f"Bearer {token}"}
|
||||
|
||||
print("\n📋 Testing Customers API...")
|
||||
|
||||
# Test getting customers list (should be empty initially)
|
||||
response = requests.get(f"{BASE_URL}/api/customers/", headers=headers)
|
||||
print(f"Get customers: {response.status_code}")
|
||||
if response.status_code == 200:
|
||||
customers = response.json()
|
||||
print(f"Found {len(customers)} customers")
|
||||
|
||||
# Test creating a customer
|
||||
test_customer = {
|
||||
"id": "TEST001",
|
||||
|
||||
def test_customers_crud_and_queries(client: TestClient):
|
||||
unique_id = f"TEST-{uuid.uuid4().hex[:8]}"
|
||||
|
||||
# List (empty or not, but should 200)
|
||||
resp = client.get("/api/customers/?limit=5")
|
||||
assert resp.status_code == 200
|
||||
assert isinstance(resp.json(), list)
|
||||
|
||||
# Create
|
||||
payload = {
|
||||
"id": unique_id,
|
||||
"last": "Doe",
|
||||
"first": "John",
|
||||
"middle": "Q",
|
||||
"prefix": "Mr.",
|
||||
"title": "Attorney",
|
||||
"group": "Client",
|
||||
"a1": "123 Main Street",
|
||||
"a2": "Suite 100",
|
||||
"city": "Dallas",
|
||||
"abrev": "TX",
|
||||
"zip": "75201",
|
||||
"email": "john.doe@example.com",
|
||||
"legal_status": "Petitioner",
|
||||
"memo": "Test customer created by automated test"
|
||||
"memo": "Created by pytest",
|
||||
}
|
||||
|
||||
response = requests.post(f"{BASE_URL}/api/customers/", json=test_customer, headers=headers)
|
||||
print(f"Create customer: {response.status_code}")
|
||||
if response.status_code == 200:
|
||||
created_customer = response.json()
|
||||
print(f"✅ Created customer: {created_customer['id']} - {created_customer['last']}")
|
||||
customer_id = created_customer['id']
|
||||
else:
|
||||
print(f"❌ Create failed: {response.text}")
|
||||
return
|
||||
|
||||
# Test adding phone numbers
|
||||
phone1 = {"location": "Office", "phone": "(214) 555-0100"}
|
||||
phone2 = {"location": "Mobile", "phone": "(214) 555-0101"}
|
||||
|
||||
for phone in [phone1, phone2]:
|
||||
response = requests.post(f"{BASE_URL}/api/customers/{customer_id}/phones",
|
||||
json=phone, headers=headers)
|
||||
print(f"Add phone {phone['location']}: {response.status_code}")
|
||||
|
||||
# Test getting customer with phones
|
||||
response = requests.get(f"{BASE_URL}/api/customers/{customer_id}", headers=headers)
|
||||
if response.status_code == 200:
|
||||
customer = response.json()
|
||||
print(f"✅ Customer has {len(customer['phone_numbers'])} phone numbers")
|
||||
for phone in customer['phone_numbers']:
|
||||
print(f" {phone['location']}: {phone['phone']}")
|
||||
|
||||
# Test search functionality
|
||||
response = requests.get(f"{BASE_URL}/api/customers/?search=Doe", headers=headers)
|
||||
if response.status_code == 200:
|
||||
results = response.json()
|
||||
print(f"✅ Search for 'Doe' found {len(results)} results")
|
||||
|
||||
# Test phone search
|
||||
response = requests.get(f"{BASE_URL}/api/customers/search/phone?phone=214", headers=headers)
|
||||
if response.status_code == 200:
|
||||
results = response.json()
|
||||
print(f"✅ Phone search for '214' found {len(results)} results")
|
||||
|
||||
# Test stats
|
||||
response = requests.get(f"{BASE_URL}/api/customers/stats", headers=headers)
|
||||
if response.status_code == 200:
|
||||
stats = response.json()
|
||||
print(f"✅ Stats: {stats['total_customers']} customers, {stats['total_phone_numbers']} phones")
|
||||
print(f" Groups: {[g['group'] + ':' + str(g['count']) for g in stats['group_breakdown']]}")
|
||||
|
||||
# Test updating customer
|
||||
update_data = {"memo": f"Updated at {datetime.now().isoformat()}"}
|
||||
response = requests.put(f"{BASE_URL}/api/customers/{customer_id}",
|
||||
json=update_data, headers=headers)
|
||||
print(f"Update customer: {response.status_code}")
|
||||
|
||||
print(f"\n✅ All customer API tests completed successfully!")
|
||||
return customer_id
|
||||
resp = client.post("/api/customers/", json=payload)
|
||||
assert resp.status_code == 200
|
||||
body = resp.json()
|
||||
assert body["id"] == unique_id
|
||||
assert body["last"] == "Doe"
|
||||
|
||||
def test_web_page():
|
||||
"""Test the web page loads"""
|
||||
print("\n🌐 Testing web page...")
|
||||
|
||||
# Test health endpoint
|
||||
response = requests.get(f"{BASE_URL}/health")
|
||||
print(f"Health check: {response.status_code}")
|
||||
|
||||
# Test customers page (will require authentication in browser)
|
||||
response = requests.get(f"{BASE_URL}/customers")
|
||||
print(f"Customers page: {response.status_code}")
|
||||
if response.status_code == 200:
|
||||
print("✅ Customers page loads successfully")
|
||||
else:
|
||||
print(f"Note: Customers page requires authentication (status {response.status_code})")
|
||||
# Add phones
|
||||
for phone in ("(214) 555-0100", "(214) 555-0101"):
|
||||
resp = client.post(
|
||||
f"/api/customers/{unique_id}/phones",
|
||||
json={"location": "Office", "phone": phone},
|
||||
)
|
||||
assert resp.status_code == 200
|
||||
|
||||
# Retrieve and assert phones present
|
||||
resp = client.get(f"/api/customers/{unique_id}")
|
||||
assert resp.status_code == 200
|
||||
customer = resp.json()
|
||||
assert customer["id"] == unique_id
|
||||
assert len(customer.get("phone_numbers", [])) >= 2
|
||||
|
||||
# Search by last name
|
||||
resp = client.get("/api/customers/?search=Doe")
|
||||
assert resp.status_code == 200
|
||||
assert any(c["id"] == unique_id for c in resp.json())
|
||||
|
||||
# Phone search
|
||||
resp = client.get("/api/customers/search/phone", params={"phone": "214"})
|
||||
assert resp.status_code == 200
|
||||
assert isinstance(resp.json(), list)
|
||||
|
||||
# Stats endpoint returns expected fields
|
||||
resp = client.get("/api/customers/stats")
|
||||
assert resp.status_code == 200
|
||||
stats = resp.json()
|
||||
assert {"total_customers", "total_phone_numbers", "customers_with_email", "group_breakdown"} <= set(stats.keys())
|
||||
|
||||
# Update
|
||||
resp = client.put(
|
||||
f"/api/customers/{unique_id}",
|
||||
json={"memo": f"Updated at {datetime.utcnow().isoformat()}"},
|
||||
)
|
||||
assert resp.status_code == 200
|
||||
|
||||
# Cleanup
|
||||
resp = client.delete(f"/api/customers/{unique_id}")
|
||||
assert resp.status_code == 200
|
||||
|
||||
|
||||
def test_pages_and_health(client: TestClient):
|
||||
# Health
|
||||
resp = client.get("/health")
|
||||
assert resp.status_code == 200
|
||||
assert resp.json().get("status") == "healthy"
|
||||
|
||||
# Customers page should load HTML
|
||||
resp = client.get("/customers")
|
||||
assert resp.status_code in (200, 401, 403) or resp.headers.get("content-type", "").startswith("text/html")
|
||||
|
||||
def main():
|
||||
print("🚀 Testing Delphi Database Customers Module")
|
||||
print("=" * 50)
|
||||
|
||||
# Test authentication
|
||||
token = test_auth()
|
||||
if not token:
|
||||
print("❌ Cannot proceed without authentication")
|
||||
return
|
||||
|
||||
# Test API endpoints
|
||||
customer_id = test_customers_api(token)
|
||||
|
||||
# Test web interface
|
||||
test_web_page()
|
||||
|
||||
print("\n🎉 Customer module testing completed!")
|
||||
print(f"🌐 Visit http://localhost:6920/customers to see the web interface")
|
||||
print(f"📚 API docs available at http://localhost:6920/docs")
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user