Files
2025-08-14 19:16:28 -05:00

249 lines
7.7 KiB
Python

"""
SQLite Full-Text Search (FTS5) helpers.
Creates and maintains FTS virtual tables and triggers to keep them in sync
with their content tables. Designed to be called at app startup.
"""
from typing import Optional
from sqlalchemy.engine import Engine
from sqlalchemy import text
def _execute_ignore_errors(engine: Engine, sql: str) -> None:
"""Execute SQL, ignoring operational errors (e.g., when FTS5 is unavailable)."""
from sqlalchemy.exc import OperationalError
with engine.begin() as conn:
try:
conn.execute(text(sql))
except OperationalError:
# Likely FTS5 extension not available in this SQLite build
pass
def ensure_rolodex_fts(engine: Engine) -> None:
"""Ensure the `rolodex_fts` virtual table and triggers exist and are populated.
This uses content=rolodex so the FTS table shadows the base table and is kept
in sync via triggers.
"""
# Create virtual table (if FTS5 is available)
_create_table = """
CREATE VIRTUAL TABLE IF NOT EXISTS rolodex_fts USING fts5(
id,
first,
last,
city,
email,
memo,
content='rolodex',
content_rowid='rowid'
);
"""
_execute_ignore_errors(engine, _create_table)
# Triggers to keep FTS in sync
_triggers = [
"""
CREATE TRIGGER IF NOT EXISTS rolodex_ai AFTER INSERT ON rolodex BEGIN
INSERT INTO rolodex_fts(rowid, id, first, last, city, email, memo)
VALUES (new.rowid, new.id, new.first, new.last, new.city, new.email, new.memo);
END;
""",
"""
CREATE TRIGGER IF NOT EXISTS rolodex_ad AFTER DELETE ON rolodex BEGIN
INSERT INTO rolodex_fts(rolodex_fts, rowid, id, first, last, city, email, memo)
VALUES ('delete', old.rowid, old.id, old.first, old.last, old.city, old.email, old.memo);
END;
""",
"""
CREATE TRIGGER IF NOT EXISTS rolodex_au AFTER UPDATE ON rolodex BEGIN
INSERT INTO rolodex_fts(rolodex_fts, rowid, id, first, last, city, email, memo)
VALUES ('delete', old.rowid, old.id, old.first, old.last, old.city, old.email, old.memo);
INSERT INTO rolodex_fts(rowid, id, first, last, city, email, memo)
VALUES (new.rowid, new.id, new.first, new.last, new.city, new.email, new.memo);
END;
""",
]
for trig in _triggers:
_execute_ignore_errors(engine, trig)
# Backfill if the FTS table exists but is empty
with engine.begin() as conn:
try:
count_fts = conn.execute(text("SELECT count(*) FROM rolodex_fts")).scalar() # type: ignore
if count_fts == 0:
# Populate from existing rolodex rows
conn.execute(text(
"""
INSERT INTO rolodex_fts(rowid, id, first, last, city, email, memo)
SELECT rowid, id, first, last, city, email, memo FROM rolodex;
"""
))
except Exception:
# If FTS table doesn't exist or any error occurs, ignore silently
pass
def ensure_files_fts(engine: Engine) -> None:
"""Ensure the `files_fts` virtual table and triggers exist and are populated."""
_create_table = """
CREATE VIRTUAL TABLE IF NOT EXISTS files_fts USING fts5(
file_no,
id,
regarding,
file_type,
memo,
content='files',
content_rowid='rowid'
);
"""
_execute_ignore_errors(engine, _create_table)
_triggers = [
"""
CREATE TRIGGER IF NOT EXISTS files_ai AFTER INSERT ON files BEGIN
INSERT INTO files_fts(rowid, file_no, id, regarding, file_type, memo)
VALUES (new.rowid, new.file_no, new.id, new.regarding, new.file_type, new.memo);
END;
""",
"""
CREATE TRIGGER IF NOT EXISTS files_ad AFTER DELETE ON files BEGIN
INSERT INTO files_fts(files_fts, rowid, file_no, id, regarding, file_type, memo)
VALUES ('delete', old.rowid, old.file_no, old.id, old.regarding, old.file_type, old.memo);
END;
""",
"""
CREATE TRIGGER IF NOT EXISTS files_au AFTER UPDATE ON files BEGIN
INSERT INTO files_fts(files_fts, rowid, file_no, id, regarding, file_type, memo)
VALUES ('delete', old.rowid, old.file_no, old.id, old.regarding, old.file_type, old.memo);
INSERT INTO files_fts(rowid, file_no, id, regarding, file_type, memo)
VALUES (new.rowid, new.file_no, new.id, new.regarding, new.file_type, new.memo);
END;
""",
]
for trig in _triggers:
_execute_ignore_errors(engine, trig)
with engine.begin() as conn:
try:
count_fts = conn.execute(text("SELECT count(*) FROM files_fts")).scalar() # type: ignore
if count_fts == 0:
conn.execute(text(
"""
INSERT INTO files_fts(rowid, file_no, id, regarding, file_type, memo)
SELECT rowid, file_no, id, regarding, file_type, memo FROM files;
"""
))
except Exception:
pass
def ensure_ledger_fts(engine: Engine) -> None:
"""Ensure the `ledger_fts` virtual table and triggers exist and are populated."""
_create_table = """
CREATE VIRTUAL TABLE IF NOT EXISTS ledger_fts USING fts5(
file_no,
t_code,
note,
empl_num,
content='ledger',
content_rowid='rowid'
);
"""
_execute_ignore_errors(engine, _create_table)
_triggers = [
"""
CREATE TRIGGER IF NOT EXISTS ledger_ai AFTER INSERT ON ledger BEGIN
INSERT INTO ledger_fts(rowid, file_no, t_code, note, empl_num)
VALUES (new.rowid, new.file_no, new.t_code, new.note, new.empl_num);
END;
""",
"""
CREATE TRIGGER IF NOT EXISTS ledger_ad AFTER DELETE ON ledger BEGIN
INSERT INTO ledger_fts(ledger_fts, rowid, file_no, t_code, note, empl_num)
VALUES ('delete', old.rowid, old.file_no, old.t_code, old.note, old.empl_num);
END;
""",
"""
CREATE TRIGGER IF NOT EXISTS ledger_au AFTER UPDATE ON ledger BEGIN
INSERT INTO ledger_fts(ledger_fts, rowid, file_no, t_code, note, empl_num)
VALUES ('delete', old.rowid, old.file_no, old.t_code, old.note, old.empl_num);
INSERT INTO ledger_fts(rowid, file_no, t_code, note, empl_num)
VALUES (new.rowid, new.file_no, new.t_code, new.note, new.empl_num);
END;
""",
]
for trig in _triggers:
_execute_ignore_errors(engine, trig)
with engine.begin() as conn:
try:
count_fts = conn.execute(text("SELECT count(*) FROM ledger_fts")).scalar() # type: ignore
if count_fts == 0:
conn.execute(text(
"""
INSERT INTO ledger_fts(rowid, file_no, t_code, note, empl_num)
SELECT rowid, file_no, t_code, note, empl_num FROM ledger;
"""
))
except Exception:
pass
def ensure_qdros_fts(engine: Engine) -> None:
"""Ensure the `qdros_fts` virtual table and triggers exist and are populated."""
_create_table = """
CREATE VIRTUAL TABLE IF NOT EXISTS qdros_fts USING fts5(
file_no,
form_name,
pet,
res,
case_number,
content='qdros',
content_rowid='rowid'
);
"""
_execute_ignore_errors(engine, _create_table)
_triggers = [
"""
CREATE TRIGGER IF NOT EXISTS qdros_ai AFTER INSERT ON qdros BEGIN
INSERT INTO qdros_fts(rowid, file_no, form_name, pet, res, case_number)
VALUES (new.rowid, new.file_no, new.form_name, new.pet, new.res, new.case_number);
END;
""",
"""
CREATE TRIGGER IF NOT EXISTS qdros_ad AFTER DELETE ON qdros BEGIN
INSERT INTO qdros_fts(qdros_fts, rowid, file_no, form_name, pet, res, case_number)
VALUES ('delete', old.rowid, old.file_no, old.form_name, old.pet, old.res, old.case_number);
END;
""",
"""
CREATE TRIGGER IF NOT EXISTS qdros_au AFTER UPDATE ON qdros BEGIN
INSERT INTO qdros_fts(qdros_fts, rowid, file_no, form_name, pet, res, case_number)
VALUES ('delete', old.rowid, old.file_no, old.form_name, old.pet, old.res, old.case_number);
INSERT INTO qdros_fts(rowid, file_no, form_name, pet, res, case_number)
VALUES (new.rowid, new.file_no, new.form_name, new.pet, new.res, new.case_number);
END;
""",
]
for trig in _triggers:
_execute_ignore_errors(engine, trig)
with engine.begin() as conn:
try:
count_fts = conn.execute(text("SELECT count(*) FROM qdros_fts")).scalar() # type: ignore
if count_fts == 0:
conn.execute(text(
"""
INSERT INTO qdros_fts(rowid, file_no, form_name, pet, res, case_number)
SELECT rowid, file_no, form_name, pet, res, case_number FROM qdros;
"""
))
except Exception:
pass