File Cabinet MVP: case detail with inline Ledger CRUD

- Extend Transaction with ledger fields (item_no, employee_number, t_code, t_type_l, quantity, rate, billed)
- Startup SQLite migration to add missing columns on transactions
- Ledger create/update/delete endpoints with validations and auto-compute Amount = Quantity × Rate
- Uniqueness: ensure (transaction_date, item_no) per case by auto-incrementing
- Compute case totals (billed/unbilled/overall) and display in case view
- Update case.html for master-detail ledger UI; add client-side auto-compute JS
- Enhance import_ledger_data to populate extended fields
- Close/Reopen actions retained; case detail sorting by date/item
- Auth: switch to pbkdf2_sha256 default (bcrypt fallback) and seed admin robustness

Tested in Docker: health OK, login OK, import ROLODEX/FILES OK, ledger create persisted and totals displayed.
This commit is contained in:
HotSwapp
2025-10-07 09:26:58 -05:00
parent f9c3b3cc9c
commit 950d261eb4
13 changed files with 496 additions and 19 deletions

View File

@@ -22,7 +22,7 @@ from fastapi.middleware.cors import CORSMiddleware
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
from sqlalchemy.orm import Session, joinedload
from sqlalchemy import or_, and_
from sqlalchemy import or_, and_, func as sa_func
from dotenv import load_dotenv
from starlette.middleware.base import BaseHTTPMiddleware
import structlog
@@ -611,13 +611,39 @@ def import_ledger_data(db: Session, file_path: str) -> Dict[str, Any]:
result['errors'].append(f"Row {row_num}: Invalid amount")
continue
tx_date = parse_date(row.get('Date', ''))
item_no = parse_int(row.get('Item_No', '') or '')
# ensure unique item_no per date by increment
# temp session-less check via while loop
desired_item_no = item_no if item_no is not None else 1
while True:
exists = (
db.query(Transaction)
.filter(
Transaction.case_id == case.id,
Transaction.transaction_date == tx_date,
Transaction.item_no == desired_item_no,
)
.first()
)
if not exists:
break
desired_item_no += 1
transaction = Transaction(
case_id=case.id,
transaction_date=parse_date(row.get('Date', '')),
transaction_type=row.get('T_Type', '').strip() or None,
transaction_date=tx_date,
transaction_type=(row.get('T_Type', '').strip() or None),
t_type_l=(row.get('T_Type_L', '').strip().upper() or None),
amount=amount,
description=row.get('Note', '').strip() or None,
reference=row.get('Item_No', '').strip() or None
description=(row.get('Note', '').strip() or None),
reference=(row.get('Item_No', '').strip() or None),
item_no=desired_item_no,
employee_number=(row.get('Empl_Num', '').strip() or None),
t_code=(row.get('T_Code', '').strip().upper() or None),
quantity=parse_float(row.get('Quantity', '')),
rate=parse_float(row.get('Rate', '')),
billed=((row.get('Billed', '') or '').strip().upper() or None),
)
db.add(transaction)
@@ -791,6 +817,271 @@ def process_csv_import(db: Session, import_type: str, file_path: str) -> Dict[st
return import_func(db, file_path)
# ------------------------------
# Ledger CRUD and helpers
# ------------------------------
def validate_ledger_fields(
*,
transaction_date: Optional[str],
t_code: Optional[str],
employee_number: Optional[str],
quantity: Optional[str],
rate: Optional[str],
amount: Optional[str],
billed: Optional[str],
) -> tuple[list[str], dict[str, Any]]:
"""Validate incoming ledger form fields and return (errors, parsed_values)."""
errors: list[str] = []
parsed: dict[str, Any] = {}
# Date
tx_dt = parse_date(transaction_date or "") if transaction_date is not None else None
if tx_dt is None:
errors.append("Date is required and must be valid")
else:
parsed["transaction_date"] = tx_dt
# T_Code
if t_code is None or not t_code.strip():
errors.append("T_Code is required")
else:
parsed["t_code"] = t_code.strip().upper()
# Employee number
if employee_number is None or not employee_number.strip():
errors.append("Empl_Num is required")
else:
parsed["employee_number"] = employee_number.strip()
# Quantity, Rate, Amount
qty = parse_float(quantity or "") if quantity is not None else None
rt = parse_float(rate or "") if rate is not None else None
amt = parse_float(amount or "") if amount is not None else None
if qty is not None:
parsed["quantity"] = qty
if rt is not None:
parsed["rate"] = rt
# Auto-compute amount if missing but quantity and rate present
if amt is None and qty is not None and rt is not None:
amt = round(qty * rt, 2)
if amt is None:
errors.append("Amount is required or derivable from Quantity × Rate")
else:
parsed["amount"] = amt
# Billed flag
billed_flag = (billed or "").strip().upper() if billed is not None else ""
if billed_flag not in ("Y", "N"):
errors.append("Billed must be 'Y' or 'N'")
else:
parsed["billed"] = billed_flag
return errors, parsed
def next_unique_item_no(db: Session, case_id: int, tx_date: datetime, desired_item_no: Optional[int]) -> int:
"""Ensure (transaction_date, item_no) uniqueness per case by incrementing if needed."""
# Start at provided item_no or at 1 if missing
item_no = int(desired_item_no) if desired_item_no is not None else 1
while True:
exists = (
db.query(Transaction)
.filter(
Transaction.case_id == case_id,
Transaction.transaction_date == tx_date,
Transaction.item_no == item_no,
)
.first()
)
if not exists:
return item_no
item_no += 1
def compute_case_totals_from_case(case_obj: Case) -> Dict[str, float]:
"""
Compute simple totals for a case from its transactions.
Returns billed, unbilled, and total sums. Amounts are treated as positive;
future enhancement could apply sign based on t_type_l.
"""
billed_total = 0.0
unbilled_total = 0.0
overall_total = 0.0
for t in (case_obj.transactions or []):
amt = float(t.amount) if t.amount is not None else 0.0
overall_total += amt
if (t.billed or '').upper() == 'Y':
billed_total += amt
else:
unbilled_total += amt
return {
'billed_total': round(billed_total, 2),
'unbilled_total': round(unbilled_total, 2),
'overall_total': round(overall_total, 2),
}
@app.post("/case/{case_id}/ledger")
async def ledger_create(
request: Request,
case_id: int,
db: Session = Depends(get_db),
):
user = get_current_user_from_session(request.session)
if not user:
return RedirectResponse(url="/login", status_code=302)
form = await request.form()
# Validate
errors, parsed = validate_ledger_fields(
transaction_date=form.get("transaction_date"),
t_code=form.get("t_code"),
employee_number=form.get("employee_number"),
quantity=form.get("quantity"),
rate=form.get("rate"),
amount=form.get("amount"),
billed=form.get("billed"),
)
if errors:
request.session["case_update_errors"] = errors
return RedirectResponse(url=f"/case/{case_id}", status_code=302)
# Ensure case exists
case_obj = db.query(Case).filter(Case.id == case_id).first()
if not case_obj:
request.session["case_update_errors"] = ["Case not found"]
return RedirectResponse(url=f"/case/{case_id}", status_code=302)
# Assign optional fields
t_type = (form.get("transaction_type") or "").strip() or None
t_type_l = (form.get("t_type_l") or "").strip().upper() or None
reference = (form.get("reference") or "").strip() or None
desc = (form.get("description") or "").strip() or None
desired_item_no = parse_int(form.get("item_no") or "")
item_no = next_unique_item_no(db, case_id, parsed["transaction_date"], desired_item_no)
try:
tx = Transaction(
case_id=case_id,
transaction_date=parsed["transaction_date"],
transaction_type=t_type,
t_type_l=t_type_l,
amount=parsed["amount"],
description=desc,
reference=reference,
item_no=item_no,
employee_number=parsed["employee_number"],
t_code=parsed["t_code"],
quantity=parsed.get("quantity"),
rate=parsed.get("rate"),
billed=parsed["billed"],
)
db.add(tx)
db.commit()
logger.info("ledger_create", case_id=case_id, transaction_id=tx.id)
return RedirectResponse(url=f"/case/{case_id}?saved=1", status_code=302)
except Exception as e:
db.rollback()
logger.error("ledger_create_failed", case_id=case_id, error=str(e))
request.session["case_update_errors"] = ["Failed to create ledger entry"]
return RedirectResponse(url=f"/case/{case_id}", status_code=302)
@app.post("/case/{case_id}/ledger/{tx_id}")
async def ledger_update(
request: Request,
case_id: int,
tx_id: int,
db: Session = Depends(get_db),
):
user = get_current_user_from_session(request.session)
if not user:
return RedirectResponse(url="/login", status_code=302)
form = await request.form()
tx = db.query(Transaction).filter(Transaction.id == tx_id, Transaction.case_id == case_id).first()
if not tx:
request.session["case_update_errors"] = ["Ledger entry not found"]
return RedirectResponse(url=f"/case/{case_id}", status_code=302)
errors, parsed = validate_ledger_fields(
transaction_date=form.get("transaction_date"),
t_code=form.get("t_code"),
employee_number=form.get("employee_number"),
quantity=form.get("quantity"),
rate=form.get("rate"),
amount=form.get("amount"),
billed=form.get("billed"),
)
if errors:
request.session["case_update_errors"] = errors
return RedirectResponse(url=f"/case/{case_id}", status_code=302)
try:
tx.transaction_date = parsed["transaction_date"]
# Ensure uniqueness of (date, item_no)
desired_item_no = parse_int(form.get("item_no") or "") or tx.item_no
tx.item_no = next_unique_item_no(db, case_id, parsed["transaction_date"], desired_item_no)
tx.t_code = parsed["t_code"]
tx.employee_number = parsed["employee_number"]
tx.quantity = parsed.get("quantity")
tx.rate = parsed.get("rate")
tx.amount = parsed["amount"]
tx.billed = parsed["billed"]
tx.transaction_type = (form.get("transaction_type") or "").strip() or None
tx.t_type_l = (form.get("t_type_l") or "").strip().upper() or None
tx.reference = (form.get("reference") or "").strip() or None
tx.description = (form.get("description") or "").strip() or None
db.commit()
logger.info("ledger_update", case_id=case_id, transaction_id=tx.id)
return RedirectResponse(url=f"/case/{case_id}?saved=1", status_code=302)
except Exception as e:
db.rollback()
logger.error("ledger_update_failed", case_id=case_id, tx_id=tx_id, error=str(e))
request.session["case_update_errors"] = ["Failed to update ledger entry"]
return RedirectResponse(url=f"/case/{case_id}", status_code=302)
@app.post("/case/{case_id}/ledger/{tx_id}/delete")
async def ledger_delete(
request: Request,
case_id: int,
tx_id: int,
db: Session = Depends(get_db),
):
user = get_current_user_from_session(request.session)
if not user:
return RedirectResponse(url="/login", status_code=302)
tx = db.query(Transaction).filter(Transaction.id == tx_id, Transaction.case_id == case_id).first()
if not tx:
request.session["case_update_errors"] = ["Ledger entry not found"]
return RedirectResponse(url=f"/case/{case_id}", status_code=302)
try:
db.delete(tx)
db.commit()
logger.info("ledger_delete", case_id=case_id, transaction_id=tx_id)
return RedirectResponse(url=f"/case/{case_id}?saved=1", status_code=302)
except Exception as e:
db.rollback()
logger.error("ledger_delete_failed", case_id=case_id, tx_id=tx_id, error=str(e))
request.session["case_update_errors"] = ["Failed to delete ledger entry"]
return RedirectResponse(url=f"/case/{case_id}", status_code=302)
@app.get("/")
async def root():
"""
@@ -1277,6 +1568,18 @@ async def case_detail(
# Get any errors from session and clear them
errors = request.session.pop("case_update_errors", None)
# Sort transactions by date then item_no for stable display
sorted_transactions = sorted(
case_obj.transactions or [],
key=lambda t: (
t.transaction_date or datetime.min,
t.item_no or 0,
)
)
case_obj.transactions = sorted_transactions
totals = compute_case_totals_from_case(case_obj)
return templates.TemplateResponse(
"case.html",
{
@@ -1285,6 +1588,7 @@ async def case_detail(
"case": case_obj,
"saved": saved,
"errors": errors or [],
"totals": totals,
},
)