""" Advanced Template Processing Engine Enhanced features: - Rich variable resolution with formatting options - Conditional content blocks (IF/ENDIF sections) - Loop functionality for data tables (FOR/ENDFOR sections) - Advanced variable substitution with built-in functions - PDF generation support - Template function library - Resolve variables from explicit context, FormVariable, ReportVariable - Built-in variables (dates) - Render DOCX using docxtpl when mime_type is docx; otherwise return bytes as-is - Return unresolved tokens list """ from __future__ import annotations import io import re import warnings import subprocess import tempfile import os from datetime import date, datetime from typing import Any, Dict, List, Tuple, Optional, Union from decimal import Decimal, InvalidOperation from sqlalchemy.orm import Session from app.models.additional import FormVariable, ReportVariable from app.core.logging import get_logger logger = get_logger("template_merge") try: with warnings.catch_warnings(): warnings.simplefilter("ignore", category=UserWarning) from docxtpl import DocxTemplate DOCXTPL_AVAILABLE = True except Exception: DOCXTPL_AVAILABLE = False # Enhanced token patterns for different template features TOKEN_PATTERN = re.compile(r"\{\{\s*([a-zA-Z0-9_\.]+)\s*\}\}") FORMATTED_TOKEN_PATTERN = re.compile(r"\{\{\s*([a-zA-Z0-9_\.]+)\s*\|\s*([^}]+)\s*\}\}") CONDITIONAL_START_PATTERN = re.compile(r"\{\%\s*if\s+([^%]+)\s*\%\}") CONDITIONAL_ELSE_PATTERN = re.compile(r"\{\%\s*else\s*\%\}") CONDITIONAL_END_PATTERN = re.compile(r"\{\%\s*endif\s*\%\}") LOOP_START_PATTERN = re.compile(r"\{\%\s*for\s+(\w+)\s+in\s+([^%]+)\s*\%\}") LOOP_END_PATTERN = re.compile(r"\{\%\s*endfor\s*\%\}") FUNCTION_PATTERN = re.compile(r"\{\{\s*(\w+)\s*\(\s*([^)]*)\s*\)\s*\}\}") def extract_tokens_from_bytes(content: bytes) -> List[str]: # Prefer docxtpl-based extraction for DOCX if available if DOCXTPL_AVAILABLE: try: buf = io.BytesIO(content) tpl = DocxTemplate(buf) # jinja2 analysis for undeclared template variables vars_set = tpl.get_undeclared_template_variables({}) return sorted({str(v) for v in vars_set}) except Exception: pass # Fallback: naive regex over decoded text try: text = content.decode("utf-8", errors="ignore") except Exception: text = "" return sorted({m.group(1) for m in TOKEN_PATTERN.finditer(text)}) class TemplateFunctions: """ Built-in template functions available in document templates """ @staticmethod def format_currency(value: Any, symbol: str = "$", decimal_places: int = 2) -> str: """Format a number as currency""" try: num_value = float(value) if value is not None else 0.0 return f"{symbol}{num_value:,.{decimal_places}f}" except (ValueError, TypeError): return f"{symbol}0.00" @staticmethod def format_date(value: Any, format_str: str = "%B %d, %Y") -> str: """Format a date with a custom format string""" if value is None: return "" try: if isinstance(value, str): from dateutil.parser import parse value = parse(value).date() elif isinstance(value, datetime): value = value.date() if isinstance(value, date): return value.strftime(format_str) return str(value) except Exception: return str(value) @staticmethod def format_number(value: Any, decimal_places: int = 2, thousands_sep: str = ",") -> str: """Format a number with specified decimal places and thousands separator""" try: num_value = float(value) if value is not None else 0.0 if thousands_sep == ",": return f"{num_value:,.{decimal_places}f}" else: formatted = f"{num_value:.{decimal_places}f}" if thousands_sep: # Simple thousands separator replacement parts = formatted.split(".") parts[0] = parts[0][::-1] # Reverse parts[0] = thousands_sep.join([parts[0][i:i+3] for i in range(0, len(parts[0]), 3)]) parts[0] = parts[0][::-1] # Reverse back formatted = ".".join(parts) return formatted except (ValueError, TypeError): return "0.00" @staticmethod def format_percentage(value: Any, decimal_places: int = 1) -> str: """Format a number as a percentage""" try: num_value = float(value) if value is not None else 0.0 return f"{num_value:.{decimal_places}f}%" except (ValueError, TypeError): return "0.0%" @staticmethod def format_phone(value: Any, format_type: str = "us") -> str: """Format a phone number""" if not value: return "" # Remove all non-digit characters digits = re.sub(r'\D', '', str(value)) if format_type.lower() == "us" and len(digits) == 10: return f"({digits[:3]}) {digits[3:6]}-{digits[6:]}" elif format_type.lower() == "us" and len(digits) == 11 and digits[0] == "1": return f"1-({digits[1:4]}) {digits[4:7]}-{digits[7:]}" return str(value) @staticmethod def uppercase(value: Any) -> str: """Convert text to uppercase""" return str(value).upper() if value is not None else "" @staticmethod def lowercase(value: Any) -> str: """Convert text to lowercase""" return str(value).lower() if value is not None else "" @staticmethod def titlecase(value: Any) -> str: """Convert text to title case""" return str(value).title() if value is not None else "" @staticmethod def truncate(value: Any, length: int = 50, suffix: str = "...") -> str: """Truncate text to a specified length""" text = str(value) if value is not None else "" if len(text) <= length: return text return text[:length - len(suffix)] + suffix @staticmethod def default(value: Any, default_value: str = "") -> str: """Return default value if the input is empty/null""" if value is None or str(value).strip() == "": return default_value return str(value) @staticmethod def join(items: List[Any], separator: str = ", ") -> str: """Join a list of items with a separator""" if not isinstance(items, (list, tuple)): return str(items) if items is not None else "" return separator.join(str(item) for item in items if item is not None) @staticmethod def length(value: Any) -> int: """Get the length of a string or list""" if value is None: return 0 if isinstance(value, (list, tuple, dict)): return len(value) return len(str(value)) @staticmethod def math_add(a: Any, b: Any) -> float: """Add two numbers""" try: return float(a or 0) + float(b or 0) except (ValueError, TypeError): return 0.0 @staticmethod def math_subtract(a: Any, b: Any) -> float: """Subtract two numbers""" try: return float(a or 0) - float(b or 0) except (ValueError, TypeError): return 0.0 @staticmethod def math_multiply(a: Any, b: Any) -> float: """Multiply two numbers""" try: return float(a or 0) * float(b or 0) except (ValueError, TypeError): return 0.0 @staticmethod def math_divide(a: Any, b: Any) -> float: """Divide two numbers""" try: divisor = float(b or 0) if divisor == 0: return 0.0 return float(a or 0) / divisor except (ValueError, TypeError): return 0.0 def apply_variable_formatting(value: Any, format_spec: str) -> str: """ Apply formatting to a variable value based on format specification Format specifications: - currency[:symbol][:decimal_places] - Format as currency - date[:format_string] - Format as date - number[:decimal_places][:thousands_sep] - Format as number - percentage[:decimal_places] - Format as percentage - phone[:format_type] - Format as phone number - upper - Convert to uppercase - lower - Convert to lowercase - title - Convert to title case - truncate[:length][:suffix] - Truncate text - default[:default_value] - Use default if empty """ if not format_spec: return str(value) if value is not None else "" parts = format_spec.split(":") format_type = parts[0].lower() try: if format_type == "currency": symbol = parts[1] if len(parts) > 1 else "$" decimal_places = int(parts[2]) if len(parts) > 2 else 2 return TemplateFunctions.format_currency(value, symbol, decimal_places) elif format_type == "date": format_str = parts[1] if len(parts) > 1 else "%B %d, %Y" return TemplateFunctions.format_date(value, format_str) elif format_type == "number": decimal_places = int(parts[1]) if len(parts) > 1 else 2 thousands_sep = parts[2] if len(parts) > 2 else "," return TemplateFunctions.format_number(value, decimal_places, thousands_sep) elif format_type == "percentage": decimal_places = int(parts[1]) if len(parts) > 1 else 1 return TemplateFunctions.format_percentage(value, decimal_places) elif format_type == "phone": format_type_spec = parts[1] if len(parts) > 1 else "us" return TemplateFunctions.format_phone(value, format_type_spec) elif format_type == "upper": return TemplateFunctions.uppercase(value) elif format_type == "lower": return TemplateFunctions.lowercase(value) elif format_type == "title": return TemplateFunctions.titlecase(value) elif format_type == "truncate": length = int(parts[1]) if len(parts) > 1 else 50 suffix = parts[2] if len(parts) > 2 else "..." return TemplateFunctions.truncate(value, length, suffix) elif format_type == "default": default_value = parts[1] if len(parts) > 1 else "" return TemplateFunctions.default(value, default_value) else: logger.warning(f"Unknown format type: {format_type}") return str(value) if value is not None else "" except Exception as e: logger.error(f"Error applying format '{format_spec}' to value '{value}': {e}") return str(value) if value is not None else "" def build_context(payload_context: Dict[str, Any], context_type: str = "global", context_id: str = "default") -> Dict[str, Any]: # Built-ins with enhanced date/time functions today = date.today() now = datetime.utcnow() builtins = { "TODAY": today.strftime("%B %d, %Y"), "TODAY_ISO": today.isoformat(), "TODAY_SHORT": today.strftime("%m/%d/%Y"), "TODAY_YEAR": str(today.year), "TODAY_MONTH": str(today.month), "TODAY_DAY": str(today.day), "NOW": now.isoformat() + "Z", "NOW_TIME": now.strftime("%I:%M %p"), "NOW_TIMESTAMP": str(int(now.timestamp())), # Context identifiers for enhanced variable processing "_context_type": context_type, "_context_id": context_id, # Template functions "format_currency": TemplateFunctions.format_currency, "format_date": TemplateFunctions.format_date, "format_number": TemplateFunctions.format_number, "format_percentage": TemplateFunctions.format_percentage, "format_phone": TemplateFunctions.format_phone, "uppercase": TemplateFunctions.uppercase, "lowercase": TemplateFunctions.lowercase, "titlecase": TemplateFunctions.titlecase, "truncate": TemplateFunctions.truncate, "default": TemplateFunctions.default, "join": TemplateFunctions.join, "length": TemplateFunctions.length, "math_add": TemplateFunctions.math_add, "math_subtract": TemplateFunctions.math_subtract, "math_multiply": TemplateFunctions.math_multiply, "math_divide": TemplateFunctions.math_divide, } merged = {**builtins} # Normalize keys to support both FOO and foo for k, v in payload_context.items(): merged[k] = v if isinstance(k, str): merged.setdefault(k.upper(), v) return merged def _safe_lookup_variable(db: Session, identifier: str) -> Any: # 1) FormVariable fv = db.query(FormVariable).filter(FormVariable.identifier == identifier, FormVariable.active == 1).first() if fv: # MVP: use static response if present; otherwise treat as unresolved if fv.response is not None: return fv.response return None # 2) ReportVariable rv = db.query(ReportVariable).filter(ReportVariable.identifier == identifier, ReportVariable.active == 1).first() if rv: # MVP: no evaluation yet; unresolved return None return None def resolve_tokens(db: Session, tokens: List[str], context: Dict[str, Any]) -> Tuple[Dict[str, Any], List[str]]: resolved: Dict[str, Any] = {} unresolved: List[str] = [] # Try enhanced variable processor first for advanced features try: from app.services.advanced_variables import VariableProcessor processor = VariableProcessor(db) # Extract context information for enhanced processing context_type = context.get('_context_type', 'global') context_id = context.get('_context_id', 'default') # Remove internal context markers from the context clean_context = {k: v for k, v in context.items() if not k.startswith('_')} enhanced_resolved, enhanced_unresolved = processor.resolve_variables( variables=tokens, context_type=context_type, context_id=context_id, base_context=clean_context ) resolved.update(enhanced_resolved) unresolved.extend(enhanced_unresolved) # Remove successfully resolved tokens from further processing tokens = [tok for tok in tokens if tok not in enhanced_resolved] except ImportError: # Enhanced variables not available, fall back to legacy processing pass except Exception as e: # Log error but continue with legacy processing import logging logging.warning(f"Enhanced variable processing failed: {e}") # Fallback to legacy variable resolution for remaining tokens for tok in tokens: # Order: payload context (case-insensitive via upper) -> FormVariable -> ReportVariable value = context.get(tok) if value is None: value = context.get(tok.upper()) if value is None: value = _safe_lookup_variable(db, tok) if value is None: if tok not in unresolved: # Avoid duplicates from enhanced processing unresolved.append(tok) else: resolved[tok] = value return resolved, unresolved def process_conditional_sections(content: str, context: Dict[str, Any]) -> str: """ Process conditional sections in template content Syntax: {% if condition %} content to include if condition is true {% else %} content to include if condition is false (optional) {% endif %} """ result = content # Find all conditional blocks while True: start_match = CONDITIONAL_START_PATTERN.search(result) if not start_match: break # Find corresponding endif start_pos = start_match.end() endif_match = CONDITIONAL_END_PATTERN.search(result, start_pos) if not endif_match: logger.warning("Found {% if %} without matching {% endif %}") break # Find optional else clause else_match = CONDITIONAL_ELSE_PATTERN.search(result, start_pos, endif_match.start()) condition = start_match.group(1).strip() # Extract content blocks if else_match: if_content = result[start_pos:else_match.start()] else_content = result[else_match.end():endif_match.start()] else: if_content = result[start_pos:endif_match.start()] else_content = "" # Evaluate condition try: condition_result = evaluate_condition(condition, context) selected_content = if_content if condition_result else else_content except Exception as e: logger.error(f"Error evaluating condition '{condition}': {e}") selected_content = else_content # Default to else content on error # Replace the entire conditional block with the selected content result = result[:start_match.start()] + selected_content + result[endif_match.end():] return result def process_loop_sections(content: str, context: Dict[str, Any]) -> str: """ Process loop sections in template content Syntax: {% for item in items %} Content to repeat for each item. Use {{item.property}} to access item data. {% endfor %} """ result = content # Find all loop blocks while True: start_match = LOOP_START_PATTERN.search(result) if not start_match: break # Find corresponding endfor start_pos = start_match.end() endfor_match = LOOP_END_PATTERN.search(result, start_pos) if not endfor_match: logger.warning("Found {% for %} without matching {% endfor %}") break loop_var = start_match.group(1).strip() collection_expr = start_match.group(2).strip() loop_content = result[start_pos:endfor_match.start()] # Get the collection from context try: collection = evaluate_expression(collection_expr, context) if not isinstance(collection, (list, tuple)): logger.warning(f"Loop collection '{collection_expr}' is not iterable") collection = [] except Exception as e: logger.error(f"Error evaluating loop collection '{collection_expr}': {e}") collection = [] # Generate content for each item repeated_content = "" for i, item in enumerate(collection): # Create item context item_context = context.copy() item_context[loop_var] = item item_context[f"{loop_var}_index"] = i item_context[f"{loop_var}_index0"] = i # 0-based index item_context[f"{loop_var}_first"] = (i == 0) item_context[f"{loop_var}_last"] = (i == len(collection) - 1) item_context[f"{loop_var}_length"] = len(collection) # Process the loop content with item context item_content = process_template_content(loop_content, item_context) repeated_content += item_content # Replace the entire loop block with the repeated content result = result[:start_match.start()] + repeated_content + result[endfor_match.end():] return result def process_formatted_variables(content: str, context: Dict[str, Any]) -> Tuple[str, List[str]]: """ Process variables with formatting in template content Syntax: {{ variable_name | format_spec }} """ result = content unresolved = [] # Find all formatted variables for match in FORMATTED_TOKEN_PATTERN.finditer(content): var_name = match.group(1).strip() format_spec = match.group(2).strip() full_token = match.group(0) # Get variable value value = context.get(var_name) if value is None: value = context.get(var_name.upper()) if value is not None: # Apply formatting formatted_value = apply_variable_formatting(value, format_spec) result = result.replace(full_token, formatted_value) else: unresolved.append(var_name) return result, unresolved def process_template_functions(content: str, context: Dict[str, Any]) -> Tuple[str, List[str]]: """ Process template function calls Syntax: {{ function_name(arg1, arg2, ...) }} """ result = content unresolved = [] for match in FUNCTION_PATTERN.finditer(content): func_name = match.group(1).strip() args_str = match.group(2).strip() full_token = match.group(0) # Get function from context func = context.get(func_name) if func and callable(func): try: # Parse arguments args = [] if args_str: # Simple argument parsing (supports strings, numbers, variables) arg_parts = [arg.strip() for arg in args_str.split(',')] for arg in arg_parts: if arg.startswith('"') and arg.endswith('"'): # String literal args.append(arg[1:-1]) elif arg.startswith("'") and arg.endswith("'"): # String literal args.append(arg[1:-1]) elif arg.replace('.', '').replace('-', '').isdigit(): # Number literal args.append(float(arg) if '.' in arg else int(arg)) else: # Variable reference var_value = context.get(arg, context.get(arg.upper(), arg)) args.append(var_value) # Call function func_result = func(*args) result = result.replace(full_token, str(func_result)) except Exception as e: logger.error(f"Error calling function '{func_name}': {e}") unresolved.append(f"{func_name}()") else: unresolved.append(f"{func_name}()") return result, unresolved def evaluate_condition(condition: str, context: Dict[str, Any]) -> bool: """ Evaluate a conditional expression safely """ try: # Replace variables in condition for var_name, value in context.items(): if var_name.startswith('_'): # Skip internal variables continue condition = condition.replace(var_name, repr(value)) # Safe evaluation with limited builtins safe_context = { '__builtins__': {}, 'True': True, 'False': False, 'None': None, } return bool(eval(condition, safe_context)) except Exception as e: logger.error(f"Error evaluating condition '{condition}': {e}") return False def evaluate_expression(expression: str, context: Dict[str, Any]) -> Any: """ Evaluate an expression safely """ try: # Check if it's a simple variable reference if expression in context: return context[expression] if expression.upper() in context: return context[expression.upper()] # Try as a more complex expression safe_context = { '__builtins__': {}, **context } return eval(expression, safe_context) except Exception as e: logger.error(f"Error evaluating expression '{expression}': {e}") return None def process_template_content(content: str, context: Dict[str, Any]) -> str: """ Process template content with all advanced features """ # 1. Process conditional sections content = process_conditional_sections(content, context) # 2. Process loop sections content = process_loop_sections(content, context) # 3. Process formatted variables content, _ = process_formatted_variables(content, context) # 4. Process template functions content, _ = process_template_functions(content, context) return content def convert_docx_to_pdf(docx_bytes: bytes) -> Optional[bytes]: """ Convert DOCX to PDF using LibreOffice headless mode """ try: with tempfile.TemporaryDirectory() as temp_dir: # Save DOCX to temp file docx_path = os.path.join(temp_dir, "document.docx") with open(docx_path, "wb") as f: f.write(docx_bytes) # Convert to PDF using LibreOffice cmd = [ "libreoffice", "--headless", "--convert-to", "pdf", "--outdir", temp_dir, docx_path ] result = subprocess.run(cmd, capture_output=True, text=True, timeout=30) if result.returncode == 0: pdf_path = os.path.join(temp_dir, "document.pdf") if os.path.exists(pdf_path): with open(pdf_path, "rb") as f: return f.read() else: logger.error(f"LibreOffice conversion failed: {result.stderr}") except subprocess.TimeoutExpired: logger.error("LibreOffice conversion timed out") except FileNotFoundError: logger.warning("LibreOffice not found. PDF conversion not available.") except Exception as e: logger.error(f"Error converting DOCX to PDF: {e}") return None def render_docx(docx_bytes: bytes, context: Dict[str, Any]) -> bytes: if not DOCXTPL_AVAILABLE: # Return original bytes if docxtpl is not installed return docx_bytes try: # Write to BytesIO for docxtpl in_buffer = io.BytesIO(docx_bytes) tpl = DocxTemplate(in_buffer) # Enhanced context with template functions enhanced_context = context.copy() # Render the template tpl.render(enhanced_context) # Save to output buffer out_buffer = io.BytesIO() tpl.save(out_buffer) return out_buffer.getvalue() except Exception as e: logger.error(f"Error rendering DOCX template: {e}") return docx_bytes