Source code for datacatalog.managers.pipelinejobs.indexer

import os
import sys
from datacatalog import settings
from ...linkedstores.file import FileRecord, infer_filetype
from ...agavehelpers import from_agave_uri, AgaveError
from ..common import Manager, data_merge
from .indexrequest import ArchiveIndexRequest, ProductIndexRequest, get_index_request
from .indexrequest import IndexingError, IndexType, InvalidIndexingRequest
from .indexrequest import ARCHIVE, PRODUCT


[docs]class Indexer(Manager): _path_listing = list()
[docs] def sync_listing(self, force=False): """Updates the job's cache of archive_path contents """ if force or len(getattr(self, '_path_listing', [])) <= 0: storage_system = self.stores['pipelinejob'].find_one_by_uuid( self.uuid).get('archive_system', None) setattr(self, '_path_storage_system', storage_system) listing = self.stores['pipelinejob'].list_job_archive_path( self.uuid, recurse=True, directories=False) if isinstance(listing, list): setattr(self, '_path_listing', listing) else: raise IndexingError('Failed to list archive path') return self
[docs] def index_if_exists(self, abs_filename, storage_system=None, check_exists=True): """Index a file if it can be confirmed to exist """ if storage_system is None: storage_system is settings.STORAGE_SYSTEM # elif storage_system != settings.STORAGE_SYSTEM: # # NOTE - This is temporary until comprehensive support for storageSystems is complete # raise ValueError( # 'Only storage system {} is currently supported'.format( # settings.STORAGE_SYSTEM)) if check_exists: if not self.stores['pipelinejob']._helper.exists( abs_filename, storage_system): raise ValueError( 'Path does not exist: {}'.format(abs_filename)) # TODO - Add storage_system=storage_system to File/FixityStore.index() self.logger.info('Indexing referenced file {}'.format( os.path.basename(abs_filename))) opt_args = dict() if getattr(self, 'uuid', None) is not None: opt_args['child_of'] = [self.uuid] resp = self.stores['file'].index(abs_filename, storage_system=storage_system, **opt_args) # raise SystemError(resp) try: self.stores['fixity'].index(abs_filename, storage_system=storage_system) except Exception: if settings.LOG_FIXITY_ERRORS: self.logger.exception( 'Fixity indexing failed for {}'.format(abs_filename)) return resp
[docs] def file_or_ref_uuid(self, string_reference): """Resolves a string as a file or reference UUID """ uuidt = self.get_uuidtype(string_reference) if uuidt in ('file', 'reference'): return string_reference else: raise ValueError('Not a file or reference UUID')
[docs] def file_or_ref_identifier(self, string_reference): """Resolves a string identifier into a files or reference UUID """ doc = self.get_by_identifier(string_reference, permissive=False) if doc is not None: uuidt = self.get_uuidtype(doc['uuid']) if uuidt in ('file', 'reference'): return doc['uuid'] else: # Try to index a file path if it wasn't in the database if string_reference.startswith('/'): doc = self.index_if_exists(string_reference) if doc is not None: return doc['uuid'] raise ValueError('Not a valid file or reference identifier {}'.format( string_reference))
[docs] def file_agave_url(self, string_reference, check_exists=True): """Resolves an Agave URL into a file UUID """ # Needs to be wrapped in try..except block since from_agave_uri # raises AgaveError or ValueError when it cannot resolve a URI try: system, directory, fname = from_agave_uri(string_reference) abs_filename = os.path.join(directory, fname) resp = self.index_if_exists(abs_filename, system, check_exists=check_exists) return self.file_or_ref_identifier(abs_filename) except Exception: raise ValueError('Unable to resolve or index Agave files URI')
[docs] def file_job_relative_path(self, string_reference, check_exists=True): """Resolves a filename relative to a job's archive path as a file UUID """ if not string_reference.startswith('./'): raise ValueError( 'Job output-relative filenames must begin with ./') abs_filename = os.path.normpath( os.path.join(self.archive_path, string_reference)) fname_uuid = self.stores['file'].get_typeduuid(abs_filename, binary=False) if check_exists: self.index_if_exists(abs_filename, self.archive_system) return fname_uuid
[docs] def resolve_derived_references(self, reference_set, permissive=False): """Resolves a list of linkages to UUIDs """ resolved = set() if not isinstance(reference_set, (list, tuple)): reference_set = [reference_set] for ref in reference_set: self.logger.debug('resolving {}'.format(ref)) # UUID try: refuuid = self.file_or_ref_uuid(ref) resolved.add(refuuid) self.logger.debug('Was a UUID') continue except ValueError: self.logger.debug('Not a UUID') # Identifier try: refuuid = self.file_or_ref_identifier(ref) resolved.add(refuuid) self.logger.debug('Was a string identifier') continue except ValueError: self.logger.debug('Not a string identifier') try: refuuid = self.file_agave_url(ref) resolved.add(refuuid) self.logger.debug('Was an Agave files url') continue except ValueError: self.logger.debug('Not a valid Agave files url') # Relative path try: # A relative pathname must ALWAYS exist to be resolved refuuid = self.file_job_relative_path(ref, check_exists=True) resolved.add(refuuid) self.logger.debug('Was a relative path') continue except ValueError: self.logger.debug('Not a relative path') if not permissive: raise ValueError( 'String reference {} was not resolved'.format(ref)) # list of resolved references resolved_list = list(resolved) resolved_list.sort() self.logger.info('Resolved {} records'.format(len(resolved_list))) return resolved_list
[docs] def single_index_request(self, index_request, token=None, refresh=False, fixity=True, permissive=False): """Processes a single indexing request """ self.sync_listing(refresh) idxr = get_index_request(**index_request) self.logger.debug('IndexRequest: {}'.format(idxr)) resp = list() if idxr.kind is ARCHIVE: gen_by = idxr.get('generated_by', []) # Enforce presence of current job.uuid in ARCHIVE requests if self.uuid not in gen_by: gen_by.append(self.uuid) idxr['generated_by'] = gen_by resp = self._handle_single_archive_request(idxr, token=token, fixity=fixity, permissive=permissive) elif idxr.kind is PRODUCT: resp = self._handle_single_product_request(idxr, token=token, fixity=fixity, permissive=permissive) return resp
def _handle_single_product_request(self, request, token=None, fixity=False, permissive=False): """Private: Services a products indexing request """ indexed = set() try: if request.filters != []: patts = request.regex() else: patts = None for file_name in self._path_listing: if patts is not None: if not patts.search(os.path.basename(file_name)): continue # Create a files record ftype = infer_filetype(file_name, check_exists=False, permissive=True).label fdict = { 'name': file_name, 'storage_system': self._path_storage_system, 'type': ftype } resp = self.stores['file'].add_update_document(fdict) self.logger.debug('product_request_target: {}'.format(resp)) # if resp is not None: self.logger.debug('Adding product linkages') # Resolve UUIDs, indentifiers, and finally relative paths # into UUIDs that can be used for linkages derived_using = self.resolve_derived_references( request.derived_using, permissive=permissive) derived_from = self.resolve_derived_references( request.derived_from, permissive=permissive) self.logger.debug('derived_from: {}'.format(derived_from)) self.logger.debug('derived_using: {}'.format(derived_using)) self.logger.debug('add_link.derived_using') self.stores['file'].add_link(resp['uuid'], derived_using, 'derived_using') self.logger.debug('add_link.derived_from') self.stores['file'].add_link(resp['uuid'], derived_from, 'derived_from') self.logger.debug('product_request.add_link.child_of') self.stores['file'].add_link(resp['uuid'], [self.uuid], 'child_of') # Fixity is cheap - do it unless told not to if fixity: try: self.logger.debug('Storing fixity') resp = self.stores['fixity'].index(file_name) except Exception: # It's not the end of the world if fixity indexing fails self.logger.debug( 'Fixity indexing failed on {} for job {}'.format( file_name, self.uuid)) self.logger.info( 'Adding {} to list of indexed files'.format(file_name)) indexed.add(file_name) indexed_list = list(indexed) indexed_list.sort() self.logger.debug('indexed {} items'.format(len(indexed_list))) return indexed_list except Exception as mexc: if permissive: return indexed else: raise IndexingError(mexc) def _handle_single_archive_request(self, request, token=None, fixity=True, permissive=False): """Private: Services an archive path indexing request """ indexed = set() try: if request.filters != []: patts = request.regex() else: patts = None for file_name in self._path_listing: if patts is not None: if not patts.search(os.path.basename(file_name)): continue # Create a files record. We use permissive=True here to save # the exists lookup in favor of just using the filename ftype = infer_filetype(file_name, check_exists=False, permissive=True).label fdict = { 'name': file_name, 'storage_system': self._path_storage_system, 'type': ftype } if request.level is not None: fdict['level'] = request.level resp = self.stores['file'].add_update_document(fdict) self.logger.debug('archive_request_target: {}'.format(resp)) self.logger.debug('archive_request.add_link.child_of') self.stores['file'].add_link(resp['uuid'], [self.uuid], 'child_of') # if resp is not None: # self.logger.debug('generated_by: {}'.format( # request.generated_by)) # self.logger.debug('writing generated_by') # self.stores['file'].add_link(resp['uuid'], # request.generated_by) # Fixity is cheap - do it unless told not to if fixity: try: self.logger.debug('Storing fixity') resp = self.stores['fixity'].index(file_name) except Exception: # It's not the end of the world if fixity indexing fails self.logger.debug( 'Fixity indexing failed on {} for job {}'.format( file_name, self.uuid)) self.logger.debug( 'Adding {} to list of indexed files'.format(file_name)) indexed.add(file_name) indexed_list = list(indexed) indexed_list.sort() self.logger.debug('indexed {} items'.format(len(indexed_list))) return indexed_list except Exception as mexc: if permissive: return indexed else: raise IndexingError(mexc)