from typing import Optional, List from sqlalchemy import or_, and_, func, asc, desc from app.models.rolodex import Rolodex def apply_customer_filters(base_query, search: Optional[str], group: Optional[str], state: Optional[str], groups: Optional[List[str]], states: Optional[List[str]]): """Apply shared search and group/state filters to the provided base_query. This helper is used by both list and export endpoints to keep logic in sync. """ s = (search or "").strip() if s: s_lower = s.lower() tokens = [t for t in s_lower.split() if t] contains_any = or_( func.lower(Rolodex.id).contains(s_lower), func.lower(Rolodex.last).contains(s_lower), func.lower(Rolodex.first).contains(s_lower), func.lower(Rolodex.middle).contains(s_lower), func.lower(Rolodex.city).contains(s_lower), func.lower(Rolodex.email).contains(s_lower), ) name_tokens = [ or_( func.lower(Rolodex.first).contains(tok), func.lower(Rolodex.middle).contains(tok), func.lower(Rolodex.last).contains(tok), ) for tok in tokens ] combined = contains_any if not name_tokens else or_(contains_any, and_(*name_tokens)) last_first_filter = None if "," in s_lower: last_part, first_part = [p.strip() for p in s_lower.split(",", 1)] if last_part and first_part: last_first_filter = and_( func.lower(Rolodex.last).contains(last_part), func.lower(Rolodex.first).contains(first_part), ) elif last_part: last_first_filter = func.lower(Rolodex.last).contains(last_part) final_filter = or_(combined, last_first_filter) if last_first_filter is not None else combined base_query = base_query.filter(final_filter) effective_groups = [g for g in (groups or []) if g] or ([group] if group else []) if effective_groups: base_query = base_query.filter(Rolodex.group.in_(effective_groups)) effective_states = [s for s in (states or []) if s] or ([state] if state else []) if effective_states: base_query = base_query.filter(Rolodex.abrev.in_(effective_states)) return base_query def apply_customer_sorting(base_query, sort_by: Optional[str], sort_dir: Optional[str]): """Apply shared sorting to the provided base_query. Supported fields: id, name (last,first), city (city,state), email. Unknown fields fall back to id. Sorting is case-insensitive for strings. """ normalized_sort_by = (sort_by or "id").lower() normalized_sort_dir = (sort_dir or "asc").lower() is_desc = normalized_sort_dir == "desc" order_columns = [] if normalized_sort_by == "id": order_columns = [Rolodex.id] elif normalized_sort_by == "name": order_columns = [Rolodex.last, Rolodex.first] elif normalized_sort_by == "city": order_columns = [Rolodex.city, Rolodex.abrev] elif normalized_sort_by == "email": order_columns = [Rolodex.email] else: order_columns = [Rolodex.id] ordered = [] for col in order_columns: try: expr = func.lower(col) if col.type.python_type in (str,) else col # type: ignore[attr-defined] except Exception: expr = col ordered.append(desc(expr) if is_desc else asc(expr)) if ordered: base_query = base_query.order_by(*ordered) return base_query def prepare_customer_csv_rows(customers: List[Rolodex], fields: Optional[List[str]]): """Prepare CSV header and rows for the given customers and requested fields. Returns a tuple: (header_row, rows), where header_row is a list of column titles and rows is a list of row lists ready to be written by csv.writer. """ allowed_fields_in_order = ["id", "name", "group", "city", "state", "phone", "email"] header_names = { "id": "Customer ID", "name": "Name", "group": "Group", "city": "City", "state": "State", "phone": "Primary Phone", "email": "Email", } requested = [f.lower() for f in (fields or []) if isinstance(f, str)] selected_fields = [f for f in allowed_fields_in_order if f in requested] if requested else allowed_fields_in_order if not selected_fields: selected_fields = allowed_fields_in_order header_row = [header_names[f] for f in selected_fields] rows: List[List[str]] = [] for c in customers: full_name = f"{(c.first or '').strip()} {(c.last or '').strip()}".strip() primary_phone = "" try: if getattr(c, "phone_numbers", None): primary_phone = c.phone_numbers[0].phone or "" except Exception: primary_phone = "" row_map = { "id": c.id, "name": full_name, "group": c.group or "", "city": c.city or "", "state": c.abrev or "", "phone": primary_phone, "email": c.email or "", } rows.append([row_map[f] for f in selected_fields]) return header_row, rows