Files
delphi-database/app/api/customers.py
2025-08-08 15:55:15 -05:00

387 lines
10 KiB
Python

"""
Customer (Rolodex) API endpoints
"""
from typing import List, Optional
from fastapi import APIRouter, Depends, HTTPException, status, Query
from sqlalchemy.orm import Session, joinedload
from sqlalchemy import or_, func
from app.database.base import get_db
from app.models.rolodex import Rolodex, Phone
from app.models.user import User
from app.auth.security import get_current_user
router = APIRouter()
# Pydantic schemas for request/response
from pydantic import BaseModel, EmailStr
from datetime import date
class PhoneCreate(BaseModel):
location: Optional[str] = None
phone: str
class PhoneResponse(BaseModel):
id: int
location: Optional[str]
phone: str
class Config:
from_attributes = True
class CustomerBase(BaseModel):
id: str
last: str
first: Optional[str] = None
middle: Optional[str] = None
prefix: Optional[str] = None
suffix: Optional[str] = None
title: Optional[str] = None
group: Optional[str] = None
a1: Optional[str] = None
a2: Optional[str] = None
a3: Optional[str] = None
city: Optional[str] = None
abrev: Optional[str] = None
zip: Optional[str] = None
email: Optional[EmailStr] = None
dob: Optional[date] = None
ss_number: Optional[str] = None
legal_status: Optional[str] = None
memo: Optional[str] = None
class CustomerCreate(CustomerBase):
pass
class CustomerUpdate(BaseModel):
last: Optional[str] = None
first: Optional[str] = None
middle: Optional[str] = None
prefix: Optional[str] = None
suffix: Optional[str] = None
title: Optional[str] = None
group: Optional[str] = None
a1: Optional[str] = None
a2: Optional[str] = None
a3: Optional[str] = None
city: Optional[str] = None
abrev: Optional[str] = None
zip: Optional[str] = None
email: Optional[EmailStr] = None
dob: Optional[date] = None
ss_number: Optional[str] = None
legal_status: Optional[str] = None
memo: Optional[str] = None
class CustomerResponse(CustomerBase):
phone_numbers: List[PhoneResponse] = []
class Config:
from_attributes = True
@router.get("/search/phone")
async def search_by_phone(
phone: str = Query(..., description="Phone number to search for"),
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""Search customers by phone number (legacy phone search feature)"""
phones = db.query(Phone).join(Rolodex).filter(
Phone.phone.contains(phone)
).options(joinedload(Phone.rolodex)).all()
results = []
for phone_record in phones:
results.append({
"phone": phone_record.phone,
"location": phone_record.location,
"customer": {
"id": phone_record.rolodex.id,
"name": f"{phone_record.rolodex.first or ''} {phone_record.rolodex.last}".strip(),
"city": phone_record.rolodex.city,
"state": phone_record.rolodex.abrev
}
})
return results
@router.get("/groups")
async def get_customer_groups(
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""Get list of customer groups for filtering"""
groups = db.query(Rolodex.group).filter(
Rolodex.group.isnot(None),
Rolodex.group != ""
).distinct().all()
return [{"group": group[0]} for group in groups if group[0]]
@router.get("/states")
async def get_states(
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""Get list of states used in the database"""
states = db.query(Rolodex.abrev).filter(
Rolodex.abrev.isnot(None),
Rolodex.abrev != ""
).distinct().all()
return [{"state": state[0]} for state in states if state[0]]
@router.get("/stats")
async def get_customer_stats(
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""Get customer database statistics"""
total_customers = db.query(Rolodex).count()
total_phones = db.query(Phone).count()
customers_with_email = db.query(Rolodex).filter(
Rolodex.email.isnot(None),
Rolodex.email != ""
).count()
# Group breakdown
group_stats = db.query(Rolodex.group, func.count(Rolodex.id)).filter(
Rolodex.group.isnot(None),
Rolodex.group != ""
).group_by(Rolodex.group).all()
return {
"total_customers": total_customers,
"total_phone_numbers": total_phones,
"customers_with_email": customers_with_email,
"group_breakdown": [{"group": group, "count": count} for group, count in group_stats]
}
@router.get("/", response_model=List[CustomerResponse])
async def list_customers(
skip: int = Query(0, ge=0),
limit: int = Query(50, ge=1, le=200),
search: Optional[str] = Query(None),
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""List customers with pagination and search"""
query = db.query(Rolodex).options(joinedload(Rolodex.phone_numbers))
if search:
query = query.filter(
or_(
Rolodex.id.contains(search),
Rolodex.last.contains(search),
Rolodex.first.contains(search),
Rolodex.city.contains(search),
Rolodex.email.contains(search)
)
)
customers = query.offset(skip).limit(limit).all()
return customers
@router.get("/{customer_id}", response_model=CustomerResponse)
async def get_customer(
customer_id: str,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""Get specific customer by ID"""
customer = db.query(Rolodex).options(joinedload(Rolodex.phone_numbers)).filter(
Rolodex.id == customer_id
).first()
if not customer:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Customer not found"
)
return customer
@router.post("/", response_model=CustomerResponse)
async def create_customer(
customer_data: CustomerCreate,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""Create new customer"""
# Check if ID already exists
existing = db.query(Rolodex).filter(Rolodex.id == customer_data.id).first()
if existing:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Customer ID already exists"
)
customer = Rolodex(**customer_data.model_dump())
db.add(customer)
db.commit()
db.refresh(customer)
return customer
@router.put("/{customer_id}", response_model=CustomerResponse)
async def update_customer(
customer_id: str,
customer_data: CustomerUpdate,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""Update customer"""
customer = db.query(Rolodex).filter(Rolodex.id == customer_id).first()
if not customer:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Customer not found"
)
# Update fields
for field, value in customer_data.model_dump(exclude_unset=True).items():
setattr(customer, field, value)
db.commit()
db.refresh(customer)
return customer
@router.delete("/{customer_id}")
async def delete_customer(
customer_id: str,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""Delete customer"""
customer = db.query(Rolodex).filter(Rolodex.id == customer_id).first()
if not customer:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Customer not found"
)
db.delete(customer)
db.commit()
return {"message": "Customer deleted successfully"}
@router.get("/{customer_id}/phones", response_model=List[PhoneResponse])
async def get_customer_phones(
customer_id: str,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""Get customer phone numbers"""
customer = db.query(Rolodex).filter(Rolodex.id == customer_id).first()
if not customer:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Customer not found"
)
phones = db.query(Phone).filter(Phone.rolodex_id == customer_id).all()
return phones
@router.post("/{customer_id}/phones", response_model=PhoneResponse)
async def add_customer_phone(
customer_id: str,
phone_data: PhoneCreate,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""Add phone number to customer"""
customer = db.query(Rolodex).filter(Rolodex.id == customer_id).first()
if not customer:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Customer not found"
)
phone = Phone(
rolodex_id=customer_id,
location=phone_data.location,
phone=phone_data.phone
)
db.add(phone)
db.commit()
db.refresh(phone)
return phone
@router.put("/{customer_id}/phones/{phone_id}", response_model=PhoneResponse)
async def update_customer_phone(
customer_id: str,
phone_id: int,
phone_data: PhoneCreate,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""Update customer phone number"""
phone = db.query(Phone).filter(
Phone.id == phone_id,
Phone.rolodex_id == customer_id
).first()
if not phone:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Phone number not found"
)
phone.location = phone_data.location
phone.phone = phone_data.phone
db.commit()
db.refresh(phone)
return phone
@router.delete("/{customer_id}/phones/{phone_id}")
async def delete_customer_phone(
customer_id: str,
phone_id: int,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""Delete customer phone number"""
phone = db.query(Phone).filter(
Phone.id == phone_id,
Phone.rolodex_id == customer_id
).first()
if not phone:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Phone number not found"
)
db.delete(phone)
db.commit()
return {"message": "Phone number deleted successfully"}