Source code for datacatalog.linkedstores.basestore.merge

from datacatalog import linkages
from jsonmerge import merge, Merger

__all__ = ['APPEND', 'MERGE', 'OVERWRITE', 'DISCARD', 'ALL',
           'DEFAULT_JSONMERGE_STRATEGY', 'json_merge']

APPEND = 'append'
MERGE = 'arrayMergeById'
OVERWRITE = 'overwrite'
DISCARD = 'discard'

ALL = (APPEND, MERGE, OVERWRITE, DISCARD)
DEFAULT_JSONMERGE_STRATEGY = APPEND

class JSONMergeError(Exception):
    pass

class JSONMergeStrategy(str):
    """A JSON merge strategy"""
    def __new__(cls, value):
        value = str(value).lower()
        if value not in ALL:
            raise JSONMergeError('"{}" is not a valid {}'.format(value, cls.__name__))
        return str.__new__(cls, value)

def get_merger(merge_strategy):
    schema = {'properties': {}}
    for lkg in linkages.ALL:
        schema[lkg] = {'mergeStrategy': JSONMergeStrategy(merge_strategy)}
    merger = Merger(schema)
    return merger

[docs]def json_merge(a, b, merge_strategy=DEFAULT_JSONMERGE_STRATEGY): merger = get_merger(merge_strategy) result = merger.merge(a, b) return result