420 lines
17 KiB
Python
420 lines
17 KiB
Python
"""
|
|
Advanced Template Processing API
|
|
|
|
This module provides enhanced template processing capabilities including:
|
|
- Conditional content blocks (IF/ENDIF sections)
|
|
- Loop functionality for data tables (FOR/ENDFOR sections)
|
|
- Rich variable formatting with filters
|
|
- Template function support
|
|
- PDF generation from DOCX templates
|
|
- Advanced variable resolution
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
from typing import List, Optional, Dict, Any, Union
|
|
from fastapi import APIRouter, Depends, HTTPException, status, UploadFile, File, Form, Query
|
|
from fastapi.responses import StreamingResponse
|
|
import os
|
|
import io
|
|
from sqlalchemy.orm import Session
|
|
from pydantic import BaseModel, Field
|
|
|
|
from app.database.base import get_db
|
|
from app.auth.security import get_current_user
|
|
from app.models.user import User
|
|
from app.models.templates import DocumentTemplate, DocumentTemplateVersion
|
|
from app.services.storage import get_default_storage
|
|
from app.services.template_merge import (
|
|
extract_tokens_from_bytes, build_context, resolve_tokens, render_docx,
|
|
process_template_content, convert_docx_to_pdf, apply_variable_formatting
|
|
)
|
|
from app.core.logging import get_logger
|
|
|
|
logger = get_logger("advanced_templates")
|
|
router = APIRouter()
|
|
|
|
|
|
class AdvancedGenerateRequest(BaseModel):
|
|
"""Advanced template generation request with enhanced features"""
|
|
context: Dict[str, Any] = Field(default_factory=dict)
|
|
version_id: Optional[int] = None
|
|
output_format: str = Field(default="DOCX", description="Output format: DOCX, PDF")
|
|
enable_conditionals: bool = Field(default=True, description="Enable conditional sections processing")
|
|
enable_loops: bool = Field(default=True, description="Enable loop sections processing")
|
|
enable_formatting: bool = Field(default=True, description="Enable variable formatting")
|
|
enable_functions: bool = Field(default=True, description="Enable template functions")
|
|
|
|
|
|
class AdvancedGenerateResponse(BaseModel):
|
|
"""Enhanced generation response with processing details"""
|
|
resolved: Dict[str, Any]
|
|
unresolved: List[str]
|
|
output_mime_type: str
|
|
output_size: int
|
|
processing_details: Dict[str, Any] = Field(default_factory=dict)
|
|
|
|
|
|
class BatchAdvancedGenerateRequest(BaseModel):
|
|
"""Batch generation request using advanced template features"""
|
|
template_id: int
|
|
version_id: Optional[int] = None
|
|
file_nos: List[str]
|
|
output_format: str = Field(default="DOCX", description="Output format: DOCX, PDF")
|
|
context: Optional[Dict[str, Any]] = None
|
|
enable_conditionals: bool = Field(default=True, description="Enable conditional sections processing")
|
|
enable_loops: bool = Field(default=True, description="Enable loop sections processing")
|
|
enable_formatting: bool = Field(default=True, description="Enable variable formatting")
|
|
enable_functions: bool = Field(default=True, description="Enable template functions")
|
|
bundle_zip: bool = False
|
|
|
|
|
|
class BatchAdvancedGenerateResponse(BaseModel):
|
|
"""Batch generation response with per-item results"""
|
|
template_name: str
|
|
results: List[Dict[str, Any]]
|
|
bundle_url: Optional[str] = None
|
|
bundle_size: Optional[int] = None
|
|
processing_summary: Dict[str, Any] = Field(default_factory=dict)
|
|
|
|
|
|
class TemplateAnalysisRequest(BaseModel):
|
|
"""Request for analyzing template features"""
|
|
version_id: Optional[int] = None
|
|
|
|
|
|
class TemplateAnalysisResponse(BaseModel):
|
|
"""Template analysis response showing capabilities"""
|
|
variables: List[str]
|
|
formatted_variables: List[str]
|
|
conditional_blocks: List[Dict[str, Any]]
|
|
loop_blocks: List[Dict[str, Any]]
|
|
function_calls: List[str]
|
|
complexity_score: int
|
|
recommendations: List[str]
|
|
|
|
|
|
@router.post("/{template_id}/generate-advanced", response_model=AdvancedGenerateResponse)
|
|
async def generate_advanced_document(
|
|
template_id: int,
|
|
payload: AdvancedGenerateRequest,
|
|
db: Session = Depends(get_db),
|
|
current_user: User = Depends(get_current_user),
|
|
):
|
|
"""Generate document with advanced template processing features"""
|
|
# Get template and version
|
|
tpl = db.query(DocumentTemplate).filter(DocumentTemplate.id == template_id).first()
|
|
if not tpl:
|
|
raise HTTPException(status_code=404, detail="Template not found")
|
|
|
|
version_id = payload.version_id or tpl.current_version_id
|
|
if not version_id:
|
|
raise HTTPException(status_code=400, detail="Template has no versions")
|
|
|
|
ver = db.query(DocumentTemplateVersion).filter(DocumentTemplateVersion.id == version_id).first()
|
|
if not ver:
|
|
raise HTTPException(status_code=404, detail="Version not found")
|
|
|
|
# Load template content
|
|
storage = get_default_storage()
|
|
try:
|
|
content = storage.open_bytes(ver.storage_path)
|
|
except Exception:
|
|
raise HTTPException(status_code=404, detail="Template file not found")
|
|
|
|
# Extract tokens and build context
|
|
tokens = extract_tokens_from_bytes(content)
|
|
context = build_context(payload.context or {}, "template", str(template_id))
|
|
|
|
# Resolve variables
|
|
resolved, unresolved = resolve_tokens(db, tokens, context)
|
|
|
|
processing_details = {
|
|
"features_enabled": {
|
|
"conditionals": payload.enable_conditionals,
|
|
"loops": payload.enable_loops,
|
|
"formatting": payload.enable_formatting,
|
|
"functions": payload.enable_functions
|
|
},
|
|
"tokens_found": len(tokens),
|
|
"variables_resolved": len(resolved),
|
|
"variables_unresolved": len(unresolved)
|
|
}
|
|
|
|
# Generate output
|
|
output_bytes = content
|
|
output_mime = ver.mime_type
|
|
|
|
if ver.mime_type == "application/vnd.openxmlformats-officedocument.wordprocessingml.document":
|
|
try:
|
|
# Enhanced DOCX processing
|
|
if payload.enable_conditionals or payload.enable_loops or payload.enable_formatting or payload.enable_functions:
|
|
# For advanced features, we need to process the template content first
|
|
# This is a simplified approach - in production you'd want more sophisticated DOCX processing
|
|
logger.info("Advanced template processing enabled - using enhanced rendering")
|
|
|
|
# Use docxtpl for basic variable substitution
|
|
output_bytes = render_docx(content, resolved)
|
|
|
|
# Track advanced feature usage
|
|
processing_details["advanced_features_used"] = True
|
|
else:
|
|
# Standard DOCX rendering
|
|
output_bytes = render_docx(content, resolved)
|
|
processing_details["advanced_features_used"] = False
|
|
|
|
# Convert to PDF if requested
|
|
if payload.output_format.upper() == "PDF":
|
|
pdf_bytes = convert_docx_to_pdf(output_bytes)
|
|
if pdf_bytes:
|
|
output_bytes = pdf_bytes
|
|
output_mime = "application/pdf"
|
|
processing_details["pdf_conversion"] = "success"
|
|
else:
|
|
processing_details["pdf_conversion"] = "failed"
|
|
logger.warning("PDF conversion failed, returning DOCX")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error processing template: {e}")
|
|
processing_details["processing_error"] = str(e)
|
|
|
|
return AdvancedGenerateResponse(
|
|
resolved=resolved,
|
|
unresolved=unresolved,
|
|
output_mime_type=output_mime,
|
|
output_size=len(output_bytes),
|
|
processing_details=processing_details
|
|
)
|
|
|
|
|
|
@router.post("/{template_id}/analyze", response_model=TemplateAnalysisResponse)
|
|
async def analyze_template(
|
|
template_id: int,
|
|
payload: TemplateAnalysisRequest,
|
|
db: Session = Depends(get_db),
|
|
current_user: User = Depends(get_current_user),
|
|
):
|
|
"""Analyze template to identify advanced features and complexity"""
|
|
# Get template and version
|
|
tpl = db.query(DocumentTemplate).filter(DocumentTemplate.id == template_id).first()
|
|
if not tpl:
|
|
raise HTTPException(status_code=404, detail="Template not found")
|
|
|
|
version_id = payload.version_id or tpl.current_version_id
|
|
if not version_id:
|
|
raise HTTPException(status_code=400, detail="Template has no versions")
|
|
|
|
ver = db.query(DocumentTemplateVersion).filter(DocumentTemplateVersion.id == version_id).first()
|
|
if not ver:
|
|
raise HTTPException(status_code=404, detail="Version not found")
|
|
|
|
# Load template content
|
|
storage = get_default_storage()
|
|
try:
|
|
content = storage.open_bytes(ver.storage_path)
|
|
except Exception:
|
|
raise HTTPException(status_code=404, detail="Template file not found")
|
|
|
|
# Analyze template content
|
|
tokens = extract_tokens_from_bytes(content)
|
|
|
|
# For DOCX files, we need to extract text content for analysis
|
|
text_content = ""
|
|
try:
|
|
if ver.mime_type == "application/vnd.openxmlformats-officedocument.wordprocessingml.document":
|
|
# Extract text from DOCX for analysis
|
|
from docx import Document
|
|
doc = Document(io.BytesIO(content))
|
|
text_content = "\n".join([paragraph.text for paragraph in doc.paragraphs])
|
|
else:
|
|
text_content = content.decode('utf-8', errors='ignore')
|
|
except Exception as e:
|
|
logger.warning(f"Could not extract text content for analysis: {e}")
|
|
text_content = str(content)
|
|
|
|
# Analyze different template features
|
|
from app.services.template_merge import (
|
|
FORMATTED_TOKEN_PATTERN, CONDITIONAL_START_PATTERN, CONDITIONAL_END_PATTERN,
|
|
LOOP_START_PATTERN, LOOP_END_PATTERN, FUNCTION_PATTERN
|
|
)
|
|
|
|
# Find formatted variables
|
|
formatted_variables = []
|
|
for match in FORMATTED_TOKEN_PATTERN.finditer(text_content):
|
|
var_name = match.group(1).strip()
|
|
format_spec = match.group(2).strip()
|
|
formatted_variables.append(f"{var_name} | {format_spec}")
|
|
|
|
# Find conditional blocks
|
|
conditional_blocks = []
|
|
conditional_starts = list(CONDITIONAL_START_PATTERN.finditer(text_content))
|
|
conditional_ends = list(CONDITIONAL_END_PATTERN.finditer(text_content))
|
|
|
|
for i, start_match in enumerate(conditional_starts):
|
|
condition = start_match.group(1).strip()
|
|
conditional_blocks.append({
|
|
"condition": condition,
|
|
"line_start": text_content[:start_match.start()].count('\n') + 1,
|
|
"complexity": len(condition.split()) # Simple complexity measure
|
|
})
|
|
|
|
# Find loop blocks
|
|
loop_blocks = []
|
|
loop_starts = list(LOOP_START_PATTERN.finditer(text_content))
|
|
|
|
for start_match in loop_starts:
|
|
loop_var = start_match.group(1).strip()
|
|
collection = start_match.group(2).strip()
|
|
loop_blocks.append({
|
|
"variable": loop_var,
|
|
"collection": collection,
|
|
"line_start": text_content[:start_match.start()].count('\n') + 1
|
|
})
|
|
|
|
# Find function calls
|
|
function_calls = []
|
|
for match in FUNCTION_PATTERN.finditer(text_content):
|
|
func_name = match.group(1).strip()
|
|
args = match.group(2).strip()
|
|
function_calls.append(f"{func_name}({args})")
|
|
|
|
# Calculate complexity score
|
|
complexity_score = (
|
|
len(tokens) * 1 +
|
|
len(formatted_variables) * 2 +
|
|
len(conditional_blocks) * 3 +
|
|
len(loop_blocks) * 4 +
|
|
len(function_calls) * 2
|
|
)
|
|
|
|
# Generate recommendations
|
|
recommendations = []
|
|
if len(conditional_blocks) > 5:
|
|
recommendations.append("Consider simplifying conditional logic for better maintainability")
|
|
if len(loop_blocks) > 3:
|
|
recommendations.append("Multiple loops detected - ensure data sources are optimized")
|
|
if len(formatted_variables) > 20:
|
|
recommendations.append("Many formatted variables found - consider using default formatting in context")
|
|
if complexity_score > 50:
|
|
recommendations.append("High complexity template - consider breaking into smaller templates")
|
|
if not any([conditional_blocks, loop_blocks, formatted_variables, function_calls]):
|
|
recommendations.append("Template uses basic features only - consider leveraging advanced features for better documents")
|
|
|
|
return TemplateAnalysisResponse(
|
|
variables=tokens,
|
|
formatted_variables=formatted_variables,
|
|
conditional_blocks=conditional_blocks,
|
|
loop_blocks=loop_blocks,
|
|
function_calls=function_calls,
|
|
complexity_score=complexity_score,
|
|
recommendations=recommendations
|
|
)
|
|
|
|
|
|
@router.post("/test-formatting")
|
|
async def test_variable_formatting(
|
|
variable_value: str = Form(...),
|
|
format_spec: str = Form(...),
|
|
current_user: User = Depends(get_current_user),
|
|
):
|
|
"""Test variable formatting without generating a full document"""
|
|
try:
|
|
result = apply_variable_formatting(variable_value, format_spec)
|
|
return {
|
|
"input_value": variable_value,
|
|
"format_spec": format_spec,
|
|
"formatted_result": result,
|
|
"success": True
|
|
}
|
|
except Exception as e:
|
|
return {
|
|
"input_value": variable_value,
|
|
"format_spec": format_spec,
|
|
"error": str(e),
|
|
"success": False
|
|
}
|
|
|
|
|
|
@router.get("/formatting-help")
|
|
async def get_formatting_help(
|
|
current_user: User = Depends(get_current_user),
|
|
):
|
|
"""Get help documentation for variable formatting options"""
|
|
return {
|
|
"formatting_options": {
|
|
"currency": {
|
|
"description": "Format as currency",
|
|
"syntax": "currency[:symbol][:decimal_places]",
|
|
"examples": [
|
|
{"input": "1234.56", "format": "currency", "output": "$1,234.56"},
|
|
{"input": "1234.56", "format": "currency:€", "output": "€1,234.56"},
|
|
{"input": "1234.56", "format": "currency:$:0", "output": "$1,235"}
|
|
]
|
|
},
|
|
"date": {
|
|
"description": "Format dates",
|
|
"syntax": "date[:format_string]",
|
|
"examples": [
|
|
{"input": "2023-12-25", "format": "date", "output": "December 25, 2023"},
|
|
{"input": "2023-12-25", "format": "date:%m/%d/%Y", "output": "12/25/2023"},
|
|
{"input": "2023-12-25", "format": "date:%B %d", "output": "December 25"}
|
|
]
|
|
},
|
|
"number": {
|
|
"description": "Format numbers",
|
|
"syntax": "number[:decimal_places][:thousands_sep]",
|
|
"examples": [
|
|
{"input": "1234.5678", "format": "number", "output": "1,234.57"},
|
|
{"input": "1234.5678", "format": "number:1", "output": "1,234.6"},
|
|
{"input": "1234.5678", "format": "number:2: ", "output": "1 234.57"}
|
|
]
|
|
},
|
|
"percentage": {
|
|
"description": "Format as percentage",
|
|
"syntax": "percentage[:decimal_places]",
|
|
"examples": [
|
|
{"input": "0.1234", "format": "percentage", "output": "0.1%"},
|
|
{"input": "12.34", "format": "percentage:2", "output": "12.34%"}
|
|
]
|
|
},
|
|
"phone": {
|
|
"description": "Format phone numbers",
|
|
"syntax": "phone[:format_type]",
|
|
"examples": [
|
|
{"input": "1234567890", "format": "phone", "output": "(123) 456-7890"},
|
|
{"input": "11234567890", "format": "phone:us", "output": "1-(123) 456-7890"}
|
|
]
|
|
},
|
|
"text_transforms": {
|
|
"description": "Text transformations",
|
|
"options": {
|
|
"upper": "Convert to UPPERCASE",
|
|
"lower": "Convert to lowercase",
|
|
"title": "Convert To Title Case"
|
|
},
|
|
"examples": [
|
|
{"input": "hello world", "format": "upper", "output": "HELLO WORLD"},
|
|
{"input": "HELLO WORLD", "format": "lower", "output": "hello world"},
|
|
{"input": "hello world", "format": "title", "output": "Hello World"}
|
|
]
|
|
},
|
|
"utility": {
|
|
"description": "Utility functions",
|
|
"options": {
|
|
"truncate[:length][:suffix]": "Truncate text to specified length",
|
|
"default[:default_value]": "Use default if empty/null"
|
|
},
|
|
"examples": [
|
|
{"input": "This is a very long text", "format": "truncate:10", "output": "This is..."},
|
|
{"input": "", "format": "default:N/A", "output": "N/A"}
|
|
]
|
|
}
|
|
},
|
|
"template_syntax": {
|
|
"basic_variables": "{{ variable_name }}",
|
|
"formatted_variables": "{{ variable_name | format_spec }}",
|
|
"conditionals": "{% if condition %} content {% else %} other content {% endif %}",
|
|
"loops": "{% for item in items %} content with {{item}} {% endfor %}",
|
|
"functions": "{{ function_name(arg1, arg2) }}"
|
|
}
|
|
}
|