Files
delphi-database-v2/app/database.py
2025-10-14 07:56:13 -05:00

202 lines
7.7 KiB
Python

"""
Database configuration and connection management for Delphi Database application.
This module handles SQLAlchemy engine creation, session management, and provides
database connection utilities for the FastAPI application.
"""
import os
from typing import Generator
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, Session
from dotenv import load_dotenv
from sqlalchemy import inspect, text
# Load environment variables from .env file
load_dotenv()
# Database configuration from environment variables
DATABASE_URL = os.getenv(
"DATABASE_URL",
"sqlite:///./delphi.db" # Default to SQLite for development
)
# Create SQLAlchemy engine
engine = create_engine(
DATABASE_URL,
connect_args={"check_same_thread": False} if "sqlite" in DATABASE_URL else {},
pool_pre_ping=True, # Verify connections before reuse
echo=False # Set to True for SQL query logging in development
)
# Create session factory
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
# Import Base from models for SQLAlchemy 1.x compatibility
from .models import Base
def get_db() -> Generator[Session, None, None]:
"""
Dependency function that provides a database session.
Yields a database session and ensures it's properly closed after use.
Used as a FastAPI dependency in route handlers.
Yields:
Session: SQLAlchemy database session
Example:
@app.get("/items/")
async def read_items(db: Session = Depends(get_db)):
return db.query(Item).all()
"""
db = SessionLocal()
try:
yield db
finally:
db.close()
def create_tables() -> None:
"""
Create all database tables defined in SQLAlchemy models.
This function should be called during application startup to ensure
all tables exist in the database.
"""
Base.metadata.create_all(bind=engine)
# Lightweight migration: ensure ledger-specific columns exist on transactions
try:
inspector = inspect(engine)
columns = {col['name'] for col in inspector.get_columns('transactions')}
migration_alters = []
# Map of column name to SQL for SQLite ALTER TABLE ADD COLUMN
required_columns_sql = {
'item_no': 'ALTER TABLE transactions ADD COLUMN item_no INTEGER',
'employee_number': 'ALTER TABLE transactions ADD COLUMN employee_number VARCHAR(20)',
't_code': 'ALTER TABLE transactions ADD COLUMN t_code VARCHAR(10)',
't_type_l': 'ALTER TABLE transactions ADD COLUMN t_type_l VARCHAR(1)',
'quantity': 'ALTER TABLE transactions ADD COLUMN quantity FLOAT',
'rate': 'ALTER TABLE transactions ADD COLUMN rate FLOAT',
'billed': 'ALTER TABLE transactions ADD COLUMN billed VARCHAR(1)'
}
for col_name, ddl in required_columns_sql.items():
if col_name not in columns:
migration_alters.append(ddl)
if migration_alters:
with engine.begin() as conn:
for ddl in migration_alters:
conn.execute(text(ddl))
except Exception as e:
# Log but do not fail startup; migrations are best-effort for SQLite
try:
from .logging_config import setup_logging
import structlog
setup_logging()
_logger = structlog.get_logger(__name__)
_logger.warning("sqlite_migration_failed", error=str(e))
except Exception:
pass
# Lightweight migration: ensure new client columns exist (SQLite safe)
try:
inspector = inspect(engine)
client_cols = {col['name'] for col in inspector.get_columns('clients')}
client_required_sql = {
'prefix': 'ALTER TABLE clients ADD COLUMN prefix VARCHAR(20)',
'middle_name': 'ALTER TABLE clients ADD COLUMN middle_name VARCHAR(50)',
'suffix': 'ALTER TABLE clients ADD COLUMN suffix VARCHAR(20)',
'title': 'ALTER TABLE clients ADD COLUMN title VARCHAR(100)',
'group': 'ALTER TABLE clients ADD COLUMN "group" VARCHAR(50)',
'email': 'ALTER TABLE clients ADD COLUMN email VARCHAR(255)',
'dob': 'ALTER TABLE clients ADD COLUMN dob DATE',
'ssn': 'ALTER TABLE clients ADD COLUMN ssn VARCHAR(20)',
'legal_status': 'ALTER TABLE clients ADD COLUMN legal_status VARCHAR(50)',
'memo': 'ALTER TABLE clients ADD COLUMN memo TEXT'
}
client_alters = []
for col_name, ddl in client_required_sql.items():
if col_name not in client_cols:
client_alters.append(ddl)
if client_alters:
with engine.begin() as conn:
for ddl in client_alters:
conn.execute(text(ddl))
except Exception as e:
try:
from .logging_config import setup_logging
import structlog
setup_logging()
_logger = structlog.get_logger(__name__)
_logger.warning("sqlite_migration_clients_failed", error=str(e))
except Exception:
pass
# Seed default admin user after creating tables
try:
from .auth import seed_admin_user
seed_admin_user()
except ImportError:
# Handle case where auth module isn't available yet during initial import
pass
# Create helpful SQLite indexes for rolodex sorting if they do not exist
try:
if "sqlite" in DATABASE_URL:
index_ddls = [
# Name sort: NULLS LAST emulation terms first then values
"CREATE INDEX IF NOT EXISTS ix_clients_name_sort ON clients((last_name IS NULL), last_name, (first_name IS NULL), first_name)",
# Company/address/city/state/zip
"CREATE INDEX IF NOT EXISTS ix_clients_company_sort ON clients((company IS NULL), company)",
"CREATE INDEX IF NOT EXISTS ix_clients_address_sort ON clients((address IS NULL), address)",
"CREATE INDEX IF NOT EXISTS ix_clients_city_sort ON clients((city IS NULL), city)",
"CREATE INDEX IF NOT EXISTS ix_clients_state_sort ON clients((state IS NULL), state)",
"CREATE INDEX IF NOT EXISTS ix_clients_zip_sort ON clients((zip_code IS NULL), zip_code)",
# Updated sort via COALESCE(updated_at, created_at)
"CREATE INDEX IF NOT EXISTS ix_clients_updated_sort ON clients(COALESCE(updated_at, created_at))",
# Phone MIN(phone_number) correlated subquery helper
"CREATE INDEX IF NOT EXISTS ix_phones_client_phone ON phones(client_id, phone_number)",
]
with engine.begin() as conn:
for ddl in index_ddls:
conn.execute(text(ddl))
except Exception as e:
try:
from .logging_config import setup_logging
import structlog
setup_logging()
_logger = structlog.get_logger(__name__)
_logger.warning("sqlite_index_creation_failed", error=str(e))
except Exception:
pass
def get_database_url() -> str:
"""
Get the current database URL (with sensitive info masked).
Returns:
str: Database URL with password masked for logging
"""
if "sqlite" in DATABASE_URL:
return DATABASE_URL
# For PostgreSQL/MySQL, mask the password
if "@" in DATABASE_URL:
parts = DATABASE_URL.split("@")
if "://" in parts[0]:
protocol_and_auth = parts[0].split("://")[1]
if ":" in protocol_and_auth:
user_pass, host_port = protocol_and_auth.split(":", 1)
return DATABASE_URL.replace(user_pass, "****")
return "****://****:****@****/****"