feat(admin): add mapping workflow for unknown CSVs

- New POST /admin/map-files to reclassify unknown files to a chosen import type
- Centralize VALID_IMPORT_TYPES and pass to admin template
- UI: dropdown + 'Map Selected' button in Unknown card
- JS: mapSelectedFiles() posts selection and reloads on success
- Keeps UUID suffix, prevents traversal, logs actions
This commit is contained in:
HotSwapp
2025-10-08 13:22:34 -05:00
parent dc1c10f44b
commit c23e8d0b8a
2 changed files with 163 additions and 13 deletions

View File

@@ -236,6 +236,19 @@ app.add_middleware(SessionMiddleware, secret_key=SECRET_KEY)
# Mount static files directory
app.mount("/static", StaticFiles(directory="static"), name="static")
# Canonical list of valid legacy import types used across the admin features
VALID_IMPORT_TYPES: List[str] = [
# Reference tables
'trnstype', 'trnslkup', 'footers', 'filestat', 'employee',
'gruplkup', 'filetype', 'fvarlkup', 'rvarlkup',
# Core data tables
'rolodex', 'phone', 'rolex_v', 'files', 'files_r', 'files_v',
'filenots', 'ledger', 'deposits', 'payments',
# Specialized tables
'planinfo', 'qdros', 'pensions', 'pension_marriage',
'pension_death', 'pension_schedule', 'pension_separate', 'pension_results',
]
def get_import_type_from_filename(filename: str) -> str:
"""
@@ -314,6 +327,95 @@ def get_import_type_from_filename(filename: str) -> str:
raise ValueError(f"Unknown file type for filename: {filename}")
@app.post("/admin/map-files")
async def admin_map_unknown_files(
request: Request,
db: Session = Depends(get_db)
):
"""
Map selected unknown files to a chosen import type by renaming with the
appropriate prefix used through the system (e.g., files are stored as
"{import_type}_{uuid}.csv").
Expects JSON body: { "target_type": str, "filenames": [str, ...] }
"""
# Auth check
user = get_current_user_from_session(request.session)
if not user:
raise HTTPException(status_code=401, detail="Unauthorized")
try:
payload = await request.json()
except Exception:
raise HTTPException(status_code=400, detail="Invalid JSON body")
target_type = (payload.get("target_type") or "").strip()
filenames = payload.get("filenames") or []
if target_type not in VALID_IMPORT_TYPES:
raise HTTPException(status_code=400, detail="Invalid target import type")
if not isinstance(filenames, list) or not filenames:
raise HTTPException(status_code=400, detail="No filenames provided")
import_dir = "data-import"
if not os.path.isdir(import_dir):
raise HTTPException(status_code=400, detail="Import directory not found")
mapped: List[Dict[str, str]] = []
errors: List[Dict[str, str]] = []
for name in filenames:
# Basic traversal protection
if not isinstance(name, str) or any(x in name for x in ["..", "/", "\\"]):
errors.append({"filename": name, "error": "Invalid filename"})
continue
old_path = os.path.join(import_dir, name)
if not os.path.isfile(old_path):
errors.append({"filename": name, "error": "File not found"})
continue
# Determine new name: keep UUID suffix/extension from existing stored filename
try:
_, ext = os.path.splitext(name)
# If name already follows pattern <type>_<uuid>.<ext>, keep suffix
suffix = name
if "_" in name:
parts = name.split("_", 1)
if len(parts) == 2:
suffix = parts[1]
new_name = f"{target_type}_{suffix}"
new_path = os.path.join(import_dir, new_name)
# Avoid overwriting: if exists, add short random suffix
if os.path.exists(new_path):
rand = uuid.uuid4().hex[:6]
base_no_ext, _ = os.path.splitext(new_name)
new_name = f"{base_no_ext}-{rand}{ext}"
new_path = os.path.join(import_dir, new_name)
os.replace(old_path, new_path)
logger.info(
"admin_map_file",
old_filename=name,
new_filename=new_name,
target_type=target_type,
username=user.username,
)
mapped.append({"old": name, "new": new_name})
except Exception as e:
logger.error(
"admin_map_file_error",
filename=name,
target_type=target_type,
error=str(e),
)
errors.append({"filename": name, "error": str(e)})
return {"mapped": mapped, "errors": errors}
def validate_csv_headers(headers: List[str], expected_fields: Dict[str, str]) -> Dict[str, Any]:
"""
Validate CSV headers against expected model fields.
@@ -1626,18 +1728,7 @@ async def admin_import_data(
return RedirectResponse(url="/login", status_code=302)
# Validate data type
valid_types = [
# Reference tables
'trnstype', 'trnslkup', 'footers', 'filestat', 'employee',
'gruplkup', 'filetype', 'fvarlkup', 'rvarlkup',
# Core data tables
'rolodex', 'phone', 'rolex_v', 'files', 'files_r', 'files_v',
'filenots', 'ledger', 'deposits', 'payments',
# Specialized tables
'planinfo', 'qdros', 'pensions', 'pension_marriage',
'pension_death', 'pension_schedule', 'pension_separate', 'pension_results'
]
if data_type not in valid_types:
if data_type not in VALID_IMPORT_TYPES:
return templates.TemplateResponse("admin.html", {
"request": request,
"user": user,
@@ -1964,7 +2055,8 @@ async def admin_panel(request: Request, db: Session = Depends(get_db)):
"recent_imports": recent_imports,
"available_files": available_files,
"table_counts": table_counts,
"files_by_type": files_by_type
"files_by_type": files_by_type,
"valid_import_types": VALID_IMPORT_TYPES
})