Files
delphi-database/app/api/support.py
2025-08-14 19:16:28 -05:00

563 lines
20 KiB
Python

"""
Support ticket API endpoints
"""
from typing import List, Optional
from fastapi import APIRouter, Depends, HTTPException, status, Request, Query
from sqlalchemy.orm import Session, joinedload
from sqlalchemy import func, desc, and_, or_
from datetime import datetime, timezone
import secrets
from app.database.base import get_db
from app.models import User, SupportTicket, TicketResponse as TicketResponseModel, TicketStatus, TicketPriority, TicketCategory
from app.auth.security import get_current_user, get_admin_user
from app.services.audit import audit_service
from app.services.query_utils import apply_sorting, paginate_with_total, tokenized_ilike_filter
from app.api.search_highlight import build_query_tokens
router = APIRouter()
# Pydantic models for API
from pydantic import BaseModel, Field, EmailStr
from pydantic.config import ConfigDict
class TicketCreate(BaseModel):
"""Create new support ticket"""
subject: str = Field(..., min_length=5, max_length=200)
description: str = Field(..., min_length=10)
category: TicketCategory = TicketCategory.BUG_REPORT
priority: TicketPriority = TicketPriority.MEDIUM
contact_name: str = Field(..., min_length=1, max_length=100)
contact_email: EmailStr
current_page: Optional[str] = None
browser_info: Optional[str] = None
class TicketUpdate(BaseModel):
"""Update existing ticket"""
subject: Optional[str] = Field(None, min_length=5, max_length=200)
description: Optional[str] = Field(None, min_length=10)
category: Optional[TicketCategory] = None
priority: Optional[TicketPriority] = None
status: Optional[TicketStatus] = None
assigned_to: Optional[int] = None
class ResponseCreate(BaseModel):
"""Create ticket response"""
message: str = Field(..., min_length=1)
is_internal: bool = False
class TicketResponseOut(BaseModel):
"""Ticket response model"""
id: int
ticket_id: int
message: str
is_internal: bool
author_name: Optional[str]
author_email: Optional[str]
created_at: datetime
model_config = ConfigDict(from_attributes=True)
class TicketDetail(BaseModel):
"""Detailed ticket information"""
id: int
ticket_number: str
subject: str
description: str
category: TicketCategory
priority: TicketPriority
status: TicketStatus
contact_name: str
contact_email: str
current_page: Optional[str]
browser_info: Optional[str]
ip_address: Optional[str]
created_at: datetime
updated_at: Optional[datetime]
resolved_at: Optional[datetime]
assigned_to: Optional[int]
assigned_admin_name: Optional[str]
submitter_name: Optional[str]
responses: List[TicketResponseOut] = Field(default_factory=list)
model_config = ConfigDict(from_attributes=True)
class PaginatedTicketsResponse(BaseModel):
items: List[TicketDetail]
total: int
def generate_ticket_number() -> str:
"""Generate unique ticket number like ST-2024-001"""
year = datetime.now(timezone.utc).year
random_suffix = secrets.token_hex(2).upper()
return f"ST-{year}-{random_suffix}"
@router.post("/tickets", response_model=dict)
async def create_support_ticket(
ticket_data: TicketCreate,
request: Request,
db: Session = Depends(get_db),
current_user: Optional[User] = Depends(lambda: None) # Allow anonymous submissions
):
"""Create new support ticket (public endpoint)"""
# current_user is already set from dependency injection above
# No need to call get_current_user again
# Generate unique ticket number
ticket_number = generate_ticket_number()
while db.query(SupportTicket).filter(SupportTicket.ticket_number == ticket_number).first():
ticket_number = generate_ticket_number()
# Get client info from request
client_ip = request.client.host
user_agent = request.headers.get("User-Agent", "")
# Create ticket
new_ticket = SupportTicket(
ticket_number=ticket_number,
subject=ticket_data.subject,
description=ticket_data.description,
category=ticket_data.category,
priority=ticket_data.priority,
contact_name=ticket_data.contact_name,
contact_email=ticket_data.contact_email,
current_page=ticket_data.current_page,
browser_info=ticket_data.browser_info or user_agent,
ip_address=client_ip,
user_id=current_user.id if current_user else None,
status=TicketStatus.OPEN,
created_at=datetime.now(timezone.utc)
)
db.add(new_ticket)
db.commit()
db.refresh(new_ticket)
# Audit logging (non-blocking)
try:
audit_service.log_action(
db=db,
action="CREATE",
resource_type="SUPPORT_TICKET",
user=current_user,
resource_id=str(new_ticket.id),
details={"ticket_number": new_ticket.ticket_number},
request=request,
)
except Exception:
pass
return {
"message": "Support ticket created successfully",
"ticket_number": new_ticket.ticket_number,
"ticket_id": new_ticket.id,
"status": "created"
}
@router.get("/tickets", response_model=List[TicketDetail] | PaginatedTicketsResponse)
async def list_tickets(
status: Optional[TicketStatus] = Query(None, description="Filter by ticket status"),
priority: Optional[TicketPriority] = Query(None, description="Filter by ticket priority"),
category: Optional[TicketCategory] = Query(None, description="Filter by ticket category"),
assigned_to_me: bool = Query(False, description="Only include tickets assigned to the current admin"),
search: Optional[str] = Query(None, description="Tokenized search across number, subject, description, contact name/email, current page, and IP"),
skip: int = Query(0, ge=0, description="Offset for pagination"),
limit: int = Query(50, ge=1, le=200, description="Page size"),
sort_by: Optional[str] = Query(None, description="Sort by: created, updated, resolved, priority, status, subject"),
sort_dir: Optional[str] = Query("desc", description="Sort direction: asc or desc"),
include_total: bool = Query(False, description="When true, returns {items, total} instead of a plain list"),
db: Session = Depends(get_db),
current_user: User = Depends(get_admin_user)
):
"""List support tickets (admin only)"""
query = db.query(SupportTicket).options(
joinedload(SupportTicket.submitter),
joinedload(SupportTicket.assigned_admin),
joinedload(SupportTicket.responses)
)
# Apply filters
if status:
query = query.filter(SupportTicket.status == status)
if priority:
query = query.filter(SupportTicket.priority == priority)
if category:
query = query.filter(SupportTicket.category == category)
if assigned_to_me:
query = query.filter(SupportTicket.assigned_to == current_user.id)
# Search across key text fields
if search:
tokens = build_query_tokens(search)
filter_expr = tokenized_ilike_filter(tokens, [
SupportTicket.ticket_number,
SupportTicket.subject,
SupportTicket.description,
SupportTicket.contact_name,
SupportTicket.contact_email,
SupportTicket.current_page,
SupportTicket.ip_address,
])
if filter_expr is not None:
query = query.filter(filter_expr)
# Sorting (whitelisted)
query = apply_sorting(
query,
sort_by,
sort_dir,
allowed={
"created": [SupportTicket.created_at],
"updated": [SupportTicket.updated_at],
"resolved": [SupportTicket.resolved_at],
"priority": [SupportTicket.priority],
"status": [SupportTicket.status],
"subject": [SupportTicket.subject],
},
)
tickets, total = paginate_with_total(query, skip, limit, include_total)
# Format response
result = []
for ticket in tickets:
ticket_dict = {
"id": ticket.id,
"ticket_number": ticket.ticket_number,
"subject": ticket.subject,
"description": ticket.description,
"category": ticket.category,
"priority": ticket.priority,
"status": ticket.status,
"contact_name": ticket.contact_name,
"contact_email": ticket.contact_email,
"current_page": ticket.current_page,
"browser_info": ticket.browser_info,
"ip_address": ticket.ip_address,
"created_at": ticket.created_at,
"updated_at": ticket.updated_at,
"resolved_at": ticket.resolved_at,
"assigned_to": ticket.assigned_to,
"assigned_admin_name": f"{ticket.assigned_admin.first_name} {ticket.assigned_admin.last_name}" if ticket.assigned_admin else None,
"submitter_name": f"{ticket.submitter.first_name} {ticket.submitter.last_name}" if ticket.submitter else ticket.contact_name,
"responses": [
{
"id": resp.id,
"ticket_id": resp.ticket_id,
"message": resp.message,
"is_internal": resp.is_internal,
"author_name": f"{resp.author.first_name} {resp.author.last_name}" if resp.author else resp.author_name,
"author_email": resp.author.email if resp.author else resp.author_email,
"created_at": resp.created_at
}
for resp in ticket.responses if not resp.is_internal # Only show public responses
]
}
result.append(ticket_dict)
if include_total:
return {"items": result, "total": total or 0}
return result
@router.get("/tickets/{ticket_id}", response_model=TicketDetail)
async def get_ticket(
ticket_id: int,
db: Session = Depends(get_db),
current_user: User = Depends(get_admin_user)
):
"""Get ticket details (admin only)"""
ticket = db.query(SupportTicket).options(
joinedload(SupportTicket.submitter),
joinedload(SupportTicket.assigned_admin),
joinedload(SupportTicket.responses).joinedload(TicketResponseModel.author)
).filter(SupportTicket.id == ticket_id).first()
if not ticket:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Ticket not found"
)
# Format response (include internal responses for admins)
return {
"id": ticket.id,
"ticket_number": ticket.ticket_number,
"subject": ticket.subject,
"description": ticket.description,
"category": ticket.category,
"priority": ticket.priority,
"status": ticket.status,
"contact_name": ticket.contact_name,
"contact_email": ticket.contact_email,
"current_page": ticket.current_page,
"browser_info": ticket.browser_info,
"ip_address": ticket.ip_address,
"created_at": ticket.created_at,
"updated_at": ticket.updated_at,
"resolved_at": ticket.resolved_at,
"assigned_to": ticket.assigned_to,
"assigned_admin_name": f"{ticket.assigned_admin.first_name} {ticket.assigned_admin.last_name}" if ticket.assigned_admin else None,
"submitter_name": f"{ticket.submitter.first_name} {ticket.submitter.last_name}" if ticket.submitter else ticket.contact_name,
"responses": [
{
"id": resp.id,
"ticket_id": resp.ticket_id,
"message": resp.message,
"is_internal": resp.is_internal,
"author_name": f"{resp.author.first_name} {resp.author.last_name}" if resp.author else resp.author_name,
"author_email": resp.author.email if resp.author else resp.author_email,
"created_at": resp.created_at
}
for resp in ticket.responses
]
}
@router.put("/tickets/{ticket_id}")
async def update_ticket(
ticket_id: int,
ticket_data: TicketUpdate,
request: Request,
db: Session = Depends(get_db),
current_user: User = Depends(get_admin_user)
):
"""Update ticket (admin only)"""
ticket = db.query(SupportTicket).filter(SupportTicket.id == ticket_id).first()
if not ticket:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Ticket not found"
)
# Track changes for audit
changes = {}
update_data = ticket_data.model_dump(exclude_unset=True)
for field, value in update_data.items():
if hasattr(ticket, field) and getattr(ticket, field) != value:
changes[field] = {"from": getattr(ticket, field), "to": value}
setattr(ticket, field, value)
# Set resolved timestamp if status changed to resolved
if ticket_data.status == TicketStatus.RESOLVED and ticket.resolved_at is None:
ticket.resolved_at = datetime.now(timezone.utc)
changes["resolved_at"] = {"from": None, "to": ticket.resolved_at}
ticket.updated_at = datetime.now(timezone.utc)
db.commit()
# Audit logging (non-blocking)
try:
audit_service.log_action(
db=db,
action="UPDATE",
resource_type="SUPPORT_TICKET",
user=current_user,
resource_id=str(ticket_id),
details={"changes": changes} if changes else None,
request=request,
)
except Exception:
pass
return {"message": "Ticket updated successfully"}
@router.post("/tickets/{ticket_id}/responses")
async def add_response(
ticket_id: int,
response_data: ResponseCreate,
request: Request,
db: Session = Depends(get_db),
current_user: User = Depends(get_admin_user)
):
"""Add response to ticket (admin only)"""
ticket = db.query(SupportTicket).filter(SupportTicket.id == ticket_id).first()
if not ticket:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Ticket not found"
)
# Create response
response = TicketResponseModel(
ticket_id=ticket_id,
message=response_data.message,
is_internal=response_data.is_internal,
user_id=current_user.id,
created_at=datetime.now(timezone.utc)
)
db.add(response)
# Update ticket timestamp
ticket.updated_at = datetime.now(timezone.utc)
db.commit()
db.refresh(response)
# Audit logging (non-blocking)
try:
audit_service.log_action(
db=db,
action="ADD_RESPONSE",
resource_type="SUPPORT_TICKET",
user=current_user,
resource_id=str(ticket_id),
details={"response_id": response.id, "is_internal": response_data.is_internal},
request=request,
)
except Exception:
pass
return {"message": "Response added successfully", "response_id": response.id}
@router.get("/my-tickets", response_model=List[TicketDetail] | PaginatedTicketsResponse)
async def get_my_tickets(
status: Optional[TicketStatus] = Query(None, description="Filter by ticket status"),
search: Optional[str] = Query(None, description="Tokenized search across number, subject, description"),
skip: int = Query(0, ge=0, description="Offset for pagination"),
limit: int = Query(20, ge=1, le=200, description="Page size"),
sort_by: Optional[str] = Query(None, description="Sort by: created, updated, resolved, priority, status, subject"),
sort_dir: Optional[str] = Query("desc", description="Sort direction: asc or desc"),
include_total: bool = Query(False, description="When true, returns {items, total} instead of a plain list"),
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""Get current user's tickets"""
query = db.query(SupportTicket).options(
joinedload(SupportTicket.responses)
).filter(SupportTicket.user_id == current_user.id)
if status:
query = query.filter(SupportTicket.status == status)
# Search within user's tickets
if search:
tokens = build_query_tokens(search)
filter_expr = tokenized_ilike_filter(tokens, [
SupportTicket.ticket_number,
SupportTicket.subject,
SupportTicket.description,
])
if filter_expr is not None:
query = query.filter(filter_expr)
# Sorting (whitelisted)
query = apply_sorting(
query,
sort_by,
sort_dir,
allowed={
"created": [SupportTicket.created_at],
"updated": [SupportTicket.updated_at],
"resolved": [SupportTicket.resolved_at],
"priority": [SupportTicket.priority],
"status": [SupportTicket.status],
"subject": [SupportTicket.subject],
},
)
tickets, total = paginate_with_total(query, skip, limit, include_total)
# Format response (exclude internal responses for regular users)
result = []
for ticket in tickets:
ticket_dict = {
"id": ticket.id,
"ticket_number": ticket.ticket_number,
"subject": ticket.subject,
"description": ticket.description,
"category": ticket.category,
"priority": ticket.priority,
"status": ticket.status,
"contact_name": ticket.contact_name,
"contact_email": ticket.contact_email,
"current_page": ticket.current_page,
"browser_info": None, # Don't expose to user
"ip_address": None, # Don't expose to user
"created_at": ticket.created_at,
"updated_at": ticket.updated_at,
"resolved_at": ticket.resolved_at,
"assigned_to": None, # Don't expose to user
"assigned_admin_name": None,
"submitter_name": f"{current_user.first_name} {current_user.last_name}",
"responses": [
{
"id": resp.id,
"ticket_id": resp.ticket_id,
"message": resp.message,
"is_internal": False, # Always show as public to user
"author_name": f"{resp.author.first_name} {resp.author.last_name}" if resp.author else "Support Team",
"author_email": None, # Don't expose admin emails
"created_at": resp.created_at
}
for resp in ticket.responses if not resp.is_internal # Only show public responses
]
}
result.append(ticket_dict)
if include_total:
return {"items": result, "total": total or 0}
return result
@router.get("/stats")
async def get_ticket_stats(
db: Session = Depends(get_db),
current_user: User = Depends(get_admin_user)
):
"""Get support ticket statistics (admin only)"""
total_tickets = db.query(func.count(SupportTicket.id)).scalar()
open_tickets = db.query(func.count(SupportTicket.id)).filter(
SupportTicket.status == TicketStatus.OPEN
).scalar()
in_progress_tickets = db.query(func.count(SupportTicket.id)).filter(
SupportTicket.status == TicketStatus.IN_PROGRESS
).scalar()
resolved_tickets = db.query(func.count(SupportTicket.id)).filter(
SupportTicket.status == TicketStatus.RESOLVED
).scalar()
# Tickets by priority
high_priority = db.query(func.count(SupportTicket.id)).filter(
and_(SupportTicket.priority == TicketPriority.HIGH, SupportTicket.status != TicketStatus.RESOLVED)
).scalar()
urgent_tickets = db.query(func.count(SupportTicket.id)).filter(
and_(SupportTicket.priority == TicketPriority.URGENT, SupportTicket.status != TicketStatus.RESOLVED)
).scalar()
# Recent tickets (last 7 days)
from datetime import timedelta
week_ago = datetime.now(timezone.utc) - timedelta(days=7)
recent_tickets = db.query(func.count(SupportTicket.id)).filter(
SupportTicket.created_at >= week_ago
).scalar()
return {
"total_tickets": total_tickets,
"open_tickets": open_tickets,
"in_progress_tickets": in_progress_tickets,
"resolved_tickets": resolved_tickets,
"high_priority_tickets": high_priority,
"urgent_tickets": urgent_tickets,
"recent_tickets": recent_tickets
}