698 lines
27 KiB
Python
698 lines
27 KiB
Python
"""
|
|
Deadline calendar integration service
|
|
Provides calendar views and scheduling utilities for deadlines
|
|
"""
|
|
from typing import List, Dict, Any, Optional, Tuple
|
|
from datetime import datetime, date, timezone, timedelta
|
|
from calendar import monthrange, weekday
|
|
from sqlalchemy.orm import Session, joinedload
|
|
from sqlalchemy import and_, func, or_, desc
|
|
|
|
from app.models import (
|
|
Deadline, CourtCalendar, User, Employee,
|
|
DeadlineType, DeadlinePriority, DeadlineStatus
|
|
)
|
|
from app.utils.logging import app_logger
|
|
|
|
logger = app_logger
|
|
|
|
|
|
class DeadlineCalendarService:
|
|
"""Service for deadline calendar views and scheduling"""
|
|
|
|
def __init__(self, db: Session):
|
|
self.db = db
|
|
|
|
def get_monthly_calendar(
|
|
self,
|
|
year: int,
|
|
month: int,
|
|
user_id: Optional[int] = None,
|
|
employee_id: Optional[str] = None,
|
|
show_completed: bool = False
|
|
) -> Dict[str, Any]:
|
|
"""Get monthly calendar view with deadlines"""
|
|
|
|
# Get first and last day of month
|
|
first_day = date(year, month, 1)
|
|
last_day = date(year, month, monthrange(year, month)[1])
|
|
|
|
# Get first Monday of calendar view (may be in previous month)
|
|
first_monday = first_day - timedelta(days=first_day.weekday())
|
|
|
|
# Get last Sunday of calendar view (may be in next month)
|
|
last_sunday = last_day + timedelta(days=(6 - last_day.weekday()))
|
|
|
|
# Build query for deadlines in the calendar period
|
|
query = self.db.query(Deadline).filter(
|
|
Deadline.deadline_date.between(first_monday, last_sunday)
|
|
)
|
|
|
|
if not show_completed:
|
|
query = query.filter(Deadline.status != DeadlineStatus.COMPLETED)
|
|
|
|
if user_id:
|
|
query = query.filter(Deadline.assigned_to_user_id == user_id)
|
|
|
|
if employee_id:
|
|
query = query.filter(Deadline.assigned_to_employee_id == employee_id)
|
|
|
|
deadlines = query.options(
|
|
joinedload(Deadline.file),
|
|
joinedload(Deadline.client),
|
|
joinedload(Deadline.assigned_to_user),
|
|
joinedload(Deadline.assigned_to_employee)
|
|
).order_by(
|
|
Deadline.deadline_date.asc(),
|
|
Deadline.deadline_time.asc(),
|
|
Deadline.priority.desc()
|
|
).all()
|
|
|
|
# Build calendar grid (6 weeks x 7 days)
|
|
calendar_weeks = []
|
|
current_date = first_monday
|
|
|
|
for week in range(6):
|
|
week_days = []
|
|
|
|
for day in range(7):
|
|
day_date = current_date + timedelta(days=day)
|
|
|
|
# Get deadlines for this day
|
|
day_deadlines = [
|
|
d for d in deadlines if d.deadline_date == day_date
|
|
]
|
|
|
|
# Format deadline data
|
|
formatted_deadlines = []
|
|
for deadline in day_deadlines:
|
|
formatted_deadlines.append({
|
|
"id": deadline.id,
|
|
"title": deadline.title,
|
|
"deadline_time": deadline.deadline_time.strftime("%H:%M") if deadline.deadline_time else None,
|
|
"priority": deadline.priority.value,
|
|
"deadline_type": deadline.deadline_type.value,
|
|
"status": deadline.status.value,
|
|
"file_no": deadline.file_no,
|
|
"client_name": self._get_client_name(deadline),
|
|
"court_name": deadline.court_name,
|
|
"is_overdue": deadline.is_overdue,
|
|
"is_court_date": deadline.deadline_type == DeadlineType.COURT_HEARING
|
|
})
|
|
|
|
week_days.append({
|
|
"date": day_date,
|
|
"day_number": day_date.day,
|
|
"is_current_month": day_date.month == month,
|
|
"is_today": day_date == date.today(),
|
|
"is_weekend": day_date.weekday() >= 5,
|
|
"deadlines": formatted_deadlines,
|
|
"deadline_count": len(formatted_deadlines),
|
|
"has_overdue": any(d["is_overdue"] for d in formatted_deadlines),
|
|
"has_court_date": any(d["is_court_date"] for d in formatted_deadlines),
|
|
"max_priority": self._get_max_priority(day_deadlines)
|
|
})
|
|
|
|
calendar_weeks.append({
|
|
"week_start": current_date,
|
|
"days": week_days
|
|
})
|
|
|
|
current_date += timedelta(days=7)
|
|
|
|
# Calculate summary statistics
|
|
month_deadlines = [d for d in deadlines if d.deadline_date.month == month]
|
|
|
|
return {
|
|
"year": year,
|
|
"month": month,
|
|
"month_name": first_day.strftime("%B"),
|
|
"calendar_period": {
|
|
"start_date": first_monday,
|
|
"end_date": last_sunday
|
|
},
|
|
"summary": {
|
|
"total_deadlines": len(month_deadlines),
|
|
"overdue": len([d for d in month_deadlines if d.is_overdue]),
|
|
"pending": len([d for d in month_deadlines if d.status == DeadlineStatus.PENDING]),
|
|
"completed": len([d for d in month_deadlines if d.status == DeadlineStatus.COMPLETED]),
|
|
"court_dates": len([d for d in month_deadlines if d.deadline_type == DeadlineType.COURT_HEARING])
|
|
},
|
|
"weeks": calendar_weeks
|
|
}
|
|
|
|
def get_weekly_calendar(
|
|
self,
|
|
year: int,
|
|
week: int,
|
|
user_id: Optional[int] = None,
|
|
employee_id: Optional[str] = None,
|
|
show_completed: bool = False
|
|
) -> Dict[str, Any]:
|
|
"""Get weekly calendar view with detailed scheduling"""
|
|
|
|
# Calculate the Monday of the specified week
|
|
jan_1 = date(year, 1, 1)
|
|
jan_1_weekday = jan_1.weekday()
|
|
|
|
# Find the Monday of week 1
|
|
days_to_monday = -jan_1_weekday if jan_1_weekday == 0 else 7 - jan_1_weekday
|
|
first_monday = jan_1 + timedelta(days=days_to_monday)
|
|
|
|
# Calculate the target week's Monday
|
|
week_monday = first_monday + timedelta(weeks=week - 1)
|
|
week_sunday = week_monday + timedelta(days=6)
|
|
|
|
# Build query for deadlines in the week
|
|
query = self.db.query(Deadline).filter(
|
|
Deadline.deadline_date.between(week_monday, week_sunday)
|
|
)
|
|
|
|
if not show_completed:
|
|
query = query.filter(Deadline.status != DeadlineStatus.COMPLETED)
|
|
|
|
if user_id:
|
|
query = query.filter(Deadline.assigned_to_user_id == user_id)
|
|
|
|
if employee_id:
|
|
query = query.filter(Deadline.assigned_to_employee_id == employee_id)
|
|
|
|
deadlines = query.options(
|
|
joinedload(Deadline.file),
|
|
joinedload(Deadline.client),
|
|
joinedload(Deadline.assigned_to_user),
|
|
joinedload(Deadline.assigned_to_employee)
|
|
).order_by(
|
|
Deadline.deadline_date.asc(),
|
|
Deadline.deadline_time.asc(),
|
|
Deadline.priority.desc()
|
|
).all()
|
|
|
|
# Build daily schedule
|
|
week_days = []
|
|
|
|
for day_offset in range(7):
|
|
day_date = week_monday + timedelta(days=day_offset)
|
|
|
|
# Get deadlines for this day
|
|
day_deadlines = [d for d in deadlines if d.deadline_date == day_date]
|
|
|
|
# Group deadlines by time
|
|
timed_deadlines = []
|
|
all_day_deadlines = []
|
|
|
|
for deadline in day_deadlines:
|
|
deadline_data = {
|
|
"id": deadline.id,
|
|
"title": deadline.title,
|
|
"deadline_time": deadline.deadline_time,
|
|
"priority": deadline.priority.value,
|
|
"deadline_type": deadline.deadline_type.value,
|
|
"status": deadline.status.value,
|
|
"file_no": deadline.file_no,
|
|
"client_name": self._get_client_name(deadline),
|
|
"assigned_to": self._get_assigned_to(deadline),
|
|
"court_name": deadline.court_name,
|
|
"case_number": deadline.case_number,
|
|
"description": deadline.description,
|
|
"is_overdue": deadline.is_overdue,
|
|
"estimated_duration": self._get_estimated_duration(deadline)
|
|
}
|
|
|
|
if deadline.deadline_time:
|
|
timed_deadlines.append(deadline_data)
|
|
else:
|
|
all_day_deadlines.append(deadline_data)
|
|
|
|
# Sort timed deadlines by time
|
|
timed_deadlines.sort(key=lambda x: x["deadline_time"])
|
|
|
|
week_days.append({
|
|
"date": day_date,
|
|
"day_name": day_date.strftime("%A"),
|
|
"day_short": day_date.strftime("%a"),
|
|
"is_today": day_date == date.today(),
|
|
"is_weekend": day_date.weekday() >= 5,
|
|
"timed_deadlines": timed_deadlines,
|
|
"all_day_deadlines": all_day_deadlines,
|
|
"total_deadlines": len(day_deadlines),
|
|
"has_court_dates": any(d.deadline_type == DeadlineType.COURT_HEARING for d in day_deadlines)
|
|
})
|
|
|
|
return {
|
|
"year": year,
|
|
"week": week,
|
|
"week_period": {
|
|
"start_date": week_monday,
|
|
"end_date": week_sunday
|
|
},
|
|
"summary": {
|
|
"total_deadlines": len(deadlines),
|
|
"timed_deadlines": len([d for d in deadlines if d.deadline_time]),
|
|
"all_day_deadlines": len([d for d in deadlines if not d.deadline_time]),
|
|
"court_dates": len([d for d in deadlines if d.deadline_type == DeadlineType.COURT_HEARING])
|
|
},
|
|
"days": week_days
|
|
}
|
|
|
|
def get_daily_schedule(
|
|
self,
|
|
target_date: date,
|
|
user_id: Optional[int] = None,
|
|
employee_id: Optional[str] = None,
|
|
show_completed: bool = False
|
|
) -> Dict[str, Any]:
|
|
"""Get detailed daily schedule with time slots"""
|
|
|
|
# Build query for deadlines on the target date
|
|
query = self.db.query(Deadline).filter(
|
|
Deadline.deadline_date == target_date
|
|
)
|
|
|
|
if not show_completed:
|
|
query = query.filter(Deadline.status != DeadlineStatus.COMPLETED)
|
|
|
|
if user_id:
|
|
query = query.filter(Deadline.assigned_to_user_id == user_id)
|
|
|
|
if employee_id:
|
|
query = query.filter(Deadline.assigned_to_employee_id == employee_id)
|
|
|
|
deadlines = query.options(
|
|
joinedload(Deadline.file),
|
|
joinedload(Deadline.client),
|
|
joinedload(Deadline.assigned_to_user),
|
|
joinedload(Deadline.assigned_to_employee)
|
|
).order_by(
|
|
Deadline.deadline_time.asc(),
|
|
Deadline.priority.desc()
|
|
).all()
|
|
|
|
# Create time slots (30-minute intervals from 8 AM to 6 PM)
|
|
time_slots = []
|
|
start_hour = 8
|
|
end_hour = 18
|
|
|
|
for hour in range(start_hour, end_hour):
|
|
for minute in [0, 30]:
|
|
slot_time = datetime.combine(target_date, datetime.min.time().replace(hour=hour, minute=minute))
|
|
|
|
# Find deadlines in this time slot
|
|
slot_deadlines = []
|
|
for deadline in deadlines:
|
|
if deadline.deadline_time:
|
|
deadline_time = deadline.deadline_time.replace(tzinfo=None)
|
|
|
|
# Check if deadline falls within this 30-minute slot
|
|
if (slot_time <= deadline_time < slot_time + timedelta(minutes=30)):
|
|
slot_deadlines.append({
|
|
"id": deadline.id,
|
|
"title": deadline.title,
|
|
"deadline_time": deadline.deadline_time,
|
|
"priority": deadline.priority.value,
|
|
"deadline_type": deadline.deadline_type.value,
|
|
"status": deadline.status.value,
|
|
"file_no": deadline.file_no,
|
|
"client_name": self._get_client_name(deadline),
|
|
"court_name": deadline.court_name,
|
|
"case_number": deadline.case_number,
|
|
"description": deadline.description,
|
|
"estimated_duration": self._get_estimated_duration(deadline)
|
|
})
|
|
|
|
time_slots.append({
|
|
"time": slot_time.strftime("%H:%M"),
|
|
"datetime": slot_time,
|
|
"deadlines": slot_deadlines,
|
|
"is_busy": len(slot_deadlines) > 0
|
|
})
|
|
|
|
# Get all-day deadlines
|
|
all_day_deadlines = []
|
|
for deadline in deadlines:
|
|
if not deadline.deadline_time:
|
|
all_day_deadlines.append({
|
|
"id": deadline.id,
|
|
"title": deadline.title,
|
|
"priority": deadline.priority.value,
|
|
"deadline_type": deadline.deadline_type.value,
|
|
"status": deadline.status.value,
|
|
"file_no": deadline.file_no,
|
|
"client_name": self._get_client_name(deadline),
|
|
"description": deadline.description
|
|
})
|
|
|
|
return {
|
|
"date": target_date,
|
|
"day_name": target_date.strftime("%A, %B %d, %Y"),
|
|
"is_today": target_date == date.today(),
|
|
"summary": {
|
|
"total_deadlines": len(deadlines),
|
|
"timed_deadlines": len([d for d in deadlines if d.deadline_time]),
|
|
"all_day_deadlines": len(all_day_deadlines),
|
|
"court_dates": len([d for d in deadlines if d.deadline_type == DeadlineType.COURT_HEARING]),
|
|
"overdue": len([d for d in deadlines if d.is_overdue])
|
|
},
|
|
"all_day_deadlines": all_day_deadlines,
|
|
"time_slots": time_slots,
|
|
"business_hours": {
|
|
"start": f"{start_hour:02d}:00",
|
|
"end": f"{end_hour:02d}:00"
|
|
}
|
|
}
|
|
|
|
def find_available_slots(
|
|
self,
|
|
start_date: date,
|
|
end_date: date,
|
|
duration_minutes: int = 60,
|
|
user_id: Optional[int] = None,
|
|
employee_id: Optional[str] = None,
|
|
business_hours_only: bool = True
|
|
) -> List[Dict[str, Any]]:
|
|
"""Find available time slots for scheduling new deadlines"""
|
|
|
|
# Get existing deadlines in the period
|
|
query = self.db.query(Deadline).filter(
|
|
Deadline.deadline_date.between(start_date, end_date),
|
|
Deadline.status == DeadlineStatus.PENDING,
|
|
Deadline.deadline_time.isnot(None)
|
|
)
|
|
|
|
if user_id:
|
|
query = query.filter(Deadline.assigned_to_user_id == user_id)
|
|
|
|
if employee_id:
|
|
query = query.filter(Deadline.assigned_to_employee_id == employee_id)
|
|
|
|
existing_deadlines = query.all()
|
|
|
|
# Define business hours
|
|
if business_hours_only:
|
|
start_hour, end_hour = 8, 18
|
|
else:
|
|
start_hour, end_hour = 0, 24
|
|
|
|
available_slots = []
|
|
current_date = start_date
|
|
|
|
while current_date <= end_date:
|
|
# Skip weekends if business hours only
|
|
if business_hours_only and current_date.weekday() >= 5:
|
|
current_date += timedelta(days=1)
|
|
continue
|
|
|
|
# Get deadlines for this day
|
|
day_deadlines = [
|
|
d for d in existing_deadlines
|
|
if d.deadline_date == current_date
|
|
]
|
|
|
|
# Sort by time
|
|
day_deadlines.sort(key=lambda d: d.deadline_time)
|
|
|
|
# Find gaps between deadlines
|
|
for hour in range(start_hour, end_hour):
|
|
for minute in range(0, 60, 30): # 30-minute intervals
|
|
slot_start = datetime.combine(
|
|
current_date,
|
|
datetime.min.time().replace(hour=hour, minute=minute)
|
|
)
|
|
slot_end = slot_start + timedelta(minutes=duration_minutes)
|
|
|
|
# Check if this slot conflicts with existing deadlines
|
|
is_available = True
|
|
for deadline in day_deadlines:
|
|
deadline_start = deadline.deadline_time.replace(tzinfo=None)
|
|
deadline_end = deadline_start + timedelta(
|
|
minutes=self._get_estimated_duration(deadline)
|
|
)
|
|
|
|
# Check for overlap
|
|
if not (slot_end <= deadline_start or slot_start >= deadline_end):
|
|
is_available = False
|
|
break
|
|
|
|
if is_available:
|
|
available_slots.append({
|
|
"start_datetime": slot_start,
|
|
"end_datetime": slot_end,
|
|
"date": current_date,
|
|
"start_time": slot_start.strftime("%H:%M"),
|
|
"end_time": slot_end.strftime("%H:%M"),
|
|
"duration_minutes": duration_minutes,
|
|
"day_name": current_date.strftime("%A")
|
|
})
|
|
|
|
current_date += timedelta(days=1)
|
|
|
|
return available_slots[:50] # Limit to first 50 slots
|
|
|
|
def get_conflict_analysis(
|
|
self,
|
|
proposed_datetime: datetime,
|
|
duration_minutes: int = 60,
|
|
user_id: Optional[int] = None,
|
|
employee_id: Optional[str] = None
|
|
) -> Dict[str, Any]:
|
|
"""Analyze potential conflicts for a proposed deadline time"""
|
|
|
|
proposed_date = proposed_datetime.date()
|
|
proposed_end = proposed_datetime + timedelta(minutes=duration_minutes)
|
|
|
|
# Get existing deadlines on the same day
|
|
query = self.db.query(Deadline).filter(
|
|
Deadline.deadline_date == proposed_date,
|
|
Deadline.status == DeadlineStatus.PENDING,
|
|
Deadline.deadline_time.isnot(None)
|
|
)
|
|
|
|
if user_id:
|
|
query = query.filter(Deadline.assigned_to_user_id == user_id)
|
|
|
|
if employee_id:
|
|
query = query.filter(Deadline.assigned_to_employee_id == employee_id)
|
|
|
|
existing_deadlines = query.options(
|
|
joinedload(Deadline.file),
|
|
joinedload(Deadline.client)
|
|
).all()
|
|
|
|
conflicts = []
|
|
nearby_deadlines = []
|
|
|
|
for deadline in existing_deadlines:
|
|
deadline_start = deadline.deadline_time.replace(tzinfo=None)
|
|
deadline_end = deadline_start + timedelta(
|
|
minutes=self._get_estimated_duration(deadline)
|
|
)
|
|
|
|
# Check for direct overlap
|
|
if not (proposed_end <= deadline_start or proposed_datetime >= deadline_end):
|
|
conflicts.append({
|
|
"id": deadline.id,
|
|
"title": deadline.title,
|
|
"start_time": deadline_start,
|
|
"end_time": deadline_end,
|
|
"conflict_type": "overlap",
|
|
"file_no": deadline.file_no,
|
|
"client_name": self._get_client_name(deadline)
|
|
})
|
|
|
|
# Check for nearby deadlines (within 30 minutes)
|
|
elif (abs((proposed_datetime - deadline_start).total_seconds()) <= 1800 or
|
|
abs((proposed_end - deadline_end).total_seconds()) <= 1800):
|
|
nearby_deadlines.append({
|
|
"id": deadline.id,
|
|
"title": deadline.title,
|
|
"start_time": deadline_start,
|
|
"end_time": deadline_end,
|
|
"file_no": deadline.file_no,
|
|
"client_name": self._get_client_name(deadline),
|
|
"minutes_gap": min(
|
|
abs((proposed_datetime - deadline_end).total_seconds() / 60),
|
|
abs((deadline_start - proposed_end).total_seconds() / 60)
|
|
)
|
|
})
|
|
|
|
return {
|
|
"proposed_datetime": proposed_datetime,
|
|
"proposed_end": proposed_end,
|
|
"duration_minutes": duration_minutes,
|
|
"has_conflicts": len(conflicts) > 0,
|
|
"conflicts": conflicts,
|
|
"nearby_deadlines": nearby_deadlines,
|
|
"recommendation": self._get_scheduling_recommendation(
|
|
conflicts, nearby_deadlines, proposed_datetime
|
|
)
|
|
}
|
|
|
|
# Private helper methods
|
|
|
|
def _get_client_name(self, deadline: Deadline) -> Optional[str]:
|
|
"""Get formatted client name from deadline"""
|
|
|
|
if deadline.client:
|
|
return f"{deadline.client.first or ''} {deadline.client.last or ''}".strip()
|
|
elif deadline.file and deadline.file.owner:
|
|
return f"{deadline.file.owner.first or ''} {deadline.file.owner.last or ''}".strip()
|
|
return None
|
|
|
|
def _get_assigned_to(self, deadline: Deadline) -> Optional[str]:
|
|
"""Get assigned person name from deadline"""
|
|
|
|
if deadline.assigned_to_user:
|
|
return deadline.assigned_to_user.username
|
|
elif deadline.assigned_to_employee:
|
|
employee = deadline.assigned_to_employee
|
|
return f"{employee.first_name or ''} {employee.last_name or ''}".strip()
|
|
return None
|
|
|
|
def _get_max_priority(self, deadlines: List[Deadline]) -> str:
|
|
"""Get the highest priority from a list of deadlines"""
|
|
|
|
if not deadlines:
|
|
return "none"
|
|
|
|
priority_order = {
|
|
DeadlinePriority.CRITICAL: 4,
|
|
DeadlinePriority.HIGH: 3,
|
|
DeadlinePriority.MEDIUM: 2,
|
|
DeadlinePriority.LOW: 1
|
|
}
|
|
|
|
max_priority = max(deadlines, key=lambda d: priority_order.get(d.priority, 0))
|
|
return max_priority.priority.value
|
|
|
|
def _get_estimated_duration(self, deadline: Deadline) -> int:
|
|
"""Get estimated duration in minutes for a deadline type"""
|
|
|
|
# Default durations by deadline type
|
|
duration_map = {
|
|
DeadlineType.COURT_HEARING: 120, # 2 hours
|
|
DeadlineType.COURT_FILING: 30, # 30 minutes
|
|
DeadlineType.CLIENT_MEETING: 60, # 1 hour
|
|
DeadlineType.DISCOVERY: 30, # 30 minutes
|
|
DeadlineType.ADMINISTRATIVE: 30, # 30 minutes
|
|
DeadlineType.INTERNAL: 60, # 1 hour
|
|
DeadlineType.CONTRACT: 30, # 30 minutes
|
|
DeadlineType.STATUTE_OF_LIMITATIONS: 30, # 30 minutes
|
|
DeadlineType.OTHER: 60 # 1 hour default
|
|
}
|
|
|
|
return duration_map.get(deadline.deadline_type, 60)
|
|
|
|
def _get_scheduling_recommendation(
|
|
self,
|
|
conflicts: List[Dict],
|
|
nearby_deadlines: List[Dict],
|
|
proposed_datetime: datetime
|
|
) -> str:
|
|
"""Get scheduling recommendation based on conflicts"""
|
|
|
|
if conflicts:
|
|
return "CONFLICT - Choose a different time slot"
|
|
|
|
if nearby_deadlines:
|
|
min_gap = min(d["minutes_gap"] for d in nearby_deadlines)
|
|
if min_gap < 15:
|
|
return "CAUTION - Very tight schedule, consider more buffer time"
|
|
elif min_gap < 30:
|
|
return "ACCEPTABLE - Close to other deadlines but manageable"
|
|
|
|
return "OPTIMAL - No conflicts detected"
|
|
|
|
|
|
class CalendarExportService:
|
|
"""Service for exporting deadlines to external calendar formats"""
|
|
|
|
def __init__(self, db: Session):
|
|
self.db = db
|
|
|
|
def export_to_ical(
|
|
self,
|
|
start_date: date,
|
|
end_date: date,
|
|
user_id: Optional[int] = None,
|
|
employee_id: Optional[str] = None,
|
|
deadline_types: Optional[List[DeadlineType]] = None
|
|
) -> str:
|
|
"""Export deadlines to iCalendar format"""
|
|
|
|
# Get deadlines for export
|
|
query = self.db.query(Deadline).filter(
|
|
Deadline.deadline_date.between(start_date, end_date),
|
|
Deadline.status == DeadlineStatus.PENDING
|
|
)
|
|
|
|
if user_id:
|
|
query = query.filter(Deadline.assigned_to_user_id == user_id)
|
|
|
|
if employee_id:
|
|
query = query.filter(Deadline.assigned_to_employee_id == employee_id)
|
|
|
|
if deadline_types:
|
|
query = query.filter(Deadline.deadline_type.in_(deadline_types))
|
|
|
|
deadlines = query.options(
|
|
joinedload(Deadline.file),
|
|
joinedload(Deadline.client)
|
|
).order_by(Deadline.deadline_date.asc()).all()
|
|
|
|
# Build iCal content
|
|
ical_lines = [
|
|
"BEGIN:VCALENDAR",
|
|
"VERSION:2.0",
|
|
"PRODID:-//Delphi Consulting//Deadline Manager//EN",
|
|
"CALSCALE:GREGORIAN",
|
|
"METHOD:PUBLISH"
|
|
]
|
|
|
|
for deadline in deadlines:
|
|
# Format datetime for iCal
|
|
if deadline.deadline_time:
|
|
dtstart = deadline.deadline_time.strftime("%Y%m%dT%H%M%S")
|
|
dtend = (deadline.deadline_time + timedelta(hours=1)).strftime("%Y%m%dT%H%M%S")
|
|
else:
|
|
dtstart = deadline.deadline_date.strftime("%Y%m%d")
|
|
dtend = dtstart
|
|
ical_lines.extend([
|
|
"BEGIN:VEVENT",
|
|
f"DTSTART;VALUE=DATE:{dtstart}",
|
|
f"DTEND;VALUE=DATE:{dtend}"
|
|
])
|
|
|
|
if deadline.deadline_time:
|
|
ical_lines.extend([
|
|
"BEGIN:VEVENT",
|
|
f"DTSTART:{dtstart}",
|
|
f"DTEND:{dtend}"
|
|
])
|
|
|
|
# Add event details
|
|
ical_lines.extend([
|
|
f"UID:deadline-{deadline.id}@delphi-consulting.com",
|
|
f"SUMMARY:{deadline.title}",
|
|
f"DESCRIPTION:{deadline.description or ''}",
|
|
f"PRIORITY:{self._get_ical_priority(deadline.priority)}",
|
|
f"CATEGORIES:{deadline.deadline_type.value.upper()}",
|
|
f"STATUS:CONFIRMED",
|
|
f"DTSTAMP:{datetime.now(timezone.utc).strftime('%Y%m%dT%H%M%SZ')}",
|
|
"END:VEVENT"
|
|
])
|
|
|
|
ical_lines.append("END:VCALENDAR")
|
|
|
|
return "\r\n".join(ical_lines)
|
|
|
|
def _get_ical_priority(self, priority: DeadlinePriority) -> str:
|
|
"""Convert deadline priority to iCal priority"""
|
|
|
|
priority_map = {
|
|
DeadlinePriority.CRITICAL: "1", # High
|
|
DeadlinePriority.HIGH: "3", # Medium-High
|
|
DeadlinePriority.MEDIUM: "5", # Medium
|
|
DeadlinePriority.LOW: "7" # Low
|
|
}
|
|
|
|
return priority_map.get(priority, "5") |