Files
delphi-database/scripts/test_workflows.py
HotSwapp bac8cc4bd5 changes
2025-08-18 20:20:04 -05:00

271 lines
9.7 KiB
Python

#!/usr/bin/env python3
"""
Test script for the example workflows
This script tests both the Auto Settlement Letter and Deadline Reminder workflows
"""
import asyncio
import sys
import os
from datetime import date, timedelta
# Add the project root to Python path
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))
from sqlalchemy.orm import Session
from app.database.base import get_db
from app.models.files import File
from app.models.deadlines import Deadline, DeadlineStatus, DeadlinePriority
from app.models.document_workflows import DocumentWorkflow, WorkflowExecution, ExecutionStatus
from app.services.workflow_integration import log_file_status_change_sync, log_deadline_approaching_sync
from app.services.deadline_notifications import DeadlineNotificationService
def test_settlement_workflow():
"""Test the Auto Settlement Letter workflow"""
print("\n🧪 Testing Auto Settlement Letter Workflow")
print("-" * 50)
db = next(get_db())
try:
# Find the settlement workflow
workflow = db.query(DocumentWorkflow).filter(
DocumentWorkflow.name == "Auto Settlement Letter"
).first()
if not workflow:
print("❌ Auto Settlement Letter workflow not found. Please run setup script first.")
return False
print(f"✅ Found workflow: {workflow.name} (ID: {workflow.id})")
# Find a test file to close (or create one)
test_file = db.query(File).filter(
File.status != "CLOSED"
).first()
if not test_file:
print("❌ No open files found to test with. Please add a file first.")
return False
print(f"✅ Found test file: {test_file.file_no} (current status: {test_file.status})")
# Get initial execution count
initial_count = db.query(WorkflowExecution).filter(
WorkflowExecution.workflow_id == workflow.id
).count()
print(f"📊 Initial execution count: {initial_count}")
# Trigger the workflow by changing file status to CLOSED
print(f"🔄 Changing file {test_file.file_no} status to CLOSED...")
log_file_status_change_sync(
db=db,
file_no=test_file.file_no,
old_status=test_file.status,
new_status="CLOSED",
user_id=1, # Assuming admin user ID 1
notes="Test closure for workflow testing"
)
# Check if workflow execution was created
new_count = db.query(WorkflowExecution).filter(
WorkflowExecution.workflow_id == workflow.id
).count()
if new_count > initial_count:
print(f"✅ Workflow execution triggered! New execution count: {new_count}")
# Get the latest execution
latest_execution = db.query(WorkflowExecution).filter(
WorkflowExecution.workflow_id == workflow.id
).order_by(WorkflowExecution.id.desc()).first()
print(f"📋 Latest execution details:")
print(f" - Execution ID: {latest_execution.id}")
print(f" - Status: {latest_execution.status.value}")
print(f" - File No: {latest_execution.context_file_no}")
print(f" - Event Type: {latest_execution.triggered_by_event_type}")
return True
else:
print("❌ Workflow execution was not triggered")
return False
except Exception as e:
print(f"❌ Error testing settlement workflow: {str(e)}")
return False
finally:
db.close()
def test_deadline_workflow():
"""Test the Deadline Reminder workflow"""
print("\n🧪 Testing Deadline Reminder Workflow")
print("-" * 50)
db = next(get_db())
try:
# Find the deadline reminder workflow
workflow = db.query(DocumentWorkflow).filter(
DocumentWorkflow.name == "Deadline Reminder"
).first()
if not workflow:
print("❌ Deadline Reminder workflow not found. Please run setup script first.")
return False
print(f"✅ Found workflow: {workflow.name} (ID: {workflow.id})")
# Find or create a test deadline that's approaching
approaching_date = date.today() + timedelta(days=5) # 5 days from now
test_deadline = db.query(Deadline).filter(
Deadline.status == DeadlineStatus.PENDING,
Deadline.deadline_date == approaching_date
).first()
if not test_deadline:
# Create a test deadline
from app.models.deadlines import DeadlineType
test_deadline = Deadline(
title="Test Deadline for Workflow",
description="Test deadline created for workflow testing",
deadline_date=approaching_date,
status=DeadlineStatus.PENDING,
priority=DeadlinePriority.HIGH,
deadline_type=DeadlineType.OTHER,
file_no="TEST-001",
client_id="TEST-CLIENT",
created_by_user_id=1
)
db.add(test_deadline)
db.commit()
db.refresh(test_deadline)
print(f"✅ Created test deadline: {test_deadline.title} (ID: {test_deadline.id})")
else:
print(f"✅ Found test deadline: {test_deadline.title} (ID: {test_deadline.id})")
# Get initial execution count
initial_count = db.query(WorkflowExecution).filter(
WorkflowExecution.workflow_id == workflow.id
).count()
print(f"📊 Initial execution count: {initial_count}")
# Trigger the workflow by logging a deadline approaching event
print(f"🔄 Triggering deadline approaching event for deadline {test_deadline.id}...")
log_deadline_approaching_sync(
db=db,
deadline_id=test_deadline.id,
file_no=test_deadline.file_no,
client_id=test_deadline.client_id,
days_until_deadline=5,
deadline_type="other"
)
# Check if workflow execution was created
new_count = db.query(WorkflowExecution).filter(
WorkflowExecution.workflow_id == workflow.id
).count()
if new_count > initial_count:
print(f"✅ Workflow execution triggered! New execution count: {new_count}")
# Get the latest execution
latest_execution = db.query(WorkflowExecution).filter(
WorkflowExecution.workflow_id == workflow.id
).order_by(WorkflowExecution.id.desc()).first()
print(f"📋 Latest execution details:")
print(f" - Execution ID: {latest_execution.id}")
print(f" - Status: {latest_execution.status.value}")
print(f" - Resource ID: {latest_execution.triggered_by_event_id}")
print(f" - Event Type: {latest_execution.triggered_by_event_type}")
return True
else:
print("❌ Workflow execution was not triggered")
return False
except Exception as e:
print(f"❌ Error testing deadline workflow: {str(e)}")
return False
finally:
db.close()
def test_deadline_notification_service():
"""Test the enhanced deadline notification service"""
print("\n🧪 Testing Enhanced Deadline Notification Service")
print("-" * 50)
db = next(get_db())
try:
service = DeadlineNotificationService(db)
# Test the workflow event triggering
events_triggered = service.check_approaching_deadlines_for_workflows()
print(f"✅ Deadline notification service triggered {events_triggered} workflow events")
# Test the daily reminder processing
results = service.process_daily_reminders()
print(f"📊 Daily reminder processing results:")
print(f" - Total reminders: {results['total_reminders']}")
print(f" - Sent successfully: {results['sent_successfully']}")
print(f" - Failed: {results['failed']}")
print(f" - Workflow events triggered: {results['workflow_events_triggered']}")
return True
except Exception as e:
print(f"❌ Error testing deadline notification service: {str(e)}")
return False
finally:
db.close()
def main():
"""Run all workflow tests"""
print("🧪 Testing Example Workflows for Delphi Database")
print("=" * 60)
success_count = 0
total_tests = 3
# Test 1: Settlement Letter Workflow
if test_settlement_workflow():
success_count += 1
# Test 2: Deadline Reminder Workflow
if test_deadline_workflow():
success_count += 1
# Test 3: Deadline Notification Service
if test_deadline_notification_service():
success_count += 1
print("\n" + "=" * 60)
print(f"🎯 Test Results: {success_count}/{total_tests} tests passed")
if success_count == total_tests:
print("✅ All workflow tests passed successfully!")
print("\n🚀 Your workflows are ready for production use!")
else:
print("❌ Some tests failed. Please check the errors above.")
print("\n🔧 Troubleshooting tips:")
print("1. Make sure the workflows were created with setup_example_workflows.py")
print("2. Check database connections and permissions")
print("3. Verify workflow configurations in the database")
return success_count == total_tests
if __name__ == "__main__":
success = main()
sys.exit(0 if success else 1)