Files
delphi-database/app/services/workflow_engine.py
HotSwapp bac8cc4bd5 changes
2025-08-18 20:20:04 -05:00

793 lines
30 KiB
Python

"""
Document Workflow Execution Engine
This service handles:
- Event detection and processing
- Workflow matching and triggering
- Automated document generation
- Action execution and error handling
- Schedule management for time-based workflows
"""
from __future__ import annotations
import json
import uuid
import asyncio
from datetime import datetime, timedelta, timezone
from typing import Dict, Any, List, Optional, Tuple
import logging
from croniter import croniter
from sqlalchemy.orm import Session
from sqlalchemy import and_, or_, func
from app.models.document_workflows import (
DocumentWorkflow, WorkflowAction, WorkflowExecution, EventLog,
WorkflowTriggerType, WorkflowActionType, ExecutionStatus, WorkflowStatus
)
from app.models.files import File
from app.models.deadlines import Deadline
from app.models.templates import DocumentTemplate
from app.models.user import User
from app.services.advanced_variables import VariableProcessor
from app.services.template_merge import build_context, resolve_tokens, render_docx
from app.services.storage import get_default_storage
from app.core.logging import get_logger
from app.services.document_notifications import notify_processing, notify_completed, notify_failed
logger = get_logger("workflow_engine")
class WorkflowEngineError(Exception):
"""Base exception for workflow engine errors"""
pass
class WorkflowExecutionError(Exception):
"""Exception for workflow execution failures"""
pass
class EventProcessor:
"""
Processes system events and triggers appropriate workflows
"""
def __init__(self, db: Session):
self.db = db
async def log_event(
self,
event_type: str,
event_source: str,
file_no: Optional[str] = None,
client_id: Optional[str] = None,
user_id: Optional[int] = None,
resource_type: Optional[str] = None,
resource_id: Optional[str] = None,
event_data: Optional[Dict[str, Any]] = None,
previous_state: Optional[Dict[str, Any]] = None,
new_state: Optional[Dict[str, Any]] = None
) -> str:
"""
Log a system event that may trigger workflows
Returns:
Event ID for tracking
"""
event_id = str(uuid.uuid4())
event_log = EventLog(
event_id=event_id,
event_type=event_type,
event_source=event_source,
file_no=file_no,
client_id=client_id,
user_id=user_id,
resource_type=resource_type,
resource_id=resource_id,
event_data=event_data or {},
previous_state=previous_state,
new_state=new_state,
occurred_at=datetime.now(timezone.utc)
)
self.db.add(event_log)
self.db.commit()
# Process the event asynchronously to find matching workflows
await self._process_event(event_log)
return event_id
async def _process_event(self, event: EventLog):
"""
Process an event to find and trigger matching workflows
"""
try:
triggered_workflows = []
# Find workflows that match this event type
matching_workflows = self.db.query(DocumentWorkflow).filter(
DocumentWorkflow.status == WorkflowStatus.ACTIVE
).all()
# Filter workflows by trigger type (enum value comparison)
filtered_workflows = []
for workflow in matching_workflows:
if workflow.trigger_type.value == event.event_type:
filtered_workflows.append(workflow)
matching_workflows = filtered_workflows
for workflow in matching_workflows:
if await self._should_trigger_workflow(workflow, event):
execution_id = await self._trigger_workflow(workflow, event)
if execution_id:
triggered_workflows.append(workflow.id)
# Update event log with triggered workflows
event.triggered_workflows = triggered_workflows
event.processed = True
event.processed_at = datetime.now(timezone.utc)
self.db.commit()
logger.info(f"Event {event.event_id} processed, triggered {len(triggered_workflows)} workflows")
except Exception as e:
logger.error(f"Error processing event {event.event_id}: {str(e)}")
event.processing_errors = [str(e)]
event.processed = True
event.processed_at = datetime.now(timezone.utc)
self.db.commit()
async def _should_trigger_workflow(self, workflow: DocumentWorkflow, event: EventLog) -> bool:
"""
Check if a workflow should be triggered for the given event
"""
try:
# Check basic filters
if workflow.file_type_filter and event.file_no:
file_obj = self.db.query(File).filter(File.file_no == event.file_no).first()
if file_obj and file_obj.file_type not in workflow.file_type_filter:
return False
if workflow.status_filter and event.file_no:
file_obj = self.db.query(File).filter(File.file_no == event.file_no).first()
if file_obj and file_obj.status not in workflow.status_filter:
return False
if workflow.attorney_filter and event.file_no:
file_obj = self.db.query(File).filter(File.file_no == event.file_no).first()
if file_obj and file_obj.empl_num not in workflow.attorney_filter:
return False
if workflow.client_filter and event.client_id:
if event.client_id not in workflow.client_filter:
return False
# Check trigger conditions
if workflow.trigger_conditions:
return self._evaluate_trigger_conditions(workflow.trigger_conditions, event)
return True
except Exception as e:
logger.warning(f"Error evaluating workflow {workflow.id} for event {event.event_id}: {str(e)}")
return False
def _evaluate_trigger_conditions(self, conditions: Dict[str, Any], event: EventLog) -> bool:
"""
Evaluate complex trigger conditions against an event
"""
try:
condition_type = conditions.get('type', 'simple')
if condition_type == 'simple':
field = conditions.get('field')
operator = conditions.get('operator', 'equals')
expected_value = conditions.get('value')
# Get actual value from event
actual_value = None
if field == 'event_type':
actual_value = event.event_type
elif field == 'file_no':
actual_value = event.file_no
elif field == 'client_id':
actual_value = event.client_id
elif field.startswith('data.'):
# Extract from event_data
data_key = field[5:] # Remove 'data.' prefix
actual_value = event.event_data.get(data_key) if event.event_data else None
elif field.startswith('new_state.'):
# Extract from new_state
state_key = field[10:] # Remove 'new_state.' prefix
actual_value = event.new_state.get(state_key) if event.new_state else None
elif field.startswith('previous_state.'):
# Extract from previous_state
state_key = field[15:] # Remove 'previous_state.' prefix
actual_value = event.previous_state.get(state_key) if event.previous_state else None
# Evaluate condition
return self._evaluate_simple_condition(actual_value, operator, expected_value)
elif condition_type == 'compound':
operator = conditions.get('operator', 'and')
sub_conditions = conditions.get('conditions', [])
if operator == 'and':
return all(self._evaluate_trigger_conditions(cond, event) for cond in sub_conditions)
elif operator == 'or':
return any(self._evaluate_trigger_conditions(cond, event) for cond in sub_conditions)
elif operator == 'not':
return not self._evaluate_trigger_conditions(sub_conditions[0], event) if sub_conditions else False
return False
except Exception:
return False
def _evaluate_simple_condition(self, actual_value: Any, operator: str, expected_value: Any) -> bool:
"""
Evaluate a simple condition
"""
try:
if operator == 'equals':
return actual_value == expected_value
elif operator == 'not_equals':
return actual_value != expected_value
elif operator == 'contains':
return str(expected_value) in str(actual_value) if actual_value else False
elif operator == 'starts_with':
return str(actual_value).startswith(str(expected_value)) if actual_value else False
elif operator == 'ends_with':
return str(actual_value).endswith(str(expected_value)) if actual_value else False
elif operator == 'is_empty':
return actual_value is None or str(actual_value).strip() == ''
elif operator == 'is_not_empty':
return actual_value is not None and str(actual_value).strip() != ''
elif operator == 'in':
return actual_value in expected_value if isinstance(expected_value, list) else False
elif operator == 'not_in':
return actual_value not in expected_value if isinstance(expected_value, list) else True
# Numeric comparisons
elif operator in ['greater_than', 'less_than', 'greater_equal', 'less_equal']:
try:
actual_num = float(actual_value) if actual_value is not None else 0
expected_num = float(expected_value) if expected_value is not None else 0
if operator == 'greater_than':
return actual_num > expected_num
elif operator == 'less_than':
return actual_num < expected_num
elif operator == 'greater_equal':
return actual_num >= expected_num
elif operator == 'less_equal':
return actual_num <= expected_num
except (ValueError, TypeError):
return False
return False
except Exception:
return False
async def _trigger_workflow(self, workflow: DocumentWorkflow, event: EventLog) -> Optional[int]:
"""
Trigger a workflow execution
Returns:
Workflow execution ID if successful, None if failed
"""
try:
execution = WorkflowExecution(
workflow_id=workflow.id,
triggered_by_event_id=event.event_id,
triggered_by_event_type=event.event_type,
context_file_no=event.file_no,
context_client_id=event.client_id,
context_user_id=event.user_id,
trigger_data=event.event_data,
status=ExecutionStatus.PENDING
)
self.db.add(execution)
self.db.flush() # Get the ID
# Update workflow statistics
workflow.execution_count += 1
workflow.last_triggered_at = datetime.now(timezone.utc)
self.db.commit()
# Execute the workflow (possibly with delay)
if workflow.delay_minutes > 0:
# Schedule delayed execution
await _schedule_delayed_execution(execution.id, workflow.delay_minutes)
else:
# Execute immediately
await _execute_workflow(execution.id, self.db)
return execution.id
except Exception as e:
logger.error(f"Error triggering workflow {workflow.id}: {str(e)}")
self.db.rollback()
return None
class WorkflowExecutor:
"""
Executes individual workflow instances
"""
def __init__(self, db: Session):
self.db = db
self.variable_processor = VariableProcessor(db)
async def execute_workflow(self, execution_id: int) -> bool:
"""
Execute a workflow execution
Returns:
True if successful, False if failed
"""
execution = self.db.query(WorkflowExecution).filter(
WorkflowExecution.id == execution_id
).first()
if not execution:
logger.error(f"Workflow execution {execution_id} not found")
return False
workflow = execution.workflow
if not workflow:
logger.error(f"Workflow for execution {execution_id} not found")
return False
try:
# Update execution status
execution.status = ExecutionStatus.RUNNING
execution.started_at = datetime.now(timezone.utc)
self.db.commit()
logger.info(f"Starting workflow execution {execution_id} for workflow '{workflow.name}'")
# Build execution context
context = await self._build_execution_context(execution)
execution.execution_context = context
# Execute actions in order
action_results = []
actions = sorted(workflow.actions, key=lambda a: a.action_order)
for action in actions:
if await self._should_execute_action(action, context):
result = await self._execute_action(action, context, execution)
action_results.append(result)
if not result.get('success', False) and not action.continue_on_failure:
raise WorkflowExecutionError(f"Action {action.id} failed: {result.get('error', 'Unknown error')}")
else:
action_results.append({
'action_id': action.id,
'skipped': True,
'reason': 'Condition not met'
})
# Update execution with results
execution.action_results = action_results
execution.status = ExecutionStatus.COMPLETED
execution.completed_at = datetime.now(timezone.utc)
execution.execution_duration_seconds = int(
(execution.completed_at - execution.started_at).total_seconds()
)
# Update workflow statistics
workflow.success_count += 1
self.db.commit()
logger.info(f"Workflow execution {execution_id} completed successfully")
return True
except Exception as e:
# Handle execution failure
error_message = str(e)
logger.error(f"Workflow execution {execution_id} failed: {error_message}")
execution.status = ExecutionStatus.FAILED
execution.error_message = error_message
execution.completed_at = datetime.now(timezone.utc)
if execution.started_at:
execution.execution_duration_seconds = int(
(execution.completed_at - execution.started_at).total_seconds()
)
# Update workflow statistics
workflow.failure_count += 1
# Check if we should retry
if execution.retry_count < workflow.max_retries:
execution.retry_count += 1
execution.next_retry_at = datetime.now(timezone.utc) + timedelta(
minutes=workflow.retry_delay_minutes
)
execution.status = ExecutionStatus.RETRYING
logger.info(f"Scheduling retry {execution.retry_count} for execution {execution_id}")
self.db.commit()
return False
async def _build_execution_context(self, execution: WorkflowExecution) -> Dict[str, Any]:
"""
Build context for workflow execution
"""
context = {
'execution_id': execution.id,
'workflow_id': execution.workflow_id,
'event_id': execution.triggered_by_event_id,
'event_type': execution.triggered_by_event_type,
'trigger_data': execution.trigger_data or {},
}
# Add file context if available
if execution.context_file_no:
file_obj = self.db.query(File).filter(
File.file_no == execution.context_file_no
).first()
if file_obj:
context.update({
'FILE_NO': file_obj.file_no,
'CLIENT_ID': file_obj.id,
'FILE_TYPE': file_obj.file_type,
'FILE_STATUS': file_obj.status,
'ATTORNEY': file_obj.empl_num,
'MATTER': file_obj.regarding or '',
'OPENED_DATE': file_obj.opened.isoformat() if file_obj.opened else '',
'CLOSED_DATE': file_obj.closed.isoformat() if file_obj.closed else '',
'HOURLY_RATE': str(file_obj.rate_per_hour),
})
# Add client information
if file_obj.owner:
context.update({
'CLIENT_FIRST': file_obj.owner.first or '',
'CLIENT_LAST': file_obj.owner.last or '',
'CLIENT_FULL': f"{file_obj.owner.first or ''} {file_obj.owner.last or ''}".strip(),
'CLIENT_COMPANY': file_obj.owner.company or '',
'CLIENT_EMAIL': file_obj.owner.email or '',
'CLIENT_PHONE': file_obj.owner.phone or '',
})
# Add user context if available
if execution.context_user_id:
user = self.db.query(User).filter(User.id == execution.context_user_id).first()
if user:
context.update({
'USER_ID': str(user.id),
'USERNAME': user.username,
'USER_EMAIL': user.email or '',
})
return context
async def _should_execute_action(self, action: WorkflowAction, context: Dict[str, Any]) -> bool:
"""
Check if an action should be executed based on its conditions
"""
if not action.condition:
return True
try:
# Use the same condition evaluation logic as trigger conditions
processor = EventProcessor(self.db)
# Create a mock event for condition evaluation
mock_event = type('MockEvent', (), {
'event_data': context.get('trigger_data', {}),
'new_state': context,
'previous_state': {},
'event_type': context.get('event_type'),
'file_no': context.get('FILE_NO'),
'client_id': context.get('CLIENT_ID'),
})()
return processor._evaluate_trigger_conditions(action.condition, mock_event)
except Exception as e:
logger.warning(f"Error evaluating action condition for action {action.id}: {str(e)}")
return True # Default to executing the action
async def _execute_action(
self,
action: WorkflowAction,
context: Dict[str, Any],
execution: WorkflowExecution
) -> Dict[str, Any]:
"""
Execute a specific workflow action
"""
try:
if action.action_type == WorkflowActionType.GENERATE_DOCUMENT:
return await self._execute_document_generation(action, context, execution)
elif action.action_type == WorkflowActionType.SEND_EMAIL:
return await self._execute_send_email(action, context, execution)
elif action.action_type == WorkflowActionType.CREATE_DEADLINE:
return await self._execute_create_deadline(action, context, execution)
elif action.action_type == WorkflowActionType.UPDATE_FILE_STATUS:
return await self._execute_update_file_status(action, context, execution)
elif action.action_type == WorkflowActionType.CREATE_LEDGER_ENTRY:
return await self._execute_create_ledger_entry(action, context, execution)
elif action.action_type == WorkflowActionType.SEND_NOTIFICATION:
return await self._execute_send_notification(action, context, execution)
elif action.action_type == WorkflowActionType.EXECUTE_CUSTOM:
return await self._execute_custom_action(action, context, execution)
else:
return {
'action_id': action.id,
'success': False,
'error': f'Unknown action type: {action.action_type.value}'
}
except Exception as e:
logger.error(f"Error executing action {action.id}: {str(e)}")
return {
'action_id': action.id,
'success': False,
'error': str(e)
}
async def _execute_document_generation(
self,
action: WorkflowAction,
context: Dict[str, Any],
execution: WorkflowExecution
) -> Dict[str, Any]:
"""
Execute document generation action
"""
if not action.template_id:
return {
'action_id': action.id,
'success': False,
'error': 'No template specified'
}
template = self.db.query(DocumentTemplate).filter(
DocumentTemplate.id == action.template_id
).first()
if not template or not template.current_version_id:
return {
'action_id': action.id,
'success': False,
'error': 'Template not found or has no current version'
}
try:
# Get file number for notifications
file_no = context.get('FILE_NO')
if not file_no:
return {
'action_id': action.id,
'success': False,
'error': 'No file number available for document generation'
}
# Notify processing started
try:
await notify_processing(
file_no=file_no,
data={
'action_id': action.id,
'workflow_id': execution.workflow_id,
'template_id': action.template_id,
'template_name': template.name,
'execution_id': execution.id
}
)
except Exception:
# Don't fail workflow if notification fails
pass
# Generate the document using the template system
from app.api.documents import generate_batch_documents
from app.models.documents import BatchGenerateRequest
# Prepare the request
file_nos = [file_no]
# Use the enhanced context for variable resolution
enhanced_context = build_context(
context,
context_type="file" if context.get('FILE_NO') else "global",
context_id=context.get('FILE_NO', 'default')
)
# Here we would integrate with the document generation system
# For now, return a placeholder result
result = {
'action_id': action.id,
'success': True,
'template_id': action.template_id,
'template_name': template.name,
'generated_for_files': file_nos,
'output_format': action.output_format,
'generated_at': datetime.now(timezone.utc).isoformat()
}
# Notify successful completion
try:
await notify_completed(
file_no=file_no,
data={
'action_id': action.id,
'workflow_id': execution.workflow_id,
'template_id': action.template_id,
'template_name': template.name,
'execution_id': execution.id,
'output_format': action.output_format,
'generated_at': result['generated_at']
}
)
except Exception:
# Don't fail workflow if notification fails
pass
# Update execution with generated documents
if not execution.generated_documents:
execution.generated_documents = []
execution.generated_documents.append(result)
return result
except Exception as e:
# Notify failure
try:
await notify_failed(
file_no=file_no,
data={
'action_id': action.id,
'workflow_id': execution.workflow_id,
'template_id': action.template_id,
'template_name': template.name if 'template' in locals() else 'Unknown',
'execution_id': execution.id,
'error': str(e)
}
)
except Exception:
# Don't fail workflow if notification fails
pass
return {
'action_id': action.id,
'success': False,
'error': f'Document generation failed: {str(e)}'
}
async def _execute_send_email(
self,
action: WorkflowAction,
context: Dict[str, Any],
execution: WorkflowExecution
) -> Dict[str, Any]:
"""
Execute send email action
"""
# Placeholder for email sending functionality
return {
'action_id': action.id,
'success': True,
'email_sent': True,
'recipients': action.email_recipients or [],
'subject': action.email_subject_template or 'Automated notification'
}
async def _execute_create_deadline(
self,
action: WorkflowAction,
context: Dict[str, Any],
execution: WorkflowExecution
) -> Dict[str, Any]:
"""
Execute create deadline action
"""
# Placeholder for deadline creation functionality
return {
'action_id': action.id,
'success': True,
'deadline_created': True
}
async def _execute_update_file_status(
self,
action: WorkflowAction,
context: Dict[str, Any],
execution: WorkflowExecution
) -> Dict[str, Any]:
"""
Execute update file status action
"""
# Placeholder for file status update functionality
return {
'action_id': action.id,
'success': True,
'file_status_updated': True
}
async def _execute_create_ledger_entry(
self,
action: WorkflowAction,
context: Dict[str, Any],
execution: WorkflowExecution
) -> Dict[str, Any]:
"""
Execute create ledger entry action
"""
# Placeholder for ledger entry creation functionality
return {
'action_id': action.id,
'success': True,
'ledger_entry_created': True
}
async def _execute_send_notification(
self,
action: WorkflowAction,
context: Dict[str, Any],
execution: WorkflowExecution
) -> Dict[str, Any]:
"""
Execute send notification action
"""
# Placeholder for notification sending functionality
return {
'action_id': action.id,
'success': True,
'notification_sent': True
}
async def _execute_custom_action(
self,
action: WorkflowAction,
context: Dict[str, Any],
execution: WorkflowExecution
) -> Dict[str, Any]:
"""
Execute custom action
"""
# Placeholder for custom action execution
return {
'action_id': action.id,
'success': True,
'custom_action_executed': True
}
# Helper functions for integration
async def _execute_workflow(execution_id: int, db: Session = None):
"""Execute a workflow (to be called asynchronously)"""
from app.database.base import get_db
if db is None:
db = next(get_db())
try:
executor = WorkflowExecutor(db)
success = await executor.execute_workflow(execution_id)
return success
except Exception as e:
logger.error(f"Error executing workflow {execution_id}: {str(e)}")
return False
finally:
if db:
db.close()
async def _schedule_delayed_execution(execution_id: int, delay_minutes: int):
"""Schedule delayed workflow execution"""
# This would be implemented with a proper scheduler in production
pass