Source code for datacatalog.formats.converter
import json
import os
import sys
import inspect
import csv
from openpyxl import load_workbook
from shutil import copyfile
from jsonschema import validate, FormatChecker, ValidationError
from ..tenancy import Projects
# from .runner import convert_file
[docs]class ConversionError(Exception):
"""Something happened that prevented conversion of the target document"""
pass
[docs]class formatChecker(FormatChecker):
"""A simple JSON format validator"""
def __init__(self):
FormatChecker.__init__(self)
[docs]class Converter(object):
"""Base class implementing a document converter"""
VERSION = '0.0.0'
FILENAME = 'baseclass'
# Implementing subclasses should override
projects = Projects.sync()
PROJECT = projects.SD2.tacc_name
TENANT = projects.SD2.tenant
def __init__(self, schemas=[], targetschema=None, options={}, reactor=None):
# Discover the default input schema
HERE = os.path.abspath(inspect.getfile(self.__class__))
PARENT = os.path.dirname(HERE)
schema_path = os.path.join(PARENT, 'schema.json')
# Input schema(s)
# FIXME move to a single schema definition per class
self.schemas = [schema_path]
self.name = type(self).__name__
if isinstance(schemas, str):
if os.path.exists(str):
self.schemas.append(str)
else:
raise OSError('schema file {} not found'.format(str))
else:
for s in schemas:
if os.path.exists(s):
self.schemas.append(s)
else:
raise OSError('schema file {} not found'.format(str))
# Default output schema
if targetschema is None:
self.targetschema = { "$ref" : "https://schema.catalog.sd2e.org/schemas/sample_set.json" }
else:
self.targetschema = targetschema
self.options = options
self.reactor = reactor
# Schema metadata
setattr(self, 'filename', self.FILENAME)
setattr(self, 'version', self.VERSION)
setattr(self, 'project', self.PROJECT)
setattr(self, 'tenant', self.TENANT)
[docs] def convert(self, input_fp, output_fp=None, verbose=True, config={}, enforce_validation=True):
"""Convert between formats
This is a pass-through method that invokes a runner script
Args:
input_fp (str): Path to input file
output_fp (str): Path to output file
verbose (bool, optional): Print verbose output while running
config (dict, optional): Generic configuration object
enforce_validation (bool, optional): Whether to force validation of outputs
Returns:
bool: Whether the conversion succeeeded
"""
# Import lazily because of the SBH requirement
from .runner import convert_file
return convert_file(self.targetschema, input_fp, output_path=output_fp, verbose=verbose, config=config, enforce_validation=enforce_validation)
[docs] def test(self, input_fp, output_fp, verbose=True, config={}):
"""Smoketest method to see if Converter discovery is working
Returns:
True
"""
return True
[docs] def validate_input(self, input_fp, encoding, permissive=False):
"""Validate a generic input file against schemas known to Converter
Parameters:
input_fp (str): path to the validation target file
Arguments:
permissive (bool): whether to return False on failure to validate
Raises:
ConversionError: Raised when schema or target can't be loaded
ValidationError: Raised when validation fails
Returns:
boolean: True on success
"""
# set encoding
self.encoding = encoding
# JSON Path
if input_fp.endswith(".json"):
try:
with open(input_fp, 'r', encoding=encoding) as jsonfile:
jsondata = json.load(jsonfile)
except Exception as exc:
raise ConversionError('Failed to load {} for validation'.format(input_fp), exc)
# Iterate through our schemas
validation_errors = []
for schema_path in self.schemas:
try:
with open(schema_path) as schema:
schema_json = json.loads(schema.read())
except Exception as e:
raise ConversionError(
'Failed to load schema for validation', e)
try:
validate(jsondata, schema_json, format_checker=formatChecker())
return True
except ValidationError as v:
validation_errors.append(v)
pass
except Exception as e:
raise ConversionError(e)
# If we have not returned True, all schemas failed
if permissive:
return False
else:
raise ValidationError(validation_errors)
#XLSX PATH
elif input_fp.endswith(".xlsx"):
try:
# load all sheets
wb = load_workbook(input_fp, read_only=True)
except Exception as exc:
raise ConversionError('Failed to load {} for validation'.format(input_fp), exc)
# Iterate through our schemas
validation_errors = []
for schema_path in self.schemas:
try:
with open(schema_path) as schema:
schema_json = json.loads(schema.read())
except Exception as e:
wb.close()
raise ConversionError(
'Failed to load schema for validation', e)
try:
# pull headers from schema and check
schema_properties = schema_json["properties"]
if "xlsx" in schema_properties and schema_properties["xlsx"] and "headers" in schema_properties:
header_values_list = schema_properties["headers"]["oneOf"]
for header_values in header_values_list:
enum_values = [enum_item["enum"][0] for enum_item in header_values["items"]]
for sheetname in wb.sheetnames:
ws = wb[sheetname]
rows = ws.iter_rows(min_row=1, max_row=1)
first_row = next(rows)
excel_headers = [c.value for c in first_row]
valid = all([header in excel_headers for header in enum_values])
if valid:
return valid
except Exception as e:
wb.close()
raise ConversionError(e)
wb.close()
# If we have not returned True, all schemas failed
if permissive:
return False
else:
raise ValidationError(validation_errors)
#CSV PATH
elif input_fp.endswith(".csv"):
try:
input_fp_csvreader = csv.reader(open(input_fp))
except Exception as exc:
raise ConversionError('Failed to load {} for validation'.format(input_fp), exc)
# Iterate through our schemas
validation_errors = []
for schema_path in self.schemas:
try:
with open(schema_path) as schema:
schema_json = json.loads(schema.read())
except Exception as e:
raise ConversionError(
'Failed to load schema for validation', e)
try:
# pull headers from schema and check
schema_properties = schema_json["properties"]
if "csv" in schema_properties and schema_properties["csv"] and "headers" in schema_properties:
header_values_list = schema_properties["headers"]["oneOf"]
for header_values in header_values_list:
enum_values = [enum_item["enum"][0] for enum_item in header_values["items"]]
for row in input_fp_csvreader:
csv_headers = row
valid = all([header in csv_headers for header in enum_values])
if valid:
return valid
break
except Exception as e:
raise ConversionError(e)
# If we have not returned True, all schemas failed
if permissive:
return False
else:
raise ValidationError(validation_errors)
[docs] def validate(self, output_fp, permissive=False):
"""Validate a file against schemas known to Converter
Parameters:
output_fp (str): path to the validation target file
Note:
Yes, this is redundant with validate_input()
Arguments:
permissive (bool): whether to return False on failure to validate
Raises:
ValidationError: Raised when validation fails
Returns:
boolean: True on success
"""
try:
with open(output_fp, 'r') as jsonfile:
jsondata = json.load(jsonfile)
except Exception as exc:
raise ValidationError(
'Unable to load {} for validation'.format(output_fp), exc)
try:
with open(self.targetschema) as schema:
schema_json = json.loads(schema.read())
except Exception as e:
raise ValidationError('Unable to load schema for validation', e)
try:
validate(jsondata, schema_json, format_checker=formatChecker())
return True
except ValidationError as v:
if permissive:
return False
else:
raise ValidationError('Schema validation failed', v)
except Exception as e:
raise ValidationError(e)
[docs] def get_schema(self):
"""Pass-through to ``get_classifier_schema()``"""
return self.get_classifier_schema()
[docs] def get_classifier_schema(self):
"""Get the JSON schema that Converter is using for classification
Raises:
ConversionError: Returned on all Exceptions
Returns:
dict: JSON schema in dictionary form
"""
# Return the classifier schema as a Python object
schema_fp = getattr(self, 'schemas', [])[0]
try:
with open(schema_fp, 'r') as jsonfile:
return json.load(jsonfile)
except Exception as exc:
raise ConversionError('Failed to load {}'.format(schema_fp), exc)