Files
delphi-database/app/api/templates.py

490 lines
17 KiB
Python

"""
Document Template API (MVP)
Endpoints:
- POST /api/templates/upload
- GET /api/templates/search
- GET /api/templates/{id}
- POST /api/templates/{id}/versions
- GET /api/templates/{id}/versions
- POST /api/templates/{id}/preview
"""
from __future__ import annotations
from typing import List, Optional, Dict, Any, Union
from fastapi import APIRouter, Depends, HTTPException, status, UploadFile, File, Form, Query
from sqlalchemy.orm import Session
from sqlalchemy import func, or_, exists
import hashlib
from app.database.base import get_db
from app.auth.security import get_current_user
from app.models.user import User
from app.models.templates import DocumentTemplate, DocumentTemplateVersion, TemplateKeyword
from app.services.storage import get_default_storage
from app.services.template_merge import extract_tokens_from_bytes, build_context, resolve_tokens, render_docx
from app.services.query_utils import paginate_with_total
router = APIRouter()
from pydantic import BaseModel, Field
class TemplateResponse(BaseModel):
id: int
name: str
description: Optional[str] = None
category: Optional[str] = None
active: bool
current_version_id: Optional[int] = None
class VersionResponse(BaseModel):
id: int
template_id: int
semantic_version: str
mime_type: str
size: int
checksum: str
changelog: Optional[str] = None
is_approved: bool
class SearchResponseItem(BaseModel):
id: int
name: str
category: Optional[str] = None
active: bool
latest_version: Optional[str] = None
class KeywordsRequest(BaseModel):
keywords: List[str]
class KeywordsResponse(BaseModel):
keywords: List[str]
class PreviewRequest(BaseModel):
context: Dict[str, Any] = Field(default_factory=dict)
version_id: Optional[int] = None
class PreviewResponse(BaseModel):
resolved: Dict[str, Any]
unresolved: List[str]
output_mime_type: str
output_size: int
class CategoryCount(BaseModel):
category: Optional[str] = None
count: int
class PaginatedSearchResponse(BaseModel):
items: List[SearchResponseItem]
total: int
class PaginatedCategoriesResponse(BaseModel):
items: List[CategoryCount]
total: int
@router.post("/upload", response_model=TemplateResponse)
async def upload_template(
name: str = Form(...),
category: Optional[str] = Form("GENERAL"),
description: Optional[str] = Form(None),
semantic_version: str = Form("1.0.0"),
file: UploadFile = File(...),
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
if file.content_type not in {"application/vnd.openxmlformats-officedocument.wordprocessingml.document", "application/pdf"}:
raise HTTPException(status_code=400, detail="Only .docx or .pdf templates are supported")
content = await file.read()
if not content:
raise HTTPException(status_code=400, detail="No file uploaded")
sha256 = hashlib.sha256(content).hexdigest()
storage = get_default_storage()
storage_path = storage.save_bytes(content=content, filename_hint=file.filename or "template.bin", subdir="templates")
template = DocumentTemplate(name=name, description=description, category=category, active=True, created_by=getattr(current_user, "username", None))
db.add(template)
db.flush() # get id
version = DocumentTemplateVersion(
template_id=template.id,
semantic_version=semantic_version,
storage_path=storage_path,
mime_type=file.content_type,
size=len(content),
checksum=sha256,
changelog=None,
created_by=getattr(current_user, "username", None),
is_approved=True,
)
db.add(version)
db.flush()
template.current_version_id = version.id
db.commit()
db.refresh(template)
return TemplateResponse(
id=template.id,
name=template.name,
description=template.description,
category=template.category,
active=template.active,
current_version_id=template.current_version_id,
)
@router.get("/search", response_model=Union[List[SearchResponseItem], PaginatedSearchResponse])
async def search_templates(
q: Optional[str] = None,
category: Optional[List[str]] = Query(
None,
description=(
"Filter by category. Repeat the parameter (e.g., ?category=A&category=B) "
"or pass a comma-separated list (e.g., ?category=A,B)."
),
),
keywords: Optional[List[str]] = Query(None),
keywords_mode: str = Query("any", description="Keyword match mode: any|all (default any)"),
has_keywords: Optional[bool] = Query(
None,
description=(
"When true, only templates that have one or more keywords are returned; "
"when false, only templates with no keywords are returned."
),
),
skip: int = Query(0, ge=0),
limit: int = Query(50, ge=1, le=200),
sort_by: Optional[str] = Query("name", description="Sort by: name | category | updated"),
sort_dir: Optional[str] = Query("asc", description="Sort direction: asc or desc"),
active_only: bool = Query(True, description="When true (default), only active templates are returned"),
include_total: bool = Query(False, description="When true, returns {items, total} instead of a plain list"),
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
query = db.query(DocumentTemplate)
if active_only:
query = query.filter(DocumentTemplate.active == True)
if q:
like = f"%{q}%"
query = query.filter(
or_(
DocumentTemplate.name.ilike(like),
DocumentTemplate.description.ilike(like),
)
)
# Category filtering (supports repeatable param and CSV within each value)
if category:
raw_values = category or []
categories: List[str] = []
for value in raw_values:
parts = [part.strip() for part in (value or "").split(",")]
for part in parts:
if part:
categories.append(part)
unique_categories = sorted(set(categories))
if unique_categories:
query = query.filter(DocumentTemplate.category.in_(unique_categories))
if keywords:
normalized = [kw.strip().lower() for kw in keywords if kw and kw.strip()]
unique_keywords = sorted(set(normalized))
if unique_keywords:
mode = (keywords_mode or "any").lower()
if mode not in ("any", "all"):
mode = "any"
query = query.join(TemplateKeyword, TemplateKeyword.template_id == DocumentTemplate.id)
if mode == "any":
query = query.filter(TemplateKeyword.keyword.in_(unique_keywords)).distinct()
else:
query = query.filter(TemplateKeyword.keyword.in_(unique_keywords))
query = query.group_by(DocumentTemplate.id)
query = query.having(func.count(func.distinct(TemplateKeyword.keyword)) == len(unique_keywords))
# Has keywords filter (independent of specific keyword matches)
if has_keywords is not None:
kw_exists = exists().where(TemplateKeyword.template_id == DocumentTemplate.id)
if has_keywords:
query = query.filter(kw_exists)
else:
query = query.filter(~kw_exists)
# Sorting
sort_key = (sort_by or "name").lower()
direction = (sort_dir or "asc").lower()
if sort_key not in ("name", "category", "updated"):
sort_key = "name"
if direction not in ("asc", "desc"):
direction = "asc"
if sort_key == "name":
order_col = DocumentTemplate.name
elif sort_key == "category":
order_col = DocumentTemplate.category
else: # updated
order_col = func.coalesce(DocumentTemplate.updated_at, DocumentTemplate.created_at)
if direction == "asc":
query = query.order_by(order_col.asc())
else:
query = query.order_by(order_col.desc())
# Pagination with optional total
templates, total = paginate_with_total(query, skip, limit, include_total)
items: List[SearchResponseItem] = []
for tpl in templates:
latest_version = None
if tpl.current_version_id:
ver = db.query(DocumentTemplateVersion).filter(DocumentTemplateVersion.id == tpl.current_version_id).first()
if ver:
latest_version = ver.semantic_version
items.append(
SearchResponseItem(
id=tpl.id,
name=tpl.name,
category=tpl.category,
active=tpl.active,
latest_version=latest_version,
)
)
if include_total:
return {"items": items, "total": int(total or 0)}
return items
@router.get("/categories", response_model=Union[List[CategoryCount], PaginatedCategoriesResponse])
async def list_template_categories(
active_only: bool = Query(True, description="When true (default), only active templates are counted"),
include_total: bool = Query(False, description="When true, returns {items, total} instead of a plain list"),
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
query = db.query(DocumentTemplate.category, func.count(DocumentTemplate.id).label("count"))
if active_only:
query = query.filter(DocumentTemplate.active == True)
rows = query.group_by(DocumentTemplate.category).order_by(DocumentTemplate.category.asc()).all()
items = [CategoryCount(category=row[0], count=row[1]) for row in rows]
if include_total:
return {"items": items, "total": len(items)}
return items
@router.get("/{template_id}", response_model=TemplateResponse)
async def get_template(
template_id: int,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
tpl = db.query(DocumentTemplate).filter(DocumentTemplate.id == template_id).first()
if not tpl:
raise HTTPException(status_code=404, detail="Template not found")
return TemplateResponse(
id=tpl.id,
name=tpl.name,
description=tpl.description,
category=tpl.category,
active=tpl.active,
current_version_id=tpl.current_version_id,
)
@router.get("/{template_id}/versions", response_model=List[VersionResponse])
async def list_versions(
template_id: int,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
versions = (
db.query(DocumentTemplateVersion)
.filter(DocumentTemplateVersion.template_id == template_id)
.order_by(DocumentTemplateVersion.created_at.desc())
.all()
)
return [
VersionResponse(
id=v.id,
template_id=v.template_id,
semantic_version=v.semantic_version,
mime_type=v.mime_type,
size=v.size,
checksum=v.checksum,
changelog=v.changelog,
is_approved=v.is_approved,
)
for v in versions
]
@router.post("/{template_id}/versions", response_model=VersionResponse)
async def add_version(
template_id: int,
semantic_version: str = Form("1.0.0"),
changelog: Optional[str] = Form(None),
approve: bool = Form(True),
file: UploadFile = File(...),
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
tpl = db.query(DocumentTemplate).filter(DocumentTemplate.id == template_id).first()
if not tpl:
raise HTTPException(status_code=404, detail="Template not found")
content = await file.read()
if not content:
raise HTTPException(status_code=400, detail="No file uploaded")
sha256 = hashlib.sha256(content).hexdigest()
storage = get_default_storage()
storage_path = storage.save_bytes(content=content, filename_hint=file.filename or "template.bin", subdir="templates")
version = DocumentTemplateVersion(
template_id=template_id,
semantic_version=semantic_version,
storage_path=storage_path,
mime_type=file.content_type,
size=len(content),
checksum=sha256,
changelog=changelog,
created_by=getattr(current_user, "username", None),
is_approved=bool(approve),
)
db.add(version)
db.flush()
if approve:
tpl.current_version_id = version.id
db.commit()
return VersionResponse(
id=version.id,
template_id=version.template_id,
semantic_version=version.semantic_version,
mime_type=version.mime_type,
size=version.size,
checksum=version.checksum,
changelog=version.changelog,
is_approved=version.is_approved,
)
@router.post("/{template_id}/preview", response_model=PreviewResponse)
async def preview_template(
template_id: int,
payload: PreviewRequest,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
tpl = db.query(DocumentTemplate).filter(DocumentTemplate.id == template_id).first()
if not tpl:
raise HTTPException(status_code=404, detail="Template not found")
version_id = payload.version_id or tpl.current_version_id
if not version_id:
raise HTTPException(status_code=400, detail="Template has no versions")
ver = db.query(DocumentTemplateVersion).filter(DocumentTemplateVersion.id == version_id).first()
if not ver:
raise HTTPException(status_code=404, detail="Version not found")
storage = get_default_storage()
content = storage.open_bytes(ver.storage_path)
tokens = extract_tokens_from_bytes(content)
context = build_context(payload.context or {})
resolved, unresolved = resolve_tokens(db, tokens, context)
output_bytes = content
output_mime = ver.mime_type
if ver.mime_type == "application/vnd.openxmlformats-officedocument.wordprocessingml.document":
output_bytes = render_docx(content, resolved)
output_mime = ver.mime_type
# We don't store preview output; just return metadata and resolution state
return PreviewResponse(
resolved=resolved,
unresolved=unresolved,
output_mime_type=output_mime,
output_size=len(output_bytes),
)
@router.get("/{template_id}/keywords", response_model=KeywordsResponse)
async def list_keywords(
template_id: int,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
tpl = db.query(DocumentTemplate).filter(DocumentTemplate.id == template_id).first()
if not tpl:
raise HTTPException(status_code=404, detail="Template not found")
kws = (
db.query(TemplateKeyword)
.filter(TemplateKeyword.template_id == template_id)
.order_by(TemplateKeyword.keyword.asc())
.all()
)
return KeywordsResponse(keywords=[k.keyword for k in kws])
@router.post("/{template_id}/keywords", response_model=KeywordsResponse)
async def add_keywords(
template_id: int,
payload: KeywordsRequest,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
tpl = db.query(DocumentTemplate).filter(DocumentTemplate.id == template_id).first()
if not tpl:
raise HTTPException(status_code=404, detail="Template not found")
to_add = []
for kw in (payload.keywords or []):
normalized = (kw or "").strip().lower()
if not normalized:
continue
exists = (
db.query(TemplateKeyword)
.filter(TemplateKeyword.template_id == template_id, TemplateKeyword.keyword == normalized)
.first()
)
if not exists:
to_add.append(TemplateKeyword(template_id=template_id, keyword=normalized))
if to_add:
db.add_all(to_add)
db.commit()
kws = (
db.query(TemplateKeyword)
.filter(TemplateKeyword.template_id == template_id)
.order_by(TemplateKeyword.keyword.asc())
.all()
)
return KeywordsResponse(keywords=[k.keyword for k in kws])
@router.delete("/{template_id}/keywords/{keyword}", response_model=KeywordsResponse)
async def remove_keyword(
template_id: int,
keyword: str,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
tpl = db.query(DocumentTemplate).filter(DocumentTemplate.id == template_id).first()
if not tpl:
raise HTTPException(status_code=404, detail="Template not found")
normalized = (keyword or "").strip().lower()
if normalized:
db.query(TemplateKeyword).filter(
TemplateKeyword.template_id == template_id,
TemplateKeyword.keyword == normalized,
).delete(synchronize_session=False)
db.commit()
kws = (
db.query(TemplateKeyword)
.filter(TemplateKeyword.template_id == template_id)
.order_by(TemplateKeyword.keyword.asc())
.all()
)
return KeywordsResponse(keywords=[k.keyword for k in kws])