- Changed encoding fallback order to prioritize iso-8859-1/latin-1 over cp1252 - Increased encoding test from 1KB to 10KB to catch issues deeper in files - Added proper file handle cleanup on encoding failures - Resolves 'charmap codec can't decode byte 0x9d' error in rolodex import - Tested with rolodex file containing 52,100 rows successfully
121 lines
4.6 KiB
Python
121 lines
4.6 KiB
Python
import json
|
|
from datetime import datetime
|
|
|
|
import pytest
|
|
|
|
from app.main import run_auto_import_for_upload, ORDER_INDEX
|
|
|
|
|
|
class DummyImportLog:
|
|
# Minimal stand-in to satisfy attribute assignments; not used by SQLAlchemy
|
|
def __init__(self, **kwargs):
|
|
self.import_type = kwargs.get("import_type")
|
|
self.file_name = kwargs.get("file_name")
|
|
self.file_path = kwargs.get("file_path")
|
|
self.status = kwargs.get("status", "pending")
|
|
self.total_rows = 0
|
|
self.success_count = 0
|
|
self.error_count = 0
|
|
self.error_details = "[]"
|
|
self.completed_at = None
|
|
|
|
|
|
class DummyDB:
|
|
"""Very small in-memory stub for the DB session interactions used by helper."""
|
|
|
|
def __init__(self, import_results_by_type):
|
|
self.import_results_by_type = import_results_by_type
|
|
self.logs = []
|
|
|
|
# SQLAlchemy-like API surface used by helper
|
|
def add(self, obj):
|
|
# record created logs
|
|
self.logs.append(obj)
|
|
|
|
def commit(self):
|
|
# no-op for tests
|
|
return None
|
|
|
|
|
|
# Monkeypatch target lookup inside the helper by patching process function in module namespace
|
|
@pytest.fixture(autouse=True)
|
|
def patch_process_csv_import(monkeypatch):
|
|
from app import main as main_mod
|
|
|
|
def fake_process_csv_import(db, import_type, file_path):
|
|
# return test-configured results from DummyDB
|
|
return db.import_results_by_type.get(import_type, {"success": 0, "errors": []})
|
|
|
|
monkeypatch.setattr(main_mod, "process_csv_import", fake_process_csv_import)
|
|
# Replace ImportLog class with Dummy for isolation (no DB)
|
|
monkeypatch.setattr(main_mod, "ImportLog", DummyImportLog)
|
|
|
|
|
|
def sorted_by_order(items):
|
|
return sorted(items, key=lambda x: (ORDER_INDEX.get(x["import_type"], 1_000_000), x.get("filename", "")))
|
|
|
|
|
|
def test_auto_import_sorts_and_runs_in_order():
|
|
uploaded = [
|
|
{"filename": "B.csv", "stored_filename": "files_b.csv", "file_path": "/tmp/b.csv", "import_type": "files"},
|
|
{"filename": "A.csv", "stored_filename": "trnstype_a.csv", "file_path": "/tmp/a.csv", "import_type": "trnstype"},
|
|
{"filename": "C.csv", "stored_filename": "payments_c.csv", "file_path": "/tmp/c.csv", "import_type": "payments"},
|
|
]
|
|
|
|
# Make all succeed
|
|
db = DummyDB({
|
|
"trnstype": {"success": 1, "errors": [], "total_rows": 10},
|
|
"files": {"success": 1, "errors": [], "total_rows": 20},
|
|
"payments": {"success": 1, "errors": [], "total_rows": 5},
|
|
})
|
|
|
|
result = run_auto_import_for_upload(db, uploaded)
|
|
|
|
# Should be ordered by IMPORT_ORDER
|
|
filenames_in_result = [f["stored_filename"] for f in result["files"]]
|
|
expected_order = [i["stored_filename"] for i in sorted_by_order(uploaded)]
|
|
assert filenames_in_result == expected_order
|
|
assert result["stopped"] is False
|
|
assert result["stopped_on"] is None
|
|
assert result["skipped_unknowns"] == []
|
|
|
|
|
|
def test_auto_import_skips_unknown_and_reports():
|
|
uploaded = [
|
|
{"filename": "Unknown.csv", "stored_filename": "unknown_1.csv", "file_path": "/tmp/u1.csv", "import_type": "unknown"},
|
|
{"filename": "Rolodex.csv", "stored_filename": "rolodex_2.csv", "file_path": "/tmp/r2.csv", "import_type": "rolodex"},
|
|
]
|
|
|
|
db = DummyDB({"rolodex": {"success": 1, "errors": [], "total_rows": 1}})
|
|
result = run_auto_import_for_upload(db, uploaded)
|
|
|
|
# Only known type processed
|
|
assert [f["stored_filename"] for f in result["files"]] == ["rolodex_2.csv"]
|
|
# Unknown skipped and listed
|
|
assert result["skipped_unknowns"] == [{"filename": "Unknown.csv", "stored_filename": "unknown_1.csv"}]
|
|
|
|
|
|
def test_auto_import_stops_on_first_error_and_sets_stopped_on():
|
|
uploaded = [
|
|
{"filename": "A.csv", "stored_filename": "trnstype_a.csv", "file_path": "/tmp/a.csv", "import_type": "trnstype"},
|
|
{"filename": "B.csv", "stored_filename": "files_b.csv", "file_path": "/tmp/b.csv", "import_type": "files"},
|
|
{"filename": "C.csv", "stored_filename": "payments_c.csv", "file_path": "/tmp/c.csv", "import_type": "payments"},
|
|
]
|
|
|
|
# First succeeds, second returns errors -> should stop before third
|
|
db = DummyDB({
|
|
"trnstype": {"success": 1, "errors": [], "total_rows": 10},
|
|
"files": {"success": 0, "errors": ["bad row"], "total_rows": 2},
|
|
"payments": {"success": 1, "errors": [], "total_rows": 5},
|
|
})
|
|
|
|
result = run_auto_import_for_upload(db, uploaded)
|
|
|
|
processed = [f["stored_filename"] for f in result["files"]]
|
|
# The run order starts with trnstype then files; payments should not run
|
|
assert processed == ["trnstype_a.csv", "files_b.csv"]
|
|
assert result["stopped"] is True
|
|
assert result["stopped_on"] == "files_b.csv"
|
|
|
|
|