Source code for datacatalog.dicthelpers

import copy
import datetime
import jsondiff
from attrdict import AttrDict
from collections import Mapping, MutableMapping
from pprint import pprint
from functools import lru_cache
from datacatalog.hashable import picklecache, jsoncache

LISTKEYS = ['child_of']
FILTERKEYS = ('_id', 'uuid', 'properties', 'measurements_ids',
              'measurements', 'files', 'samples')
[docs]class DictionaryMergeError(Exception): pass
def __data_merge(a, b): aa = AttrDict(a) bb = AttrDict(b) ab = dict(aa + bb) return ab
[docs]def right_merge(right_value, left_value): merged = None if isinstance(right_value, list) and isinstance(left_value, list): merged = list(set(left_value) | set(right_value)) return merged else: return right_value
[docs]def data_merge(left, right, setkeys=LISTKEYS): """ Merge two mappings objects together, combining overlapping Mappings, and favoring right-values left: The left Mapping object. right: The right (favored) Mapping object. NOTE: This is not commutative (merge(a,b) != merge(b,a)). """ merged = {} left_keys = frozenset(left) right_keys = frozenset(right) # Items only in the left Mapping for key in left_keys - right_keys: merged[key] = left[key] # Items only in the right Mapping for key in right_keys - left_keys: merged[key] = right[key] # in both for key in left_keys & right_keys: left_value = left[key] right_value = right[key] if (isinstance(left_value, Mapping) and isinstance(right_value, Mapping)): # recursive merge merged[key] = data_merge(left_value, right_value) else: # overwrite with right value if key in setkeys: merged[key] = right_merge(left_value, right_value) else: merged[key] = right_value return merged
[docs]def data_merge_diff(a, b, filters=FILTERKEYS): ab = data_merge(a, b) df = json_diff(a, b, filters=FILTERKEYS) return ab, df
[docs]@picklecache.mcache(lru_cache(maxsize=128)) def json_diff(j1, j2, filters=FILTERKEYS): j1 = copy.deepcopy(j1) j2 = copy.deepcopy(j2) for f in filters: try: j1.pop(f) except KeyError: pass try: j2.pop(f) except KeyError: pass resp = jsondiff.diff(j1, j2, marshal=True) return resp
[docs]def filter_dict(target_dict, keys_to_filter): """Filters key(s) from top level of a dict Args: target_dict (dict): the dictionary to filter keys_to_filter (list, tuple, str): set of keys to filter Returns: A filtered copy of target_dict """ assert isinstance(target_dict, dict) d = copy.deepcopy(target_dict) # Let keys_to_filter be a single string or a tuple of values if isinstance(keys_to_filter, str): keys_to_filter = [keys_to_filter] elif isinstance(keys_to_filter, tuple): keys_to_filter = list(keys_to_filter) elif keys_to_filter is None: keys_to_filter = [] # Do the filtering for kf in keys_to_filter: try: d.pop(kf) except KeyError: pass return d
[docs]def dict_compare(a, b): '''Lexically compare values of all primitives in a pair of dicts''' # FIXME: This is not a great implementation aa = '.'.join(sorted([str(v) for v in list(flatten(a).values()) if is_primitive(v)])) bb = '.'.join(sorted([str(u) for u in list(flatten(b).values()) if is_primitive(u)])) if aa == bb: return False else: return True
[docs]def flatten(d, parent_key='', sep='_'): items = [] for k, v in list(d.items()): new_key = '{0}{1}{2}'.format(parent_key, sep, k) if parent_key else k if isinstance(v, MutableMapping): items.extend(list(flatten(v, new_key, sep=sep).items())) elif isinstance(v, list): # apply itself to each element of the list - that's it! items.append((new_key, list(map(flatten, v)))) else: items.append((new_key, v)) return dict(items)
[docs]def is_primitive(pyobj): """Determine if pyobj is one of (what other languages would deem) a primitive""" if type(pyobj) in (int, float, bool, str, bytes): return True else: return False
# Reference: https://gist.github.com/angstwad/bf22d1822c38a92ec0a9#gistcomment-1986197
[docs]def dict_merge(dct, merge_dct, add_keys=True): """ Recursive dict merge. Inspired by :meth:``dict.update()``, instead of updating only top-level keys, dict_merge recurses down into dicts nested to an arbitrary depth, updating keys. The ``merge_dct`` is merged into ``dct``. This version will return a copy of the dictionary and leave the original arguments untouched. The optional argument ``add_keys``, determines whether keys which are present in ``merge_dict`` but not ``dct`` should be included in the new dict. Args: dct (dict) onto which the merge is executed merge_dct (dict): dct merged into dct add_keys (bool): whether to add new keys Returns: dict: updated dict """ dct = copy.deepcopy(dct) if not add_keys: merge_dct = { k: merge_dct[k] for k in set(dct).intersection(set(merge_dct)) } for k, v in list(merge_dct.items()): if (k in dct and isinstance(dct[k], dict) and isinstance(merge_dct[k], Mapping)): dct[k] = dict_merge(dct[k], merge_dct[k], add_keys=add_keys) else: dct[k] = merge_dct[k] return dct
[docs]def list_merge(lst, merge_lst): # Does not preserve order return list(set(lst) | set(merge_lst))
def __data_merge(aa, bb): """Merges b into a and returns merged result NOTE: Tuples and arbitrary objects are not handled as it is ambiguous what should happen """ key = None a = copy.deepcopy(aa) b = copy.deepcopy(bb) try: if a is None or isinstance(a, (str, int, float, bool, bytes, datetime.datetime)): # border case for first run or if a is a primitive if b is not None: a = b elif isinstance(a, list): # lists can be only appended if isinstance(b, list): # merge lists a = list_merge(a, b) else: # append to list a = list_merge(a, b) elif isinstance(a, dict): # dicts must be merged if isinstance(b, dict): for key in b: if key in a: a[key] = data_merge(a[key], b[key]) else: a[key] = b[key] else: raise DictionaryMergeError( 'Cannot merge non-dict "%s" into dict "%s"' % (b, a)) else: raise DictionaryMergeError( 'NOT IMPLEMENTED "%s" into "%s"' % (b, a)) except TypeError as e: raise DictionaryMergeError( 'TypeError "%s" in key "%s" when merging "%s" into "%s"' % (e, key, b, a)) return a def _dictcompare(a, b, section=None): # Used to compare database records as dicts # https://stackoverflow.com/a/48652830 # This was not working for the comparisons I needed to make but am keeping # it around for reference return [(c, d, g, section) if all(not isinstance(i, dict) for i in [d, g]) and d != g else None if all(not isinstance(i, dict) for i in [d, g]) and d == g else _dictcompare(d, g, c) for [c, d], [h, g] in zip(list(a.items()), list(b.items()))]
[docs]def flatten_dict(dd, separator='_', prefix=''): return {prefix + separator + k if prefix else k: v for kk, vv in dd.items() for k, v in flatten_dict(vv, separator, kk).items() } if isinstance(dd, dict) else {prefix: dd}
[docs]def linearize_dict(dd, separator='|'): ary = list() ddd = flatten_dict(dd) for k, v in sorted(ddd.items()): ary.append(k + '=' + str(v)) # ary = sorted(ary) resp = separator.join(ary).replace(' ', '') # resp = resp.replace(' ', '') return resp