#!/usr/bin/env python3 """ Test script for the example workflows This script tests both the Auto Settlement Letter and Deadline Reminder workflows """ import asyncio import sys import os from datetime import date, timedelta # Add the project root to Python path sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..')) from sqlalchemy.orm import Session from app.database.base import get_db from app.models.files import File from app.models.deadlines import Deadline, DeadlineStatus, DeadlinePriority from app.models.document_workflows import DocumentWorkflow, WorkflowExecution, ExecutionStatus from app.services.workflow_integration import log_file_status_change_sync, log_deadline_approaching_sync from app.services.deadline_notifications import DeadlineNotificationService def test_settlement_workflow(): """Test the Auto Settlement Letter workflow""" print("\n๐Ÿงช Testing Auto Settlement Letter Workflow") print("-" * 50) db = next(get_db()) try: # Find the settlement workflow workflow = db.query(DocumentWorkflow).filter( DocumentWorkflow.name == "Auto Settlement Letter" ).first() if not workflow: print("โŒ Auto Settlement Letter workflow not found. Please run setup script first.") return False print(f"โœ… Found workflow: {workflow.name} (ID: {workflow.id})") # Find a test file to close (or create one) test_file = db.query(File).filter( File.status != "CLOSED" ).first() if not test_file: print("โŒ No open files found to test with. Please add a file first.") return False print(f"โœ… Found test file: {test_file.file_no} (current status: {test_file.status})") # Get initial execution count initial_count = db.query(WorkflowExecution).filter( WorkflowExecution.workflow_id == workflow.id ).count() print(f"๐Ÿ“Š Initial execution count: {initial_count}") # Trigger the workflow by changing file status to CLOSED print(f"๐Ÿ”„ Changing file {test_file.file_no} status to CLOSED...") log_file_status_change_sync( db=db, file_no=test_file.file_no, old_status=test_file.status, new_status="CLOSED", user_id=1, # Assuming admin user ID 1 notes="Test closure for workflow testing" ) # Check if workflow execution was created new_count = db.query(WorkflowExecution).filter( WorkflowExecution.workflow_id == workflow.id ).count() if new_count > initial_count: print(f"โœ… Workflow execution triggered! New execution count: {new_count}") # Get the latest execution latest_execution = db.query(WorkflowExecution).filter( WorkflowExecution.workflow_id == workflow.id ).order_by(WorkflowExecution.id.desc()).first() print(f"๐Ÿ“‹ Latest execution details:") print(f" - Execution ID: {latest_execution.id}") print(f" - Status: {latest_execution.status.value}") print(f" - File No: {latest_execution.context_file_no}") print(f" - Event Type: {latest_execution.triggered_by_event_type}") return True else: print("โŒ Workflow execution was not triggered") return False except Exception as e: print(f"โŒ Error testing settlement workflow: {str(e)}") return False finally: db.close() def test_deadline_workflow(): """Test the Deadline Reminder workflow""" print("\n๐Ÿงช Testing Deadline Reminder Workflow") print("-" * 50) db = next(get_db()) try: # Find the deadline reminder workflow workflow = db.query(DocumentWorkflow).filter( DocumentWorkflow.name == "Deadline Reminder" ).first() if not workflow: print("โŒ Deadline Reminder workflow not found. Please run setup script first.") return False print(f"โœ… Found workflow: {workflow.name} (ID: {workflow.id})") # Find or create a test deadline that's approaching approaching_date = date.today() + timedelta(days=5) # 5 days from now test_deadline = db.query(Deadline).filter( Deadline.status == DeadlineStatus.PENDING, Deadline.deadline_date == approaching_date ).first() if not test_deadline: # Create a test deadline from app.models.deadlines import DeadlineType test_deadline = Deadline( title="Test Deadline for Workflow", description="Test deadline created for workflow testing", deadline_date=approaching_date, status=DeadlineStatus.PENDING, priority=DeadlinePriority.HIGH, deadline_type=DeadlineType.OTHER, file_no="TEST-001", client_id="TEST-CLIENT", created_by_user_id=1 ) db.add(test_deadline) db.commit() db.refresh(test_deadline) print(f"โœ… Created test deadline: {test_deadline.title} (ID: {test_deadline.id})") else: print(f"โœ… Found test deadline: {test_deadline.title} (ID: {test_deadline.id})") # Get initial execution count initial_count = db.query(WorkflowExecution).filter( WorkflowExecution.workflow_id == workflow.id ).count() print(f"๐Ÿ“Š Initial execution count: {initial_count}") # Trigger the workflow by logging a deadline approaching event print(f"๐Ÿ”„ Triggering deadline approaching event for deadline {test_deadline.id}...") log_deadline_approaching_sync( db=db, deadline_id=test_deadline.id, file_no=test_deadline.file_no, client_id=test_deadline.client_id, days_until_deadline=5, deadline_type="other" ) # Check if workflow execution was created new_count = db.query(WorkflowExecution).filter( WorkflowExecution.workflow_id == workflow.id ).count() if new_count > initial_count: print(f"โœ… Workflow execution triggered! New execution count: {new_count}") # Get the latest execution latest_execution = db.query(WorkflowExecution).filter( WorkflowExecution.workflow_id == workflow.id ).order_by(WorkflowExecution.id.desc()).first() print(f"๐Ÿ“‹ Latest execution details:") print(f" - Execution ID: {latest_execution.id}") print(f" - Status: {latest_execution.status.value}") print(f" - Resource ID: {latest_execution.triggered_by_event_id}") print(f" - Event Type: {latest_execution.triggered_by_event_type}") return True else: print("โŒ Workflow execution was not triggered") return False except Exception as e: print(f"โŒ Error testing deadline workflow: {str(e)}") return False finally: db.close() def test_deadline_notification_service(): """Test the enhanced deadline notification service""" print("\n๐Ÿงช Testing Enhanced Deadline Notification Service") print("-" * 50) db = next(get_db()) try: service = DeadlineNotificationService(db) # Test the workflow event triggering events_triggered = service.check_approaching_deadlines_for_workflows() print(f"โœ… Deadline notification service triggered {events_triggered} workflow events") # Test the daily reminder processing results = service.process_daily_reminders() print(f"๐Ÿ“Š Daily reminder processing results:") print(f" - Total reminders: {results['total_reminders']}") print(f" - Sent successfully: {results['sent_successfully']}") print(f" - Failed: {results['failed']}") print(f" - Workflow events triggered: {results['workflow_events_triggered']}") return True except Exception as e: print(f"โŒ Error testing deadline notification service: {str(e)}") return False finally: db.close() def main(): """Run all workflow tests""" print("๐Ÿงช Testing Example Workflows for Delphi Database") print("=" * 60) success_count = 0 total_tests = 3 # Test 1: Settlement Letter Workflow if test_settlement_workflow(): success_count += 1 # Test 2: Deadline Reminder Workflow if test_deadline_workflow(): success_count += 1 # Test 3: Deadline Notification Service if test_deadline_notification_service(): success_count += 1 print("\n" + "=" * 60) print(f"๐ŸŽฏ Test Results: {success_count}/{total_tests} tests passed") if success_count == total_tests: print("โœ… All workflow tests passed successfully!") print("\n๐Ÿš€ Your workflows are ready for production use!") else: print("โŒ Some tests failed. Please check the errors above.") print("\n๐Ÿ”ง Troubleshooting tips:") print("1. Make sure the workflows were created with setup_example_workflows.py") print("2. Check database connections and permissions") print("3. Verify workflow configurations in the database") return success_count == total_tests if __name__ == "__main__": success = main() sys.exit(0 if success else 1)