Source code for datacatalog.linkedstores.annotations.text.store

import inspect
import json
import os
import sys
from pprint import pprint

from datacatalog import linkages
from datacatalog.dicthelpers import data_merge
from datacatalog.identifiers.typeduuid import get_uuidtype
from datacatalog.linkedstores.basestore import (
    LinkedStore, CatalogUpdateFailure, SoftDelete,
    JSONSchemaCollection)
from .document import (TextAnnotationDocument,
                       TextAnnotationSchema,
                       TYPE_SIGNATURE)
from ..exceptions import AnnotationError

[docs]class TextAnnotationStore(SoftDelete, LinkedStore): """Manage storage and retrieval of TagAnnotation documents""" LINK_FIELDS = [linkages.CHILD_OF] def __init__(self, mongodb, config={}, session=None, **kwargs): super(TextAnnotationStore, self).__init__(mongodb, config, session) schema = TextAnnotationSchema(**kwargs) super(TextAnnotationStore, self).update_attrs(schema) self._enforce_auth = True self.setup(update_indexes=kwargs.get('update_indexes', False))
[docs] def new_text(self, body, subject=None, owner=None, token=None): """Create a new TextAnnotation record Args: body (str): The body of the message owner (str): TACC.cloud username or valid email owner subject (str, optional): Optional subject for the message Returns: dict: Representation of the newly-created text annotation """ doc_dict = TextAnnotationDocument(subject=subject, body=body, owner=owner) return self.add_update_document(doc_dict)
[docs] def new_reply(self, parent_text_uuid, body, subject=None, owner=None, token=None): """Creates a new TextAnnotation record linked to its parent via child_of Args: parent_text_uuid (str): UUID5 of an existing Text Annotation body (str): The body of the message owner (str): TACC.cloud username or valid email owner subject (str, optional): Optional subject for the message Returns: dict: Representation of the newly-created reply """ if self.find_one_by_uuid(parent_text_uuid) is None: raise AnnotationError('Parent TextAnnotation does not exist') doc_dict = TextAnnotationDocument(subject=subject, body=body, owner=owner) doc_dict[linkages.CHILD_OF] = [parent_text_uuid] return self.add_update_document(doc_dict, token=token)
[docs]class StoreInterface(TextAnnotationStore): pass