Files
delphi-database/app/services/deadline_calendar.py
HotSwapp bac8cc4bd5 changes
2025-08-18 20:20:04 -05:00

698 lines
27 KiB
Python

"""
Deadline calendar integration service
Provides calendar views and scheduling utilities for deadlines
"""
from typing import List, Dict, Any, Optional, Tuple
from datetime import datetime, date, timezone, timedelta
from calendar import monthrange, weekday
from sqlalchemy.orm import Session, joinedload
from sqlalchemy import and_, func, or_, desc
from app.models import (
Deadline, CourtCalendar, User, Employee,
DeadlineType, DeadlinePriority, DeadlineStatus
)
from app.utils.logging import app_logger
logger = app_logger
class DeadlineCalendarService:
"""Service for deadline calendar views and scheduling"""
def __init__(self, db: Session):
self.db = db
def get_monthly_calendar(
self,
year: int,
month: int,
user_id: Optional[int] = None,
employee_id: Optional[str] = None,
show_completed: bool = False
) -> Dict[str, Any]:
"""Get monthly calendar view with deadlines"""
# Get first and last day of month
first_day = date(year, month, 1)
last_day = date(year, month, monthrange(year, month)[1])
# Get first Monday of calendar view (may be in previous month)
first_monday = first_day - timedelta(days=first_day.weekday())
# Get last Sunday of calendar view (may be in next month)
last_sunday = last_day + timedelta(days=(6 - last_day.weekday()))
# Build query for deadlines in the calendar period
query = self.db.query(Deadline).filter(
Deadline.deadline_date.between(first_monday, last_sunday)
)
if not show_completed:
query = query.filter(Deadline.status != DeadlineStatus.COMPLETED)
if user_id:
query = query.filter(Deadline.assigned_to_user_id == user_id)
if employee_id:
query = query.filter(Deadline.assigned_to_employee_id == employee_id)
deadlines = query.options(
joinedload(Deadline.file),
joinedload(Deadline.client),
joinedload(Deadline.assigned_to_user),
joinedload(Deadline.assigned_to_employee)
).order_by(
Deadline.deadline_date.asc(),
Deadline.deadline_time.asc(),
Deadline.priority.desc()
).all()
# Build calendar grid (6 weeks x 7 days)
calendar_weeks = []
current_date = first_monday
for week in range(6):
week_days = []
for day in range(7):
day_date = current_date + timedelta(days=day)
# Get deadlines for this day
day_deadlines = [
d for d in deadlines if d.deadline_date == day_date
]
# Format deadline data
formatted_deadlines = []
for deadline in day_deadlines:
formatted_deadlines.append({
"id": deadline.id,
"title": deadline.title,
"deadline_time": deadline.deadline_time.strftime("%H:%M") if deadline.deadline_time else None,
"priority": deadline.priority.value,
"deadline_type": deadline.deadline_type.value,
"status": deadline.status.value,
"file_no": deadline.file_no,
"client_name": self._get_client_name(deadline),
"court_name": deadline.court_name,
"is_overdue": deadline.is_overdue,
"is_court_date": deadline.deadline_type == DeadlineType.COURT_HEARING
})
week_days.append({
"date": day_date,
"day_number": day_date.day,
"is_current_month": day_date.month == month,
"is_today": day_date == date.today(),
"is_weekend": day_date.weekday() >= 5,
"deadlines": formatted_deadlines,
"deadline_count": len(formatted_deadlines),
"has_overdue": any(d["is_overdue"] for d in formatted_deadlines),
"has_court_date": any(d["is_court_date"] for d in formatted_deadlines),
"max_priority": self._get_max_priority(day_deadlines)
})
calendar_weeks.append({
"week_start": current_date,
"days": week_days
})
current_date += timedelta(days=7)
# Calculate summary statistics
month_deadlines = [d for d in deadlines if d.deadline_date.month == month]
return {
"year": year,
"month": month,
"month_name": first_day.strftime("%B"),
"calendar_period": {
"start_date": first_monday,
"end_date": last_sunday
},
"summary": {
"total_deadlines": len(month_deadlines),
"overdue": len([d for d in month_deadlines if d.is_overdue]),
"pending": len([d for d in month_deadlines if d.status == DeadlineStatus.PENDING]),
"completed": len([d for d in month_deadlines if d.status == DeadlineStatus.COMPLETED]),
"court_dates": len([d for d in month_deadlines if d.deadline_type == DeadlineType.COURT_HEARING])
},
"weeks": calendar_weeks
}
def get_weekly_calendar(
self,
year: int,
week: int,
user_id: Optional[int] = None,
employee_id: Optional[str] = None,
show_completed: bool = False
) -> Dict[str, Any]:
"""Get weekly calendar view with detailed scheduling"""
# Calculate the Monday of the specified week
jan_1 = date(year, 1, 1)
jan_1_weekday = jan_1.weekday()
# Find the Monday of week 1
days_to_monday = -jan_1_weekday if jan_1_weekday == 0 else 7 - jan_1_weekday
first_monday = jan_1 + timedelta(days=days_to_monday)
# Calculate the target week's Monday
week_monday = first_monday + timedelta(weeks=week - 1)
week_sunday = week_monday + timedelta(days=6)
# Build query for deadlines in the week
query = self.db.query(Deadline).filter(
Deadline.deadline_date.between(week_monday, week_sunday)
)
if not show_completed:
query = query.filter(Deadline.status != DeadlineStatus.COMPLETED)
if user_id:
query = query.filter(Deadline.assigned_to_user_id == user_id)
if employee_id:
query = query.filter(Deadline.assigned_to_employee_id == employee_id)
deadlines = query.options(
joinedload(Deadline.file),
joinedload(Deadline.client),
joinedload(Deadline.assigned_to_user),
joinedload(Deadline.assigned_to_employee)
).order_by(
Deadline.deadline_date.asc(),
Deadline.deadline_time.asc(),
Deadline.priority.desc()
).all()
# Build daily schedule
week_days = []
for day_offset in range(7):
day_date = week_monday + timedelta(days=day_offset)
# Get deadlines for this day
day_deadlines = [d for d in deadlines if d.deadline_date == day_date]
# Group deadlines by time
timed_deadlines = []
all_day_deadlines = []
for deadline in day_deadlines:
deadline_data = {
"id": deadline.id,
"title": deadline.title,
"deadline_time": deadline.deadline_time,
"priority": deadline.priority.value,
"deadline_type": deadline.deadline_type.value,
"status": deadline.status.value,
"file_no": deadline.file_no,
"client_name": self._get_client_name(deadline),
"assigned_to": self._get_assigned_to(deadline),
"court_name": deadline.court_name,
"case_number": deadline.case_number,
"description": deadline.description,
"is_overdue": deadline.is_overdue,
"estimated_duration": self._get_estimated_duration(deadline)
}
if deadline.deadline_time:
timed_deadlines.append(deadline_data)
else:
all_day_deadlines.append(deadline_data)
# Sort timed deadlines by time
timed_deadlines.sort(key=lambda x: x["deadline_time"])
week_days.append({
"date": day_date,
"day_name": day_date.strftime("%A"),
"day_short": day_date.strftime("%a"),
"is_today": day_date == date.today(),
"is_weekend": day_date.weekday() >= 5,
"timed_deadlines": timed_deadlines,
"all_day_deadlines": all_day_deadlines,
"total_deadlines": len(day_deadlines),
"has_court_dates": any(d.deadline_type == DeadlineType.COURT_HEARING for d in day_deadlines)
})
return {
"year": year,
"week": week,
"week_period": {
"start_date": week_monday,
"end_date": week_sunday
},
"summary": {
"total_deadlines": len(deadlines),
"timed_deadlines": len([d for d in deadlines if d.deadline_time]),
"all_day_deadlines": len([d for d in deadlines if not d.deadline_time]),
"court_dates": len([d for d in deadlines if d.deadline_type == DeadlineType.COURT_HEARING])
},
"days": week_days
}
def get_daily_schedule(
self,
target_date: date,
user_id: Optional[int] = None,
employee_id: Optional[str] = None,
show_completed: bool = False
) -> Dict[str, Any]:
"""Get detailed daily schedule with time slots"""
# Build query for deadlines on the target date
query = self.db.query(Deadline).filter(
Deadline.deadline_date == target_date
)
if not show_completed:
query = query.filter(Deadline.status != DeadlineStatus.COMPLETED)
if user_id:
query = query.filter(Deadline.assigned_to_user_id == user_id)
if employee_id:
query = query.filter(Deadline.assigned_to_employee_id == employee_id)
deadlines = query.options(
joinedload(Deadline.file),
joinedload(Deadline.client),
joinedload(Deadline.assigned_to_user),
joinedload(Deadline.assigned_to_employee)
).order_by(
Deadline.deadline_time.asc(),
Deadline.priority.desc()
).all()
# Create time slots (30-minute intervals from 8 AM to 6 PM)
time_slots = []
start_hour = 8
end_hour = 18
for hour in range(start_hour, end_hour):
for minute in [0, 30]:
slot_time = datetime.combine(target_date, datetime.min.time().replace(hour=hour, minute=minute))
# Find deadlines in this time slot
slot_deadlines = []
for deadline in deadlines:
if deadline.deadline_time:
deadline_time = deadline.deadline_time.replace(tzinfo=None)
# Check if deadline falls within this 30-minute slot
if (slot_time <= deadline_time < slot_time + timedelta(minutes=30)):
slot_deadlines.append({
"id": deadline.id,
"title": deadline.title,
"deadline_time": deadline.deadline_time,
"priority": deadline.priority.value,
"deadline_type": deadline.deadline_type.value,
"status": deadline.status.value,
"file_no": deadline.file_no,
"client_name": self._get_client_name(deadline),
"court_name": deadline.court_name,
"case_number": deadline.case_number,
"description": deadline.description,
"estimated_duration": self._get_estimated_duration(deadline)
})
time_slots.append({
"time": slot_time.strftime("%H:%M"),
"datetime": slot_time,
"deadlines": slot_deadlines,
"is_busy": len(slot_deadlines) > 0
})
# Get all-day deadlines
all_day_deadlines = []
for deadline in deadlines:
if not deadline.deadline_time:
all_day_deadlines.append({
"id": deadline.id,
"title": deadline.title,
"priority": deadline.priority.value,
"deadline_type": deadline.deadline_type.value,
"status": deadline.status.value,
"file_no": deadline.file_no,
"client_name": self._get_client_name(deadline),
"description": deadline.description
})
return {
"date": target_date,
"day_name": target_date.strftime("%A, %B %d, %Y"),
"is_today": target_date == date.today(),
"summary": {
"total_deadlines": len(deadlines),
"timed_deadlines": len([d for d in deadlines if d.deadline_time]),
"all_day_deadlines": len(all_day_deadlines),
"court_dates": len([d for d in deadlines if d.deadline_type == DeadlineType.COURT_HEARING]),
"overdue": len([d for d in deadlines if d.is_overdue])
},
"all_day_deadlines": all_day_deadlines,
"time_slots": time_slots,
"business_hours": {
"start": f"{start_hour:02d}:00",
"end": f"{end_hour:02d}:00"
}
}
def find_available_slots(
self,
start_date: date,
end_date: date,
duration_minutes: int = 60,
user_id: Optional[int] = None,
employee_id: Optional[str] = None,
business_hours_only: bool = True
) -> List[Dict[str, Any]]:
"""Find available time slots for scheduling new deadlines"""
# Get existing deadlines in the period
query = self.db.query(Deadline).filter(
Deadline.deadline_date.between(start_date, end_date),
Deadline.status == DeadlineStatus.PENDING,
Deadline.deadline_time.isnot(None)
)
if user_id:
query = query.filter(Deadline.assigned_to_user_id == user_id)
if employee_id:
query = query.filter(Deadline.assigned_to_employee_id == employee_id)
existing_deadlines = query.all()
# Define business hours
if business_hours_only:
start_hour, end_hour = 8, 18
else:
start_hour, end_hour = 0, 24
available_slots = []
current_date = start_date
while current_date <= end_date:
# Skip weekends if business hours only
if business_hours_only and current_date.weekday() >= 5:
current_date += timedelta(days=1)
continue
# Get deadlines for this day
day_deadlines = [
d for d in existing_deadlines
if d.deadline_date == current_date
]
# Sort by time
day_deadlines.sort(key=lambda d: d.deadline_time)
# Find gaps between deadlines
for hour in range(start_hour, end_hour):
for minute in range(0, 60, 30): # 30-minute intervals
slot_start = datetime.combine(
current_date,
datetime.min.time().replace(hour=hour, minute=minute)
)
slot_end = slot_start + timedelta(minutes=duration_minutes)
# Check if this slot conflicts with existing deadlines
is_available = True
for deadline in day_deadlines:
deadline_start = deadline.deadline_time.replace(tzinfo=None)
deadline_end = deadline_start + timedelta(
minutes=self._get_estimated_duration(deadline)
)
# Check for overlap
if not (slot_end <= deadline_start or slot_start >= deadline_end):
is_available = False
break
if is_available:
available_slots.append({
"start_datetime": slot_start,
"end_datetime": slot_end,
"date": current_date,
"start_time": slot_start.strftime("%H:%M"),
"end_time": slot_end.strftime("%H:%M"),
"duration_minutes": duration_minutes,
"day_name": current_date.strftime("%A")
})
current_date += timedelta(days=1)
return available_slots[:50] # Limit to first 50 slots
def get_conflict_analysis(
self,
proposed_datetime: datetime,
duration_minutes: int = 60,
user_id: Optional[int] = None,
employee_id: Optional[str] = None
) -> Dict[str, Any]:
"""Analyze potential conflicts for a proposed deadline time"""
proposed_date = proposed_datetime.date()
proposed_end = proposed_datetime + timedelta(minutes=duration_minutes)
# Get existing deadlines on the same day
query = self.db.query(Deadline).filter(
Deadline.deadline_date == proposed_date,
Deadline.status == DeadlineStatus.PENDING,
Deadline.deadline_time.isnot(None)
)
if user_id:
query = query.filter(Deadline.assigned_to_user_id == user_id)
if employee_id:
query = query.filter(Deadline.assigned_to_employee_id == employee_id)
existing_deadlines = query.options(
joinedload(Deadline.file),
joinedload(Deadline.client)
).all()
conflicts = []
nearby_deadlines = []
for deadline in existing_deadlines:
deadline_start = deadline.deadline_time.replace(tzinfo=None)
deadline_end = deadline_start + timedelta(
minutes=self._get_estimated_duration(deadline)
)
# Check for direct overlap
if not (proposed_end <= deadline_start or proposed_datetime >= deadline_end):
conflicts.append({
"id": deadline.id,
"title": deadline.title,
"start_time": deadline_start,
"end_time": deadline_end,
"conflict_type": "overlap",
"file_no": deadline.file_no,
"client_name": self._get_client_name(deadline)
})
# Check for nearby deadlines (within 30 minutes)
elif (abs((proposed_datetime - deadline_start).total_seconds()) <= 1800 or
abs((proposed_end - deadline_end).total_seconds()) <= 1800):
nearby_deadlines.append({
"id": deadline.id,
"title": deadline.title,
"start_time": deadline_start,
"end_time": deadline_end,
"file_no": deadline.file_no,
"client_name": self._get_client_name(deadline),
"minutes_gap": min(
abs((proposed_datetime - deadline_end).total_seconds() / 60),
abs((deadline_start - proposed_end).total_seconds() / 60)
)
})
return {
"proposed_datetime": proposed_datetime,
"proposed_end": proposed_end,
"duration_minutes": duration_minutes,
"has_conflicts": len(conflicts) > 0,
"conflicts": conflicts,
"nearby_deadlines": nearby_deadlines,
"recommendation": self._get_scheduling_recommendation(
conflicts, nearby_deadlines, proposed_datetime
)
}
# Private helper methods
def _get_client_name(self, deadline: Deadline) -> Optional[str]:
"""Get formatted client name from deadline"""
if deadline.client:
return f"{deadline.client.first or ''} {deadline.client.last or ''}".strip()
elif deadline.file and deadline.file.owner:
return f"{deadline.file.owner.first or ''} {deadline.file.owner.last or ''}".strip()
return None
def _get_assigned_to(self, deadline: Deadline) -> Optional[str]:
"""Get assigned person name from deadline"""
if deadline.assigned_to_user:
return deadline.assigned_to_user.username
elif deadline.assigned_to_employee:
employee = deadline.assigned_to_employee
return f"{employee.first_name or ''} {employee.last_name or ''}".strip()
return None
def _get_max_priority(self, deadlines: List[Deadline]) -> str:
"""Get the highest priority from a list of deadlines"""
if not deadlines:
return "none"
priority_order = {
DeadlinePriority.CRITICAL: 4,
DeadlinePriority.HIGH: 3,
DeadlinePriority.MEDIUM: 2,
DeadlinePriority.LOW: 1
}
max_priority = max(deadlines, key=lambda d: priority_order.get(d.priority, 0))
return max_priority.priority.value
def _get_estimated_duration(self, deadline: Deadline) -> int:
"""Get estimated duration in minutes for a deadline type"""
# Default durations by deadline type
duration_map = {
DeadlineType.COURT_HEARING: 120, # 2 hours
DeadlineType.COURT_FILING: 30, # 30 minutes
DeadlineType.CLIENT_MEETING: 60, # 1 hour
DeadlineType.DISCOVERY: 30, # 30 minutes
DeadlineType.ADMINISTRATIVE: 30, # 30 minutes
DeadlineType.INTERNAL: 60, # 1 hour
DeadlineType.CONTRACT: 30, # 30 minutes
DeadlineType.STATUTE_OF_LIMITATIONS: 30, # 30 minutes
DeadlineType.OTHER: 60 # 1 hour default
}
return duration_map.get(deadline.deadline_type, 60)
def _get_scheduling_recommendation(
self,
conflicts: List[Dict],
nearby_deadlines: List[Dict],
proposed_datetime: datetime
) -> str:
"""Get scheduling recommendation based on conflicts"""
if conflicts:
return "CONFLICT - Choose a different time slot"
if nearby_deadlines:
min_gap = min(d["minutes_gap"] for d in nearby_deadlines)
if min_gap < 15:
return "CAUTION - Very tight schedule, consider more buffer time"
elif min_gap < 30:
return "ACCEPTABLE - Close to other deadlines but manageable"
return "OPTIMAL - No conflicts detected"
class CalendarExportService:
"""Service for exporting deadlines to external calendar formats"""
def __init__(self, db: Session):
self.db = db
def export_to_ical(
self,
start_date: date,
end_date: date,
user_id: Optional[int] = None,
employee_id: Optional[str] = None,
deadline_types: Optional[List[DeadlineType]] = None
) -> str:
"""Export deadlines to iCalendar format"""
# Get deadlines for export
query = self.db.query(Deadline).filter(
Deadline.deadline_date.between(start_date, end_date),
Deadline.status == DeadlineStatus.PENDING
)
if user_id:
query = query.filter(Deadline.assigned_to_user_id == user_id)
if employee_id:
query = query.filter(Deadline.assigned_to_employee_id == employee_id)
if deadline_types:
query = query.filter(Deadline.deadline_type.in_(deadline_types))
deadlines = query.options(
joinedload(Deadline.file),
joinedload(Deadline.client)
).order_by(Deadline.deadline_date.asc()).all()
# Build iCal content
ical_lines = [
"BEGIN:VCALENDAR",
"VERSION:2.0",
"PRODID:-//Delphi Consulting//Deadline Manager//EN",
"CALSCALE:GREGORIAN",
"METHOD:PUBLISH"
]
for deadline in deadlines:
# Format datetime for iCal
if deadline.deadline_time:
dtstart = deadline.deadline_time.strftime("%Y%m%dT%H%M%S")
dtend = (deadline.deadline_time + timedelta(hours=1)).strftime("%Y%m%dT%H%M%S")
else:
dtstart = deadline.deadline_date.strftime("%Y%m%d")
dtend = dtstart
ical_lines.extend([
"BEGIN:VEVENT",
f"DTSTART;VALUE=DATE:{dtstart}",
f"DTEND;VALUE=DATE:{dtend}"
])
if deadline.deadline_time:
ical_lines.extend([
"BEGIN:VEVENT",
f"DTSTART:{dtstart}",
f"DTEND:{dtend}"
])
# Add event details
ical_lines.extend([
f"UID:deadline-{deadline.id}@delphi-consulting.com",
f"SUMMARY:{deadline.title}",
f"DESCRIPTION:{deadline.description or ''}",
f"PRIORITY:{self._get_ical_priority(deadline.priority)}",
f"CATEGORIES:{deadline.deadline_type.value.upper()}",
f"STATUS:CONFIRMED",
f"DTSTAMP:{datetime.now(timezone.utc).strftime('%Y%m%dT%H%M%SZ')}",
"END:VEVENT"
])
ical_lines.append("END:VCALENDAR")
return "\r\n".join(ical_lines)
def _get_ical_priority(self, priority: DeadlinePriority) -> str:
"""Convert deadline priority to iCal priority"""
priority_map = {
DeadlinePriority.CRITICAL: "1", # High
DeadlinePriority.HIGH: "3", # Medium-High
DeadlinePriority.MEDIUM: "5", # Medium
DeadlinePriority.LOW: "7" # Low
}
return priority_map.get(priority, "5")