749 lines
26 KiB
Python
749 lines
26 KiB
Python
"""
|
|
Advanced Template Processing Engine
|
|
|
|
Enhanced features:
|
|
- Rich variable resolution with formatting options
|
|
- Conditional content blocks (IF/ENDIF sections)
|
|
- Loop functionality for data tables (FOR/ENDFOR sections)
|
|
- Advanced variable substitution with built-in functions
|
|
- PDF generation support
|
|
- Template function library
|
|
- Resolve variables from explicit context, FormVariable, ReportVariable
|
|
- Built-in variables (dates)
|
|
- Render DOCX using docxtpl when mime_type is docx; otherwise return bytes as-is
|
|
- Return unresolved tokens list
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import io
|
|
import re
|
|
import warnings
|
|
import subprocess
|
|
import tempfile
|
|
import os
|
|
from datetime import date, datetime
|
|
from typing import Any, Dict, List, Tuple, Optional, Union
|
|
from decimal import Decimal, InvalidOperation
|
|
|
|
from sqlalchemy.orm import Session
|
|
|
|
from app.models.additional import FormVariable, ReportVariable
|
|
from app.core.logging import get_logger
|
|
|
|
logger = get_logger("template_merge")
|
|
|
|
try:
|
|
with warnings.catch_warnings():
|
|
warnings.simplefilter("ignore", category=UserWarning)
|
|
from docxtpl import DocxTemplate
|
|
DOCXTPL_AVAILABLE = True
|
|
except Exception:
|
|
DOCXTPL_AVAILABLE = False
|
|
|
|
|
|
# Enhanced token patterns for different template features
|
|
TOKEN_PATTERN = re.compile(r"\{\{\s*([a-zA-Z0-9_\.]+)\s*\}\}")
|
|
FORMATTED_TOKEN_PATTERN = re.compile(r"\{\{\s*([a-zA-Z0-9_\.]+)\s*\|\s*([^}]+)\s*\}\}")
|
|
CONDITIONAL_START_PATTERN = re.compile(r"\{\%\s*if\s+([^%]+)\s*\%\}")
|
|
CONDITIONAL_ELSE_PATTERN = re.compile(r"\{\%\s*else\s*\%\}")
|
|
CONDITIONAL_END_PATTERN = re.compile(r"\{\%\s*endif\s*\%\}")
|
|
LOOP_START_PATTERN = re.compile(r"\{\%\s*for\s+(\w+)\s+in\s+([^%]+)\s*\%\}")
|
|
LOOP_END_PATTERN = re.compile(r"\{\%\s*endfor\s*\%\}")
|
|
FUNCTION_PATTERN = re.compile(r"\{\{\s*(\w+)\s*\(\s*([^)]*)\s*\)\s*\}\}")
|
|
|
|
|
|
def extract_tokens_from_bytes(content: bytes) -> List[str]:
|
|
# Prefer docxtpl-based extraction for DOCX if available
|
|
if DOCXTPL_AVAILABLE:
|
|
try:
|
|
buf = io.BytesIO(content)
|
|
tpl = DocxTemplate(buf)
|
|
# jinja2 analysis for undeclared template variables
|
|
vars_set = tpl.get_undeclared_template_variables({})
|
|
return sorted({str(v) for v in vars_set})
|
|
except Exception:
|
|
pass
|
|
# Fallback: naive regex over decoded text
|
|
try:
|
|
text = content.decode("utf-8", errors="ignore")
|
|
except Exception:
|
|
text = ""
|
|
return sorted({m.group(1) for m in TOKEN_PATTERN.finditer(text)})
|
|
|
|
|
|
class TemplateFunctions:
|
|
"""
|
|
Built-in template functions available in document templates
|
|
"""
|
|
|
|
@staticmethod
|
|
def format_currency(value: Any, symbol: str = "$", decimal_places: int = 2) -> str:
|
|
"""Format a number as currency"""
|
|
try:
|
|
num_value = float(value) if value is not None else 0.0
|
|
return f"{symbol}{num_value:,.{decimal_places}f}"
|
|
except (ValueError, TypeError):
|
|
return f"{symbol}0.00"
|
|
|
|
@staticmethod
|
|
def format_date(value: Any, format_str: str = "%B %d, %Y") -> str:
|
|
"""Format a date with a custom format string"""
|
|
if value is None:
|
|
return ""
|
|
try:
|
|
if isinstance(value, str):
|
|
from dateutil.parser import parse
|
|
value = parse(value).date()
|
|
elif isinstance(value, datetime):
|
|
value = value.date()
|
|
|
|
if isinstance(value, date):
|
|
return value.strftime(format_str)
|
|
return str(value)
|
|
except Exception:
|
|
return str(value)
|
|
|
|
@staticmethod
|
|
def format_number(value: Any, decimal_places: int = 2, thousands_sep: str = ",") -> str:
|
|
"""Format a number with specified decimal places and thousands separator"""
|
|
try:
|
|
num_value = float(value) if value is not None else 0.0
|
|
if thousands_sep == ",":
|
|
return f"{num_value:,.{decimal_places}f}"
|
|
else:
|
|
formatted = f"{num_value:.{decimal_places}f}"
|
|
if thousands_sep:
|
|
# Simple thousands separator replacement
|
|
parts = formatted.split(".")
|
|
parts[0] = parts[0][::-1] # Reverse
|
|
parts[0] = thousands_sep.join([parts[0][i:i+3] for i in range(0, len(parts[0]), 3)])
|
|
parts[0] = parts[0][::-1] # Reverse back
|
|
formatted = ".".join(parts)
|
|
return formatted
|
|
except (ValueError, TypeError):
|
|
return "0.00"
|
|
|
|
@staticmethod
|
|
def format_percentage(value: Any, decimal_places: int = 1) -> str:
|
|
"""Format a number as a percentage"""
|
|
try:
|
|
num_value = float(value) if value is not None else 0.0
|
|
return f"{num_value:.{decimal_places}f}%"
|
|
except (ValueError, TypeError):
|
|
return "0.0%"
|
|
|
|
@staticmethod
|
|
def format_phone(value: Any, format_type: str = "us") -> str:
|
|
"""Format a phone number"""
|
|
if not value:
|
|
return ""
|
|
|
|
# Remove all non-digit characters
|
|
digits = re.sub(r'\D', '', str(value))
|
|
|
|
if format_type.lower() == "us" and len(digits) == 10:
|
|
return f"({digits[:3]}) {digits[3:6]}-{digits[6:]}"
|
|
elif format_type.lower() == "us" and len(digits) == 11 and digits[0] == "1":
|
|
return f"1-({digits[1:4]}) {digits[4:7]}-{digits[7:]}"
|
|
|
|
return str(value)
|
|
|
|
@staticmethod
|
|
def uppercase(value: Any) -> str:
|
|
"""Convert text to uppercase"""
|
|
return str(value).upper() if value is not None else ""
|
|
|
|
@staticmethod
|
|
def lowercase(value: Any) -> str:
|
|
"""Convert text to lowercase"""
|
|
return str(value).lower() if value is not None else ""
|
|
|
|
@staticmethod
|
|
def titlecase(value: Any) -> str:
|
|
"""Convert text to title case"""
|
|
return str(value).title() if value is not None else ""
|
|
|
|
@staticmethod
|
|
def truncate(value: Any, length: int = 50, suffix: str = "...") -> str:
|
|
"""Truncate text to a specified length"""
|
|
text = str(value) if value is not None else ""
|
|
if len(text) <= length:
|
|
return text
|
|
return text[:length - len(suffix)] + suffix
|
|
|
|
@staticmethod
|
|
def default(value: Any, default_value: str = "") -> str:
|
|
"""Return default value if the input is empty/null"""
|
|
if value is None or str(value).strip() == "":
|
|
return default_value
|
|
return str(value)
|
|
|
|
@staticmethod
|
|
def join(items: List[Any], separator: str = ", ") -> str:
|
|
"""Join a list of items with a separator"""
|
|
if not isinstance(items, (list, tuple)):
|
|
return str(items) if items is not None else ""
|
|
return separator.join(str(item) for item in items if item is not None)
|
|
|
|
@staticmethod
|
|
def length(value: Any) -> int:
|
|
"""Get the length of a string or list"""
|
|
if value is None:
|
|
return 0
|
|
if isinstance(value, (list, tuple, dict)):
|
|
return len(value)
|
|
return len(str(value))
|
|
|
|
@staticmethod
|
|
def math_add(a: Any, b: Any) -> float:
|
|
"""Add two numbers"""
|
|
try:
|
|
return float(a or 0) + float(b or 0)
|
|
except (ValueError, TypeError):
|
|
return 0.0
|
|
|
|
@staticmethod
|
|
def math_subtract(a: Any, b: Any) -> float:
|
|
"""Subtract two numbers"""
|
|
try:
|
|
return float(a or 0) - float(b or 0)
|
|
except (ValueError, TypeError):
|
|
return 0.0
|
|
|
|
@staticmethod
|
|
def math_multiply(a: Any, b: Any) -> float:
|
|
"""Multiply two numbers"""
|
|
try:
|
|
return float(a or 0) * float(b or 0)
|
|
except (ValueError, TypeError):
|
|
return 0.0
|
|
|
|
@staticmethod
|
|
def math_divide(a: Any, b: Any) -> float:
|
|
"""Divide two numbers"""
|
|
try:
|
|
divisor = float(b or 0)
|
|
if divisor == 0:
|
|
return 0.0
|
|
return float(a or 0) / divisor
|
|
except (ValueError, TypeError):
|
|
return 0.0
|
|
|
|
|
|
def apply_variable_formatting(value: Any, format_spec: str) -> str:
|
|
"""
|
|
Apply formatting to a variable value based on format specification
|
|
|
|
Format specifications:
|
|
- currency[:symbol][:decimal_places] - Format as currency
|
|
- date[:format_string] - Format as date
|
|
- number[:decimal_places][:thousands_sep] - Format as number
|
|
- percentage[:decimal_places] - Format as percentage
|
|
- phone[:format_type] - Format as phone number
|
|
- upper - Convert to uppercase
|
|
- lower - Convert to lowercase
|
|
- title - Convert to title case
|
|
- truncate[:length][:suffix] - Truncate text
|
|
- default[:default_value] - Use default if empty
|
|
"""
|
|
if not format_spec:
|
|
return str(value) if value is not None else ""
|
|
|
|
parts = format_spec.split(":")
|
|
format_type = parts[0].lower()
|
|
|
|
try:
|
|
if format_type == "currency":
|
|
symbol = parts[1] if len(parts) > 1 else "$"
|
|
decimal_places = int(parts[2]) if len(parts) > 2 else 2
|
|
return TemplateFunctions.format_currency(value, symbol, decimal_places)
|
|
|
|
elif format_type == "date":
|
|
format_str = parts[1] if len(parts) > 1 else "%B %d, %Y"
|
|
return TemplateFunctions.format_date(value, format_str)
|
|
|
|
elif format_type == "number":
|
|
decimal_places = int(parts[1]) if len(parts) > 1 else 2
|
|
thousands_sep = parts[2] if len(parts) > 2 else ","
|
|
return TemplateFunctions.format_number(value, decimal_places, thousands_sep)
|
|
|
|
elif format_type == "percentage":
|
|
decimal_places = int(parts[1]) if len(parts) > 1 else 1
|
|
return TemplateFunctions.format_percentage(value, decimal_places)
|
|
|
|
elif format_type == "phone":
|
|
format_type_spec = parts[1] if len(parts) > 1 else "us"
|
|
return TemplateFunctions.format_phone(value, format_type_spec)
|
|
|
|
elif format_type == "upper":
|
|
return TemplateFunctions.uppercase(value)
|
|
|
|
elif format_type == "lower":
|
|
return TemplateFunctions.lowercase(value)
|
|
|
|
elif format_type == "title":
|
|
return TemplateFunctions.titlecase(value)
|
|
|
|
elif format_type == "truncate":
|
|
length = int(parts[1]) if len(parts) > 1 else 50
|
|
suffix = parts[2] if len(parts) > 2 else "..."
|
|
return TemplateFunctions.truncate(value, length, suffix)
|
|
|
|
elif format_type == "default":
|
|
default_value = parts[1] if len(parts) > 1 else ""
|
|
return TemplateFunctions.default(value, default_value)
|
|
|
|
else:
|
|
logger.warning(f"Unknown format type: {format_type}")
|
|
return str(value) if value is not None else ""
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error applying format '{format_spec}' to value '{value}': {e}")
|
|
return str(value) if value is not None else ""
|
|
|
|
|
|
def build_context(payload_context: Dict[str, Any], context_type: str = "global", context_id: str = "default") -> Dict[str, Any]:
|
|
# Built-ins with enhanced date/time functions
|
|
today = date.today()
|
|
now = datetime.utcnow()
|
|
builtins = {
|
|
"TODAY": today.strftime("%B %d, %Y"),
|
|
"TODAY_ISO": today.isoformat(),
|
|
"TODAY_SHORT": today.strftime("%m/%d/%Y"),
|
|
"TODAY_YEAR": str(today.year),
|
|
"TODAY_MONTH": str(today.month),
|
|
"TODAY_DAY": str(today.day),
|
|
"NOW": now.isoformat() + "Z",
|
|
"NOW_TIME": now.strftime("%I:%M %p"),
|
|
"NOW_TIMESTAMP": str(int(now.timestamp())),
|
|
# Context identifiers for enhanced variable processing
|
|
"_context_type": context_type,
|
|
"_context_id": context_id,
|
|
|
|
# Template functions
|
|
"format_currency": TemplateFunctions.format_currency,
|
|
"format_date": TemplateFunctions.format_date,
|
|
"format_number": TemplateFunctions.format_number,
|
|
"format_percentage": TemplateFunctions.format_percentage,
|
|
"format_phone": TemplateFunctions.format_phone,
|
|
"uppercase": TemplateFunctions.uppercase,
|
|
"lowercase": TemplateFunctions.lowercase,
|
|
"titlecase": TemplateFunctions.titlecase,
|
|
"truncate": TemplateFunctions.truncate,
|
|
"default": TemplateFunctions.default,
|
|
"join": TemplateFunctions.join,
|
|
"length": TemplateFunctions.length,
|
|
"math_add": TemplateFunctions.math_add,
|
|
"math_subtract": TemplateFunctions.math_subtract,
|
|
"math_multiply": TemplateFunctions.math_multiply,
|
|
"math_divide": TemplateFunctions.math_divide,
|
|
}
|
|
merged = {**builtins}
|
|
|
|
# Normalize keys to support both FOO and foo
|
|
for k, v in payload_context.items():
|
|
merged[k] = v
|
|
if isinstance(k, str):
|
|
merged.setdefault(k.upper(), v)
|
|
|
|
return merged
|
|
|
|
|
|
def _safe_lookup_variable(db: Session, identifier: str) -> Any:
|
|
# 1) FormVariable
|
|
fv = db.query(FormVariable).filter(FormVariable.identifier == identifier, FormVariable.active == 1).first()
|
|
if fv:
|
|
# MVP: use static response if present; otherwise treat as unresolved
|
|
if fv.response is not None:
|
|
return fv.response
|
|
return None
|
|
# 2) ReportVariable
|
|
rv = db.query(ReportVariable).filter(ReportVariable.identifier == identifier, ReportVariable.active == 1).first()
|
|
if rv:
|
|
# MVP: no evaluation yet; unresolved
|
|
return None
|
|
return None
|
|
|
|
|
|
def resolve_tokens(db: Session, tokens: List[str], context: Dict[str, Any]) -> Tuple[Dict[str, Any], List[str]]:
|
|
resolved: Dict[str, Any] = {}
|
|
unresolved: List[str] = []
|
|
|
|
# Try enhanced variable processor first for advanced features
|
|
try:
|
|
from app.services.advanced_variables import VariableProcessor
|
|
processor = VariableProcessor(db)
|
|
|
|
# Extract context information for enhanced processing
|
|
context_type = context.get('_context_type', 'global')
|
|
context_id = context.get('_context_id', 'default')
|
|
|
|
# Remove internal context markers from the context
|
|
clean_context = {k: v for k, v in context.items() if not k.startswith('_')}
|
|
|
|
enhanced_resolved, enhanced_unresolved = processor.resolve_variables(
|
|
variables=tokens,
|
|
context_type=context_type,
|
|
context_id=context_id,
|
|
base_context=clean_context
|
|
)
|
|
|
|
resolved.update(enhanced_resolved)
|
|
unresolved.extend(enhanced_unresolved)
|
|
|
|
# Remove successfully resolved tokens from further processing
|
|
tokens = [tok for tok in tokens if tok not in enhanced_resolved]
|
|
|
|
except ImportError:
|
|
# Enhanced variables not available, fall back to legacy processing
|
|
pass
|
|
except Exception as e:
|
|
# Log error but continue with legacy processing
|
|
import logging
|
|
logging.warning(f"Enhanced variable processing failed: {e}")
|
|
|
|
# Fallback to legacy variable resolution for remaining tokens
|
|
for tok in tokens:
|
|
# Order: payload context (case-insensitive via upper) -> FormVariable -> ReportVariable
|
|
value = context.get(tok)
|
|
if value is None:
|
|
value = context.get(tok.upper())
|
|
if value is None:
|
|
value = _safe_lookup_variable(db, tok)
|
|
if value is None:
|
|
if tok not in unresolved: # Avoid duplicates from enhanced processing
|
|
unresolved.append(tok)
|
|
else:
|
|
resolved[tok] = value
|
|
|
|
return resolved, unresolved
|
|
|
|
|
|
def process_conditional_sections(content: str, context: Dict[str, Any]) -> str:
|
|
"""
|
|
Process conditional sections in template content
|
|
|
|
Syntax:
|
|
{% if condition %}
|
|
content to include if condition is true
|
|
{% else %}
|
|
content to include if condition is false (optional)
|
|
{% endif %}
|
|
"""
|
|
result = content
|
|
|
|
# Find all conditional blocks
|
|
while True:
|
|
start_match = CONDITIONAL_START_PATTERN.search(result)
|
|
if not start_match:
|
|
break
|
|
|
|
# Find corresponding endif
|
|
start_pos = start_match.end()
|
|
endif_match = CONDITIONAL_END_PATTERN.search(result, start_pos)
|
|
if not endif_match:
|
|
logger.warning("Found {% if %} without matching {% endif %}")
|
|
break
|
|
|
|
# Find optional else clause
|
|
else_match = CONDITIONAL_ELSE_PATTERN.search(result, start_pos, endif_match.start())
|
|
|
|
condition = start_match.group(1).strip()
|
|
|
|
# Extract content blocks
|
|
if else_match:
|
|
if_content = result[start_pos:else_match.start()]
|
|
else_content = result[else_match.end():endif_match.start()]
|
|
else:
|
|
if_content = result[start_pos:endif_match.start()]
|
|
else_content = ""
|
|
|
|
# Evaluate condition
|
|
try:
|
|
condition_result = evaluate_condition(condition, context)
|
|
selected_content = if_content if condition_result else else_content
|
|
except Exception as e:
|
|
logger.error(f"Error evaluating condition '{condition}': {e}")
|
|
selected_content = else_content # Default to else content on error
|
|
|
|
# Replace the entire conditional block with the selected content
|
|
result = result[:start_match.start()] + selected_content + result[endif_match.end():]
|
|
|
|
return result
|
|
|
|
|
|
def process_loop_sections(content: str, context: Dict[str, Any]) -> str:
|
|
"""
|
|
Process loop sections in template content
|
|
|
|
Syntax:
|
|
{% for item in items %}
|
|
Content to repeat for each item. Use {{item.property}} to access item data.
|
|
{% endfor %}
|
|
"""
|
|
result = content
|
|
|
|
# Find all loop blocks
|
|
while True:
|
|
start_match = LOOP_START_PATTERN.search(result)
|
|
if not start_match:
|
|
break
|
|
|
|
# Find corresponding endfor
|
|
start_pos = start_match.end()
|
|
endfor_match = LOOP_END_PATTERN.search(result, start_pos)
|
|
if not endfor_match:
|
|
logger.warning("Found {% for %} without matching {% endfor %}")
|
|
break
|
|
|
|
loop_var = start_match.group(1).strip()
|
|
collection_expr = start_match.group(2).strip()
|
|
loop_content = result[start_pos:endfor_match.start()]
|
|
|
|
# Get the collection from context
|
|
try:
|
|
collection = evaluate_expression(collection_expr, context)
|
|
if not isinstance(collection, (list, tuple)):
|
|
logger.warning(f"Loop collection '{collection_expr}' is not iterable")
|
|
collection = []
|
|
except Exception as e:
|
|
logger.error(f"Error evaluating loop collection '{collection_expr}': {e}")
|
|
collection = []
|
|
|
|
# Generate content for each item
|
|
repeated_content = ""
|
|
for i, item in enumerate(collection):
|
|
# Create item context
|
|
item_context = context.copy()
|
|
item_context[loop_var] = item
|
|
item_context[f"{loop_var}_index"] = i
|
|
item_context[f"{loop_var}_index0"] = i # 0-based index
|
|
item_context[f"{loop_var}_first"] = (i == 0)
|
|
item_context[f"{loop_var}_last"] = (i == len(collection) - 1)
|
|
item_context[f"{loop_var}_length"] = len(collection)
|
|
|
|
# Process the loop content with item context
|
|
item_content = process_template_content(loop_content, item_context)
|
|
repeated_content += item_content
|
|
|
|
# Replace the entire loop block with the repeated content
|
|
result = result[:start_match.start()] + repeated_content + result[endfor_match.end():]
|
|
|
|
return result
|
|
|
|
|
|
def process_formatted_variables(content: str, context: Dict[str, Any]) -> Tuple[str, List[str]]:
|
|
"""
|
|
Process variables with formatting in template content
|
|
|
|
Syntax: {{ variable_name | format_spec }}
|
|
"""
|
|
result = content
|
|
unresolved = []
|
|
|
|
# Find all formatted variables
|
|
for match in FORMATTED_TOKEN_PATTERN.finditer(content):
|
|
var_name = match.group(1).strip()
|
|
format_spec = match.group(2).strip()
|
|
full_token = match.group(0)
|
|
|
|
# Get variable value
|
|
value = context.get(var_name)
|
|
if value is None:
|
|
value = context.get(var_name.upper())
|
|
|
|
if value is not None:
|
|
# Apply formatting
|
|
formatted_value = apply_variable_formatting(value, format_spec)
|
|
result = result.replace(full_token, formatted_value)
|
|
else:
|
|
unresolved.append(var_name)
|
|
|
|
return result, unresolved
|
|
|
|
|
|
def process_template_functions(content: str, context: Dict[str, Any]) -> Tuple[str, List[str]]:
|
|
"""
|
|
Process template function calls
|
|
|
|
Syntax: {{ function_name(arg1, arg2, ...) }}
|
|
"""
|
|
result = content
|
|
unresolved = []
|
|
|
|
for match in FUNCTION_PATTERN.finditer(content):
|
|
func_name = match.group(1).strip()
|
|
args_str = match.group(2).strip()
|
|
full_token = match.group(0)
|
|
|
|
# Get function from context
|
|
func = context.get(func_name)
|
|
if func and callable(func):
|
|
try:
|
|
# Parse arguments
|
|
args = []
|
|
if args_str:
|
|
# Simple argument parsing (supports strings, numbers, variables)
|
|
arg_parts = [arg.strip() for arg in args_str.split(',')]
|
|
for arg in arg_parts:
|
|
if arg.startswith('"') and arg.endswith('"'):
|
|
# String literal
|
|
args.append(arg[1:-1])
|
|
elif arg.startswith("'") and arg.endswith("'"):
|
|
# String literal
|
|
args.append(arg[1:-1])
|
|
elif arg.replace('.', '').replace('-', '').isdigit():
|
|
# Number literal
|
|
args.append(float(arg) if '.' in arg else int(arg))
|
|
else:
|
|
# Variable reference
|
|
var_value = context.get(arg, context.get(arg.upper(), arg))
|
|
args.append(var_value)
|
|
|
|
# Call function
|
|
func_result = func(*args)
|
|
result = result.replace(full_token, str(func_result))
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error calling function '{func_name}': {e}")
|
|
unresolved.append(f"{func_name}()")
|
|
else:
|
|
unresolved.append(f"{func_name}()")
|
|
|
|
return result, unresolved
|
|
|
|
|
|
def evaluate_condition(condition: str, context: Dict[str, Any]) -> bool:
|
|
"""
|
|
Evaluate a conditional expression safely
|
|
"""
|
|
try:
|
|
# Replace variables in condition
|
|
for var_name, value in context.items():
|
|
if var_name.startswith('_'): # Skip internal variables
|
|
continue
|
|
condition = condition.replace(var_name, repr(value))
|
|
|
|
# Safe evaluation with limited builtins
|
|
safe_context = {
|
|
'__builtins__': {},
|
|
'True': True,
|
|
'False': False,
|
|
'None': None,
|
|
}
|
|
|
|
return bool(eval(condition, safe_context))
|
|
except Exception as e:
|
|
logger.error(f"Error evaluating condition '{condition}': {e}")
|
|
return False
|
|
|
|
|
|
def evaluate_expression(expression: str, context: Dict[str, Any]) -> Any:
|
|
"""
|
|
Evaluate an expression safely
|
|
"""
|
|
try:
|
|
# Check if it's a simple variable reference
|
|
if expression in context:
|
|
return context[expression]
|
|
if expression.upper() in context:
|
|
return context[expression.upper()]
|
|
|
|
# Try as a more complex expression
|
|
safe_context = {
|
|
'__builtins__': {},
|
|
**context
|
|
}
|
|
|
|
return eval(expression, safe_context)
|
|
except Exception as e:
|
|
logger.error(f"Error evaluating expression '{expression}': {e}")
|
|
return None
|
|
|
|
|
|
def process_template_content(content: str, context: Dict[str, Any]) -> str:
|
|
"""
|
|
Process template content with all advanced features
|
|
"""
|
|
# 1. Process conditional sections
|
|
content = process_conditional_sections(content, context)
|
|
|
|
# 2. Process loop sections
|
|
content = process_loop_sections(content, context)
|
|
|
|
# 3. Process formatted variables
|
|
content, _ = process_formatted_variables(content, context)
|
|
|
|
# 4. Process template functions
|
|
content, _ = process_template_functions(content, context)
|
|
|
|
return content
|
|
|
|
|
|
def convert_docx_to_pdf(docx_bytes: bytes) -> Optional[bytes]:
|
|
"""
|
|
Convert DOCX to PDF using LibreOffice headless mode
|
|
"""
|
|
try:
|
|
with tempfile.TemporaryDirectory() as temp_dir:
|
|
# Save DOCX to temp file
|
|
docx_path = os.path.join(temp_dir, "document.docx")
|
|
with open(docx_path, "wb") as f:
|
|
f.write(docx_bytes)
|
|
|
|
# Convert to PDF using LibreOffice
|
|
cmd = [
|
|
"libreoffice",
|
|
"--headless",
|
|
"--convert-to", "pdf",
|
|
"--outdir", temp_dir,
|
|
docx_path
|
|
]
|
|
|
|
result = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
|
|
|
|
if result.returncode == 0:
|
|
pdf_path = os.path.join(temp_dir, "document.pdf")
|
|
if os.path.exists(pdf_path):
|
|
with open(pdf_path, "rb") as f:
|
|
return f.read()
|
|
else:
|
|
logger.error(f"LibreOffice conversion failed: {result.stderr}")
|
|
|
|
except subprocess.TimeoutExpired:
|
|
logger.error("LibreOffice conversion timed out")
|
|
except FileNotFoundError:
|
|
logger.warning("LibreOffice not found. PDF conversion not available.")
|
|
except Exception as e:
|
|
logger.error(f"Error converting DOCX to PDF: {e}")
|
|
|
|
return None
|
|
|
|
|
|
def render_docx(docx_bytes: bytes, context: Dict[str, Any]) -> bytes:
|
|
if not DOCXTPL_AVAILABLE:
|
|
# Return original bytes if docxtpl is not installed
|
|
return docx_bytes
|
|
|
|
try:
|
|
# Write to BytesIO for docxtpl
|
|
in_buffer = io.BytesIO(docx_bytes)
|
|
tpl = DocxTemplate(in_buffer)
|
|
|
|
# Enhanced context with template functions
|
|
enhanced_context = context.copy()
|
|
|
|
# Render the template
|
|
tpl.render(enhanced_context)
|
|
|
|
# Save to output buffer
|
|
out_buffer = io.BytesIO()
|
|
tpl.save(out_buffer)
|
|
return out_buffer.getvalue()
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error rendering DOCX template: {e}")
|
|
return docx_bytes
|
|
|
|
|