""" Document Workflow Management API This API provides comprehensive workflow automation management including: - Workflow creation and configuration - Event logging and processing - Execution monitoring and control - Template management for common workflows """ from __future__ import annotations from typing import List, Optional, Dict, Any, Union from fastapi import APIRouter, Depends, HTTPException, status, Query, Body from sqlalchemy.orm import Session, joinedload from sqlalchemy import func, or_, and_, desc from pydantic import BaseModel, Field from datetime import datetime, date, timedelta import json from app.database.base import get_db from app.auth.security import get_current_user from app.models.user import User from app.models.document_workflows import ( DocumentWorkflow, WorkflowAction, WorkflowExecution, EventLog, WorkflowTemplate, WorkflowTriggerType, WorkflowActionType, ExecutionStatus, WorkflowStatus ) from app.services.workflow_engine import EventProcessor, WorkflowExecutor from app.services.query_utils import paginate_with_total router = APIRouter() # Pydantic schemas for API class WorkflowCreate(BaseModel): name: str = Field(..., max_length=200) description: Optional[str] = None trigger_type: WorkflowTriggerType trigger_conditions: Optional[Dict[str, Any]] = None delay_minutes: int = Field(0, ge=0) max_retries: int = Field(3, ge=0, le=10) retry_delay_minutes: int = Field(30, ge=1) timeout_minutes: int = Field(60, ge=1) file_type_filter: Optional[List[str]] = None status_filter: Optional[List[str]] = None attorney_filter: Optional[List[str]] = None client_filter: Optional[List[str]] = None schedule_cron: Optional[str] = None schedule_timezone: str = "UTC" priority: int = Field(5, ge=1, le=10) category: Optional[str] = None tags: Optional[List[str]] = None class WorkflowUpdate(BaseModel): name: Optional[str] = None description: Optional[str] = None status: Optional[WorkflowStatus] = None trigger_conditions: Optional[Dict[str, Any]] = None delay_minutes: Optional[int] = None max_retries: Optional[int] = None retry_delay_minutes: Optional[int] = None timeout_minutes: Optional[int] = None file_type_filter: Optional[List[str]] = None status_filter: Optional[List[str]] = None attorney_filter: Optional[List[str]] = None client_filter: Optional[List[str]] = None schedule_cron: Optional[str] = None schedule_timezone: Optional[str] = None priority: Optional[int] = None category: Optional[str] = None tags: Optional[List[str]] = None class WorkflowActionCreate(BaseModel): action_type: WorkflowActionType action_order: int = Field(1, ge=1) action_name: Optional[str] = None parameters: Optional[Dict[str, Any]] = None template_id: Optional[int] = None output_format: str = "DOCX" custom_filename_template: Optional[str] = None email_template_id: Optional[int] = None email_recipients: Optional[List[str]] = None email_subject_template: Optional[str] = None condition: Optional[Dict[str, Any]] = None continue_on_failure: bool = False class WorkflowActionUpdate(BaseModel): action_type: Optional[WorkflowActionType] = None action_order: Optional[int] = None action_name: Optional[str] = None parameters: Optional[Dict[str, Any]] = None template_id: Optional[int] = None output_format: Optional[str] = None custom_filename_template: Optional[str] = None email_template_id: Optional[int] = None email_recipients: Optional[List[str]] = None email_subject_template: Optional[str] = None condition: Optional[Dict[str, Any]] = None continue_on_failure: Optional[bool] = None class WorkflowResponse(BaseModel): id: int name: str description: Optional[str] status: WorkflowStatus trigger_type: WorkflowTriggerType trigger_conditions: Optional[Dict[str, Any]] delay_minutes: int max_retries: int priority: int category: Optional[str] tags: Optional[List[str]] created_by: Optional[str] created_at: datetime updated_at: datetime last_triggered_at: Optional[datetime] execution_count: int success_count: int failure_count: int class Config: from_attributes = True class WorkflowActionResponse(BaseModel): id: int workflow_id: int action_type: WorkflowActionType action_order: int action_name: Optional[str] parameters: Optional[Dict[str, Any]] template_id: Optional[int] output_format: str condition: Optional[Dict[str, Any]] continue_on_failure: bool class Config: from_attributes = True class WorkflowExecutionResponse(BaseModel): id: int workflow_id: int triggered_by_event_id: Optional[str] triggered_by_event_type: Optional[str] context_file_no: Optional[str] context_client_id: Optional[str] status: ExecutionStatus started_at: Optional[datetime] completed_at: Optional[datetime] execution_duration_seconds: Optional[int] retry_count: int error_message: Optional[str] generated_documents: Optional[List[Dict[str, Any]]] class Config: from_attributes = True class EventLogCreate(BaseModel): event_type: str event_source: str file_no: Optional[str] = None client_id: Optional[str] = None resource_type: Optional[str] = None resource_id: Optional[str] = None event_data: Optional[Dict[str, Any]] = None previous_state: Optional[Dict[str, Any]] = None new_state: Optional[Dict[str, Any]] = None class EventLogResponse(BaseModel): id: int event_id: str event_type: str event_source: str file_no: Optional[str] client_id: Optional[str] resource_type: Optional[str] resource_id: Optional[str] event_data: Optional[Dict[str, Any]] processed: bool triggered_workflows: Optional[List[int]] occurred_at: datetime class Config: from_attributes = True class WorkflowTestRequest(BaseModel): event_type: str event_data: Optional[Dict[str, Any]] = None file_no: Optional[str] = None client_id: Optional[str] = None class WorkflowStatsResponse(BaseModel): total_workflows: int active_workflows: int total_executions: int successful_executions: int failed_executions: int pending_executions: int workflows_by_trigger_type: Dict[str, int] executions_by_day: List[Dict[str, Any]] # Workflow CRUD endpoints @router.post("/workflows/", response_model=WorkflowResponse) async def create_workflow( workflow_data: WorkflowCreate, db: Session = Depends(get_db), current_user: User = Depends(get_current_user) ): """Create a new document workflow""" # Check for duplicate names existing = db.query(DocumentWorkflow).filter( DocumentWorkflow.name == workflow_data.name ).first() if existing: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=f"Workflow with name '{workflow_data.name}' already exists" ) # Validate cron expression if provided if workflow_data.schedule_cron: try: from croniter import croniter croniter(workflow_data.schedule_cron) except Exception: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Invalid cron expression" ) # Create workflow workflow = DocumentWorkflow( name=workflow_data.name, description=workflow_data.description, trigger_type=workflow_data.trigger_type, trigger_conditions=workflow_data.trigger_conditions, delay_minutes=workflow_data.delay_minutes, max_retries=workflow_data.max_retries, retry_delay_minutes=workflow_data.retry_delay_minutes, timeout_minutes=workflow_data.timeout_minutes, file_type_filter=workflow_data.file_type_filter, status_filter=workflow_data.status_filter, attorney_filter=workflow_data.attorney_filter, client_filter=workflow_data.client_filter, schedule_cron=workflow_data.schedule_cron, schedule_timezone=workflow_data.schedule_timezone, priority=workflow_data.priority, category=workflow_data.category, tags=workflow_data.tags, created_by=current_user.username, status=WorkflowStatus.ACTIVE ) db.add(workflow) db.commit() db.refresh(workflow) return workflow @router.get("/workflows/", response_model=List[WorkflowResponse]) async def list_workflows( skip: int = Query(0, ge=0), limit: int = Query(50, ge=1, le=200), status: Optional[WorkflowStatus] = Query(None), trigger_type: Optional[WorkflowTriggerType] = Query(None), category: Optional[str] = Query(None), search: Optional[str] = Query(None), db: Session = Depends(get_db), current_user: User = Depends(get_current_user) ): """List workflows with filtering options""" query = db.query(DocumentWorkflow) if status: query = query.filter(DocumentWorkflow.status == status) if trigger_type: query = query.filter(DocumentWorkflow.trigger_type == trigger_type) if category: query = query.filter(DocumentWorkflow.category == category) if search: search_filter = f"%{search}%" query = query.filter( or_( DocumentWorkflow.name.ilike(search_filter), DocumentWorkflow.description.ilike(search_filter) ) ) query = query.order_by(DocumentWorkflow.priority.desc(), DocumentWorkflow.name) workflows, _ = paginate_with_total(query, skip, limit, False) return workflows @router.get("/workflows/{workflow_id}", response_model=WorkflowResponse) async def get_workflow( workflow_id: int, db: Session = Depends(get_db), current_user: User = Depends(get_current_user) ): """Get a specific workflow by ID""" workflow = db.query(DocumentWorkflow).filter( DocumentWorkflow.id == workflow_id ).first() if not workflow: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Workflow not found" ) return workflow @router.put("/workflows/{workflow_id}", response_model=WorkflowResponse) async def update_workflow( workflow_id: int, workflow_data: WorkflowUpdate, db: Session = Depends(get_db), current_user: User = Depends(get_current_user) ): """Update a workflow""" workflow = db.query(DocumentWorkflow).filter( DocumentWorkflow.id == workflow_id ).first() if not workflow: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Workflow not found" ) # Update fields that are provided update_data = workflow_data.dict(exclude_unset=True) for field, value in update_data.items(): setattr(workflow, field, value) db.commit() db.refresh(workflow) return workflow @router.delete("/workflows/{workflow_id}") async def delete_workflow( workflow_id: int, db: Session = Depends(get_db), current_user: User = Depends(get_current_user) ): """Delete a workflow (soft delete by setting status to archived)""" workflow = db.query(DocumentWorkflow).filter( DocumentWorkflow.id == workflow_id ).first() if not workflow: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Workflow not found" ) # Soft delete workflow.status = WorkflowStatus.ARCHIVED db.commit() return {"message": "Workflow archived successfully"} # Workflow Actions endpoints @router.post("/workflows/{workflow_id}/actions", response_model=WorkflowActionResponse) async def create_workflow_action( workflow_id: int, action_data: WorkflowActionCreate, db: Session = Depends(get_db), current_user: User = Depends(get_current_user) ): """Create a new action for a workflow""" workflow = db.query(DocumentWorkflow).filter( DocumentWorkflow.id == workflow_id ).first() if not workflow: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Workflow not found" ) action = WorkflowAction( workflow_id=workflow_id, action_type=action_data.action_type, action_order=action_data.action_order, action_name=action_data.action_name, parameters=action_data.parameters, template_id=action_data.template_id, output_format=action_data.output_format, custom_filename_template=action_data.custom_filename_template, email_template_id=action_data.email_template_id, email_recipients=action_data.email_recipients, email_subject_template=action_data.email_subject_template, condition=action_data.condition, continue_on_failure=action_data.continue_on_failure ) db.add(action) db.commit() db.refresh(action) return action @router.get("/workflows/{workflow_id}/actions", response_model=List[WorkflowActionResponse]) async def list_workflow_actions( workflow_id: int, db: Session = Depends(get_db), current_user: User = Depends(get_current_user) ): """List actions for a workflow""" actions = db.query(WorkflowAction).filter( WorkflowAction.workflow_id == workflow_id ).order_by(WorkflowAction.action_order, WorkflowAction.id).all() return actions @router.put("/workflows/{workflow_id}/actions/{action_id}", response_model=WorkflowActionResponse) async def update_workflow_action( workflow_id: int, action_id: int, action_data: WorkflowActionUpdate, db: Session = Depends(get_db), current_user: User = Depends(get_current_user) ): """Update a workflow action""" action = db.query(WorkflowAction).filter( WorkflowAction.id == action_id, WorkflowAction.workflow_id == workflow_id ).first() if not action: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Action not found" ) # Update fields that are provided update_data = action_data.dict(exclude_unset=True) for field, value in update_data.items(): setattr(action, field, value) db.commit() db.refresh(action) return action @router.delete("/workflows/{workflow_id}/actions/{action_id}") async def delete_workflow_action( workflow_id: int, action_id: int, db: Session = Depends(get_db), current_user: User = Depends(get_current_user) ): """Delete a workflow action""" action = db.query(WorkflowAction).filter( WorkflowAction.id == action_id, WorkflowAction.workflow_id == workflow_id ).first() if not action: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Action not found" ) db.delete(action) db.commit() return {"message": "Action deleted successfully"} # Event Management endpoints @router.post("/events/", response_model=dict) async def log_event( event_data: EventLogCreate, db: Session = Depends(get_db), current_user: User = Depends(get_current_user) ): """Log a system event that may trigger workflows""" processor = EventProcessor(db) event_id = await processor.log_event( event_type=event_data.event_type, event_source=event_data.event_source, file_no=event_data.file_no, client_id=event_data.client_id, user_id=current_user.id, resource_type=event_data.resource_type, resource_id=event_data.resource_id, event_data=event_data.event_data, previous_state=event_data.previous_state, new_state=event_data.new_state ) return {"event_id": event_id, "message": "Event logged successfully"} @router.get("/events/", response_model=List[EventLogResponse]) async def list_events( skip: int = Query(0, ge=0), limit: int = Query(50, ge=1, le=200), event_type: Optional[str] = Query(None), file_no: Optional[str] = Query(None), processed: Optional[bool] = Query(None), db: Session = Depends(get_db), current_user: User = Depends(get_current_user) ): """List system events""" query = db.query(EventLog) if event_type: query = query.filter(EventLog.event_type == event_type) if file_no: query = query.filter(EventLog.file_no == file_no) if processed is not None: query = query.filter(EventLog.processed == processed) query = query.order_by(desc(EventLog.occurred_at)) events, _ = paginate_with_total(query, skip, limit, False) return events # Execution Management endpoints @router.get("/executions/", response_model=List[WorkflowExecutionResponse]) async def list_executions( skip: int = Query(0, ge=0), limit: int = Query(50, ge=1, le=200), workflow_id: Optional[int] = Query(None), status: Optional[ExecutionStatus] = Query(None), file_no: Optional[str] = Query(None), db: Session = Depends(get_db), current_user: User = Depends(get_current_user) ): """List workflow executions""" query = db.query(WorkflowExecution) if workflow_id: query = query.filter(WorkflowExecution.workflow_id == workflow_id) if status: query = query.filter(WorkflowExecution.status == status) if file_no: query = query.filter(WorkflowExecution.context_file_no == file_no) query = query.order_by(desc(WorkflowExecution.started_at)) executions, _ = paginate_with_total(query, skip, limit, False) return executions @router.get("/executions/{execution_id}", response_model=WorkflowExecutionResponse) async def get_execution( execution_id: int, db: Session = Depends(get_db), current_user: User = Depends(get_current_user) ): """Get details of a specific execution""" execution = db.query(WorkflowExecution).filter( WorkflowExecution.id == execution_id ).first() if not execution: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Execution not found" ) return execution @router.post("/executions/{execution_id}/retry") async def retry_execution( execution_id: int, db: Session = Depends(get_db), current_user: User = Depends(get_current_user) ): """Retry a failed workflow execution""" execution = db.query(WorkflowExecution).filter( WorkflowExecution.id == execution_id ).first() if not execution: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Execution not found" ) if execution.status not in [ExecutionStatus.FAILED, ExecutionStatus.RETRYING]: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Only failed executions can be retried" ) # Reset execution for retry execution.status = ExecutionStatus.PENDING execution.error_message = None execution.next_retry_at = None execution.retry_count += 1 db.commit() # Execute the workflow executor = WorkflowExecutor(db) success = await executor.execute_workflow(execution_id) return {"message": "Execution retried", "success": success} # Testing and Management endpoints @router.post("/workflows/{workflow_id}/test") async def test_workflow( workflow_id: int, test_request: WorkflowTestRequest, db: Session = Depends(get_db), current_user: User = Depends(get_current_user) ): """Test a workflow with simulated event data""" workflow = db.query(DocumentWorkflow).filter( DocumentWorkflow.id == workflow_id ).first() if not workflow: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Workflow not found" ) # Create a test event processor = EventProcessor(db) event_id = await processor.log_event( event_type=test_request.event_type, event_source="workflow_test", file_no=test_request.file_no, client_id=test_request.client_id, user_id=current_user.id, event_data=test_request.event_data or {} ) return {"message": "Test event logged", "event_id": event_id} @router.get("/stats", response_model=WorkflowStatsResponse) async def get_workflow_stats( days: int = Query(30, ge=1, le=365), db: Session = Depends(get_db), current_user: User = Depends(get_current_user) ): """Get workflow system statistics""" # Basic counts total_workflows = db.query(func.count(DocumentWorkflow.id)).scalar() active_workflows = db.query(func.count(DocumentWorkflow.id)).filter( DocumentWorkflow.status == WorkflowStatus.ACTIVE ).scalar() total_executions = db.query(func.count(WorkflowExecution.id)).scalar() successful_executions = db.query(func.count(WorkflowExecution.id)).filter( WorkflowExecution.status == ExecutionStatus.COMPLETED ).scalar() failed_executions = db.query(func.count(WorkflowExecution.id)).filter( WorkflowExecution.status == ExecutionStatus.FAILED ).scalar() pending_executions = db.query(func.count(WorkflowExecution.id)).filter( WorkflowExecution.status.in_([ExecutionStatus.PENDING, ExecutionStatus.RUNNING]) ).scalar() # Workflows by trigger type trigger_stats = db.query( DocumentWorkflow.trigger_type, func.count(DocumentWorkflow.id) ).group_by(DocumentWorkflow.trigger_type).all() workflows_by_trigger_type = { trigger.value: count for trigger, count in trigger_stats } # Executions by day (for the chart) cutoff_date = datetime.now() - timedelta(days=days) daily_stats = db.query( func.date(WorkflowExecution.started_at).label('date'), func.count(WorkflowExecution.id).label('count'), func.sum(func.case((WorkflowExecution.status == ExecutionStatus.COMPLETED, 1), else_=0)).label('successful'), func.sum(func.case((WorkflowExecution.status == ExecutionStatus.FAILED, 1), else_=0)).label('failed') ).filter( WorkflowExecution.started_at >= cutoff_date ).group_by(func.date(WorkflowExecution.started_at)).all() executions_by_day = [ { 'date': row.date.isoformat() if row.date else None, 'total': row.count, 'successful': row.successful or 0, 'failed': row.failed or 0 } for row in daily_stats ] return WorkflowStatsResponse( total_workflows=total_workflows or 0, active_workflows=active_workflows or 0, total_executions=total_executions or 0, successful_executions=successful_executions or 0, failed_executions=failed_executions or 0, pending_executions=pending_executions or 0, workflows_by_trigger_type=workflows_by_trigger_type, executions_by_day=executions_by_day )