QDRO links appear in rolodex_view.html case rows and case.html header when QDRO data exists, matching legacy flows.
171 lines
6.0 KiB
Python
171 lines
6.0 KiB
Python
"""
|
|
Database configuration and connection management for Delphi Database application.
|
|
|
|
This module handles SQLAlchemy engine creation, session management, and provides
|
|
database connection utilities for the FastAPI application.
|
|
"""
|
|
|
|
import os
|
|
from typing import Generator
|
|
|
|
from sqlalchemy import create_engine
|
|
from sqlalchemy.ext.declarative import declarative_base
|
|
from sqlalchemy.orm import sessionmaker, Session
|
|
from dotenv import load_dotenv
|
|
from sqlalchemy import inspect, text
|
|
|
|
# Load environment variables from .env file
|
|
load_dotenv()
|
|
|
|
# Database configuration from environment variables
|
|
DATABASE_URL = os.getenv(
|
|
"DATABASE_URL",
|
|
"sqlite:///./delphi.db" # Default to SQLite for development
|
|
)
|
|
|
|
# Create SQLAlchemy engine
|
|
engine = create_engine(
|
|
DATABASE_URL,
|
|
connect_args={"check_same_thread": False} if "sqlite" in DATABASE_URL else {},
|
|
pool_pre_ping=True, # Verify connections before reuse
|
|
echo=False # Set to True for SQL query logging in development
|
|
)
|
|
|
|
# Create session factory
|
|
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
|
|
|
|
# Import Base from models for SQLAlchemy 1.x compatibility
|
|
from .models import Base
|
|
|
|
|
|
def get_db() -> Generator[Session, None, None]:
|
|
"""
|
|
Dependency function that provides a database session.
|
|
|
|
Yields a database session and ensures it's properly closed after use.
|
|
Used as a FastAPI dependency in route handlers.
|
|
|
|
Yields:
|
|
Session: SQLAlchemy database session
|
|
|
|
Example:
|
|
@app.get("/items/")
|
|
async def read_items(db: Session = Depends(get_db)):
|
|
return db.query(Item).all()
|
|
"""
|
|
db = SessionLocal()
|
|
try:
|
|
yield db
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
def create_tables() -> None:
|
|
"""
|
|
Create all database tables defined in SQLAlchemy models.
|
|
|
|
This function should be called during application startup to ensure
|
|
all tables exist in the database.
|
|
"""
|
|
Base.metadata.create_all(bind=engine)
|
|
|
|
# Lightweight migration: ensure ledger-specific columns exist on transactions
|
|
try:
|
|
inspector = inspect(engine)
|
|
columns = {col['name'] for col in inspector.get_columns('transactions')}
|
|
|
|
migration_alters = []
|
|
# Map of column name to SQL for SQLite ALTER TABLE ADD COLUMN
|
|
required_columns_sql = {
|
|
'item_no': 'ALTER TABLE transactions ADD COLUMN item_no INTEGER',
|
|
'employee_number': 'ALTER TABLE transactions ADD COLUMN employee_number VARCHAR(20)',
|
|
't_code': 'ALTER TABLE transactions ADD COLUMN t_code VARCHAR(10)',
|
|
't_type_l': 'ALTER TABLE transactions ADD COLUMN t_type_l VARCHAR(1)',
|
|
'quantity': 'ALTER TABLE transactions ADD COLUMN quantity FLOAT',
|
|
'rate': 'ALTER TABLE transactions ADD COLUMN rate FLOAT',
|
|
'billed': 'ALTER TABLE transactions ADD COLUMN billed VARCHAR(1)'
|
|
}
|
|
|
|
for col_name, ddl in required_columns_sql.items():
|
|
if col_name not in columns:
|
|
migration_alters.append(ddl)
|
|
|
|
if migration_alters:
|
|
with engine.begin() as conn:
|
|
for ddl in migration_alters:
|
|
conn.execute(text(ddl))
|
|
except Exception as e:
|
|
# Log but do not fail startup; migrations are best-effort for SQLite
|
|
try:
|
|
from .logging_config import setup_logging
|
|
import structlog
|
|
setup_logging()
|
|
_logger = structlog.get_logger(__name__)
|
|
_logger.warning("sqlite_migration_failed", error=str(e))
|
|
except Exception:
|
|
pass
|
|
|
|
# Lightweight migration: ensure new client columns exist (SQLite safe)
|
|
try:
|
|
inspector = inspect(engine)
|
|
client_cols = {col['name'] for col in inspector.get_columns('clients')}
|
|
client_required_sql = {
|
|
'prefix': 'ALTER TABLE clients ADD COLUMN prefix VARCHAR(20)',
|
|
'middle_name': 'ALTER TABLE clients ADD COLUMN middle_name VARCHAR(50)',
|
|
'suffix': 'ALTER TABLE clients ADD COLUMN suffix VARCHAR(20)',
|
|
'title': 'ALTER TABLE clients ADD COLUMN title VARCHAR(100)',
|
|
'group': 'ALTER TABLE clients ADD COLUMN "group" VARCHAR(50)',
|
|
'email': 'ALTER TABLE clients ADD COLUMN email VARCHAR(255)',
|
|
'dob': 'ALTER TABLE clients ADD COLUMN dob DATE',
|
|
'ssn': 'ALTER TABLE clients ADD COLUMN ssn VARCHAR(20)',
|
|
'legal_status': 'ALTER TABLE clients ADD COLUMN legal_status VARCHAR(50)',
|
|
'memo': 'ALTER TABLE clients ADD COLUMN memo TEXT'
|
|
}
|
|
client_alters = []
|
|
for col_name, ddl in client_required_sql.items():
|
|
if col_name not in client_cols:
|
|
client_alters.append(ddl)
|
|
if client_alters:
|
|
with engine.begin() as conn:
|
|
for ddl in client_alters:
|
|
conn.execute(text(ddl))
|
|
except Exception as e:
|
|
try:
|
|
from .logging_config import setup_logging
|
|
import structlog
|
|
setup_logging()
|
|
_logger = structlog.get_logger(__name__)
|
|
_logger.warning("sqlite_migration_clients_failed", error=str(e))
|
|
except Exception:
|
|
pass
|
|
|
|
# Seed default admin user after creating tables
|
|
try:
|
|
from .auth import seed_admin_user
|
|
seed_admin_user()
|
|
except ImportError:
|
|
# Handle case where auth module isn't available yet during initial import
|
|
pass
|
|
|
|
|
|
def get_database_url() -> str:
|
|
"""
|
|
Get the current database URL (with sensitive info masked).
|
|
|
|
Returns:
|
|
str: Database URL with password masked for logging
|
|
"""
|
|
if "sqlite" in DATABASE_URL:
|
|
return DATABASE_URL
|
|
|
|
# For PostgreSQL/MySQL, mask the password
|
|
if "@" in DATABASE_URL:
|
|
parts = DATABASE_URL.split("@")
|
|
if "://" in parts[0]:
|
|
protocol_and_auth = parts[0].split("://")[1]
|
|
if ":" in protocol_and_auth:
|
|
user_pass, host_port = protocol_and_auth.split(":", 1)
|
|
return DATABASE_URL.replace(user_pass, "****")
|
|
|
|
return "****://****:****@****/****"
|