Source code for datacatalog.agavehelpers

import re
import os
from requests import HTTPError
from datacatalog import settings
from datacatalog.stores import StorageSystem
from agavepy.agave import Agave, AgaveError
from tenacity import (retry, retry_if_exception_type,
                      stop_after_delay, wait_exponential)

from .stores import ManagedStores
from .utils import normalize, normpath

# TODO Factor the command runners into a class that handles the setup
# TODO Implement a more declarative form of support for these commands based on plugins
# FIXME listdir returns full paths, which is at odds with the POSIX implementation

[docs]class AgaveHelperError(AgaveError): pass
[docs]class AgaveHelperException(AgaveHelperError): pass
[docs]@retry(retry=retry_if_exception_type(AgaveError), reraise=True, stop=stop_after_delay(8), wait=wait_exponential(multiplier=2, max=64)) def ag_files_list(client, systemId, filePath, limit=50, offset=0): return client.files.list(systemId=systemId, filePath=filePath, limit=limit, offset=offset)
[docs]class AgaveHelper(object): """Uses an active API client to provide various utility functions """ def __init__(self, client, storage_system=settings.STORAGE_SYSTEM): assert client is not None, 'AgaveHelper requires a valid API client' self.client = client self.system = StorageSystem( storage_system, agave=client)
[docs] def get_storage_system(self, storage_system): if storage_system is not None: system = StorageSystem(storage_system, agave=self.client) else: system = self.system return system
[docs] def mapped_posix_path(self, path, storage_system=None): """Resolve the absolute POSIX path for an Agave directory Args: path (str): Agave absolute path storage_system (str, optional): The storage system against which to resolve the POSIX path Returns: str: The path as a string """ system = self.get_storage_system(storage_system) if os.environ.get('STORAGE_SYSTEM_PREFIX_OVERRIDE', None) is not None: root_dir = os.environ.get('STORAGE_SYSTEM_PREFIX_OVERRIDE') else: root_dir = system.root_dir normalized_path = normalize(path) return os.path.join(root_dir, normalized_path)
[docs] def paths_to_agave_uris(self, filepaths, storage_system=None): """Transform a list of paths on a storage system to agave URI Args: filepaths (list): A list of agave storage system paths storage_system (str, optional): The storage system where these paths reside Returns: list: The paths in `agave://` format Warning: Existence of resources described by the URI list is not validated """ system = self.get_storage_system(storage_system) uri_list = [] for f in filepaths: uri_list.append(system.agave_path_uri(f)) return uri_list
[docs] def exists(self, path, storage_system=None): """Check if a path exists on an Agave storage resource Args: path (str): An Agave absolute path storage_system (str, optional): The storage system against which to resolve the POSIX path Raises: AgaveHelperError: The function has failed due an API error Returns: bool: Whether the path exists or not """ system = self.get_storage_system(storage_system) try: if os.path.exists(self.mapped_posix_path(path, system)): return True else: try: path_format = ag_files_list(self.client, filePath=path, systemId=system, limit=2)[0].get('format', None) if path_format != 'folder': return True else: return False except HTTPError as herr: if herr.response.status_code == 404: return False else: raise HTTPError(herr) except Exception as exc: raise AgaveHelperError('Function failed', exc)
[docs] def dirname(self, path, storage_system=None): raise NotImplementedError()
[docs] def isfile(self, path, storage_system=None): """Check if a path on an Agave storage resource is a file Args: path (str): An Agave absolute path storage_system (str, optional): The storage system against which to resolve the POSIX path Raises: AgaveHelperError: The function has failed due an API error Returns: bool: Whether the path is a file or not """ system = self.get_storage_system(storage_system) try: if os.path.isfile(self.mapped_posix_path(path, system)): return True else: try: path_format = ag_files_list(self.client, filePath=path, systemId=system, limit=2)[0].get('format', None) if path_format != 'folder': return True else: return False except Exception as exc: raise AgaveHelperError('Function failed', exc) except AgaveHelperError as aexc: raise NotImplementedError(aexc)
[docs] def isdir(self, path, storage_system=None): """Check if a path on an Agave storage resource is a directory Args: path (str): An Agave absolute path storage_system (str, optional): The storage system against which to resolve the POSIX path Raises: AgaveHelperError: The function has failed due an API error Returns: bool: Whether the path is a directory or not """ system = self.get_storage_system(storage_system) try: if os.path.isdir(self.mapped_posix_path(path, system)): return True else: try: path_format = ag_files_list(self.client, filePath=path, systemId=system, limit=2)[0].get('format', None) if path_format == 'folder': return True else: return False except Exception as exc: raise AgaveHelperError('Function failed', exc) except AgaveHelperError as aexc: raise NotImplementedError(aexc)
[docs] def listdir(self, path, recurse, storage_system=None, directories=True): """Get the contents of a directory on an Agave storage resource Args: path (str): An Agave absolute path to directory storage_system (str, optional): The storage system where `path` is found directories (bool, optional): Whether to include directories in response Returns: list: Directory contents as a list of strings """ system = self.get_storage_system(storage_system) dirlisting = list() try: dirlisting = self.listdir_agave_posix(path, recurse, system, directories) raise SystemError(dirlisting) except Exception: dirlisting = self.listdir_agave_native(path, recurse, system, directories) # Ensure listing is non-redundant # FIXME - figure out why there are redundant entries return list(set(dirlisting))
[docs] def listdir_agave_posix(self, path, recurse=True, storage_system=None, directories=True): system = self.get_storage_system(storage_system) prefix = system.root_dir path = os.path.join(prefix, normalize(path)) listing = list() files = list() for root, _, filenames in os.walk(path): for filename in filenames: files.append(os.path.join(root, filename)) if directories is True: listing = [l.replace(prefix, '') for l in listing] else: listing = [l.replace(prefix, '') for l in listing if not os.path.isdir(l)] return listing
[docs] def listdir_agave_lustre(self, path, recurse=True, storage_system=None, directories=True): raise NotImplementedError( 'Lustre support is not implemented. Consider using listdir_agave_posix().')
[docs] def listdir_agave_native(self, path, recurse, storage_system=None, directories=True, current_listing=[]): system = self.get_storage_system(storage_system) pagesize = system.page_size listing = current_listing keeplisting = True skip = 0 while keeplisting: sublist = ag_files_list(self.client, systemId=storage_system, filePath=path, limit=pagesize, offset=skip) # sublist = self.client.files.list( # systemId=storage_system, filePath=path, limit=pagesize, offset=skip) skip = skip + pagesize if len(sublist) < pagesize: keeplisting = False for f in sublist: if f['name'] != '.': if f['format'] != 'folder' or directories is True: listing.append(f['path']) if f['format'] == 'folder' and recurse is True: self.listdir_agave_native( f['path'], recurse, storage_system, directories, current_listing=listing) listing.sort() return listing
[docs] def delete(self, filePath, systemId): self.client.files.delete(filePath=filePath, systemId=systemId)
[docs] def mkdir(self, dirName, systemId, basePath='/', sync=False, timeOut=60): """ Creates a directory dirName on a storage system at basePath Like mkdir -p this is imdepotent. It will create the child path tree so long as paths are specified correctly, but will do nothing if all directories are already in place. """ try: self.client.files.manage(systemId=systemId, body={'action': 'mkdir', 'path': dirName}, filePath=basePath) except HTTPError as h: http_err_resp = process_agave_httperror(h) raise Exception(http_err_resp) except Exception as e: raise AgaveError( "Unable to mkdir {} at {}/{}: {}".format( dirName, systemId, basePath, e)) return True
[docs]def from_agave_uri(uri=None, validate=False): """Partition an Agave storage URI into its components Args: uri (str): An agave-canonical files URI validate (bool, optional): Whether to validate the URL using an API call Raises: AgaveError: Occurs when invalid URI is passed Returns: tuple: Three strings are returned: storageSystem, directoryPath, and fileName """ systemId = None dirPath = None fileName = None proto = re.compile(r'agave:\/\/(.*)$') if uri is None: raise AgaveError("URI cannot be empty") resourcepath = proto.search(uri) if resourcepath is None: raise AgaveError("Unable identify protocol: {}".format(uri)) resourcepath = resourcepath.group(1) firstSlash = resourcepath.find('/') if firstSlash is -1: raise AgaveError("Unable to resolve systemId: {}".format(uri)) try: systemId = resourcepath[0:firstSlash] origDirPath = resourcepath[firstSlash + 1:] dirPath = '/' + os.path.dirname(origDirPath) dirPath = normpath(dirPath) fileName = os.path.basename(origDirPath) return (systemId, dirPath, fileName) except Exception as e: raise AgaveError( "Error resolving directory path or file name: {}".format(e))
[docs]def process_agave_httperror(http_error_object): h = http_error_object # extract HTTP response code code = -1 try: code = h.response.status_code assert isinstance(code, int) except Exception: # we have no idea what the hell happened code = 418 # extract HTTP reason reason = 'UNKNOWN ERROR' try: reason = h.response.reason except Exception: pass return reason