271 lines
9.7 KiB
Python
271 lines
9.7 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Test script for the example workflows
|
|
This script tests both the Auto Settlement Letter and Deadline Reminder workflows
|
|
"""
|
|
import asyncio
|
|
import sys
|
|
import os
|
|
from datetime import date, timedelta
|
|
|
|
# Add the project root to Python path
|
|
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))
|
|
|
|
from sqlalchemy.orm import Session
|
|
from app.database.base import get_db
|
|
from app.models.files import File
|
|
from app.models.deadlines import Deadline, DeadlineStatus, DeadlinePriority
|
|
from app.models.document_workflows import DocumentWorkflow, WorkflowExecution, ExecutionStatus
|
|
from app.services.workflow_integration import log_file_status_change_sync, log_deadline_approaching_sync
|
|
from app.services.deadline_notifications import DeadlineNotificationService
|
|
|
|
|
|
def test_settlement_workflow():
|
|
"""Test the Auto Settlement Letter workflow"""
|
|
print("\n🧪 Testing Auto Settlement Letter Workflow")
|
|
print("-" * 50)
|
|
|
|
db = next(get_db())
|
|
|
|
try:
|
|
# Find the settlement workflow
|
|
workflow = db.query(DocumentWorkflow).filter(
|
|
DocumentWorkflow.name == "Auto Settlement Letter"
|
|
).first()
|
|
|
|
if not workflow:
|
|
print("❌ Auto Settlement Letter workflow not found. Please run setup script first.")
|
|
return False
|
|
|
|
print(f"✅ Found workflow: {workflow.name} (ID: {workflow.id})")
|
|
|
|
# Find a test file to close (or create one)
|
|
test_file = db.query(File).filter(
|
|
File.status != "CLOSED"
|
|
).first()
|
|
|
|
if not test_file:
|
|
print("❌ No open files found to test with. Please add a file first.")
|
|
return False
|
|
|
|
print(f"✅ Found test file: {test_file.file_no} (current status: {test_file.status})")
|
|
|
|
# Get initial execution count
|
|
initial_count = db.query(WorkflowExecution).filter(
|
|
WorkflowExecution.workflow_id == workflow.id
|
|
).count()
|
|
|
|
print(f"📊 Initial execution count: {initial_count}")
|
|
|
|
# Trigger the workflow by changing file status to CLOSED
|
|
print(f"🔄 Changing file {test_file.file_no} status to CLOSED...")
|
|
|
|
log_file_status_change_sync(
|
|
db=db,
|
|
file_no=test_file.file_no,
|
|
old_status=test_file.status,
|
|
new_status="CLOSED",
|
|
user_id=1, # Assuming admin user ID 1
|
|
notes="Test closure for workflow testing"
|
|
)
|
|
|
|
# Check if workflow execution was created
|
|
new_count = db.query(WorkflowExecution).filter(
|
|
WorkflowExecution.workflow_id == workflow.id
|
|
).count()
|
|
|
|
if new_count > initial_count:
|
|
print(f"✅ Workflow execution triggered! New execution count: {new_count}")
|
|
|
|
# Get the latest execution
|
|
latest_execution = db.query(WorkflowExecution).filter(
|
|
WorkflowExecution.workflow_id == workflow.id
|
|
).order_by(WorkflowExecution.id.desc()).first()
|
|
|
|
print(f"📋 Latest execution details:")
|
|
print(f" - Execution ID: {latest_execution.id}")
|
|
print(f" - Status: {latest_execution.status.value}")
|
|
print(f" - File No: {latest_execution.context_file_no}")
|
|
print(f" - Event Type: {latest_execution.triggered_by_event_type}")
|
|
|
|
return True
|
|
else:
|
|
print("❌ Workflow execution was not triggered")
|
|
return False
|
|
|
|
except Exception as e:
|
|
print(f"❌ Error testing settlement workflow: {str(e)}")
|
|
return False
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
def test_deadline_workflow():
|
|
"""Test the Deadline Reminder workflow"""
|
|
print("\n🧪 Testing Deadline Reminder Workflow")
|
|
print("-" * 50)
|
|
|
|
db = next(get_db())
|
|
|
|
try:
|
|
# Find the deadline reminder workflow
|
|
workflow = db.query(DocumentWorkflow).filter(
|
|
DocumentWorkflow.name == "Deadline Reminder"
|
|
).first()
|
|
|
|
if not workflow:
|
|
print("❌ Deadline Reminder workflow not found. Please run setup script first.")
|
|
return False
|
|
|
|
print(f"✅ Found workflow: {workflow.name} (ID: {workflow.id})")
|
|
|
|
# Find or create a test deadline that's approaching
|
|
approaching_date = date.today() + timedelta(days=5) # 5 days from now
|
|
|
|
test_deadline = db.query(Deadline).filter(
|
|
Deadline.status == DeadlineStatus.PENDING,
|
|
Deadline.deadline_date == approaching_date
|
|
).first()
|
|
|
|
if not test_deadline:
|
|
# Create a test deadline
|
|
from app.models.deadlines import DeadlineType
|
|
test_deadline = Deadline(
|
|
title="Test Deadline for Workflow",
|
|
description="Test deadline created for workflow testing",
|
|
deadline_date=approaching_date,
|
|
status=DeadlineStatus.PENDING,
|
|
priority=DeadlinePriority.HIGH,
|
|
deadline_type=DeadlineType.OTHER,
|
|
file_no="TEST-001",
|
|
client_id="TEST-CLIENT",
|
|
created_by_user_id=1
|
|
)
|
|
db.add(test_deadline)
|
|
db.commit()
|
|
db.refresh(test_deadline)
|
|
print(f"✅ Created test deadline: {test_deadline.title} (ID: {test_deadline.id})")
|
|
else:
|
|
print(f"✅ Found test deadline: {test_deadline.title} (ID: {test_deadline.id})")
|
|
|
|
# Get initial execution count
|
|
initial_count = db.query(WorkflowExecution).filter(
|
|
WorkflowExecution.workflow_id == workflow.id
|
|
).count()
|
|
|
|
print(f"📊 Initial execution count: {initial_count}")
|
|
|
|
# Trigger the workflow by logging a deadline approaching event
|
|
print(f"🔄 Triggering deadline approaching event for deadline {test_deadline.id}...")
|
|
|
|
log_deadline_approaching_sync(
|
|
db=db,
|
|
deadline_id=test_deadline.id,
|
|
file_no=test_deadline.file_no,
|
|
client_id=test_deadline.client_id,
|
|
days_until_deadline=5,
|
|
deadline_type="other"
|
|
)
|
|
|
|
# Check if workflow execution was created
|
|
new_count = db.query(WorkflowExecution).filter(
|
|
WorkflowExecution.workflow_id == workflow.id
|
|
).count()
|
|
|
|
if new_count > initial_count:
|
|
print(f"✅ Workflow execution triggered! New execution count: {new_count}")
|
|
|
|
# Get the latest execution
|
|
latest_execution = db.query(WorkflowExecution).filter(
|
|
WorkflowExecution.workflow_id == workflow.id
|
|
).order_by(WorkflowExecution.id.desc()).first()
|
|
|
|
print(f"📋 Latest execution details:")
|
|
print(f" - Execution ID: {latest_execution.id}")
|
|
print(f" - Status: {latest_execution.status.value}")
|
|
print(f" - Resource ID: {latest_execution.triggered_by_event_id}")
|
|
print(f" - Event Type: {latest_execution.triggered_by_event_type}")
|
|
|
|
return True
|
|
else:
|
|
print("❌ Workflow execution was not triggered")
|
|
return False
|
|
|
|
except Exception as e:
|
|
print(f"❌ Error testing deadline workflow: {str(e)}")
|
|
return False
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
def test_deadline_notification_service():
|
|
"""Test the enhanced deadline notification service"""
|
|
print("\n🧪 Testing Enhanced Deadline Notification Service")
|
|
print("-" * 50)
|
|
|
|
db = next(get_db())
|
|
|
|
try:
|
|
service = DeadlineNotificationService(db)
|
|
|
|
# Test the workflow event triggering
|
|
events_triggered = service.check_approaching_deadlines_for_workflows()
|
|
print(f"✅ Deadline notification service triggered {events_triggered} workflow events")
|
|
|
|
# Test the daily reminder processing
|
|
results = service.process_daily_reminders()
|
|
print(f"📊 Daily reminder processing results:")
|
|
print(f" - Total reminders: {results['total_reminders']}")
|
|
print(f" - Sent successfully: {results['sent_successfully']}")
|
|
print(f" - Failed: {results['failed']}")
|
|
print(f" - Workflow events triggered: {results['workflow_events_triggered']}")
|
|
|
|
return True
|
|
|
|
except Exception as e:
|
|
print(f"❌ Error testing deadline notification service: {str(e)}")
|
|
return False
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
def main():
|
|
"""Run all workflow tests"""
|
|
print("🧪 Testing Example Workflows for Delphi Database")
|
|
print("=" * 60)
|
|
|
|
success_count = 0
|
|
total_tests = 3
|
|
|
|
# Test 1: Settlement Letter Workflow
|
|
if test_settlement_workflow():
|
|
success_count += 1
|
|
|
|
# Test 2: Deadline Reminder Workflow
|
|
if test_deadline_workflow():
|
|
success_count += 1
|
|
|
|
# Test 3: Deadline Notification Service
|
|
if test_deadline_notification_service():
|
|
success_count += 1
|
|
|
|
print("\n" + "=" * 60)
|
|
print(f"🎯 Test Results: {success_count}/{total_tests} tests passed")
|
|
|
|
if success_count == total_tests:
|
|
print("✅ All workflow tests passed successfully!")
|
|
print("\n🚀 Your workflows are ready for production use!")
|
|
else:
|
|
print("❌ Some tests failed. Please check the errors above.")
|
|
print("\n🔧 Troubleshooting tips:")
|
|
print("1. Make sure the workflows were created with setup_example_workflows.py")
|
|
print("2. Check database connections and permissions")
|
|
print("3. Verify workflow configurations in the database")
|
|
|
|
return success_count == total_tests
|
|
|
|
|
|
if __name__ == "__main__":
|
|
success = main()
|
|
sys.exit(0 if success else 1)
|