452 lines
16 KiB
Python
452 lines
16 KiB
Python
"""
|
|
Support ticket API endpoints
|
|
"""
|
|
from typing import List, Optional
|
|
from fastapi import APIRouter, Depends, HTTPException, status, Request
|
|
from sqlalchemy.orm import Session, joinedload
|
|
from sqlalchemy import func, desc, and_, or_
|
|
from datetime import datetime
|
|
import secrets
|
|
|
|
from app.database.base import get_db
|
|
from app.models import User, SupportTicket, TicketResponse, TicketStatus, TicketPriority, TicketCategory
|
|
from app.auth.security import get_current_user, get_admin_user
|
|
|
|
router = APIRouter()
|
|
|
|
# Pydantic models for API
|
|
from pydantic import BaseModel, Field, EmailStr
|
|
|
|
|
|
class TicketCreate(BaseModel):
|
|
"""Create new support ticket"""
|
|
subject: str = Field(..., min_length=5, max_length=200)
|
|
description: str = Field(..., min_length=10)
|
|
category: TicketCategory = TicketCategory.BUG_REPORT
|
|
priority: TicketPriority = TicketPriority.MEDIUM
|
|
contact_name: str = Field(..., min_length=1, max_length=100)
|
|
contact_email: EmailStr
|
|
current_page: Optional[str] = None
|
|
browser_info: Optional[str] = None
|
|
|
|
|
|
class TicketUpdate(BaseModel):
|
|
"""Update existing ticket"""
|
|
subject: Optional[str] = Field(None, min_length=5, max_length=200)
|
|
description: Optional[str] = Field(None, min_length=10)
|
|
category: Optional[TicketCategory] = None
|
|
priority: Optional[TicketPriority] = None
|
|
status: Optional[TicketStatus] = None
|
|
assigned_to: Optional[int] = None
|
|
|
|
|
|
class ResponseCreate(BaseModel):
|
|
"""Create ticket response"""
|
|
message: str = Field(..., min_length=1)
|
|
is_internal: bool = False
|
|
|
|
|
|
class TicketResponse(BaseModel):
|
|
"""Ticket response model"""
|
|
id: int
|
|
ticket_id: int
|
|
message: str
|
|
is_internal: bool
|
|
author_name: Optional[str]
|
|
author_email: Optional[str]
|
|
created_at: datetime
|
|
|
|
class Config:
|
|
from_attributes = True
|
|
|
|
|
|
class TicketDetail(BaseModel):
|
|
"""Detailed ticket information"""
|
|
id: int
|
|
ticket_number: str
|
|
subject: str
|
|
description: str
|
|
category: TicketCategory
|
|
priority: TicketPriority
|
|
status: TicketStatus
|
|
contact_name: str
|
|
contact_email: str
|
|
current_page: Optional[str]
|
|
browser_info: Optional[str]
|
|
ip_address: Optional[str]
|
|
created_at: datetime
|
|
updated_at: Optional[datetime]
|
|
resolved_at: Optional[datetime]
|
|
assigned_to: Optional[int]
|
|
assigned_admin_name: Optional[str]
|
|
submitter_name: Optional[str]
|
|
responses: List[TicketResponse] = []
|
|
|
|
class Config:
|
|
from_attributes = True
|
|
|
|
|
|
def generate_ticket_number() -> str:
|
|
"""Generate unique ticket number like ST-2024-001"""
|
|
year = datetime.now().year
|
|
random_suffix = secrets.token_hex(2).upper()
|
|
return f"ST-{year}-{random_suffix}"
|
|
|
|
|
|
@router.post("/tickets", response_model=dict)
|
|
async def create_support_ticket(
|
|
ticket_data: TicketCreate,
|
|
request: Request,
|
|
db: Session = Depends(get_db),
|
|
current_user: Optional[User] = Depends(lambda: None) # Allow anonymous submissions
|
|
):
|
|
"""Create new support ticket (public endpoint)"""
|
|
|
|
# current_user is already set from dependency injection above
|
|
# No need to call get_current_user again
|
|
|
|
# Generate unique ticket number
|
|
ticket_number = generate_ticket_number()
|
|
while db.query(SupportTicket).filter(SupportTicket.ticket_number == ticket_number).first():
|
|
ticket_number = generate_ticket_number()
|
|
|
|
# Get client info from request
|
|
client_ip = request.client.host
|
|
user_agent = request.headers.get("User-Agent", "")
|
|
|
|
# Create ticket
|
|
new_ticket = SupportTicket(
|
|
ticket_number=ticket_number,
|
|
subject=ticket_data.subject,
|
|
description=ticket_data.description,
|
|
category=ticket_data.category,
|
|
priority=ticket_data.priority,
|
|
contact_name=ticket_data.contact_name,
|
|
contact_email=ticket_data.contact_email,
|
|
current_page=ticket_data.current_page,
|
|
browser_info=ticket_data.browser_info or user_agent,
|
|
ip_address=client_ip,
|
|
user_id=current_user.id if current_user else None,
|
|
status=TicketStatus.OPEN,
|
|
created_at=datetime.utcnow()
|
|
)
|
|
|
|
db.add(new_ticket)
|
|
db.commit()
|
|
db.refresh(new_ticket)
|
|
|
|
return {
|
|
"message": "Support ticket created successfully",
|
|
"ticket_number": new_ticket.ticket_number,
|
|
"ticket_id": new_ticket.id,
|
|
"status": "created"
|
|
}
|
|
|
|
|
|
@router.get("/tickets", response_model=List[TicketDetail])
|
|
async def list_tickets(
|
|
status: Optional[TicketStatus] = None,
|
|
priority: Optional[TicketPriority] = None,
|
|
category: Optional[TicketCategory] = None,
|
|
assigned_to_me: bool = False,
|
|
skip: int = 0,
|
|
limit: int = 50,
|
|
db: Session = Depends(get_db),
|
|
current_user: User = Depends(get_admin_user)
|
|
):
|
|
"""List support tickets (admin only)"""
|
|
|
|
query = db.query(SupportTicket).options(
|
|
joinedload(SupportTicket.submitter),
|
|
joinedload(SupportTicket.assigned_admin),
|
|
joinedload(SupportTicket.responses)
|
|
)
|
|
|
|
# Apply filters
|
|
if status:
|
|
query = query.filter(SupportTicket.status == status)
|
|
if priority:
|
|
query = query.filter(SupportTicket.priority == priority)
|
|
if category:
|
|
query = query.filter(SupportTicket.category == category)
|
|
if assigned_to_me:
|
|
query = query.filter(SupportTicket.assigned_to == current_user.id)
|
|
|
|
tickets = query.order_by(desc(SupportTicket.created_at)).offset(skip).limit(limit).all()
|
|
|
|
# Format response
|
|
result = []
|
|
for ticket in tickets:
|
|
ticket_dict = {
|
|
"id": ticket.id,
|
|
"ticket_number": ticket.ticket_number,
|
|
"subject": ticket.subject,
|
|
"description": ticket.description,
|
|
"category": ticket.category,
|
|
"priority": ticket.priority,
|
|
"status": ticket.status,
|
|
"contact_name": ticket.contact_name,
|
|
"contact_email": ticket.contact_email,
|
|
"current_page": ticket.current_page,
|
|
"browser_info": ticket.browser_info,
|
|
"ip_address": ticket.ip_address,
|
|
"created_at": ticket.created_at,
|
|
"updated_at": ticket.updated_at,
|
|
"resolved_at": ticket.resolved_at,
|
|
"assigned_to": ticket.assigned_to,
|
|
"assigned_admin_name": f"{ticket.assigned_admin.first_name} {ticket.assigned_admin.last_name}" if ticket.assigned_admin else None,
|
|
"submitter_name": f"{ticket.submitter.first_name} {ticket.submitter.last_name}" if ticket.submitter else ticket.contact_name,
|
|
"responses": [
|
|
{
|
|
"id": resp.id,
|
|
"ticket_id": resp.ticket_id,
|
|
"message": resp.message,
|
|
"is_internal": resp.is_internal,
|
|
"author_name": f"{resp.author.first_name} {resp.author.last_name}" if resp.author else resp.author_name,
|
|
"author_email": resp.author.email if resp.author else resp.author_email,
|
|
"created_at": resp.created_at
|
|
}
|
|
for resp in ticket.responses if not resp.is_internal # Only show public responses
|
|
]
|
|
}
|
|
result.append(ticket_dict)
|
|
|
|
return result
|
|
|
|
|
|
@router.get("/tickets/{ticket_id}", response_model=TicketDetail)
|
|
async def get_ticket(
|
|
ticket_id: int,
|
|
db: Session = Depends(get_db),
|
|
current_user: User = Depends(get_admin_user)
|
|
):
|
|
"""Get ticket details (admin only)"""
|
|
|
|
ticket = db.query(SupportTicket).options(
|
|
joinedload(SupportTicket.submitter),
|
|
joinedload(SupportTicket.assigned_admin),
|
|
joinedload(SupportTicket.responses).joinedload(TicketResponse.author)
|
|
).filter(SupportTicket.id == ticket_id).first()
|
|
|
|
if not ticket:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail="Ticket not found"
|
|
)
|
|
|
|
# Format response (include internal responses for admins)
|
|
return {
|
|
"id": ticket.id,
|
|
"ticket_number": ticket.ticket_number,
|
|
"subject": ticket.subject,
|
|
"description": ticket.description,
|
|
"category": ticket.category,
|
|
"priority": ticket.priority,
|
|
"status": ticket.status,
|
|
"contact_name": ticket.contact_name,
|
|
"contact_email": ticket.contact_email,
|
|
"current_page": ticket.current_page,
|
|
"browser_info": ticket.browser_info,
|
|
"ip_address": ticket.ip_address,
|
|
"created_at": ticket.created_at,
|
|
"updated_at": ticket.updated_at,
|
|
"resolved_at": ticket.resolved_at,
|
|
"assigned_to": ticket.assigned_to,
|
|
"assigned_admin_name": f"{ticket.assigned_admin.first_name} {ticket.assigned_admin.last_name}" if ticket.assigned_admin else None,
|
|
"submitter_name": f"{ticket.submitter.first_name} {ticket.submitter.last_name}" if ticket.submitter else ticket.contact_name,
|
|
"responses": [
|
|
{
|
|
"id": resp.id,
|
|
"ticket_id": resp.ticket_id,
|
|
"message": resp.message,
|
|
"is_internal": resp.is_internal,
|
|
"author_name": f"{resp.author.first_name} {resp.author.last_name}" if resp.author else resp.author_name,
|
|
"author_email": resp.author.email if resp.author else resp.author_email,
|
|
"created_at": resp.created_at
|
|
}
|
|
for resp in ticket.responses
|
|
]
|
|
}
|
|
|
|
|
|
@router.put("/tickets/{ticket_id}")
|
|
async def update_ticket(
|
|
ticket_id: int,
|
|
ticket_data: TicketUpdate,
|
|
request: Request,
|
|
db: Session = Depends(get_db),
|
|
current_user: User = Depends(get_admin_user)
|
|
):
|
|
"""Update ticket (admin only)"""
|
|
|
|
ticket = db.query(SupportTicket).filter(SupportTicket.id == ticket_id).first()
|
|
if not ticket:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail="Ticket not found"
|
|
)
|
|
|
|
# Track changes for audit
|
|
changes = {}
|
|
update_data = ticket_data.model_dump(exclude_unset=True)
|
|
|
|
for field, value in update_data.items():
|
|
if hasattr(ticket, field) and getattr(ticket, field) != value:
|
|
changes[field] = {"from": getattr(ticket, field), "to": value}
|
|
setattr(ticket, field, value)
|
|
|
|
# Set resolved timestamp if status changed to resolved
|
|
if ticket_data.status == TicketStatus.RESOLVED and ticket.resolved_at is None:
|
|
ticket.resolved_at = datetime.utcnow()
|
|
changes["resolved_at"] = {"from": None, "to": ticket.resolved_at}
|
|
|
|
ticket.updated_at = datetime.utcnow()
|
|
db.commit()
|
|
|
|
# Log the update (audit logging can be added later)
|
|
# TODO: Add audit logging for ticket updates
|
|
|
|
return {"message": "Ticket updated successfully"}
|
|
|
|
|
|
@router.post("/tickets/{ticket_id}/responses")
|
|
async def add_response(
|
|
ticket_id: int,
|
|
response_data: ResponseCreate,
|
|
request: Request,
|
|
db: Session = Depends(get_db),
|
|
current_user: User = Depends(get_admin_user)
|
|
):
|
|
"""Add response to ticket (admin only)"""
|
|
|
|
ticket = db.query(SupportTicket).filter(SupportTicket.id == ticket_id).first()
|
|
if not ticket:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail="Ticket not found"
|
|
)
|
|
|
|
# Create response
|
|
response = TicketResponse(
|
|
ticket_id=ticket_id,
|
|
message=response_data.message,
|
|
is_internal=response_data.is_internal,
|
|
user_id=current_user.id,
|
|
created_at=datetime.utcnow()
|
|
)
|
|
|
|
db.add(response)
|
|
|
|
# Update ticket timestamp
|
|
ticket.updated_at = datetime.utcnow()
|
|
|
|
db.commit()
|
|
db.refresh(response)
|
|
|
|
# Log the response (audit logging can be added later)
|
|
# TODO: Add audit logging for ticket responses
|
|
|
|
return {"message": "Response added successfully", "response_id": response.id}
|
|
|
|
|
|
@router.get("/my-tickets", response_model=List[TicketDetail])
|
|
async def get_my_tickets(
|
|
status: Optional[TicketStatus] = None,
|
|
skip: int = 0,
|
|
limit: int = 20,
|
|
db: Session = Depends(get_db),
|
|
current_user: User = Depends(get_current_user)
|
|
):
|
|
"""Get current user's tickets"""
|
|
|
|
query = db.query(SupportTicket).options(
|
|
joinedload(SupportTicket.responses)
|
|
).filter(SupportTicket.user_id == current_user.id)
|
|
|
|
if status:
|
|
query = query.filter(SupportTicket.status == status)
|
|
|
|
tickets = query.order_by(desc(SupportTicket.created_at)).offset(skip).limit(limit).all()
|
|
|
|
# Format response (exclude internal responses for regular users)
|
|
result = []
|
|
for ticket in tickets:
|
|
ticket_dict = {
|
|
"id": ticket.id,
|
|
"ticket_number": ticket.ticket_number,
|
|
"subject": ticket.subject,
|
|
"description": ticket.description,
|
|
"category": ticket.category,
|
|
"priority": ticket.priority,
|
|
"status": ticket.status,
|
|
"contact_name": ticket.contact_name,
|
|
"contact_email": ticket.contact_email,
|
|
"current_page": ticket.current_page,
|
|
"browser_info": None, # Don't expose to user
|
|
"ip_address": None, # Don't expose to user
|
|
"created_at": ticket.created_at,
|
|
"updated_at": ticket.updated_at,
|
|
"resolved_at": ticket.resolved_at,
|
|
"assigned_to": None, # Don't expose to user
|
|
"assigned_admin_name": None,
|
|
"submitter_name": f"{current_user.first_name} {current_user.last_name}",
|
|
"responses": [
|
|
{
|
|
"id": resp.id,
|
|
"ticket_id": resp.ticket_id,
|
|
"message": resp.message,
|
|
"is_internal": False, # Always show as public to user
|
|
"author_name": f"{resp.author.first_name} {resp.author.last_name}" if resp.author else "Support Team",
|
|
"author_email": None, # Don't expose admin emails
|
|
"created_at": resp.created_at
|
|
}
|
|
for resp in ticket.responses if not resp.is_internal # Only show public responses
|
|
]
|
|
}
|
|
result.append(ticket_dict)
|
|
|
|
return result
|
|
|
|
|
|
@router.get("/stats")
|
|
async def get_ticket_stats(
|
|
db: Session = Depends(get_db),
|
|
current_user: User = Depends(get_admin_user)
|
|
):
|
|
"""Get support ticket statistics (admin only)"""
|
|
|
|
total_tickets = db.query(func.count(SupportTicket.id)).scalar()
|
|
open_tickets = db.query(func.count(SupportTicket.id)).filter(
|
|
SupportTicket.status == TicketStatus.OPEN
|
|
).scalar()
|
|
in_progress_tickets = db.query(func.count(SupportTicket.id)).filter(
|
|
SupportTicket.status == TicketStatus.IN_PROGRESS
|
|
).scalar()
|
|
resolved_tickets = db.query(func.count(SupportTicket.id)).filter(
|
|
SupportTicket.status == TicketStatus.RESOLVED
|
|
).scalar()
|
|
|
|
# Tickets by priority
|
|
high_priority = db.query(func.count(SupportTicket.id)).filter(
|
|
and_(SupportTicket.priority == TicketPriority.HIGH, SupportTicket.status != TicketStatus.RESOLVED)
|
|
).scalar()
|
|
urgent_tickets = db.query(func.count(SupportTicket.id)).filter(
|
|
and_(SupportTicket.priority == TicketPriority.URGENT, SupportTicket.status != TicketStatus.RESOLVED)
|
|
).scalar()
|
|
|
|
# Recent tickets (last 7 days)
|
|
from datetime import timedelta
|
|
week_ago = datetime.utcnow() - timedelta(days=7)
|
|
recent_tickets = db.query(func.count(SupportTicket.id)).filter(
|
|
SupportTicket.created_at >= week_ago
|
|
).scalar()
|
|
|
|
return {
|
|
"total_tickets": total_tickets,
|
|
"open_tickets": open_tickets,
|
|
"in_progress_tickets": in_progress_tickets,
|
|
"resolved_tickets": resolved_tickets,
|
|
"high_priority_tickets": high_priority,
|
|
"urgent_tickets": urgent_tickets,
|
|
"recent_tickets": recent_tickets
|
|
} |