Source code for datacatalog.stores.pathmapping

import inspect
import json
import os
from datacatalog import settings
from datacatalog.extensible import ExtensibleAttrDict
from .agave import StorageSystem
from .level import Level
from ..utils import normalize

[docs]class PathMapping(ExtensibleAttrDict): """Representation of a managed store with levels """ def __init__(self, system_id, path_mapping, agave=None): super().__init__() setattr(self, '_system', StorageSystem(system_id, agave=agave)) setattr(self, '_mapping', path_mapping) @property def hpc_path(self): """Path on TACC CLI systems """ return self._system.root_dir
[docs] def hpc_path_uri(self, username='<username>'): """SFTP URI Implements RFC-3986 (expired) encoding of an SFTP URI """ return 'sftp://' + username + '@' + self._system.ssh_host + self.hpc_path
@property def agave_path(self): """Agave absolute path """ return self._system.home_dir @property def agave_path_uri(self): """Agave files URI """ url = settings.TACC_API_SERVER + self._system + self.agave_path return url @property def jupyter_path(self): """Path within Jupyter notebooks """ return self._mapping.get('jupyter') @property def jupyterhub_path(self): """User-agnostic path in JupyterHub application """ return self._mapping.get('jupyterhub') @property def jupyterhub_path_uri(self): """JupyterHub application URI """ url = settings.TACC_JUPYTER_SERVER + self.jupyterhub_path return url
[docs]class PathMappings(object): _maps = list() """Maps managed stores to levels and physical file paths across platforms """
[docs] @classmethod def map(cls, system_id, agave=None): mod = inspect.getmodule(cls) modfile = inspect.getfile(mod) storefile = os.path.join(os.path.dirname(modfile), 'mappings.json') contents = json.load(open(storefile, 'r')) for k, v in contents.items(): if k == system_id: m = PathMapping(k, v, agave=agave) if m not in cls._maps: cls._maps.append(m) return m