Files
delphi-database/app/import_export/generic_importer.py
HotSwapp c68e36e6c6 Fix SQL reserved keyword issue in generic CSV importer
- Add SQL_RESERVED_KEYWORDS list containing SQLite reserved keywords
- Add _quote_column_name() method to properly quote reserved keywords in SQL queries
- Update INSERT statement generation to quote column names that are reserved keywords
- This fixes the 'group' column import issue where SQL syntax error occurred
2025-10-01 08:07:30 -05:00

447 lines
22 KiB
Python

"""
Generic CSV Importer - handles any CSV structure dynamically
"""
import csv
import io
import logging
import re
from typing import Dict, Any, List, Optional
from datetime import datetime
from sqlalchemy import text, Column, String, Integer, Text, MetaData, Table, create_engine, Date
from sqlalchemy.orm import Session
from sqlalchemy.exc import SQLAlchemyError
from .base import BaseCSVImporter, ImportResult
logger = logging.getLogger(__name__)
# SQL reserved keywords that need to be quoted when used as column names
SQL_RESERVED_KEYWORDS = {
'ABORT', 'ACTION', 'ADD', 'AFTER', 'ALL', 'ALTER', 'ALWAYS', 'ANALYZE', 'AND', 'AS', 'ASC',
'ATTACH', 'AUTOINCREMENT', 'BEFORE', 'BEGIN', 'BETWEEN', 'BY', 'CASCADE', 'CASE', 'CAST',
'CHECK', 'COLLATE', 'COLUMN', 'COMMIT', 'CONFLICT', 'CONSTRAINT', 'CREATE', 'CROSS',
'CURRENT', 'CURRENT_DATE', 'CURRENT_TIME', 'CURRENT_TIMESTAMP', 'DATABASE', 'DEFAULT',
'DEFERRABLE', 'DEFERRED', 'DELETE', 'DESC', 'DETACH', 'DISTINCT', 'DO', 'DROP', 'EACH',
'ELSE', 'END', 'ESCAPE', 'EXCEPT', 'EXCLUDE', 'EXISTS', 'EXPLAIN', 'FAIL', 'FILTER',
'FIRST', 'FOLLOWING', 'FOR', 'FOREIGN', 'FROM', 'FULL', 'GENERATED', 'GLOB', 'GROUP',
'GROUPS', 'HAVING', 'IF', 'IGNORE', 'IMMEDIATE', 'IN', 'INDEX', 'INDEXED', 'INITIALLY',
'INNER', 'INSERT', 'INSTEAD', 'INTERSECT', 'INTO', 'IS', 'ISNULL', 'JOIN', 'KEY',
'LAST', 'LEFT', 'LIKE', 'LIMIT', 'MATCH', 'NATURAL', 'NO', 'NOT', 'NOTHING', 'NOTNULL',
'NULL', 'NULLS', 'OF', 'OFFSET', 'ON', 'OR', 'ORDER', 'OUTER', 'OVER', 'PARTITION',
'PLAN', 'PRAGMA', 'PRECEDING', 'PRIMARY', 'QUERY', 'RAISE', 'RECURSIVE', 'REFERENCES',
'REGEXP', 'REINDEX', 'RELEASE', 'RENAME', 'REPLACE', 'RESTRICT', 'RIGHT', 'ROLLBACK',
'ROW', 'ROWS', 'SAVEPOINT', 'SELECT', 'SET', 'TABLE', 'TEMP', 'TEMPORARY', 'THEN',
'TIES', 'TO', 'TRANSACTION', 'TRIGGER', 'UNBOUNDED', 'UNION', 'UNIQUE', 'UPDATE',
'USING', 'VACUUM', 'VALUES', 'VIEW', 'VIRTUAL', 'WHEN', 'WHERE', 'WINDOW', 'WITH',
'WITHOUT'
}
class GenericCSVImporter(BaseCSVImporter):
"""Generic importer that can handle any CSV structure by creating tables dynamically"""
def __init__(self, db_session: Session, table_name: str, import_id: str = None):
# Set table name first, before calling super().__init__()
# because BaseCSVImporter.__init__ calls self.table_name
self._table_name = table_name.lower()
self.dynamic_table = None
self.csv_headers = []
super().__init__(db_session, import_id)
@property
def table_name(self) -> str:
return self._table_name
@property
def required_fields(self) -> List[str]:
"""No required fields for generic import"""
return []
@property
def field_mapping(self) -> Dict[str, str]:
"""Dynamic mapping based on CSV headers"""
if self.csv_headers:
mapping = {}
for header in self.csv_headers:
safe_name = self._make_safe_name(header)
# Handle 'id' column renaming for conflict avoidance
if safe_name.lower() == 'id':
safe_name = 'csv_id'
mapping[header] = safe_name
return mapping
return {}
def create_model_instance(self, row_data: Dict[str, Any]) -> Dict[str, Any]:
"""For generic import, just return the processed row data"""
return row_data
def create_dynamic_table(self, headers: List[str]) -> Table:
"""Create a table dynamically based on CSV headers"""
try:
# Create metadata
metadata = MetaData()
# Clean table name
safe_table_name = self._make_safe_name(self.table_name)
# Check if table already exists BEFORE creating the Table object
from sqlalchemy import inspect
inspector = inspect(self.db_session.bind)
existing_tables = inspector.get_table_names()
if safe_table_name in existing_tables:
logger.info(f"Table '{safe_table_name}' already exists, will use existing table structure")
# Reflect the existing table to get its structure
metadata.reflect(bind=self.db_session.bind, only=[safe_table_name])
existing_table = metadata.tables[safe_table_name]
# Store the actual table name for use in data insertion
self.actual_table_name = safe_table_name
self._table_name = safe_table_name
logger.info(f"Using existing table: '{safe_table_name}'")
return existing_table
else:
logger.info(f"Creating new table: '{safe_table_name}'")
# Create columns dynamically
columns = [Column('id', Integer, primary_key=True, autoincrement=True)]
for header in headers:
if header and header.strip():
safe_column_name = self._make_safe_name(header.strip())
# Skip if this would create a duplicate 'id' column
if safe_column_name.lower() == 'id':
# Rename the CSV column to avoid conflict with auto-generated id
safe_column_name = 'csv_id'
columns.append(Column(safe_column_name, Text))
# Create table with the final table name
table = Table(safe_table_name, metadata, *columns)
# Store the actual table name for use in data insertion
self.actual_table_name = safe_table_name
self._table_name = safe_table_name # Update the stored table name to use the timestamped version
logger.info(f"Using table name for data insertion: '{safe_table_name}'")
# Create the table in the database with retry logic for locks
max_retries = 3
retry_delay = 1.0
for attempt in range(max_retries):
try:
# Use explicit transaction to avoid deadlocks
self.db_session.begin()
metadata.create_all(self.db_session.bind)
self.db_session.commit()
logger.info(f"Created dynamic table '{safe_table_name}' with {len(columns)} columns")
return table
except Exception as create_error:
self.db_session.rollback()
if "database is locked" in str(create_error).lower() and attempt < max_retries - 1:
import time
logger.warning(f"Database locked, retrying in {retry_delay}s (attempt {attempt + 1}/{max_retries})")
time.sleep(retry_delay)
retry_delay *= 2 # Exponential backoff
continue
elif "already present" in str(create_error).lower():
# Table was created by another process, reflect it
logger.info(f"Table '{safe_table_name}' created by another process, reflecting existing table")
try:
metadata.reflect(bind=self.db_session.bind, only=[safe_table_name])
return metadata.tables[safe_table_name]
except Exception:
# If reflection fails, re-raise original error
raise create_error
else:
# Re-raise if not a recoverable error
raise create_error
except Exception as e:
logger.error(f"Error creating dynamic table: {e}")
raise
def _make_safe_name(self, name: str) -> str:
"""Make a database-safe name from any string"""
import re
# Remove special characters and replace with underscore
safe_name = re.sub(r'[^a-zA-Z0-9_]', '_', name)
# Remove multiple underscores
safe_name = re.sub(r'_+', '_', safe_name)
# Remove trailing underscore
safe_name = safe_name.strip('_')
# Ensure it's not empty
if not safe_name:
safe_name = 'unnamed_column'
# Special handling for purely numeric names or names starting with numbers
if safe_name.isdigit() or (safe_name and safe_name[0].isdigit()):
safe_name = f'col_{safe_name}'
# Ensure it starts with a letter or underscore (final check)
elif safe_name and not (safe_name[0].isalpha() or safe_name[0] == '_'):
safe_name = 'col_' + safe_name
return safe_name.lower()
def _quote_column_name(self, column_name: str) -> str:
"""Quote column name if it's a SQL reserved keyword"""
if column_name.upper() in SQL_RESERVED_KEYWORDS:
return f'"{column_name}"'
return column_name
def _parse_date_value(self, value: str) -> Optional[str]:
"""Try to parse a date value and return it in ISO format"""
if not value or value.strip() == '':
return None
value = value.strip()
# Common date formats to try
date_formats = [
'%m/%d/%Y', # MM/DD/YYYY
'%m/%d/%y', # MM/DD/YY
'%Y-%m-%d', # YYYY-MM-DD
'%d/%m/%Y', # DD/MM/YYYY
'%d-%m-%Y', # DD-MM-YYYY
'%Y/%m/%d', # YYYY/MM/DD
]
for fmt in date_formats:
try:
parsed_date = datetime.strptime(value, fmt)
return parsed_date.strftime('%Y-%m-%d') # Return in ISO format
except ValueError:
continue
# If no format matches, return the original value
return value
def process_csv_content(self, csv_content: str, encoding: str = "utf-8") -> ImportResult:
"""Override the main processing method to handle dynamic table creation"""
try:
# Preprocess CSV content to handle common issues
# Remove trailing empty lines and normalize line endings
lines = csv_content.strip().splitlines()
# Remove empty lines that might cause parsing issues
non_empty_lines = [line for line in lines if line.strip()]
if not non_empty_lines:
result = ImportResult()
result.add_error("CSV file is empty or contains only empty lines")
return result
# Reconstruct CSV content with clean line endings
cleaned_csv_content = '\n'.join(non_empty_lines)
# Parse CSV and get headers with flexible parsing
# Handle various CSV format issues including embedded newlines
csv_file = io.StringIO(cleaned_csv_content)
# Try with different CSV dialect configurations
headers = None
parsing_strategies = [
# Strategy 1: Standard CSV parsing
lambda f: csv.DictReader(f),
# Strategy 2: Handle newlines in fields with strict quoting
lambda f: csv.DictReader(f, skipinitialspace=True, quoting=csv.QUOTE_MINIMAL, strict=False),
# Strategy 3: More flexible quoting
lambda f: csv.DictReader(f, quoting=csv.QUOTE_ALL, strict=False),
# Strategy 4: Excel dialect
lambda f: csv.DictReader(f, dialect='excel'),
# Strategy 5: Unix dialect
lambda f: csv.DictReader(f, dialect='unix'),
# Strategy 6: Very permissive - ignore malformed lines
lambda f: csv.DictReader(f, quoting=csv.QUOTE_NONE, escapechar='\\', strict=False)
]
for i, strategy in enumerate(parsing_strategies):
try:
csv_file.seek(0)
csv_reader = strategy(csv_file)
headers = csv_reader.fieldnames
if headers:
logger.debug(f"CSV parsing successful with strategy {i+1}")
break
except (csv.Error, UnicodeDecodeError) as e:
logger.debug(f"CSV parsing strategy {i+1} failed: {e}")
continue
if not headers:
result = ImportResult()
result.add_error("No headers found in CSV file")
return result
# Store headers and create dynamic table
self.csv_headers = [h.strip() for h in headers if h and h.strip()]
if not self.csv_headers:
result = ImportResult()
result.add_error("No valid headers found in CSV file")
return result
self.dynamic_table = self.create_dynamic_table(self.csv_headers)
# Reset reader and process rows with the same successful parsing strategy
csv_file = io.StringIO(cleaned_csv_content)
csv_reader = None
# Use the same parsing strategies to ensure consistency
for i, strategy in enumerate(parsing_strategies):
try:
csv_file.seek(0)
csv_reader = strategy(csv_file)
# Test that it works by trying to read headers
test_headers = csv_reader.fieldnames
if test_headers:
logger.debug(f"Data parsing using strategy {i+1}")
break
except (csv.Error, UnicodeDecodeError) as e:
logger.debug(f"Data parsing strategy {i+1} failed: {e}")
continue
if not csv_reader:
result = ImportResult()
result.add_error("Unable to parse CSV file with any available strategy")
return result
imported_count = 0
error_count = 0
total_count = 0
# Check if file has any data rows
rows = list(csv_reader)
if not rows:
logger.info(f"CSV file for table '{self.table_name}' contains headers only, no data rows to import")
self.result.success = True
self.result.total_rows = 0
self.result.imported_rows = 0
self.result.error_rows = 0
self.result.add_warning("File contains headers only, no data rows found")
return self.result
# Process all rows (transaction managed by session)
try:
for row_num, row in enumerate(rows, start=2):
total_count += 1
try:
# Prepare row data
row_data = {}
# Get existing table columns if using existing table
existing_columns = set()
if hasattr(self, 'dynamic_table') and self.dynamic_table is not None:
# Convert column keys to strings for comparison
existing_columns = set(str(col) for col in self.dynamic_table.columns.keys())
for header in self.csv_headers:
try:
safe_column_name = self._make_safe_name(header)
# Handle 'id' column mapping for existing tables
if safe_column_name.lower() == 'id' and 'id' in existing_columns:
# For existing tables, try to map the CSV 'id' to the actual 'id' column
# Check if id column has autoincrement - but handle this safely
try:
id_col = self.dynamic_table.columns.id
# Check if autoincrement is True (SQLAlchemy may not define this attribute)
is_autoincrement = getattr(id_col, 'autoincrement', False) is True
if is_autoincrement:
safe_column_name = 'csv_id' # Avoid conflict with auto-increment
else:
safe_column_name = 'id' # Use the actual id column
except (AttributeError, TypeError):
# If we can't determine autoincrement, default to using 'id'
safe_column_name = 'id'
elif safe_column_name.lower() == 'id':
safe_column_name = 'csv_id' # Default fallback
# Only include columns that exist in the target table (if using existing table)
if existing_columns and safe_column_name not in existing_columns:
logger.debug(f"Skipping column '{safe_column_name}' (from '{header}') - not found in target table")
continue
value = row.get(header, '').strip() if row.get(header) else None
# Convert empty strings to None for better database handling
if value == '':
value = None
elif value and ('date' in header.lower() or 'time' in header.lower()):
# Try to parse date values for better format consistency
value = self._parse_date_value(value)
row_data[safe_column_name] = value
except Exception as header_error:
logger.error(f"Error processing header '{header}': {header_error}")
# Continue to next header instead of failing the whole row
continue
# Insert into database with conflict resolution
# Use INSERT OR IGNORE to handle potential duplicates gracefully
# Use the actual table name (which may have timestamp suffix) instead of dynamic_table.name
table_name = getattr(self, 'actual_table_name', self.dynamic_table.name)
logger.debug(f"Inserting into table: '{table_name}' (original: '{self._table_name}', dynamic: '{self.dynamic_table.name}')")
if not row_data:
logger.warning(f"Row {row_num}: No valid columns found for insertion")
continue
columns = list(row_data.keys())
values = list(row_data.values())
placeholders = ', '.join([':param' + str(i) for i in range(len(values))])
# Quote column names that are reserved keywords
quoted_columns = [self._quote_column_name(col) for col in columns]
column_names = ', '.join(quoted_columns)
# Create parameter dictionary for SQLAlchemy
params = {f'param{i}': value for i, value in enumerate(values)}
ignore_sql = f"INSERT OR IGNORE INTO {table_name} ({column_names}) VALUES ({placeholders})"
result = self.db_session.execute(text(ignore_sql), params)
# Check if the row was actually inserted (rowcount > 0) or ignored (rowcount = 0)
if result.rowcount == 0:
logger.debug(f"Row {row_num}: Skipped duplicate record")
else:
logger.debug(f"Row {row_num}: Inserted successfully")
imported_count += 1
except Exception as e:
error_count += 1
error_msg = str(e)
# Provide more specific error messages for common database issues
if "NOT NULL constraint failed" in error_msg:
self.result.add_error(f"Row {row_num}: Missing required value in column")
elif "UNIQUE constraint failed" in error_msg:
self.result.add_error(f"Row {row_num}: Duplicate value detected")
elif "no such column" in error_msg:
self.result.add_error(f"Row {row_num}: Column structure mismatch")
else:
self.result.add_error(f"Row {row_num}: {error_msg}")
logger.warning(f"Error importing row {row_num}: {e}")
continue
# Changes are automatically committed by the session manager
pass
except Exception as transaction_error:
logger.error(f"Import processing failed: {transaction_error}")
self.result.add_error(f"Import failed: {str(transaction_error)}")
# Update result
self.result.success = imported_count > 0
self.result.total_rows = total_count
self.result.imported_rows = imported_count
self.result.error_rows = error_count
if imported_count > 0:
logger.info(f"Successfully imported {imported_count} rows into {self.table_name}")
return self.result
except Exception as e:
logger.error(f"Error during CSV import: {e}")
self.result.add_error(f"Import failed: {str(e)}")
return self.result