This commit is contained in:
HotSwapp
2025-08-16 10:05:42 -05:00
parent 0347284556
commit ae4484381f
15 changed files with 3966 additions and 77 deletions

539
app/services/billing.py Normal file
View File

@@ -0,0 +1,539 @@
"""
Billing statement generation service
Handles statement creation, template rendering, and PDF generation
"""
from typing import List, Dict, Any, Optional, Tuple
from datetime import date, datetime, timedelta
from decimal import Decimal
from jinja2 import Template, Environment, BaseLoader
from sqlalchemy.orm import Session, joinedload
from sqlalchemy import and_, func
from app.models import (
BillingStatement, StatementTemplate, BillingStatementItem,
File, Ledger, Rolodex, StatementStatus
)
from app.utils.logging import app_logger
logger = app_logger
class StatementGenerationError(Exception):
"""Exception raised when statement generation fails"""
pass
class BillingStatementService:
"""Service for generating and managing billing statements"""
def __init__(self, db: Session):
self.db = db
self.jinja_env = Environment(loader=BaseLoader())
def generate_statement_number(self) -> str:
"""Generate unique statement number"""
today = date.today()
prefix = f"STMT-{today.strftime('%Y%m')}"
# Find highest number for this month
last_stmt = self.db.query(BillingStatement).filter(
BillingStatement.statement_number.like(f"{prefix}%")
).order_by(BillingStatement.statement_number.desc()).first()
if last_stmt:
try:
last_num = int(last_stmt.statement_number.split('-')[-1])
next_num = last_num + 1
except (ValueError, IndexError):
next_num = 1
else:
next_num = 1
return f"{prefix}-{next_num:04d}"
def get_unbilled_transactions(
self,
file_no: str,
period_start: date = None,
period_end: date = None
) -> List[Ledger]:
"""Get unbilled transactions for a file within date range"""
query = self.db.query(Ledger).filter(
Ledger.file_no == file_no,
Ledger.billed == "N"
)
if period_start:
query = query.filter(Ledger.date >= period_start)
if period_end:
query = query.filter(Ledger.date <= period_end)
return query.order_by(Ledger.date).all()
def calculate_statement_totals(self, transactions: List[Ledger]) -> Dict[str, float]:
"""Calculate financial totals for statement"""
totals = {
'fees': 0.0,
'costs': 0.0,
'payments': 0.0,
'trust_deposits': 0.0,
'trust_transfers': 0.0,
'adjustments': 0.0,
'current_charges': 0.0,
'net_charges': 0.0
}
for txn in transactions:
amount = float(txn.amount or 0.0)
# Categorize by transaction type
if txn.t_type in ['1', '2']: # Fees (hourly and flat)
totals['fees'] += amount
totals['current_charges'] += amount
elif txn.t_type == '3': # Costs/disbursements
totals['costs'] += amount
totals['current_charges'] += amount
elif txn.t_type == '4': # Payments
totals['payments'] += amount
totals['net_charges'] -= amount
elif txn.t_type == '5': # Trust deposits
totals['trust_deposits'] += amount
# Add more categorization as needed
totals['net_charges'] = totals['current_charges'] - totals['payments']
return totals
def get_previous_balance(self, file_no: str, period_start: date) -> float:
"""Calculate previous balance before statement period"""
# Sum all transactions before period start
result = self.db.query(func.sum(Ledger.amount)).filter(
Ledger.file_no == file_no,
Ledger.date < period_start
).scalar()
return float(result or 0.0)
def get_trust_balance(self, file_no: str) -> float:
"""Get current trust account balance for file"""
file_obj = self.db.query(File).filter(File.file_no == file_no).first()
return float(file_obj.trust_bal or 0.0) if file_obj else 0.0
def create_statement(
self,
file_no: str,
period_start: date,
period_end: date,
template_id: Optional[int] = None,
custom_footer: Optional[str] = None,
created_by: Optional[str] = None
) -> BillingStatement:
"""Create a new billing statement for a file"""
# Get file and customer info
file_obj = self.db.query(File).options(
joinedload(File.owner)
).filter(File.file_no == file_no).first()
if not file_obj:
raise StatementGenerationError(f"File {file_no} not found")
# Get unbilled transactions
transactions = self.get_unbilled_transactions(file_no, period_start, period_end)
if not transactions:
raise StatementGenerationError(f"No unbilled transactions found for file {file_no}")
# Calculate totals
totals = self.calculate_statement_totals(transactions)
previous_balance = self.get_previous_balance(file_no, period_start)
trust_balance = self.get_trust_balance(file_no)
# Calculate total due
total_due = previous_balance + totals['current_charges'] - totals['payments']
# Get or create default template
if not template_id:
template = self.db.query(StatementTemplate).filter(
StatementTemplate.is_default == True,
StatementTemplate.is_active == True
).first()
if template:
template_id = template.id
# Create statement
statement = BillingStatement(
statement_number=self.generate_statement_number(),
file_no=file_no,
customer_id=file_obj.owner.id if file_obj.owner else None,
period_start=period_start,
period_end=period_end,
statement_date=date.today(),
due_date=date.today() + timedelta(days=30),
previous_balance=previous_balance,
current_charges=totals['current_charges'],
payments_credits=totals['payments'],
total_due=total_due,
trust_balance=trust_balance,
template_id=template_id,
billed_transaction_count=len(transactions),
custom_footer=custom_footer,
created_by=created_by,
status=StatementStatus.DRAFT
)
self.db.add(statement)
self.db.flush() # Get the statement ID
# Create statement items
for txn in transactions:
item = BillingStatementItem(
statement_id=statement.id,
ledger_id=txn.id,
date=txn.date,
description=txn.note or f"{txn.t_code} - {txn.empl_num}",
quantity=float(txn.quantity or 0.0),
rate=float(txn.rate or 0.0),
amount=float(txn.amount or 0.0),
item_category=self._categorize_transaction(txn)
)
self.db.add(item)
self.db.commit()
self.db.refresh(statement)
logger.info(f"Created statement {statement.statement_number} for file {file_no}")
return statement
def _categorize_transaction(self, txn: Ledger) -> str:
"""Categorize transaction for statement display"""
if txn.t_type in ['1', '2']:
return 'fees'
elif txn.t_type == '3':
return 'costs'
elif txn.t_type == '4':
return 'payments'
elif txn.t_type == '5':
return 'trust'
else:
return 'other'
def generate_statement_html(self, statement_id: int) -> str:
"""Generate HTML content for a statement"""
statement = self.db.query(BillingStatement).options(
joinedload(BillingStatement.file).joinedload(File.owner),
joinedload(BillingStatement.customer),
joinedload(BillingStatement.template),
joinedload(BillingStatement.statement_items)
).filter(BillingStatement.id == statement_id).first()
if not statement:
raise StatementGenerationError(f"Statement {statement_id} not found")
# Prepare template context
context = self._prepare_template_context(statement)
# Get template or use default
template_content = self._get_statement_template(statement)
# Render template
template = self.jinja_env.from_string(template_content)
html_content = template.render(**context)
# Save generated HTML
statement.html_content = html_content
self.db.commit()
return html_content
def _prepare_template_context(self, statement: BillingStatement) -> Dict[str, Any]:
"""Prepare context data for template rendering"""
# Group statement items by category
items_by_category = {}
for item in statement.statement_items:
category = item.item_category or 'other'
if category not in items_by_category:
items_by_category[category] = []
items_by_category[category].append(item)
return {
'statement': statement,
'file': statement.file,
'customer': statement.customer or statement.file.owner,
'items_by_category': items_by_category,
'total_fees': sum(item.amount for item in items_by_category.get('fees', [])),
'total_costs': sum(item.amount for item in items_by_category.get('costs', [])),
'total_payments': sum(item.amount for item in items_by_category.get('payments', [])),
'generation_date': datetime.now(),
'custom_footer': statement.custom_footer
}
def _get_statement_template(self, statement: BillingStatement) -> str:
"""Get template content for statement"""
if statement.template and statement.template.is_active:
# Use custom template
header = statement.template.header_template or ""
footer = statement.template.footer_template or ""
css = statement.template.css_styles or ""
return self._build_complete_template(header, footer, css)
else:
# Use default template
return self._get_default_template()
def _build_complete_template(self, header: str, footer: str, css: str) -> str:
"""Build complete HTML template from components"""
# Use regular string formatting to avoid f-string conflicts with Jinja2
template = """
<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8">
<title>Billing Statement - {{ statement.statement_number }}</title>
<style>
%(css)s
body { font-family: Arial, sans-serif; margin: 40px; }
.statement-header { margin-bottom: 30px; }
.statement-details { margin-bottom: 20px; }
.statement-items { margin-bottom: 30px; }
.statement-footer { margin-top: 30px; }
table { width: 100%%; border-collapse: collapse; }
th, td { padding: 8px; text-align: left; border-bottom: 1px solid #ddd; }
.amount { text-align: right; }
.total-row { font-weight: bold; border-top: 2px solid #000; }
</style>
</head>
<body>
<div class="statement-header">
%(header)s
</div>
<div class="statement-content">
<!-- Default statement content will be inserted here -->
{{ self.default_content() }}
</div>
<div class="statement-footer">
%(footer)s
{%% if custom_footer %%}
<div class="custom-footer">{{ custom_footer }}</div>
{%% endif %%}
</div>
</body>
</html>
""" % {"css": css, "header": header, "footer": footer}
return template
def _get_default_template(self) -> str:
"""Get default statement template"""
return """
<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8">
<title>Billing Statement - {{ statement.statement_number }}</title>
<style>
body { font-family: Arial, sans-serif; margin: 40px; }
.header { text-align: center; margin-bottom: 30px; }
.client-info { margin-bottom: 20px; }
.statement-details { margin-bottom: 20px; }
.items-table { width: 100%; border-collapse: collapse; margin-bottom: 30px; }
.items-table th, .items-table td { padding: 8px; text-align: left; border-bottom: 1px solid #ddd; }
.amount { text-align: right; }
.total-row { font-weight: bold; border-top: 2px solid #000; }
.summary { margin-top: 20px; }
.footer { margin-top: 40px; font-size: 12px; color: #666; }
</style>
</head>
<body>
<div class="header">
<h1>BILLING STATEMENT</h1>
<h2>{{ statement.statement_number }}</h2>
</div>
<div class="client-info">
<strong>Bill To:</strong><br>
{{ customer.first }} {{ customer.last }}<br>
{% if customer.a1 %}{{ customer.a1 }}<br>{% endif %}
{% if customer.a2 %}{{ customer.a2 }}<br>{% endif %}
{% if customer.city %}{{ customer.city }}, {{ customer.abrev }} {{ customer.zip }}{% endif %}
</div>
<div class="statement-details">
<table>
<tr><td><strong>File Number:</strong></td><td>{{ statement.file_no }}</td></tr>
<tr><td><strong>Statement Date:</strong></td><td>{{ statement.statement_date.strftime('%m/%d/%Y') }}</td></tr>
<tr><td><strong>Period:</strong></td><td>{{ statement.period_start.strftime('%m/%d/%Y') }} - {{ statement.period_end.strftime('%m/%d/%Y') }}</td></tr>
<tr><td><strong>Due Date:</strong></td><td>{{ statement.due_date.strftime('%m/%d/%Y') if statement.due_date else 'Upon Receipt' }}</td></tr>
</table>
</div>
<!-- Fees Section -->
{% if items_by_category.fees %}
<h3>Professional Services</h3>
<table class="items-table">
<thead>
<tr>
<th>Date</th>
<th>Description</th>
<th>Quantity</th>
<th>Rate</th>
<th class="amount">Amount</th>
</tr>
</thead>
<tbody>
{% for item in items_by_category.fees %}
<tr>
<td>{{ item.date.strftime('%m/%d/%Y') }}</td>
<td>{{ item.description }}</td>
<td>{{ "%.2f"|format(item.quantity) if item.quantity else '' }}</td>
<td>{{ "$%.2f"|format(item.rate) if item.rate else '' }}</td>
<td class="amount">${{ "%.2f"|format(item.amount) }}</td>
</tr>
{% endfor %}
<tr class="total-row">
<td colspan="4">Total Professional Services</td>
<td class="amount">${{ "%.2f"|format(total_fees) }}</td>
</tr>
</tbody>
</table>
{% endif %}
<!-- Costs Section -->
{% if items_by_category.costs %}
<h3>Costs and Disbursements</h3>
<table class="items-table">
<thead>
<tr>
<th>Date</th>
<th>Description</th>
<th class="amount">Amount</th>
</tr>
</thead>
<tbody>
{% for item in items_by_category.costs %}
<tr>
<td>{{ item.date.strftime('%m/%d/%Y') }}</td>
<td>{{ item.description }}</td>
<td class="amount">${{ "%.2f"|format(item.amount) }}</td>
</tr>
{% endfor %}
<tr class="total-row">
<td colspan="2">Total Costs and Disbursements</td>
<td class="amount">${{ "%.2f"|format(total_costs) }}</td>
</tr>
</tbody>
</table>
{% endif %}
<!-- Payments Section -->
{% if items_by_category.payments %}
<h3>Payments and Credits</h3>
<table class="items-table">
<thead>
<tr>
<th>Date</th>
<th>Description</th>
<th class="amount">Amount</th>
</tr>
</thead>
<tbody>
{% for item in items_by_category.payments %}
<tr>
<td>{{ item.date.strftime('%m/%d/%Y') }}</td>
<td>{{ item.description }}</td>
<td class="amount">${{ "%.2f"|format(item.amount) }}</td>
</tr>
{% endfor %}
<tr class="total-row">
<td colspan="2">Total Payments and Credits</td>
<td class="amount">${{ "%.2f"|format(total_payments) }}</td>
</tr>
</tbody>
</table>
{% endif %}
<!-- Summary -->
<div class="summary">
<table class="items-table">
<tr>
<td><strong>Previous Balance:</strong></td>
<td class="amount">${{ "%.2f"|format(statement.previous_balance) }}</td>
</tr>
<tr>
<td><strong>Current Charges:</strong></td>
<td class="amount">${{ "%.2f"|format(statement.current_charges) }}</td>
</tr>
<tr>
<td><strong>Payments/Credits:</strong></td>
<td class="amount">${{ "%.2f"|format(statement.payments_credits) }}</td>
</tr>
<tr class="total-row">
<td><strong>TOTAL DUE:</strong></td>
<td class="amount"><strong>${{ "%.2f"|format(statement.total_due) }}</strong></td>
</tr>
</table>
</div>
{% if statement.trust_balance > 0 %}
<div class="trust-info">
<p><strong>Trust Account Balance:</strong> ${{ "%.2f"|format(statement.trust_balance) }}</p>
</div>
{% endif %}
<div class="footer">
<p>Thank you for your business. Please remit payment by the due date.</p>
{% if custom_footer %}
<p>{{ custom_footer }}</p>
{% endif %}
<p><em>Generated on {{ generation_date.strftime('%m/%d/%Y at %I:%M %p') }}</em></p>
</div>
</body>
</html>
"""
def approve_statement(self, statement_id: int, approved_by: str) -> BillingStatement:
"""Approve a statement and mark transactions as billed"""
statement = self.db.query(BillingStatement).filter(
BillingStatement.id == statement_id
).first()
if not statement:
raise StatementGenerationError(f"Statement {statement_id} not found")
if statement.status != StatementStatus.DRAFT:
raise StatementGenerationError(f"Only draft statements can be approved")
# Mark statement as approved
statement.status = StatementStatus.APPROVED
statement.approved_by = approved_by
statement.approved_at = datetime.now()
# Mark all related transactions as billed
ledger_ids = [item.ledger_id for item in statement.statement_items]
self.db.query(Ledger).filter(
Ledger.id.in_(ledger_ids)
).update({Ledger.billed: "Y"}, synchronize_session=False)
self.db.commit()
logger.info(f"Approved statement {statement.statement_number} by {approved_by}")
return statement
def mark_statement_sent(self, statement_id: int, sent_by: str) -> BillingStatement:
"""Mark statement as sent"""
statement = self.db.query(BillingStatement).filter(
BillingStatement.id == statement_id
).first()
if not statement:
raise StatementGenerationError(f"Statement {statement_id} not found")
statement.status = StatementStatus.SENT
statement.sent_by = sent_by
statement.sent_at = datetime.now()
self.db.commit()
logger.info(f"Marked statement {statement.statement_number} as sent by {sent_by}")
return statement

View File

@@ -0,0 +1,651 @@
"""
Enhanced file management service
Handles file closure, status workflows, transfers, and archival
"""
from typing import List, Dict, Any, Optional, Tuple
from datetime import datetime, date, timezone
from decimal import Decimal
from sqlalchemy.orm import Session, joinedload
from sqlalchemy import and_, func, or_, desc
from app.models import (
File, Ledger, FileStatus, FileType, Rolodex, Employee,
BillingStatement, Timer, TimeEntry, User, FileStatusHistory,
FileTransferHistory, FileArchiveInfo
)
from app.utils.logging import app_logger
logger = app_logger
class FileManagementError(Exception):
"""Exception raised when file management operations fail"""
pass
class FileStatusWorkflow:
"""Define valid file status transitions and business rules"""
# Define valid status transitions
VALID_TRANSITIONS = {
"NEW": ["ACTIVE", "INACTIVE", "FOLLOW_UP"],
"ACTIVE": ["INACTIVE", "FOLLOW_UP", "PENDING_CLOSURE", "ARCHIVED"],
"INACTIVE": ["ACTIVE", "FOLLOW_UP", "PENDING_CLOSURE", "ARCHIVED"],
"FOLLOW_UP": ["ACTIVE", "INACTIVE", "PENDING_CLOSURE", "ARCHIVED"],
"PENDING_CLOSURE": ["ACTIVE", "INACTIVE", "CLOSED"],
"CLOSED": ["ARCHIVED", "ACTIVE"], # Allow reopening
"ARCHIVED": [] # Final state - no transitions
}
# Statuses that require special validation
CLOSURE_STATUSES = {"PENDING_CLOSURE", "CLOSED"}
FINAL_STATUSES = {"ARCHIVED"}
ACTIVE_STATUSES = {"NEW", "ACTIVE", "FOLLOW_UP", "PENDING_CLOSURE"}
@classmethod
def can_transition(cls, from_status: str, to_status: str) -> bool:
"""Check if status transition is valid"""
return to_status in cls.VALID_TRANSITIONS.get(from_status, [])
@classmethod
def get_valid_transitions(cls, from_status: str) -> List[str]:
"""Get list of valid status transitions from current status"""
return cls.VALID_TRANSITIONS.get(from_status, [])
@classmethod
def requires_closure_validation(cls, status: str) -> bool:
"""Check if status requires closure validation"""
return status in cls.CLOSURE_STATUSES
@classmethod
def is_active_status(cls, status: str) -> bool:
"""Check if status indicates an active file"""
return status in cls.ACTIVE_STATUSES
class FileManagementService:
"""Service for advanced file management operations"""
def __init__(self, db: Session):
self.db = db
self.workflow = FileStatusWorkflow()
def change_file_status(
self,
file_no: str,
new_status: str,
user_id: int,
notes: Optional[str] = None,
validate_transition: bool = True
) -> File:
"""Change file status with workflow validation"""
file_obj = self.db.query(File).filter(File.file_no == file_no).first()
if not file_obj:
raise FileManagementError(f"File {file_no} not found")
current_status = file_obj.status
# Validate status transition
if validate_transition and not self.workflow.can_transition(current_status, new_status):
valid_transitions = self.workflow.get_valid_transitions(current_status)
raise FileManagementError(
f"Invalid status transition from '{current_status}' to '{new_status}'. "
f"Valid transitions: {valid_transitions}"
)
# Special validation for closure statuses
if self.workflow.requires_closure_validation(new_status):
self._validate_file_closure(file_obj)
# Update file status
old_status = file_obj.status
file_obj.status = new_status
# Set closure date if closing
if new_status == "CLOSED" and not file_obj.closed:
file_obj.closed = date.today()
elif new_status != "CLOSED" and file_obj.closed:
# Clear closure date if reopening
file_obj.closed = None
# Create status history record
self._create_status_history(file_no, old_status, new_status, user_id, notes)
self.db.commit()
self.db.refresh(file_obj)
logger.info(f"Changed file {file_no} status from '{old_status}' to '{new_status}' by user {user_id}")
return file_obj
def close_file(
self,
file_no: str,
user_id: int,
force_close: bool = False,
final_payment_amount: Optional[float] = None,
closing_notes: Optional[str] = None
) -> Dict[str, Any]:
"""Close a file with automated closure process"""
file_obj = self.db.query(File).filter(File.file_no == file_no).first()
if not file_obj:
raise FileManagementError(f"File {file_no} not found")
if file_obj.status == "CLOSED":
raise FileManagementError("File is already closed")
closure_summary = {
"file_no": file_no,
"closure_date": date.today(),
"actions_taken": [],
"warnings": [],
"final_balance": 0.0,
"trust_balance": file_obj.trust_bal or 0.0
}
try:
# Step 1: Validate closure readiness
validation_result = self._validate_file_closure(file_obj, force_close)
closure_summary["warnings"].extend(validation_result.get("warnings", []))
if validation_result.get("blocking_issues") and not force_close:
raise FileManagementError(
f"Cannot close file: {'; '.join(validation_result['blocking_issues'])}"
)
# Step 2: Handle outstanding balances
outstanding_balance = self._calculate_outstanding_balance(file_obj)
closure_summary["final_balance"] = outstanding_balance
if outstanding_balance > 0 and final_payment_amount:
# Create payment entry to close outstanding balance
payment_entry = self._create_final_payment_entry(
file_obj, final_payment_amount, user_id
)
closure_summary["actions_taken"].append(
f"Created final payment entry: ${final_payment_amount:.2f}"
)
outstanding_balance -= final_payment_amount
# Step 3: Stop any active timers
active_timers = self._stop_active_timers(file_no, user_id)
if active_timers:
closure_summary["actions_taken"].append(
f"Stopped {len(active_timers)} active timer(s)"
)
# Step 4: Mark unbilled time entries as non-billable if any
unbilled_entries = self._handle_unbilled_time_entries(file_no, user_id)
if unbilled_entries:
closure_summary["actions_taken"].append(
f"Marked {len(unbilled_entries)} time entries as non-billable"
)
# Step 5: Update file status
file_obj.status = "CLOSED"
file_obj.closed = date.today()
# Step 6: Create closure history record
self._create_status_history(
file_no,
file_obj.status,
"CLOSED",
user_id,
closing_notes or "File closed via automated closure process"
)
self.db.commit()
closure_summary["actions_taken"].append("File status updated to CLOSED")
logger.info(f"Successfully closed file {file_no} by user {user_id}")
return closure_summary
except Exception as e:
self.db.rollback()
logger.error(f"Failed to close file {file_no}: {str(e)}")
raise FileManagementError(f"File closure failed: {str(e)}")
def reopen_file(
self,
file_no: str,
user_id: int,
new_status: str = "ACTIVE",
notes: Optional[str] = None
) -> File:
"""Reopen a closed file"""
file_obj = self.db.query(File).filter(File.file_no == file_no).first()
if not file_obj:
raise FileManagementError(f"File {file_no} not found")
if file_obj.status != "CLOSED":
raise FileManagementError("Only closed files can be reopened")
if file_obj.status == "ARCHIVED":
raise FileManagementError("Archived files cannot be reopened")
# Validate new status
if not self.workflow.can_transition("CLOSED", new_status):
valid_transitions = self.workflow.get_valid_transitions("CLOSED")
raise FileManagementError(
f"Invalid reopening status '{new_status}'. Valid options: {valid_transitions}"
)
# Update file
old_status = file_obj.status
file_obj.status = new_status
file_obj.closed = None # Clear closure date
# Create status history
self._create_status_history(
file_no,
old_status,
new_status,
user_id,
notes or "File reopened"
)
self.db.commit()
self.db.refresh(file_obj)
logger.info(f"Reopened file {file_no} to status '{new_status}' by user {user_id}")
return file_obj
def transfer_file(
self,
file_no: str,
new_attorney_id: str,
user_id: int,
transfer_reason: Optional[str] = None
) -> File:
"""Transfer file to a different attorney"""
file_obj = self.db.query(File).filter(File.file_no == file_no).first()
if not file_obj:
raise FileManagementError(f"File {file_no} not found")
# Validate new attorney exists
new_attorney = self.db.query(Employee).filter(Employee.empl_num == new_attorney_id).first()
if not new_attorney:
raise FileManagementError(f"Attorney {new_attorney_id} not found")
if not new_attorney.active:
raise FileManagementError(f"Attorney {new_attorney_id} is not active")
old_attorney = file_obj.empl_num
if old_attorney == new_attorney_id:
raise FileManagementError("File is already assigned to this attorney")
# Update file assignment
file_obj.empl_num = new_attorney_id
# Update hourly rate if attorney has a default rate
if new_attorney.rate_per_hour and new_attorney.rate_per_hour > 0:
file_obj.rate_per_hour = new_attorney.rate_per_hour
# Create transfer history record
self._create_transfer_history(
file_no,
old_attorney,
new_attorney_id,
user_id,
transfer_reason
)
self.db.commit()
self.db.refresh(file_obj)
logger.info(f"Transferred file {file_no} from {old_attorney} to {new_attorney_id} by user {user_id}")
return file_obj
def archive_file(
self,
file_no: str,
user_id: int,
archive_location: Optional[str] = None,
notes: Optional[str] = None
) -> File:
"""Archive a closed file"""
file_obj = self.db.query(File).filter(File.file_no == file_no).first()
if not file_obj:
raise FileManagementError(f"File {file_no} not found")
if file_obj.status != "CLOSED":
raise FileManagementError("Only closed files can be archived")
# Check for any recent activity (within last 30 days)
recent_activity = self._check_recent_activity(file_no, days=30)
if recent_activity:
raise FileManagementError(
f"File has recent activity and cannot be archived: {recent_activity}"
)
# Update file status
old_status = file_obj.status
file_obj.status = "ARCHIVED"
# Create archive history record
archive_notes = notes or f"File archived to location: {archive_location or 'Standard archive'}"
self._create_status_history(file_no, old_status, "ARCHIVED", user_id, archive_notes)
self.db.commit()
self.db.refresh(file_obj)
logger.info(f"Archived file {file_no} by user {user_id}")
return file_obj
def get_file_status_history(self, file_no: str) -> List[Dict[str, Any]]:
"""Get status change history for a file"""
history = self.db.query(FileStatusHistory).filter(
FileStatusHistory.file_no == file_no
).options(
joinedload(FileStatusHistory.changed_by)
).order_by(FileStatusHistory.change_date.desc()).all()
return [
{
"id": h.id,
"old_status": h.old_status,
"new_status": h.new_status,
"change_date": h.change_date,
"changed_by": h.changed_by_name or (h.changed_by.username if h.changed_by else "System"),
"notes": h.notes,
"system_generated": h.system_generated
}
for h in history
]
def get_files_by_status(
self,
status: str,
attorney_id: Optional[str] = None,
limit: int = 100
) -> List[File]:
"""Get files by status with optional attorney filter"""
query = self.db.query(File).filter(File.status == status)
if attorney_id:
query = query.filter(File.empl_num == attorney_id)
return query.options(
joinedload(File.owner)
).order_by(File.opened.desc()).limit(limit).all()
def get_closure_candidates(self, days_inactive: int = 90) -> List[Dict[str, Any]]:
"""Get files that are candidates for closure"""
cutoff_date = datetime.now(timezone.utc).date() - datetime.timedelta(days=days_inactive)
# Files with no recent ledger activity
files_query = self.db.query(File).filter(
File.status.in_(["ACTIVE", "FOLLOW_UP"]),
File.opened < cutoff_date
)
candidates = []
for file_obj in files_query.all():
# Check for recent ledger activity
recent_activity = self.db.query(Ledger).filter(
Ledger.file_no == file_obj.file_no,
Ledger.date >= cutoff_date
).first()
if not recent_activity:
outstanding_balance = self._calculate_outstanding_balance(file_obj)
candidates.append({
"file_no": file_obj.file_no,
"client_name": f"{file_obj.owner.first or ''} {file_obj.owner.last}".strip() if file_obj.owner else "Unknown",
"attorney": file_obj.empl_num,
"opened_date": file_obj.opened,
"last_activity": None, # Could be enhanced to find actual last activity
"outstanding_balance": outstanding_balance,
"status": file_obj.status
})
return sorted(candidates, key=lambda x: x["opened_date"])
def bulk_status_update(
self,
file_numbers: List[str],
new_status: str,
user_id: int,
notes: Optional[str] = None
) -> Dict[str, Any]:
"""Update status for multiple files"""
results = {
"successful": [],
"failed": [],
"total": len(file_numbers)
}
for file_no in file_numbers:
try:
self.change_file_status(file_no, new_status, user_id, notes)
results["successful"].append(file_no)
except Exception as e:
results["failed"].append({
"file_no": file_no,
"error": str(e)
})
logger.info(f"Bulk status update: {len(results['successful'])} successful, {len(results['failed'])} failed")
return results
# Private helper methods
def _validate_file_closure(self, file_obj: File, force: bool = False) -> Dict[str, Any]:
"""Validate if file is ready for closure"""
validation_result = {
"can_close": True,
"blocking_issues": [],
"warnings": []
}
# Check for active timers
active_timers = self.db.query(Timer).filter(
Timer.file_no == file_obj.file_no,
Timer.status.in_(["running", "paused"])
).count()
if active_timers > 0:
validation_result["warnings"].append(f"{active_timers} active timer(s) will be stopped")
# Check for unbilled time entries
unbilled_entries = self.db.query(TimeEntry).filter(
TimeEntry.file_no == file_obj.file_no,
TimeEntry.billed == False,
TimeEntry.is_billable == True
).count()
if unbilled_entries > 0:
validation_result["warnings"].append(f"{unbilled_entries} unbilled time entries exist")
# Check for outstanding balance
outstanding_balance = self._calculate_outstanding_balance(file_obj)
if outstanding_balance > 0:
validation_result["warnings"].append(f"Outstanding balance: ${outstanding_balance:.2f}")
# Check for pending billing statements
pending_statements = self.db.query(BillingStatement).filter(
BillingStatement.file_no == file_obj.file_no,
BillingStatement.status.in_(["draft", "pending_approval"])
).count()
if pending_statements > 0:
validation_result["blocking_issues"].append(f"{pending_statements} pending billing statement(s)")
validation_result["can_close"] = False
return validation_result
def _calculate_outstanding_balance(self, file_obj: File) -> float:
"""Calculate outstanding balance for a file"""
# Sum all charges minus payments
charges = self.db.query(func.sum(Ledger.amount)).filter(
Ledger.file_no == file_obj.file_no,
Ledger.t_type.in_(["1", "2", "3", "4"]) # Fee and cost types
).scalar() or 0.0
payments = self.db.query(func.sum(Ledger.amount)).filter(
Ledger.file_no == file_obj.file_no,
Ledger.t_type == "5" # Payment type
).scalar() or 0.0
return max(0.0, charges - payments)
def _stop_active_timers(self, file_no: str, user_id: int) -> List[Timer]:
"""Stop all active timers for a file"""
from app.services.timers import TimerService
active_timers = self.db.query(Timer).filter(
Timer.file_no == file_no,
Timer.status.in_(["running", "paused"])
).all()
timer_service = TimerService(self.db)
stopped_timers = []
for timer in active_timers:
try:
timer_service.stop_timer(timer.id, timer.user_id)
stopped_timers.append(timer)
except Exception as e:
logger.warning(f"Failed to stop timer {timer.id}: {str(e)}")
return stopped_timers
def _handle_unbilled_time_entries(self, file_no: str, user_id: int) -> List[TimeEntry]:
"""Handle unbilled time entries during file closure"""
unbilled_entries = self.db.query(TimeEntry).filter(
TimeEntry.file_no == file_no,
TimeEntry.billed == False,
TimeEntry.is_billable == True
).all()
# Mark as non-billable to prevent future billing
for entry in unbilled_entries:
entry.is_billable = False
entry.notes = (entry.notes or "") + " [Marked non-billable during file closure]"
return unbilled_entries
def _create_final_payment_entry(
self,
file_obj: File,
amount: float,
user_id: int
) -> Ledger:
"""Create a final payment entry to close outstanding balance"""
# Get next item number
max_item = self.db.query(func.max(Ledger.item_no)).filter(
Ledger.file_no == file_obj.file_no
).scalar() or 0
payment_entry = Ledger(
file_no=file_obj.file_no,
item_no=max_item + 1,
date=date.today(),
t_code="FINAL",
t_type="5", # Payment type
empl_num=f"user_{user_id}",
amount=amount,
billed="Y", # Mark as billed
note="Final payment - file closure"
)
self.db.add(payment_entry)
return payment_entry
def _check_recent_activity(self, file_no: str, days: int = 30) -> Optional[str]:
"""Check for recent activity that would prevent archival"""
cutoff_date = datetime.now(timezone.utc).date() - datetime.timedelta(days=days)
# Check for recent ledger entries
recent_ledger = self.db.query(Ledger).filter(
Ledger.file_no == file_no,
Ledger.date >= cutoff_date
).first()
if recent_ledger:
return f"Recent ledger activity on {recent_ledger.date}"
# Check for recent billing statements
recent_statement = self.db.query(BillingStatement).filter(
BillingStatement.file_no == file_no,
BillingStatement.created_at >= cutoff_date
).first()
if recent_statement:
return f"Recent billing statement on {recent_statement.created_at.date()}"
return None
def _create_status_history(
self,
file_no: str,
old_status: str,
new_status: str,
user_id: int,
notes: Optional[str] = None
):
"""Create a status history record"""
# Get user info for caching
user = self.db.query(User).filter(User.id == user_id).first()
user_name = user.username if user else f"user_{user_id}"
history_record = FileStatusHistory(
file_no=file_no,
old_status=old_status,
new_status=new_status,
changed_by_user_id=user_id,
changed_by_name=user_name,
notes=notes,
system_generated=False
)
self.db.add(history_record)
logger.info(
f"File {file_no} status changed: {old_status} -> {new_status} by {user_name}"
+ (f" - {notes}" if notes else "")
)
def _create_transfer_history(
self,
file_no: str,
old_attorney: str,
new_attorney: str,
user_id: int,
reason: Optional[str] = None
):
"""Create a transfer history record"""
# Get user info for caching
user = self.db.query(User).filter(User.id == user_id).first()
user_name = user.username if user else f"user_{user_id}"
# Get current file for rate info
file_obj = self.db.query(File).filter(File.file_no == file_no).first()
old_rate = file_obj.rate_per_hour if file_obj else None
# Get new attorney rate
new_attorney_obj = self.db.query(Employee).filter(Employee.empl_num == new_attorney).first()
new_rate = new_attorney_obj.rate_per_hour if new_attorney_obj else None
transfer_record = FileTransferHistory(
file_no=file_no,
old_attorney_id=old_attorney,
new_attorney_id=new_attorney,
authorized_by_user_id=user_id,
authorized_by_name=user_name,
reason=reason,
old_hourly_rate=old_rate,
new_hourly_rate=new_rate
)
self.db.add(transfer_record)
logger.info(
f"File {file_no} transferred: {old_attorney} -> {new_attorney} by {user_name}"
+ (f" - {reason}" if reason else "")
)

530
app/services/timers.py Normal file
View File

@@ -0,0 +1,530 @@
"""
Timer service for time tracking functionality
Handles timer start/stop/pause operations and time entry creation
"""
from typing import List, Dict, Any, Optional, Tuple
from datetime import datetime, timezone, timedelta
from decimal import Decimal
from sqlalchemy.orm import Session, joinedload
from sqlalchemy import and_, func, or_
from app.models import (
Timer, TimeEntry, TimerSession, TimerTemplate, TimerStatus, TimerType,
User, File, Ledger, Rolodex
)
from app.utils.logging import app_logger
logger = app_logger
class TimerServiceError(Exception):
"""Exception raised when timer operations fail"""
pass
class TimerService:
"""Service for managing timers and time tracking"""
def __init__(self, db: Session):
self.db = db
def create_timer(
self,
user_id: int,
title: str,
description: Optional[str] = None,
file_no: Optional[str] = None,
customer_id: Optional[str] = None,
timer_type: TimerType = TimerType.BILLABLE,
hourly_rate: Optional[float] = None,
task_category: Optional[str] = None,
template_id: Optional[int] = None
) -> Timer:
"""Create a new timer"""
# Validate user exists
user = self.db.query(User).filter(User.id == user_id).first()
if not user:
raise TimerServiceError(f"User {user_id} not found")
# Validate file exists if provided
if file_no:
file_obj = self.db.query(File).filter(File.file_no == file_no).first()
if not file_obj:
raise TimerServiceError(f"File {file_no} not found")
# Use file's hourly rate if not specified
if not hourly_rate and file_obj.rate_per_hour:
hourly_rate = file_obj.rate_per_hour
# Validate customer exists if provided
if customer_id:
customer = self.db.query(Rolodex).filter(Rolodex.id == customer_id).first()
if not customer:
raise TimerServiceError(f"Customer {customer_id} not found")
# Apply template if provided
if template_id:
template = self.db.query(TimerTemplate).filter(TimerTemplate.id == template_id).first()
if template:
if not title:
title = template.title_template
if not description:
description = template.description_template
if timer_type == TimerType.BILLABLE: # Only override if default
timer_type = template.timer_type
if not hourly_rate and template.default_rate:
hourly_rate = template.default_rate
if not task_category:
task_category = template.task_category
# Update template usage count
template.usage_count += 1
timer = Timer(
user_id=user_id,
file_no=file_no,
customer_id=customer_id,
title=title,
description=description,
timer_type=timer_type,
hourly_rate=hourly_rate,
task_category=task_category,
is_billable=(timer_type == TimerType.BILLABLE),
status=TimerStatus.STOPPED
)
self.db.add(timer)
self.db.commit()
self.db.refresh(timer)
logger.info(f"Created timer {timer.id} for user {user_id}: {title}")
return timer
def start_timer(self, timer_id: int, user_id: int) -> Timer:
"""Start a timer"""
timer = self._get_user_timer(timer_id, user_id)
if timer.status == TimerStatus.RUNNING:
raise TimerServiceError("Timer is already running")
now = datetime.now(timezone.utc)
# Stop any other running timers for this user
self._stop_other_timers(user_id, timer_id)
# Update timer status
timer.status = TimerStatus.RUNNING
timer.last_started_at = now
if not timer.started_at:
timer.started_at = now
# Create session record
session = TimerSession(
timer_id=timer.id,
started_at=now
)
self.db.add(session)
self.db.commit()
self.db.refresh(timer)
logger.info(f"Started timer {timer.id} for user {user_id}")
return timer
def pause_timer(self, timer_id: int, user_id: int) -> Timer:
"""Pause a running timer"""
timer = self._get_user_timer(timer_id, user_id)
if timer.status != TimerStatus.RUNNING:
raise TimerServiceError("Timer is not running")
now = datetime.now(timezone.utc)
# Calculate session time and add to total
if timer.last_started_at:
session_seconds = int((now - timer.last_started_at).total_seconds())
timer.total_seconds += session_seconds
# Update timer status
timer.status = TimerStatus.PAUSED
timer.last_paused_at = now
# Update current session
current_session = self.db.query(TimerSession).filter(
TimerSession.timer_id == timer.id,
TimerSession.ended_at.is_(None)
).order_by(TimerSession.started_at.desc()).first()
if current_session:
current_session.ended_at = now
current_session.duration_seconds = int((now - current_session.started_at).total_seconds())
self.db.commit()
self.db.refresh(timer)
logger.info(f"Paused timer {timer.id} for user {user_id}")
return timer
def stop_timer(self, timer_id: int, user_id: int) -> Timer:
"""Stop a timer completely"""
timer = self._get_user_timer(timer_id, user_id)
if timer.status == TimerStatus.STOPPED:
raise TimerServiceError("Timer is already stopped")
now = datetime.now(timezone.utc)
# If running, calculate final session time
if timer.status == TimerStatus.RUNNING and timer.last_started_at:
session_seconds = int((now - timer.last_started_at).total_seconds())
timer.total_seconds += session_seconds
# Update timer status
timer.status = TimerStatus.STOPPED
timer.stopped_at = now
# Update current session
current_session = self.db.query(TimerSession).filter(
TimerSession.timer_id == timer.id,
TimerSession.ended_at.is_(None)
).order_by(TimerSession.started_at.desc()).first()
if current_session:
current_session.ended_at = now
current_session.duration_seconds = int((now - current_session.started_at).total_seconds())
self.db.commit()
self.db.refresh(timer)
logger.info(f"Stopped timer {timer.id} for user {user_id}, total time: {timer.total_hours:.2f} hours")
return timer
def resume_timer(self, timer_id: int, user_id: int) -> Timer:
"""Resume a paused timer"""
timer = self._get_user_timer(timer_id, user_id)
if timer.status != TimerStatus.PAUSED:
raise TimerServiceError("Timer is not paused")
# Stop any other running timers for this user
self._stop_other_timers(user_id, timer_id)
return self.start_timer(timer_id, user_id)
def delete_timer(self, timer_id: int, user_id: int) -> bool:
"""Delete a timer (only if stopped)"""
timer = self._get_user_timer(timer_id, user_id)
if timer.status != TimerStatus.STOPPED:
raise TimerServiceError("Can only delete stopped timers")
# Check if timer has associated time entries
entry_count = self.db.query(TimeEntry).filter(TimeEntry.timer_id == timer_id).count()
if entry_count > 0:
raise TimerServiceError(f"Cannot delete timer: {entry_count} time entries are linked to this timer")
self.db.delete(timer)
self.db.commit()
logger.info(f"Deleted timer {timer_id} for user {user_id}")
return True
def get_active_timers(self, user_id: int) -> List[Timer]:
"""Get all active (running or paused) timers for a user"""
return self.db.query(Timer).filter(
Timer.user_id == user_id,
Timer.status.in_([TimerStatus.RUNNING, TimerStatus.PAUSED])
).options(
joinedload(Timer.file),
joinedload(Timer.customer)
).all()
def get_user_timers(
self,
user_id: int,
status_filter: Optional[TimerStatus] = None,
file_no: Optional[str] = None,
limit: int = 50
) -> List[Timer]:
"""Get timers for a user with optional filtering"""
query = self.db.query(Timer).filter(Timer.user_id == user_id)
if status_filter:
query = query.filter(Timer.status == status_filter)
if file_no:
query = query.filter(Timer.file_no == file_no)
return query.options(
joinedload(Timer.file),
joinedload(Timer.customer)
).order_by(Timer.updated_at.desc()).limit(limit).all()
def create_time_entry_from_timer(
self,
timer_id: int,
user_id: int,
title: Optional[str] = None,
description: Optional[str] = None,
hours_override: Optional[float] = None,
entry_date: Optional[datetime] = None
) -> TimeEntry:
"""Create a time entry from a completed timer"""
timer = self._get_user_timer(timer_id, user_id)
if timer.status != TimerStatus.STOPPED:
raise TimerServiceError("Timer must be stopped to create time entry")
if timer.total_seconds == 0:
raise TimerServiceError("Timer has no recorded time")
# Use timer details or overrides
entry_title = title or timer.title
entry_description = description or timer.description
entry_hours = hours_override or timer.total_hours
entry_date = entry_date or timer.stopped_at or datetime.now(timezone.utc)
time_entry = TimeEntry(
timer_id=timer.id,
user_id=user_id,
file_no=timer.file_no,
customer_id=timer.customer_id,
title=entry_title,
description=entry_description,
entry_type=timer.timer_type,
hours=entry_hours,
entry_date=entry_date,
hourly_rate=timer.hourly_rate,
is_billable=timer.is_billable,
task_category=timer.task_category,
created_by=f"user_{user_id}"
)
self.db.add(time_entry)
self.db.commit()
self.db.refresh(time_entry)
logger.info(f"Created time entry {time_entry.id} from timer {timer_id}: {entry_hours:.2f} hours")
return time_entry
def create_manual_time_entry(
self,
user_id: int,
title: str,
hours: float,
entry_date: datetime,
description: Optional[str] = None,
file_no: Optional[str] = None,
customer_id: Optional[str] = None,
hourly_rate: Optional[float] = None,
entry_type: TimerType = TimerType.BILLABLE,
task_category: Optional[str] = None
) -> TimeEntry:
"""Create a manual time entry (not from a timer)"""
# Validate user
user = self.db.query(User).filter(User.id == user_id).first()
if not user:
raise TimerServiceError(f"User {user_id} not found")
# Validate file if provided
if file_no:
file_obj = self.db.query(File).filter(File.file_no == file_no).first()
if not file_obj:
raise TimerServiceError(f"File {file_no} not found")
# Use file's rate if not specified
if not hourly_rate and file_obj.rate_per_hour:
hourly_rate = file_obj.rate_per_hour
# Validate customer if provided
if customer_id:
customer = self.db.query(Rolodex).filter(Rolodex.id == customer_id).first()
if not customer:
raise TimerServiceError(f"Customer {customer_id} not found")
time_entry = TimeEntry(
user_id=user_id,
file_no=file_no,
customer_id=customer_id,
title=title,
description=description,
entry_type=entry_type,
hours=hours,
entry_date=entry_date,
hourly_rate=hourly_rate,
is_billable=(entry_type == TimerType.BILLABLE),
task_category=task_category,
created_by=f"user_{user_id}"
)
self.db.add(time_entry)
self.db.commit()
self.db.refresh(time_entry)
logger.info(f"Created manual time entry {time_entry.id} for user {user_id}: {hours:.2f} hours")
return time_entry
def convert_time_entry_to_ledger(
self,
time_entry_id: int,
user_id: int,
transaction_code: str = "TIME",
notes: Optional[str] = None
) -> Ledger:
"""Convert a time entry to a billable ledger transaction"""
time_entry = self.db.query(TimeEntry).filter(
TimeEntry.id == time_entry_id,
TimeEntry.user_id == user_id
).first()
if not time_entry:
raise TimerServiceError(f"Time entry {time_entry_id} not found")
if time_entry.billed:
raise TimerServiceError("Time entry has already been billed")
if not time_entry.is_billable:
raise TimerServiceError("Time entry is not billable")
if not time_entry.file_no:
raise TimerServiceError("Time entry must have a file assignment for billing")
if not time_entry.hourly_rate or time_entry.hourly_rate <= 0:
raise TimerServiceError("Time entry must have a valid hourly rate for billing")
# Get next item number for this file
max_item = self.db.query(func.max(Ledger.item_no)).filter(
Ledger.file_no == time_entry.file_no
).scalar() or 0
# Calculate amount
amount = time_entry.hours * time_entry.hourly_rate
# Create ledger entry
ledger_entry = Ledger(
file_no=time_entry.file_no,
item_no=max_item + 1,
date=time_entry.entry_date.date() if hasattr(time_entry.entry_date, 'date') else time_entry.entry_date,
t_code=transaction_code,
t_type="1", # Type 1 = hourly fees
empl_num=f"user_{user_id}",
quantity=time_entry.hours,
rate=time_entry.hourly_rate,
amount=amount,
billed="N", # Will be marked as billed when statement is approved
note=notes or time_entry.description or time_entry.title
)
# Link time entry to ledger entry
time_entry.ledger_id = ledger_entry.id
time_entry.billed = True
self.db.add(ledger_entry)
self.db.commit()
self.db.refresh(ledger_entry)
# Update the time entry with the ledger ID
time_entry.ledger_id = ledger_entry.id
self.db.commit()
logger.info(f"Converted time entry {time_entry_id} to ledger entry {ledger_entry.id}: ${amount:.2f}")
return ledger_entry
def update_timer_total(self, timer_id: int) -> Timer:
"""Recalculate timer total from sessions (for data consistency)"""
timer = self.db.query(Timer).filter(Timer.id == timer_id).first()
if not timer:
raise TimerServiceError(f"Timer {timer_id} not found")
# Calculate total from completed sessions
total_seconds = self.db.query(func.sum(TimerSession.duration_seconds)).filter(
TimerSession.timer_id == timer_id,
TimerSession.ended_at.isnot(None)
).scalar() or 0
# Add current running session if applicable
if timer.status == TimerStatus.RUNNING:
total_seconds += timer.get_current_session_seconds()
timer.total_seconds = total_seconds
self.db.commit()
return timer
def _get_user_timer(self, timer_id: int, user_id: int) -> Timer:
"""Get timer and verify ownership"""
timer = self.db.query(Timer).filter(
Timer.id == timer_id,
Timer.user_id == user_id
).first()
if not timer:
raise TimerServiceError(f"Timer {timer_id} not found or access denied")
return timer
def _stop_other_timers(self, user_id: int, exclude_timer_id: int):
"""Stop all other running timers for a user"""
running_timers = self.db.query(Timer).filter(
Timer.user_id == user_id,
Timer.status == TimerStatus.RUNNING,
Timer.id != exclude_timer_id
).all()
for timer in running_timers:
try:
self.pause_timer(timer.id, user_id)
logger.info(f"Auto-paused timer {timer.id} when starting timer {exclude_timer_id}")
except Exception as e:
logger.warning(f"Failed to auto-pause timer {timer.id}: {str(e)}")
def get_timer_statistics(self, user_id: int, days: int = 30) -> Dict[str, Any]:
"""Get timer statistics for a user over the last N days"""
cutoff_date = datetime.now(timezone.utc) - timedelta(days=days)
# Total time tracked
total_seconds = self.db.query(func.sum(Timer.total_seconds)).filter(
Timer.user_id == user_id,
Timer.created_at >= cutoff_date
).scalar() or 0
# Total billable time
billable_seconds = self.db.query(func.sum(Timer.total_seconds)).filter(
Timer.user_id == user_id,
Timer.is_billable == True,
Timer.created_at >= cutoff_date
).scalar() or 0
# Number of active timers
active_count = self.db.query(Timer).filter(
Timer.user_id == user_id,
Timer.status.in_([TimerStatus.RUNNING, TimerStatus.PAUSED])
).count()
# Number of time entries created
entries_count = self.db.query(TimeEntry).filter(
TimeEntry.user_id == user_id,
TimeEntry.created_at >= cutoff_date
).count()
# Entries converted to billing
billed_entries = self.db.query(TimeEntry).filter(
TimeEntry.user_id == user_id,
TimeEntry.billed == True,
TimeEntry.created_at >= cutoff_date
).count()
return {
"period_days": days,
"total_hours": total_seconds / 3600.0,
"billable_hours": billable_seconds / 3600.0,
"non_billable_hours": (total_seconds - billable_seconds) / 3600.0,
"active_timers": active_count,
"time_entries_created": entries_count,
"time_entries_billed": billed_entries,
"billable_percentage": (billable_seconds / total_seconds * 100) if total_seconds > 0 else 0
}