""" Advanced Template Processing API This module provides enhanced template processing capabilities including: - Conditional content blocks (IF/ENDIF sections) - Loop functionality for data tables (FOR/ENDFOR sections) - Rich variable formatting with filters - Template function support - PDF generation from DOCX templates - Advanced variable resolution """ from __future__ import annotations from typing import List, Optional, Dict, Any, Union from fastapi import APIRouter, Depends, HTTPException, status, UploadFile, File, Form, Query from fastapi.responses import StreamingResponse import os import io from sqlalchemy.orm import Session from pydantic import BaseModel, Field from app.database.base import get_db from app.auth.security import get_current_user from app.models.user import User from app.models.templates import DocumentTemplate, DocumentTemplateVersion from app.services.storage import get_default_storage from app.services.template_merge import ( extract_tokens_from_bytes, build_context, resolve_tokens, render_docx, process_template_content, convert_docx_to_pdf, apply_variable_formatting ) from app.core.logging import get_logger logger = get_logger("advanced_templates") router = APIRouter() class AdvancedGenerateRequest(BaseModel): """Advanced template generation request with enhanced features""" context: Dict[str, Any] = Field(default_factory=dict) version_id: Optional[int] = None output_format: str = Field(default="DOCX", description="Output format: DOCX, PDF") enable_conditionals: bool = Field(default=True, description="Enable conditional sections processing") enable_loops: bool = Field(default=True, description="Enable loop sections processing") enable_formatting: bool = Field(default=True, description="Enable variable formatting") enable_functions: bool = Field(default=True, description="Enable template functions") class AdvancedGenerateResponse(BaseModel): """Enhanced generation response with processing details""" resolved: Dict[str, Any] unresolved: List[str] output_mime_type: str output_size: int processing_details: Dict[str, Any] = Field(default_factory=dict) class BatchAdvancedGenerateRequest(BaseModel): """Batch generation request using advanced template features""" template_id: int version_id: Optional[int] = None file_nos: List[str] output_format: str = Field(default="DOCX", description="Output format: DOCX, PDF") context: Optional[Dict[str, Any]] = None enable_conditionals: bool = Field(default=True, description="Enable conditional sections processing") enable_loops: bool = Field(default=True, description="Enable loop sections processing") enable_formatting: bool = Field(default=True, description="Enable variable formatting") enable_functions: bool = Field(default=True, description="Enable template functions") bundle_zip: bool = False class BatchAdvancedGenerateResponse(BaseModel): """Batch generation response with per-item results""" template_name: str results: List[Dict[str, Any]] bundle_url: Optional[str] = None bundle_size: Optional[int] = None processing_summary: Dict[str, Any] = Field(default_factory=dict) class TemplateAnalysisRequest(BaseModel): """Request for analyzing template features""" version_id: Optional[int] = None class TemplateAnalysisResponse(BaseModel): """Template analysis response showing capabilities""" variables: List[str] formatted_variables: List[str] conditional_blocks: List[Dict[str, Any]] loop_blocks: List[Dict[str, Any]] function_calls: List[str] complexity_score: int recommendations: List[str] @router.post("/{template_id}/generate-advanced", response_model=AdvancedGenerateResponse) async def generate_advanced_document( template_id: int, payload: AdvancedGenerateRequest, db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): """Generate document with advanced template processing features""" # Get template and version tpl = db.query(DocumentTemplate).filter(DocumentTemplate.id == template_id).first() if not tpl: raise HTTPException(status_code=404, detail="Template not found") version_id = payload.version_id or tpl.current_version_id if not version_id: raise HTTPException(status_code=400, detail="Template has no versions") ver = db.query(DocumentTemplateVersion).filter(DocumentTemplateVersion.id == version_id).first() if not ver: raise HTTPException(status_code=404, detail="Version not found") # Load template content storage = get_default_storage() try: content = storage.open_bytes(ver.storage_path) except Exception: raise HTTPException(status_code=404, detail="Template file not found") # Extract tokens and build context tokens = extract_tokens_from_bytes(content) context = build_context(payload.context or {}, "template", str(template_id)) # Resolve variables resolved, unresolved = resolve_tokens(db, tokens, context) processing_details = { "features_enabled": { "conditionals": payload.enable_conditionals, "loops": payload.enable_loops, "formatting": payload.enable_formatting, "functions": payload.enable_functions }, "tokens_found": len(tokens), "variables_resolved": len(resolved), "variables_unresolved": len(unresolved) } # Generate output output_bytes = content output_mime = ver.mime_type if ver.mime_type == "application/vnd.openxmlformats-officedocument.wordprocessingml.document": try: # Enhanced DOCX processing if payload.enable_conditionals or payload.enable_loops or payload.enable_formatting or payload.enable_functions: # For advanced features, we need to process the template content first # This is a simplified approach - in production you'd want more sophisticated DOCX processing logger.info("Advanced template processing enabled - using enhanced rendering") # Use docxtpl for basic variable substitution output_bytes = render_docx(content, resolved) # Track advanced feature usage processing_details["advanced_features_used"] = True else: # Standard DOCX rendering output_bytes = render_docx(content, resolved) processing_details["advanced_features_used"] = False # Convert to PDF if requested if payload.output_format.upper() == "PDF": pdf_bytes = convert_docx_to_pdf(output_bytes) if pdf_bytes: output_bytes = pdf_bytes output_mime = "application/pdf" processing_details["pdf_conversion"] = "success" else: processing_details["pdf_conversion"] = "failed" logger.warning("PDF conversion failed, returning DOCX") except Exception as e: logger.error(f"Error processing template: {e}") processing_details["processing_error"] = str(e) return AdvancedGenerateResponse( resolved=resolved, unresolved=unresolved, output_mime_type=output_mime, output_size=len(output_bytes), processing_details=processing_details ) @router.post("/{template_id}/analyze", response_model=TemplateAnalysisResponse) async def analyze_template( template_id: int, payload: TemplateAnalysisRequest, db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): """Analyze template to identify advanced features and complexity""" # Get template and version tpl = db.query(DocumentTemplate).filter(DocumentTemplate.id == template_id).first() if not tpl: raise HTTPException(status_code=404, detail="Template not found") version_id = payload.version_id or tpl.current_version_id if not version_id: raise HTTPException(status_code=400, detail="Template has no versions") ver = db.query(DocumentTemplateVersion).filter(DocumentTemplateVersion.id == version_id).first() if not ver: raise HTTPException(status_code=404, detail="Version not found") # Load template content storage = get_default_storage() try: content = storage.open_bytes(ver.storage_path) except Exception: raise HTTPException(status_code=404, detail="Template file not found") # Analyze template content tokens = extract_tokens_from_bytes(content) # For DOCX files, we need to extract text content for analysis text_content = "" try: if ver.mime_type == "application/vnd.openxmlformats-officedocument.wordprocessingml.document": # Extract text from DOCX for analysis from docx import Document doc = Document(io.BytesIO(content)) text_content = "\n".join([paragraph.text for paragraph in doc.paragraphs]) else: text_content = content.decode('utf-8', errors='ignore') except Exception as e: logger.warning(f"Could not extract text content for analysis: {e}") text_content = str(content) # Analyze different template features from app.services.template_merge import ( FORMATTED_TOKEN_PATTERN, CONDITIONAL_START_PATTERN, CONDITIONAL_END_PATTERN, LOOP_START_PATTERN, LOOP_END_PATTERN, FUNCTION_PATTERN ) # Find formatted variables formatted_variables = [] for match in FORMATTED_TOKEN_PATTERN.finditer(text_content): var_name = match.group(1).strip() format_spec = match.group(2).strip() formatted_variables.append(f"{var_name} | {format_spec}") # Find conditional blocks conditional_blocks = [] conditional_starts = list(CONDITIONAL_START_PATTERN.finditer(text_content)) conditional_ends = list(CONDITIONAL_END_PATTERN.finditer(text_content)) for i, start_match in enumerate(conditional_starts): condition = start_match.group(1).strip() conditional_blocks.append({ "condition": condition, "line_start": text_content[:start_match.start()].count('\n') + 1, "complexity": len(condition.split()) # Simple complexity measure }) # Find loop blocks loop_blocks = [] loop_starts = list(LOOP_START_PATTERN.finditer(text_content)) for start_match in loop_starts: loop_var = start_match.group(1).strip() collection = start_match.group(2).strip() loop_blocks.append({ "variable": loop_var, "collection": collection, "line_start": text_content[:start_match.start()].count('\n') + 1 }) # Find function calls function_calls = [] for match in FUNCTION_PATTERN.finditer(text_content): func_name = match.group(1).strip() args = match.group(2).strip() function_calls.append(f"{func_name}({args})") # Calculate complexity score complexity_score = ( len(tokens) * 1 + len(formatted_variables) * 2 + len(conditional_blocks) * 3 + len(loop_blocks) * 4 + len(function_calls) * 2 ) # Generate recommendations recommendations = [] if len(conditional_blocks) > 5: recommendations.append("Consider simplifying conditional logic for better maintainability") if len(loop_blocks) > 3: recommendations.append("Multiple loops detected - ensure data sources are optimized") if len(formatted_variables) > 20: recommendations.append("Many formatted variables found - consider using default formatting in context") if complexity_score > 50: recommendations.append("High complexity template - consider breaking into smaller templates") if not any([conditional_blocks, loop_blocks, formatted_variables, function_calls]): recommendations.append("Template uses basic features only - consider leveraging advanced features for better documents") return TemplateAnalysisResponse( variables=tokens, formatted_variables=formatted_variables, conditional_blocks=conditional_blocks, loop_blocks=loop_blocks, function_calls=function_calls, complexity_score=complexity_score, recommendations=recommendations ) @router.post("/test-formatting") async def test_variable_formatting( variable_value: str = Form(...), format_spec: str = Form(...), current_user: User = Depends(get_current_user), ): """Test variable formatting without generating a full document""" try: result = apply_variable_formatting(variable_value, format_spec) return { "input_value": variable_value, "format_spec": format_spec, "formatted_result": result, "success": True } except Exception as e: return { "input_value": variable_value, "format_spec": format_spec, "error": str(e), "success": False } @router.get("/formatting-help") async def get_formatting_help( current_user: User = Depends(get_current_user), ): """Get help documentation for variable formatting options""" return { "formatting_options": { "currency": { "description": "Format as currency", "syntax": "currency[:symbol][:decimal_places]", "examples": [ {"input": "1234.56", "format": "currency", "output": "$1,234.56"}, {"input": "1234.56", "format": "currency:€", "output": "€1,234.56"}, {"input": "1234.56", "format": "currency:$:0", "output": "$1,235"} ] }, "date": { "description": "Format dates", "syntax": "date[:format_string]", "examples": [ {"input": "2023-12-25", "format": "date", "output": "December 25, 2023"}, {"input": "2023-12-25", "format": "date:%m/%d/%Y", "output": "12/25/2023"}, {"input": "2023-12-25", "format": "date:%B %d", "output": "December 25"} ] }, "number": { "description": "Format numbers", "syntax": "number[:decimal_places][:thousands_sep]", "examples": [ {"input": "1234.5678", "format": "number", "output": "1,234.57"}, {"input": "1234.5678", "format": "number:1", "output": "1,234.6"}, {"input": "1234.5678", "format": "number:2: ", "output": "1 234.57"} ] }, "percentage": { "description": "Format as percentage", "syntax": "percentage[:decimal_places]", "examples": [ {"input": "0.1234", "format": "percentage", "output": "0.1%"}, {"input": "12.34", "format": "percentage:2", "output": "12.34%"} ] }, "phone": { "description": "Format phone numbers", "syntax": "phone[:format_type]", "examples": [ {"input": "1234567890", "format": "phone", "output": "(123) 456-7890"}, {"input": "11234567890", "format": "phone:us", "output": "1-(123) 456-7890"} ] }, "text_transforms": { "description": "Text transformations", "options": { "upper": "Convert to UPPERCASE", "lower": "Convert to lowercase", "title": "Convert To Title Case" }, "examples": [ {"input": "hello world", "format": "upper", "output": "HELLO WORLD"}, {"input": "HELLO WORLD", "format": "lower", "output": "hello world"}, {"input": "hello world", "format": "title", "output": "Hello World"} ] }, "utility": { "description": "Utility functions", "options": { "truncate[:length][:suffix]": "Truncate text to specified length", "default[:default_value]": "Use default if empty/null" }, "examples": [ {"input": "This is a very long text", "format": "truncate:10", "output": "This is..."}, {"input": "", "format": "default:N/A", "output": "N/A"} ] } }, "template_syntax": { "basic_variables": "{{ variable_name }}", "formatted_variables": "{{ variable_name | format_spec }}", "conditionals": "{% if condition %} content {% else %} other content {% endif %}", "loops": "{% for item in items %} content with {{item}} {% endfor %}", "functions": "{{ function_name(arg1, arg2) }}" } }