"""
Billing statement generation service
Handles statement creation, template rendering, and PDF generation
"""
from typing import List, Dict, Any, Optional, Tuple
from datetime import date, datetime, timedelta
from decimal import Decimal
from jinja2 import Template, Environment, BaseLoader
from sqlalchemy.orm import Session, joinedload
from sqlalchemy import and_, func
from app.models import (
BillingStatement, StatementTemplate, BillingStatementItem,
File, Ledger, Rolodex, StatementStatus
)
from app.utils.logging import app_logger
logger = app_logger
class StatementGenerationError(Exception):
"""Exception raised when statement generation fails"""
pass
class BillingStatementService:
"""Service for generating and managing billing statements"""
def __init__(self, db: Session):
self.db = db
self.jinja_env = Environment(loader=BaseLoader())
def generate_statement_number(self) -> str:
"""Generate unique statement number"""
today = date.today()
prefix = f"STMT-{today.strftime('%Y%m')}"
# Find highest number for this month
last_stmt = self.db.query(BillingStatement).filter(
BillingStatement.statement_number.like(f"{prefix}%")
).order_by(BillingStatement.statement_number.desc()).first()
if last_stmt:
try:
last_num = int(last_stmt.statement_number.split('-')[-1])
next_num = last_num + 1
except (ValueError, IndexError):
next_num = 1
else:
next_num = 1
return f"{prefix}-{next_num:04d}"
def get_unbilled_transactions(
self,
file_no: str,
period_start: date = None,
period_end: date = None
) -> List[Ledger]:
"""Get unbilled transactions for a file within date range"""
query = self.db.query(Ledger).filter(
Ledger.file_no == file_no,
Ledger.billed == "N"
)
if period_start:
query = query.filter(Ledger.date >= period_start)
if period_end:
query = query.filter(Ledger.date <= period_end)
return query.order_by(Ledger.date).all()
def calculate_statement_totals(self, transactions: List[Ledger]) -> Dict[str, float]:
"""Calculate financial totals for statement"""
totals = {
'fees': 0.0,
'costs': 0.0,
'payments': 0.0,
'trust_deposits': 0.0,
'trust_transfers': 0.0,
'adjustments': 0.0,
'current_charges': 0.0,
'net_charges': 0.0
}
for txn in transactions:
amount = float(txn.amount or 0.0)
# Categorize by transaction type
if txn.t_type in ['1', '2']: # Fees (hourly and flat)
totals['fees'] += amount
totals['current_charges'] += amount
elif txn.t_type == '3': # Costs/disbursements
totals['costs'] += amount
totals['current_charges'] += amount
elif txn.t_type == '4': # Payments
totals['payments'] += amount
totals['net_charges'] -= amount
elif txn.t_type == '5': # Trust deposits
totals['trust_deposits'] += amount
# Add more categorization as needed
totals['net_charges'] = totals['current_charges'] - totals['payments']
return totals
def get_previous_balance(self, file_no: str, period_start: date) -> float:
"""Calculate previous balance before statement period"""
# Sum all transactions before period start
result = self.db.query(func.sum(Ledger.amount)).filter(
Ledger.file_no == file_no,
Ledger.date < period_start
).scalar()
return float(result or 0.0)
def get_trust_balance(self, file_no: str) -> float:
"""Get current trust account balance for file"""
file_obj = self.db.query(File).filter(File.file_no == file_no).first()
return float(file_obj.trust_bal or 0.0) if file_obj else 0.0
def create_statement(
self,
file_no: str,
period_start: date,
period_end: date,
template_id: Optional[int] = None,
custom_footer: Optional[str] = None,
created_by: Optional[str] = None
) -> BillingStatement:
"""Create a new billing statement for a file"""
# Get file and customer info
file_obj = self.db.query(File).options(
joinedload(File.owner)
).filter(File.file_no == file_no).first()
if not file_obj:
raise StatementGenerationError(f"File {file_no} not found")
# Get unbilled transactions
transactions = self.get_unbilled_transactions(file_no, period_start, period_end)
if not transactions:
raise StatementGenerationError(f"No unbilled transactions found for file {file_no}")
# Calculate totals
totals = self.calculate_statement_totals(transactions)
previous_balance = self.get_previous_balance(file_no, period_start)
trust_balance = self.get_trust_balance(file_no)
# Calculate total due
total_due = previous_balance + totals['current_charges'] - totals['payments']
# Get or create default template
if not template_id:
template = self.db.query(StatementTemplate).filter(
StatementTemplate.is_default == True,
StatementTemplate.is_active == True
).first()
if template:
template_id = template.id
# Create statement
statement = BillingStatement(
statement_number=self.generate_statement_number(),
file_no=file_no,
customer_id=file_obj.owner.id if file_obj.owner else None,
period_start=period_start,
period_end=period_end,
statement_date=date.today(),
due_date=date.today() + timedelta(days=30),
previous_balance=previous_balance,
current_charges=totals['current_charges'],
payments_credits=totals['payments'],
total_due=total_due,
trust_balance=trust_balance,
template_id=template_id,
billed_transaction_count=len(transactions),
custom_footer=custom_footer,
created_by=created_by,
status=StatementStatus.DRAFT
)
self.db.add(statement)
self.db.flush() # Get the statement ID
# Create statement items
for txn in transactions:
item = BillingStatementItem(
statement_id=statement.id,
ledger_id=txn.id,
date=txn.date,
description=txn.note or f"{txn.t_code} - {txn.empl_num}",
quantity=float(txn.quantity or 0.0),
rate=float(txn.rate or 0.0),
amount=float(txn.amount or 0.0),
item_category=self._categorize_transaction(txn)
)
self.db.add(item)
self.db.commit()
self.db.refresh(statement)
logger.info(f"Created statement {statement.statement_number} for file {file_no}")
return statement
def _categorize_transaction(self, txn: Ledger) -> str:
"""Categorize transaction for statement display"""
if txn.t_type in ['1', '2']:
return 'fees'
elif txn.t_type == '3':
return 'costs'
elif txn.t_type == '4':
return 'payments'
elif txn.t_type == '5':
return 'trust'
else:
return 'other'
def generate_statement_html(self, statement_id: int) -> str:
"""Generate HTML content for a statement"""
statement = self.db.query(BillingStatement).options(
joinedload(BillingStatement.file).joinedload(File.owner),
joinedload(BillingStatement.customer),
joinedload(BillingStatement.template),
joinedload(BillingStatement.statement_items)
).filter(BillingStatement.id == statement_id).first()
if not statement:
raise StatementGenerationError(f"Statement {statement_id} not found")
# Prepare template context
context = self._prepare_template_context(statement)
# Get template or use default
template_content = self._get_statement_template(statement)
# Render template
template = self.jinja_env.from_string(template_content)
html_content = template.render(**context)
# Save generated HTML
statement.html_content = html_content
self.db.commit()
return html_content
def _prepare_template_context(self, statement: BillingStatement) -> Dict[str, Any]:
"""Prepare context data for template rendering"""
# Group statement items by category
items_by_category = {}
for item in statement.statement_items:
category = item.item_category or 'other'
if category not in items_by_category:
items_by_category[category] = []
items_by_category[category].append(item)
return {
'statement': statement,
'file': statement.file,
'customer': statement.customer or statement.file.owner,
'items_by_category': items_by_category,
'total_fees': sum(item.amount for item in items_by_category.get('fees', [])),
'total_costs': sum(item.amount for item in items_by_category.get('costs', [])),
'total_payments': sum(item.amount for item in items_by_category.get('payments', [])),
'generation_date': datetime.now(),
'custom_footer': statement.custom_footer
}
def _get_statement_template(self, statement: BillingStatement) -> str:
"""Get template content for statement"""
if statement.template and statement.template.is_active:
# Use custom template
header = statement.template.header_template or ""
footer = statement.template.footer_template or ""
css = statement.template.css_styles or ""
return self._build_complete_template(header, footer, css)
else:
# Use default template
return self._get_default_template()
def _build_complete_template(self, header: str, footer: str, css: str) -> str:
"""Build complete HTML template from components"""
# Use regular string formatting to avoid f-string conflicts with Jinja2
template = """
Billing Statement - {{ statement.statement_number }}
{{ self.default_content() }}
""" % {"css": css, "header": header, "footer": footer}
return template
def _get_default_template(self) -> str:
"""Get default statement template"""
return """
Billing Statement - {{ statement.statement_number }}
Bill To:
{{ customer.first }} {{ customer.last }}
{% if customer.a1 %}{{ customer.a1 }}
{% endif %}
{% if customer.a2 %}{{ customer.a2 }}
{% endif %}
{% if customer.city %}{{ customer.city }}, {{ customer.abrev }} {{ customer.zip }}{% endif %}
| File Number: | {{ statement.file_no }} |
| Statement Date: | {{ statement.statement_date.strftime('%m/%d/%Y') }} |
| Period: | {{ statement.period_start.strftime('%m/%d/%Y') }} - {{ statement.period_end.strftime('%m/%d/%Y') }} |
| Due Date: | {{ statement.due_date.strftime('%m/%d/%Y') if statement.due_date else 'Upon Receipt' }} |
{% if items_by_category.fees %}
Professional Services
| Date |
Description |
Quantity |
Rate |
Amount |
{% for item in items_by_category.fees %}
| {{ item.date.strftime('%m/%d/%Y') }} |
{{ item.description }} |
{{ "%.2f"|format(item.quantity) if item.quantity else '' }} |
{{ "$%.2f"|format(item.rate) if item.rate else '' }} |
${{ "%.2f"|format(item.amount) }} |
{% endfor %}
| Total Professional Services |
${{ "%.2f"|format(total_fees) }} |
{% endif %}
{% if items_by_category.costs %}
Costs and Disbursements
| Date |
Description |
Amount |
{% for item in items_by_category.costs %}
| {{ item.date.strftime('%m/%d/%Y') }} |
{{ item.description }} |
${{ "%.2f"|format(item.amount) }} |
{% endfor %}
| Total Costs and Disbursements |
${{ "%.2f"|format(total_costs) }} |
{% endif %}
{% if items_by_category.payments %}
Payments and Credits
| Date |
Description |
Amount |
{% for item in items_by_category.payments %}
| {{ item.date.strftime('%m/%d/%Y') }} |
{{ item.description }} |
${{ "%.2f"|format(item.amount) }} |
{% endfor %}
| Total Payments and Credits |
${{ "%.2f"|format(total_payments) }} |
{% endif %}
| Previous Balance: |
${{ "%.2f"|format(statement.previous_balance) }} |
| Current Charges: |
${{ "%.2f"|format(statement.current_charges) }} |
| Payments/Credits: |
${{ "%.2f"|format(statement.payments_credits) }} |
| TOTAL DUE: |
${{ "%.2f"|format(statement.total_due) }} |
{% if statement.trust_balance > 0 %}
Trust Account Balance: ${{ "%.2f"|format(statement.trust_balance) }}
{% endif %}
"""
def approve_statement(self, statement_id: int, approved_by: str) -> BillingStatement:
"""Approve a statement and mark transactions as billed"""
statement = self.db.query(BillingStatement).filter(
BillingStatement.id == statement_id
).first()
if not statement:
raise StatementGenerationError(f"Statement {statement_id} not found")
if statement.status != StatementStatus.DRAFT:
raise StatementGenerationError(f"Only draft statements can be approved")
# Mark statement as approved
statement.status = StatementStatus.APPROVED
statement.approved_by = approved_by
statement.approved_at = datetime.now()
# Mark all related transactions as billed
ledger_ids = [item.ledger_id for item in statement.statement_items]
self.db.query(Ledger).filter(
Ledger.id.in_(ledger_ids)
).update({Ledger.billed: "Y"}, synchronize_session=False)
self.db.commit()
logger.info(f"Approved statement {statement.statement_number} by {approved_by}")
return statement
def mark_statement_sent(self, statement_id: int, sent_by: str) -> BillingStatement:
"""Mark statement as sent"""
statement = self.db.query(BillingStatement).filter(
BillingStatement.id == statement_id
).first()
if not statement:
raise StatementGenerationError(f"Statement {statement_id} not found")
statement.status = StatementStatus.SENT
statement.sent_by = sent_by
statement.sent_at = datetime.now()
self.db.commit()
logger.info(f"Marked statement {statement.statement_number} as sent by {sent_by}")
return statement