import os
import sys
from datacatalog import settings
from ...linkedstores.file import FileRecord, infer_filetype
from ...agavehelpers import from_agave_uri, AgaveError
from ..common import Manager, data_merge
from .indexrequest import ArchiveIndexRequest, ProductIndexRequest, get_index_request
from .indexrequest import IndexingError, IndexType, InvalidIndexingRequest
from .indexrequest import ARCHIVE, PRODUCT
[docs]class Indexer(Manager):
_path_listing = list()
[docs] def sync_listing(self, force=False):
"""Updates the job's cache of archive_path contents
"""
if force or len(getattr(self, '_path_listing', [])) <= 0:
storage_system = self.stores['pipelinejob'].find_one_by_uuid(
self.uuid).get('archive_system', None)
setattr(self, '_path_storage_system', storage_system)
listing = self.stores['pipelinejob'].list_job_archive_path(
self.uuid, recurse=True, directories=False)
if isinstance(listing, list):
setattr(self, '_path_listing', listing)
else:
raise IndexingError('Failed to list archive path')
return self
[docs] def index_if_exists(self,
abs_filename,
storage_system=None,
check_exists=True):
"""Index a file if it can be confirmed to exist
"""
if storage_system is None:
storage_system is settings.STORAGE_SYSTEM
# elif storage_system != settings.STORAGE_SYSTEM:
# # NOTE - This is temporary until comprehensive support for storageSystems is complete
# raise ValueError(
# 'Only storage system {} is currently supported'.format(
# settings.STORAGE_SYSTEM))
if check_exists:
if not self.stores['pipelinejob']._helper.exists(
abs_filename, storage_system):
raise ValueError(
'Path does not exist: {}'.format(abs_filename))
# TODO - Add storage_system=storage_system to File/FixityStore.index()
self.logger.info('Indexing referenced file {}'.format(
os.path.basename(abs_filename)))
opt_args = dict()
if getattr(self, 'uuid', None) is not None:
opt_args['child_of'] = [self.uuid]
resp = self.stores['file'].index(abs_filename,
storage_system=storage_system,
**opt_args)
# raise SystemError(resp)
try:
self.stores['fixity'].index(abs_filename,
storage_system=storage_system)
except Exception:
if settings.LOG_FIXITY_ERRORS:
self.logger.exception(
'Fixity indexing failed for {}'.format(abs_filename))
return resp
[docs] def file_or_ref_uuid(self, string_reference):
"""Resolves a string as a file or reference UUID
"""
uuidt = self.get_uuidtype(string_reference)
if uuidt in ('file', 'reference'):
return string_reference
else:
raise ValueError('Not a file or reference UUID')
[docs] def file_or_ref_identifier(self, string_reference):
"""Resolves a string identifier into a files or reference UUID
"""
doc = self.get_by_identifier(string_reference, permissive=False)
if doc is not None:
uuidt = self.get_uuidtype(doc['uuid'])
if uuidt in ('file', 'reference'):
return doc['uuid']
else:
# Try to index a file path if it wasn't in the database
if string_reference.startswith('/'):
doc = self.index_if_exists(string_reference)
if doc is not None:
return doc['uuid']
raise ValueError('Not a valid file or reference identifier {}'.format(
string_reference))
[docs] def file_agave_url(self, string_reference, check_exists=True):
"""Resolves an Agave URL into a file UUID
"""
# Needs to be wrapped in try..except block since from_agave_uri
# raises AgaveError or ValueError when it cannot resolve a URI
try:
system, directory, fname = from_agave_uri(string_reference)
abs_filename = os.path.join(directory, fname)
resp = self.index_if_exists(abs_filename,
system,
check_exists=check_exists)
return self.file_or_ref_identifier(abs_filename)
except Exception:
raise ValueError('Unable to resolve or index Agave files URI')
[docs] def file_job_relative_path(self, string_reference, check_exists=True):
"""Resolves a filename relative to a job's archive path as a file UUID
"""
if not string_reference.startswith('./'):
raise ValueError(
'Job output-relative filenames must begin with ./')
abs_filename = os.path.normpath(
os.path.join(self.archive_path, string_reference))
fname_uuid = self.stores['file'].get_typeduuid(abs_filename,
binary=False)
if check_exists:
self.index_if_exists(abs_filename, self.archive_system)
return fname_uuid
[docs] def resolve_derived_references(self, reference_set, permissive=False):
"""Resolves a list of linkages to UUIDs
"""
resolved = set()
if not isinstance(reference_set, (list, tuple)):
reference_set = [reference_set]
for ref in reference_set:
self.logger.debug('resolving {}'.format(ref))
# UUID
try:
refuuid = self.file_or_ref_uuid(ref)
resolved.add(refuuid)
self.logger.debug('Was a UUID')
continue
except ValueError:
self.logger.debug('Not a UUID')
# Identifier
try:
refuuid = self.file_or_ref_identifier(ref)
resolved.add(refuuid)
self.logger.debug('Was a string identifier')
continue
except ValueError:
self.logger.debug('Not a string identifier')
try:
refuuid = self.file_agave_url(ref)
resolved.add(refuuid)
self.logger.debug('Was an Agave files url')
continue
except ValueError:
self.logger.debug('Not a valid Agave files url')
# Relative path
try:
# A relative pathname must ALWAYS exist to be resolved
refuuid = self.file_job_relative_path(ref, check_exists=True)
resolved.add(refuuid)
self.logger.debug('Was a relative path')
continue
except ValueError:
self.logger.debug('Not a relative path')
if not permissive:
raise ValueError(
'String reference {} was not resolved'.format(ref))
# list of resolved references
resolved_list = list(resolved)
resolved_list.sort()
self.logger.info('Resolved {} records'.format(len(resolved_list)))
return resolved_list
[docs] def single_index_request(self,
index_request,
token=None,
refresh=False,
fixity=True,
permissive=False):
"""Processes a single indexing request
"""
self.sync_listing(refresh)
idxr = get_index_request(**index_request)
self.logger.debug('IndexRequest: {}'.format(idxr))
resp = list()
if idxr.kind is ARCHIVE:
gen_by = idxr.get('generated_by', [])
# Enforce presence of current job.uuid in ARCHIVE requests
if self.uuid not in gen_by:
gen_by.append(self.uuid)
idxr['generated_by'] = gen_by
resp = self._handle_single_archive_request(idxr,
token=token,
fixity=fixity,
permissive=permissive)
elif idxr.kind is PRODUCT:
resp = self._handle_single_product_request(idxr,
token=token,
fixity=fixity,
permissive=permissive)
return resp
def _handle_single_product_request(self,
request,
token=None,
fixity=False,
permissive=False):
"""Private: Services a products indexing request
"""
indexed = set()
try:
if request.filters != []:
patts = request.regex()
else:
patts = None
for file_name in self._path_listing:
if patts is not None:
if not patts.search(os.path.basename(file_name)):
continue
# Create a files record
ftype = infer_filetype(file_name,
check_exists=False,
permissive=True).label
fdict = {
'name': file_name,
'storage_system': self._path_storage_system,
'type': ftype
}
resp = self.stores['file'].add_update_document(fdict)
self.logger.debug('product_request_target: {}'.format(resp))
# if resp is not None:
self.logger.debug('Adding product linkages')
# Resolve UUIDs, indentifiers, and finally relative paths
# into UUIDs that can be used for linkages
derived_using = self.resolve_derived_references(
request.derived_using, permissive=permissive)
derived_from = self.resolve_derived_references(
request.derived_from, permissive=permissive)
self.logger.debug('derived_from: {}'.format(derived_from))
self.logger.debug('derived_using: {}'.format(derived_using))
self.logger.debug('add_link.derived_using')
self.stores['file'].add_link(resp['uuid'], derived_using,
'derived_using')
self.logger.debug('add_link.derived_from')
self.stores['file'].add_link(resp['uuid'], derived_from,
'derived_from')
self.logger.debug('product_request.add_link.child_of')
self.stores['file'].add_link(resp['uuid'], [self.uuid],
'child_of')
# Fixity is cheap - do it unless told not to
if fixity:
try:
self.logger.debug('Storing fixity')
resp = self.stores['fixity'].index(file_name)
except Exception:
# It's not the end of the world if fixity indexing fails
self.logger.debug(
'Fixity indexing failed on {} for job {}'.format(
file_name, self.uuid))
self.logger.info(
'Adding {} to list of indexed files'.format(file_name))
indexed.add(file_name)
indexed_list = list(indexed)
indexed_list.sort()
self.logger.debug('indexed {} items'.format(len(indexed_list)))
return indexed_list
except Exception as mexc:
if permissive:
return indexed
else:
raise IndexingError(mexc)
def _handle_single_archive_request(self,
request,
token=None,
fixity=True,
permissive=False):
"""Private: Services an archive path indexing request
"""
indexed = set()
try:
if request.filters != []:
patts = request.regex()
else:
patts = None
for file_name in self._path_listing:
if patts is not None:
if not patts.search(os.path.basename(file_name)):
continue
# Create a files record. We use permissive=True here to save
# the exists lookup in favor of just using the filename
ftype = infer_filetype(file_name,
check_exists=False,
permissive=True).label
fdict = {
'name': file_name,
'storage_system': self._path_storage_system,
'type': ftype
}
if request.level is not None:
fdict['level'] = request.level
resp = self.stores['file'].add_update_document(fdict)
self.logger.debug('archive_request_target: {}'.format(resp))
self.logger.debug('archive_request.add_link.child_of')
self.stores['file'].add_link(resp['uuid'], [self.uuid],
'child_of')
# if resp is not None:
# self.logger.debug('generated_by: {}'.format(
# request.generated_by))
# self.logger.debug('writing generated_by')
# self.stores['file'].add_link(resp['uuid'],
# request.generated_by)
# Fixity is cheap - do it unless told not to
if fixity:
try:
self.logger.debug('Storing fixity')
resp = self.stores['fixity'].index(file_name)
except Exception:
# It's not the end of the world if fixity indexing fails
self.logger.debug(
'Fixity indexing failed on {} for job {}'.format(
file_name, self.uuid))
self.logger.debug(
'Adding {} to list of indexed files'.format(file_name))
indexed.add(file_name)
indexed_list = list(indexed)
indexed_list.sort()
self.logger.debug('indexed {} items'.format(len(indexed_list)))
return indexed_list
except Exception as mexc:
if permissive:
return indexed
else:
raise IndexingError(mexc)