Fix upload detection for model class names and add States/Printers/Setup import

- Enhanced get_import_type_from_filename() to recognize model class names (LegacyFile, FilesR, etc.) in addition to legacy CSV names
- Added import functions for States, Printers, and Setup reference tables
- Updated VALID_IMPORT_TYPES and IMPORT_ORDER to include new tables
- Updated admin panel table counts to display new reference tables
- Created UPLOAD_FIX.md documentation explaining the changes and how to handle existing unknown files

This fixes the issue where files uploaded with model class names (e.g., LegacyFile.csv) were being categorized as 'unknown' instead of being properly detected.
This commit is contained in:
HotSwapp
2025-10-13 09:08:06 -05:00
parent e6a78221e6
commit 69f1043be3
3 changed files with 380 additions and 21 deletions

View File

@@ -20,7 +20,7 @@ from .models import (
Footers, FileStat, Employee, GroupLkup, FileType,
Qdros, PlanInfo, Pensions, PensionMarriage, PensionDeath,
PensionSchedule, PensionSeparate, PensionResults,
RolexV, FVarLkup, RVarLkup
RolexV, FVarLkup, RVarLkup, States, Printers, Setup
)
logger = structlog.get_logger(__name__)
@@ -660,6 +660,221 @@ def import_rvarlkup(db: Session, file_path: str) -> Dict[str, Any]:
return result
def import_states(db: Session, file_path: str) -> Dict[str, Any]:
"""Import STATES.csv → States model with upsert logic."""
result = {'success': 0, 'errors': [], 'total_rows': 0, 'updated': 0, 'inserted': 0}
try:
f, encoding = open_text_with_fallbacks(file_path)
reader = csv.DictReader(f)
for row_num, row in enumerate(reader, start=2):
result['total_rows'] += 1
try:
abrev = clean_string(row.get('Abrev'))
if not abrev:
continue
# Check if record already exists
existing = db.query(States).filter(States.abrev == abrev).first()
if existing:
# Update existing record
existing.st = clean_string(row.get('St'))
result['updated'] += 1
else:
# Insert new record
record = States(
abrev=abrev,
st=clean_string(row.get('St'))
)
db.add(record)
result['inserted'] += 1
result['success'] += 1
# Commit in batches for performance
if result['success'] % BATCH_SIZE == 0:
db.commit()
except Exception as e:
result['errors'].append(f"Row {row_num}: {str(e)}")
db.rollback()
# Commit any remaining changes
db.commit()
f.close()
logger.info("import_states_complete", **result)
except Exception as e:
db.rollback()
result['errors'].append(f"Fatal error: {str(e)}")
logger.error("import_states_failed", error=str(e))
return result
def import_printers(db: Session, file_path: str) -> Dict[str, Any]:
"""Import PRINTERS.csv → Printers model with upsert logic."""
result = {'success': 0, 'errors': [], 'total_rows': 0, 'updated': 0, 'inserted': 0}
try:
f, encoding = open_text_with_fallbacks(file_path)
reader = csv.DictReader(f)
for row_num, row in enumerate(reader, start=2):
result['total_rows'] += 1
try:
number_str = clean_string(row.get('Number'))
if not number_str:
continue
try:
number = int(number_str)
except ValueError:
result['errors'].append(f"Row {row_num}: Invalid Number '{number_str}'")
continue
# Check if record already exists
existing = db.query(Printers).filter(Printers.number == number).first()
if existing:
# Update existing record
existing.name = clean_string(row.get('Name'))
existing.port = clean_string(row.get('Port'))
existing.page_break = clean_string(row.get('Page_Break'))
existing.setup_st = clean_string(row.get('Setup_St'))
existing.phone_book = clean_string(row.get('Phone_Book'))
existing.rolodex_info = clean_string(row.get('Rolodex_Info'))
existing.envelope = clean_string(row.get('Envelope'))
existing.file_cabinet = clean_string(row.get('File_Cabinet'))
existing.accounts = clean_string(row.get('Accounts'))
existing.statements = clean_string(row.get('Statements'))
existing.calendar = clean_string(row.get('Calendar'))
existing.reset_st = clean_string(row.get('Reset_St'))
existing.b_underline = clean_string(row.get('B_Underline'))
existing.e_underline = clean_string(row.get('E_Underline'))
existing.b_bold = clean_string(row.get('B_Bold'))
existing.e_bold = clean_string(row.get('E_Bold'))
result['updated'] += 1
else:
# Insert new record
record = Printers(
number=number,
name=clean_string(row.get('Name')),
port=clean_string(row.get('Port')),
page_break=clean_string(row.get('Page_Break')),
setup_st=clean_string(row.get('Setup_St')),
phone_book=clean_string(row.get('Phone_Book')),
rolodex_info=clean_string(row.get('Rolodex_Info')),
envelope=clean_string(row.get('Envelope')),
file_cabinet=clean_string(row.get('File_Cabinet')),
accounts=clean_string(row.get('Accounts')),
statements=clean_string(row.get('Statements')),
calendar=clean_string(row.get('Calendar')),
reset_st=clean_string(row.get('Reset_St')),
b_underline=clean_string(row.get('B_Underline')),
e_underline=clean_string(row.get('E_Underline')),
b_bold=clean_string(row.get('B_Bold')),
e_bold=clean_string(row.get('E_Bold'))
)
db.add(record)
result['inserted'] += 1
result['success'] += 1
# Commit in batches for performance
if result['success'] % BATCH_SIZE == 0:
db.commit()
except Exception as e:
result['errors'].append(f"Row {row_num}: {str(e)}")
db.rollback()
# Commit any remaining changes
db.commit()
f.close()
logger.info("import_printers_complete", **result)
except Exception as e:
db.rollback()
result['errors'].append(f"Fatal error: {str(e)}")
logger.error("import_printers_failed", error=str(e))
return result
def import_setup(db: Session, file_path: str) -> Dict[str, Any]:
"""Import SETUP.csv → Setup model (clears and re-inserts)."""
result = {'success': 0, 'errors': [], 'total_rows': 0}
try:
# Clear existing setup records (typically only one row in legacy system)
db.query(Setup).delete()
db.commit()
f, encoding = open_text_with_fallbacks(file_path)
reader = csv.DictReader(f)
batch = []
for row_num, row in enumerate(reader, start=2):
result['total_rows'] += 1
try:
# Parse default_printer as integer if present
default_printer = None
default_printer_str = clean_string(row.get('Default_Printer'))
if default_printer_str:
try:
default_printer = int(default_printer_str)
except ValueError:
result['errors'].append(f"Row {row_num}: Invalid Default_Printer '{default_printer_str}'")
record = Setup(
appl_title=clean_string(row.get('Appl_Title')),
l_head1=clean_string(row.get('L_Head1')),
l_head2=clean_string(row.get('L_Head2')),
l_head3=clean_string(row.get('L_Head3')),
l_head4=clean_string(row.get('L_Head4')),
l_head5=clean_string(row.get('L_Head5')),
l_head6=clean_string(row.get('L_Head6')),
l_head7=clean_string(row.get('L_Head7')),
l_head8=clean_string(row.get('L_Head8')),
l_head9=clean_string(row.get('L_Head9')),
l_head10=clean_string(row.get('L_Head10')),
default_printer=default_printer
)
batch.append(record)
if len(batch) >= BATCH_SIZE:
db.bulk_save_objects(batch)
db.commit()
result['success'] += len(batch)
batch = []
except Exception as e:
result['errors'].append(f"Row {row_num}: {str(e)}")
if batch:
db.bulk_save_objects(batch)
db.commit()
result['success'] += len(batch)
f.close()
logger.info("import_setup_complete", **result)
except Exception as e:
db.rollback()
result['errors'].append(f"Fatal error: {str(e)}")
logger.error("import_setup_failed", error=str(e))
return result
# ============================================================================
# Core Data Table Imports
# ============================================================================