Source code for datacatalog.managers.pipelinejobs.jobmanager

import arrow
import copy
import json
import os
import re
import sys
import validators
import logging
from pprint import pprint
from ...tokens import validate_admin_token
from ...tokens.admin import internal_get_admin_token
from ...tokens import admin
from ..common import Manager, data_merge
from .exceptions import ManagedPipelineJobError
from .config import DEFAULT_ARCHIVE_SYSTEM


[docs]class JobManager(Manager): PARAMS = [('archive_path', False, 'archive_path', None), ('archive_patterns', False, 'archive_patterns', []), ('archive_system', False, 'archive_system', DEFAULT_ARCHIVE_SYSTEM), ('pipeline_uuid', False, 'pipeline_uuid', None), ('token', False, 'token', None), ('uuid', False, 'uuid', None), ('state', False, 'state', None), ('last_event', False, 'last_event', None)] ADMIN_EVENTS = ['reset', 'ready', 'delete', 'purge'] def __init__(self, mongodb, agave=None, *args, **kwargs): self.cancelable = False self.job = None self._enforce_auth = True super(JobManager, self).__init__(mongodb, agave) # Read in core kwargs per PARAMS for param, required, key, default in self.PARAMS: kval = kwargs.get(param, None) if kval is None and required is True: raise ManagedPipelineJobError( 'Parameter "{}" is required'.format(param)) else: if kval is None: kval = default setattr(self, key, kval)
[docs] def setup(self, *args, **kwargs): return self
[docs] def attach(self, token=None): """Simplified proxy for load Materializes just enough of the job to interact with it """ return self.load(job_uuid=self.uuid, token=token)
[docs] def load(self, job_uuid=None, token=None): """Load up an JobManager instance for the given UUID This fuction is the opposite of ``setup()``. It populates a minimum attribute set from the current contents of a job. Args: job_uuid (string): A known job TypedUUID token (string, optional): Update token for the job Raises: ManagedPipelineJobError is raised on any unrecoverable error Returns: object: ``self`` """ if job_uuid is None: job_uuid = getattr(self, 'uuid', None) if job_uuid is None: raise ManagedPipelineJobError('Unable to load job contents') loaded_job = self.stores['pipelinejob'].find_one_by_uuid(job_uuid) if loaded_job is None: raise ManagedPipelineJobError( 'No job {} was found'.format(job_uuid)) for param, required, key, default in self.PARAMS: kval = loaded_job.get(param, None) if kval is None and required is True: raise ManagedPipelineJobError( 'Parameter "{}" is required'.format(param)) else: if kval is None: kval = default setattr(self, key, kval) return self
[docs] def cancel(self, token=None): """Cancel the job, deleting it from the system """ if token is not None: htoken = token else: htoken = getattr(self, 'token', None) try: if self.uuid is None: raise ValueError('Job UUID cannot be empty') if getattr(self, 'cancelable') is not False: self.stores['pipelinejob'].delete(self.uuid, htoken, force=True) self.job = None return self.job else: raise ManagedPipelineJobError( 'Cannot cancel a job once it is running. Send a "fail" event instead.' ) except Exception as cexc: raise ManagedPipelineJobError(cexc)
[docs] def delete(self, token=None): """Delete the job once, even if it has processed events """ if token is not None: htoken = token else: htoken = getattr(self, 'token', None) try: if self.uuid is None: raise ValueError('Job UUID cannot be empty') else: self.stores['pipelinejob'].delete(self.uuid, htoken, force=True) self.job = None for param, required, key, default in self.PARAMS: setattr(self, param, None) return self except Exception as cexc: raise ManagedPipelineJobError(cexc)
[docs] def handle(self, event_name, data={}, token=None, **kwargs): """Handle a named event """ # Passed token >> current token to permit # passing admin token as an argument if token is None: htoken = getattr(self, 'token', None) else: htoken = token try: if event_name in self.ADMIN_EVENTS: validate_admin_token(htoken, key=admin.get_admin_key(), permissive=False) # HRM if getattr(self, 'uuid', None) is None: self.setup(update_indexes=kwargs.get('update_indexes', False)) self.job = self.stores['pipelinejob'].handle({ 'name': event_name.lower(), 'uuid': self.uuid, 'token': htoken, 'data': data }) if getattr(self, 'cancelable'): setattr(self, 'cancelable', False) for param, required, key, default in self.PARAMS: setattr(self, param, self.job.get(param, None)) return self.job except Exception as hexc: raise ManagedPipelineJobError(hexc)
[docs] def run(self, data={}, token=None): """Wrapper for **run** """ return self.handle('run', data, token=token)
[docs] def resource(self, data={}, token=None): """Wrapper for **resource** """ return self.handle('resource', data, token=token)
[docs] def update(self, data={}, token=None): """Wrapper for **update** """ return self.handle('update', data, token=token)
[docs] def fail(self, data={}, token=None): """Wrapper for **fail** """ # This lets job workflows simply call fail() if the job # has not yet started running. We could also allow CREATED # to go to FAILED, thus preserving history. To do that, we # need to back this code out and update the FSM if getattr(self, 'cancelable', False): return self.cancel(token=internal_get_admin_token()) else: return self.handle('fail', data, token=token)
[docs] def finish(self, data={}, token=None): """Wrapper for **finish** """ return self.handle('finish', data, token=token)
[docs] def index(self, data={}, token=None, **kwargs): """Wrapper for **index** """ return self.handle('index', data, token=token)
[docs] def indexed(self, data={}, token=None, **kwargs): """Wrapper for **indexed** """ return self.handle('indexed', data, token=token)
[docs] def reset(self, data={}, no_clear_path=False, token=None, permissive=False): """Wrapper for **reset** Note: This event encapsulates both the 'reset' and subsequent 'ready' event, as the resetting process needs to be thread-locked. """ validate_admin_token(token, key=admin.get_admin_key(), permissive=False) resp = self.handle('reset', data, token=token) if not no_clear_path: self._clear_archive_path(permissive=permissive) # print('Sending READY') resp = self.handle('ready', data, token=token) return resp
[docs] def ready(self, data={}, token=None): """Wrapper for **ready* """ validate_admin_token(token, key=admin.get_admin_key(), permissive=False) return self.handle('ready', data, token=token)
[docs] def serialize_data(self): """Serializes self.data into a minified string """ return json.dumps(getattr(self, 'data', {}), sort_keys=True, separators=(',', ':'))
[docs] def archive_uri(self): """Formats archive system and path into a URI """ return 'agave://{}{}'.format(getattr(self, 'archive_system', 'NA'), getattr(self, 'archive_path', 'NA'))
def __repr__(self): vals = list() vals.append('uuid: {}'.format(getattr(self, 'uuid'))) vals.append('pipeline: {}'.format(getattr(self, 'pipeline_uuid'))) vals.append('archive_uri: {}'.format(self.archive_uri())) return '\n'.join(vals) def _validate_clearable_archive_path(self, path, permissive=True): PREFIXES = ('/products/v2', '/sample/tacc-cloud') for p in PREFIXES: if path.startswith(p): return True if permissive: return False else: raise ValueError( 'Only specific paths may be cleared: {}'.format(PREFIXES)) def _clear_archive_path(self, mock=False, permissive=True): """Administratively clears a job's archive path Path is cleared quickly by deleting the directory then recreating it. Preview the actions to be taken by this function by passing ``mock=True`` as a parameter. Args: mock (bool, optional): Whether to simulate running the delete Raises: ManagedPipelineJobError is raised for any error state Returns: bool """ try: ag_sys = getattr(self, 'archive_system', None) ag_path = getattr(self, 'archive_path', None) helper = self.stores['pipelinejob']._helper self._validate_clearable_archive_path(ag_path) if not helper.isdir(ag_path, storage_system=ag_sys): raise ValueError('Path does not appear to exist') if mock: print('clear_archive_path.mock.delete', ag_path, ag_sys) print('clear_archive_path.mock.mkdir', ag_path, ag_sys) else: helper.delete(ag_path, ag_sys) helper.mkdir(ag_path, ag_sys) except Exception as clexc: if permissive is False: raise ManagedPipelineJobError(clexc) else: return False return True