import os
import sys
import inspect
import json
from pprint import pprint
from slugify import slugify
from ...jsonschemas import JSONSchemaBaseObject
from ...jsonschemas import formatChecker, DateTimeEncoder
from ...identifiers import typeduuid
from ...settings import MONGO_DELETE_FIELD
from ...utils import camel_to_snake, current_time, msec_precision, time_stamp
[docs]class DocumentSchema(JSONSchemaBaseObject):
"""Extends the JSON schema-driven document class with LinkedStore functions
DocumentSchema objects validate against the schema defined in
`schema.json`, have a defined LinkedStore type and specify fields used to
uniquely identify the document. Their `get_schemas` method can to emit both
document (which contains all administrative fields) and object schema (only
core data fields).
Attributes
_filters (list): A private attribute defining how to render document and object schemas from the larger JSON schema
"""
DELETE_FIELD = MONGO_DELETE_FIELD
TYPED_UUID_TYPE = 'generic'
"""The named type for UUIDs assigned to this class of LinkedStore documents"""
TYPED_UUID_FIELD = ['id']
"""List of fields used to generate a typed UUID"""
DEFAULT_DOCUMENT_NAME = 'schema.json'
"""Filename of the JSON schema document, relative to __file__."""
DEFAULT_FILTERS_NAME = 'filters.json'
"""Filename of the JSON schema filters document, relative to __file__."""
RETURN_DOC_FILTERS = ['_id', '_salt', '_admin',
'_properties', '_update_token', DELETE_FIELD]
"""These keys should never be returned in a document"""
def __init__(self, **kwargs):
doc_file = kwargs.get('document', self.DEFAULT_DOCUMENT_NAME)
filt_file = kwargs.get('filters', self.DEFAULT_FILTERS_NAME)
modfile = inspect.getfile(self.__class__)
try:
# Get default schema and filter documents from this base class
class_schemafile = os.path.join(os.path.dirname(__file__), self.DEFAULT_DOCUMENT_NAME)
class_filtersfile = os.path.join(os.path.dirname(__file__), self.DEFAULT_FILTERS_NAME)
# Get instance schema and filter documents
schemafile = os.path.join(os.path.dirname(modfile), doc_file)
filtersfile = os.path.join(os.path.dirname(modfile), filt_file)
if os.path.isfile(schemafile):
schemaj = json.load(open(schemafile, 'r'))
else:
schemaj = json.load(open(class_schemafile, 'r'))
if os.path.isfile(filtersfile):
filtersj = json.load(open(filtersfile, 'r'))
elif os.path.isfile(class_filtersfile):
filtersj = json.load(open(class_filtersfile, 'r'))
else:
filtersj = dict()
setattr(self, '_filters', filtersj)
except Exception:
schemaj = dict()
params = {**schemaj, **kwargs}
super(DocumentSchema, self).__init__(**params)
# print(self.get_collection())
self.update_id()
[docs] def to_dict(self, private_prefix='_', document=False, **kwargs):
"""Render LinkedStore object as a dict suitable for serialization
Args:
private_prefix (str): Key prefix used exclude keys from included in the dict
document (bool): Whether to generate a document- or object-type dict
Returns:
dict: A dictionary containing fields represented in the document's JSON schema
"""
schema_class = 'document' if document is True else 'object'
response_dict = dict()
self.update_id(document)
filters = getattr(self, '_filters', {})
properties_to_filter = filters.get(schema_class, {}).get('properties', [])
# Fetch the parent schema dictionary
# FIXME - check that this does not return values in parent dict
super_dict = super(DocumentSchema, self).to_dict(private_prefix, **kwargs)
# The filters.json definition doesn't support any kind of discovery
# One can only filter a document's 'properties' by top-level key
for key, val in super_dict.items():
# Filter named properties from the schema
if key == 'properties':
filtered_props = dict()
for fkey, fval in val.items():
if fkey not in properties_to_filter:
filtered_props[fkey] = fval
val = filtered_props
elif key == 'required':
filtered_reqs = list()
for lval in val:
if lval not in properties_to_filter:
filtered_reqs.append(lval)
val = filtered_reqs
response_dict[key] = val
return response_dict
[docs] def get_filename(self, document=False):
"""Returns basename for the schema file
When a LinkedStore's schema is rendered, its relationship with other
datacatalog-managed schemas is established via a common base URL. The
basename of the URI that is embedded in the ``$id`` field of the schema
is defined in ``__filename`` in the extended JSON schema document and
is returned by this method.
Returns:
str: The filename at which this schema is expected to be resolvable
"""
fn = getattr(self, '_filename', 'schema')
if document is False:
return fn
else:
return fn + '_document'
[docs] def update_id(self, document=False):
"""Update the ``id`` field in the JSON schema
This method is used solely to let us differentiate object- from
document-form JSON schemas by incorporating a specific string
into the schema's ``id`` field.
Args:
document (bool): Whether the schema is a document schema
Returns:
string: The updated value for schema ``id``
"""
temp_fname = getattr(self, '_filename')
if self._snake_case:
temp_fname = camel_to_snake(temp_fname)
schema_id = self.BASEREF + temp_fname
schema_id = schema_id.lower()
if document:
schema_id = schema_id + '_document'
if not schema_id.endswith('.json'):
schema_id = schema_id + '.json'
setattr(self, 'id', schema_id)
return schema_id
[docs] def get_identifiers(self):
"""Returns the list of top-level keys that are identifiers
In the extended-form schema, ``__identifiers`` describes which keys
can be used to uniquely identify documents written using this schema:
Returns:
list: The list of identifying key names
"""
return getattr(self, '_identifiers', [])
[docs] def get_indexes(self):
"""Returns the list of indexes declared for documents of this schema
In the extended-form schema, ``__indexes`` declare the indexing strategy
for documents written using this schema.
Returns:
list: The list of indexes declared for this schema
"""
return getattr(self, '_indexes', [])
[docs] def get_required(self):
"""Returns the list of required fields
This is defined by ``__required`` in extended-form schema.
Returns:
list: The list of indexes declared for this schema
"""
return getattr(self, 'required', [])
[docs] def get_collection(self):
"""Returns the name of the MongoDB containing documents with this schema
Documents from a LinkedStore are stored in a specific named MongoDB
collection. This method returns the collection name. It is good
practive for the collection and name of the LinkedStore-derived class
to be related intuitively.
Returns:
str: The name of a MongoDB collection
"""
return getattr(self, '_collection', self.COLLECTION)
[docs] def get_uuid_type(self):
"""Returns the TypedUUID name for documents with this schema
Each document is assigned a UUID which is a hash of values of specific
named keys in the document. The UUID is typed with a prefix to indicate
which kind of object it is. All LinkedStore documents have typed UUIDs,
but there are several other types as well.
Returns:
str: One of the list of UUID types known to the datacatalog library
"""
return getattr(self, '_uuid_type', self.TYPED_UUID_TYPE)
[docs] def get_uuid_fields(self):
"""Returns the key names used to generate the document's TypedUUID
Returns:
list: A list of key names found in the document that contribute to its UUID
"""
return getattr(self, '_uuid_fields', self.TYPED_UUID_FIELD)
[docs] def get_typeduuid(self, payload, binary=False):
"""Generate a UUID with the appropriate type prefix
Args:
payload (str/dict): If ``payload`` is string, the UUID is generated
directly from it. Otherwise, it is serialized before being used to
generate the UUID.
binary (bool, optional): Whether to return a Binary-encoded UUID.
Defaults to `False`.
Returns:
str: A string validating as UUID5 with a 3-character typing prefix
"""
# print('TYPED_UUID_PAYLOAD: {}'.format(payload))
if isinstance(payload, dict):
identifier_string = self.get_serialized_document(payload)
else:
identifier_string = str(payload)
new_uuid = typeduuid.catalog_uuid(identifier_string, uuid_type=self.get_uuid_type(), binary=binary)
# print('TYPED_UUID: {}'.format(new_uuid))
return new_uuid
[docs] def get_serialized_document(self, document, **kwargs):
"""Serializes a complex object into a string
Some UUIDs are constructed from complex data structures like Agave job
definitions. Rather than implement specific strategies for selecting
from arbitrary nested structures, this method provides guaranteed-
order serialization of the object to a linear string.
Args:
document (object): A dict or list object to serialize
Returns:
str: JSON serialized and minified representation of ``document``
"""
# Serialize values of specific keys to generate a UUID
union = {**document, **kwargs}
uuid_fields = self.get_uuid_fields()
serialized = dict()
for k in union:
if k in uuid_fields:
# print('TYPED_UUID_KEY: {}'.format(k))
serialized[k] = union.get(k)
serialized_document = json.dumps(serialized, indent=0, sort_keys=True,
separators=(',', ':'))
return serialized_document
[docs] def filter_keys(self):
defined_filters = self._filters.get('object', {}).get('properties', [])
return list(set(defined_filters + self.RETURN_DOC_FILTERS))
[docs] @classmethod
def time_stamp(cls):
"""Get a UTC time stamp rounded to millisecond precision
Returns:
object: datetime.datetime representation of utc_now()
"""
return msec_precision(time_stamp())