142 lines
5.1 KiB
Python
142 lines
5.1 KiB
Python
from typing import Optional, List
|
|
from sqlalchemy import or_, and_, func, asc, desc
|
|
|
|
from app.models.rolodex import Rolodex
|
|
|
|
|
|
def apply_customer_filters(base_query, search: Optional[str], group: Optional[str], state: Optional[str], groups: Optional[List[str]], states: Optional[List[str]]):
|
|
"""Apply shared search and group/state filters to the provided base_query.
|
|
|
|
This helper is used by both list and export endpoints to keep logic in sync.
|
|
"""
|
|
s = (search or "").strip()
|
|
if s:
|
|
s_lower = s.lower()
|
|
tokens = [t for t in s_lower.split() if t]
|
|
contains_any = or_(
|
|
func.lower(Rolodex.id).contains(s_lower),
|
|
func.lower(Rolodex.last).contains(s_lower),
|
|
func.lower(Rolodex.first).contains(s_lower),
|
|
func.lower(Rolodex.middle).contains(s_lower),
|
|
func.lower(Rolodex.city).contains(s_lower),
|
|
func.lower(Rolodex.email).contains(s_lower),
|
|
)
|
|
name_tokens = [
|
|
or_(
|
|
func.lower(Rolodex.first).contains(tok),
|
|
func.lower(Rolodex.middle).contains(tok),
|
|
func.lower(Rolodex.last).contains(tok),
|
|
)
|
|
for tok in tokens
|
|
]
|
|
combined = contains_any if not name_tokens else or_(contains_any, and_(*name_tokens))
|
|
|
|
last_first_filter = None
|
|
if "," in s_lower:
|
|
last_part, first_part = [p.strip() for p in s_lower.split(",", 1)]
|
|
if last_part and first_part:
|
|
last_first_filter = and_(
|
|
func.lower(Rolodex.last).contains(last_part),
|
|
func.lower(Rolodex.first).contains(first_part),
|
|
)
|
|
elif last_part:
|
|
last_first_filter = func.lower(Rolodex.last).contains(last_part)
|
|
|
|
final_filter = or_(combined, last_first_filter) if last_first_filter is not None else combined
|
|
base_query = base_query.filter(final_filter)
|
|
|
|
effective_groups = [g for g in (groups or []) if g] or ([group] if group else [])
|
|
if effective_groups:
|
|
base_query = base_query.filter(Rolodex.group.in_(effective_groups))
|
|
|
|
effective_states = [s for s in (states or []) if s] or ([state] if state else [])
|
|
if effective_states:
|
|
base_query = base_query.filter(Rolodex.abrev.in_(effective_states))
|
|
|
|
return base_query
|
|
|
|
|
|
def apply_customer_sorting(base_query, sort_by: Optional[str], sort_dir: Optional[str]):
|
|
"""Apply shared sorting to the provided base_query.
|
|
|
|
Supported fields: id, name (last,first), city (city,state), email.
|
|
Unknown fields fall back to id. Sorting is case-insensitive for strings.
|
|
"""
|
|
normalized_sort_by = (sort_by or "id").lower()
|
|
normalized_sort_dir = (sort_dir or "asc").lower()
|
|
is_desc = normalized_sort_dir == "desc"
|
|
|
|
order_columns = []
|
|
if normalized_sort_by == "id":
|
|
order_columns = [Rolodex.id]
|
|
elif normalized_sort_by == "name":
|
|
order_columns = [Rolodex.last, Rolodex.first]
|
|
elif normalized_sort_by == "city":
|
|
order_columns = [Rolodex.city, Rolodex.abrev]
|
|
elif normalized_sort_by == "email":
|
|
order_columns = [Rolodex.email]
|
|
else:
|
|
order_columns = [Rolodex.id]
|
|
|
|
ordered = []
|
|
for col in order_columns:
|
|
try:
|
|
expr = func.lower(col) if col.type.python_type in (str,) else col # type: ignore[attr-defined]
|
|
except Exception:
|
|
expr = col
|
|
ordered.append(desc(expr) if is_desc else asc(expr))
|
|
|
|
if ordered:
|
|
base_query = base_query.order_by(*ordered)
|
|
return base_query
|
|
|
|
|
|
def prepare_customer_csv_rows(customers: List[Rolodex], fields: Optional[List[str]]):
|
|
"""Prepare CSV header and rows for the given customers and requested fields.
|
|
|
|
Returns a tuple: (header_row, rows), where header_row is a list of column
|
|
titles and rows is a list of row lists ready to be written by csv.writer.
|
|
"""
|
|
allowed_fields_in_order = ["id", "name", "group", "city", "state", "phone", "email"]
|
|
header_names = {
|
|
"id": "Customer ID",
|
|
"name": "Name",
|
|
"group": "Group",
|
|
"city": "City",
|
|
"state": "State",
|
|
"phone": "Primary Phone",
|
|
"email": "Email",
|
|
}
|
|
|
|
requested = [f.lower() for f in (fields or []) if isinstance(f, str)]
|
|
selected_fields = [f for f in allowed_fields_in_order if f in requested] if requested else allowed_fields_in_order
|
|
if not selected_fields:
|
|
selected_fields = allowed_fields_in_order
|
|
|
|
header_row = [header_names[f] for f in selected_fields]
|
|
|
|
rows: List[List[str]] = []
|
|
for c in customers:
|
|
full_name = f"{(c.first or '').strip()} {(c.last or '').strip()}".strip()
|
|
primary_phone = ""
|
|
try:
|
|
if getattr(c, "phone_numbers", None):
|
|
primary_phone = c.phone_numbers[0].phone or ""
|
|
except Exception:
|
|
primary_phone = ""
|
|
|
|
row_map = {
|
|
"id": c.id,
|
|
"name": full_name,
|
|
"group": c.group or "",
|
|
"city": c.city or "",
|
|
"state": c.abrev or "",
|
|
"phone": primary_phone,
|
|
"email": c.email or "",
|
|
}
|
|
rows.append([row_map[f] for f in selected_fields])
|
|
|
|
return header_row, rows
|
|
|
|
|