import base64
import inspect
import json
import os
import sys
from pprint import pprint
import random
import string
import tempfile
from datacatalog import settings
from ... import identifiers
from ...dicthelpers import data_merge
from ..basestore import LinkedStore, HeritableDocumentSchema, linkages
from ..basestore import CatalogUpdateFailure
from ..basestore import SoftDelete, AgaveClient
from ..basestore import get_token, validate_token, validate_admin_token
from .exceptions import JobError, JobCreateFailure, JobUpdateFailure, \
DuplicateJobError, UnknownPipeline, UnknownJob
from .schema import JobDocument, HistoryEventDocument
from .job import PipelineJob, PipelineJobError
from .graphfsm import render_graph, build_graph
DEFAULT_LINK_FIELDS = [linkages.CHILD_OF, linkages.ACTED_ON,
linkages.ACTED_USING, linkages.GENERATED_BY]
[docs]class PipelineJobStore(AgaveClient, SoftDelete, LinkedStore):
NEVER_INDEX_FIELDS = ('data')
LINK_FIELDS = DEFAULT_LINK_FIELDS
"""Fields that should never be indexed"""
def __init__(self, mongodb, config={}, session=None, agave=None, **kwargs):
super(PipelineJobStore, self).__init__(mongodb, config, session, agave)
# setup based on schema extended properties
schema = JobDocument(**kwargs)
super(PipelineJobStore, self).update_attrs(schema)
self._enforce_auth = True
self.setup(update_indexes=kwargs.get('update_indexes', False))
# Extend Store so it can validate the pipeline UUID
setattr(self, 'pipes_coll', self.db['pipelines'])
[docs] def uuid_from_properties(self, job_document, **kwargs):
self.validate_pipeline_uuid(job_document.get('pipeline_uuid'))
# Must refer to a valid (but not verified) abaco actorId
try:
identifiers.abaco_hashid.validate(job_document.get('actor_id'))
except Exception:
pass
return job_document.get('uuid', self.get_typeduuid(job_document))
[docs] def create(self, job_document, **kwargs):
job_uuid = self.uuid_from_properties(job_document, **kwargs)
job_document['uuid'] = job_uuid
pipe_job_document = PipelineJob(job_document).new()
return self.add_update_document(pipe_job_document.to_dict())
[docs] def handle(self, event_document, token=None, data=None, **kwargs):
# This is a special method that takes event documents
# and modifies the job state/history
self.logger.info("handling event '{}'".format(
event_document.get('name', None)))
job_uuid = None
try:
job_uuid = event_document.get('uuid')
except KeyError:
raise JobUpdateFailure('Cannot process an event without a job UUID')
db_record = self.find_one_by_uuid(job_uuid)
if db_record is None:
raise UnknownJob('{} is not a valid job ID'.format(job_uuid))
# Allow handle() to accept token as an argument or as a key
# in the event_document
passed_token = event_document.get('token', token)
# Token must validate
# TODO - Extend validate_token to honor one or more admin tokens set in env
validate_token(passed_token, db_record['_salt'], self.get_token_fields(db_record))
db_job = PipelineJob(db_record).handle(event_document)
return self.add_update_document(db_job.to_dict())
[docs] def delete(self, job_uuid, token, force=True):
# Special kind of event
validate_admin_token(token, permissive=False)
return self.delete_document(job_uuid, token=token, force=force)
[docs] def history(self, job_uuid, limit=None, skip=None):
pass
[docs] def validate_pipeline_uuid(self, pipeline_uuid):
try:
pipe = self.pipes_coll.find_one({'uuid': pipeline_uuid})
if pipe is not None:
return True
else:
raise UnknownPipeline(
'No pipeline exists with UUID {}'.format(str(pipeline_uuid)))
except Exception as exc:
raise Exception('Failed to validate pipeline UUID', exc)
[docs] def list_job_archive_path(self, job_uuid, recurse=True,
directories=False, **kwargs):
"""Returns contents of a job's archive_path on the job's archive_system
Args:
job_uuid (str): UUID of the job
recurse (bool, optional): List recursively
directories (bool, optional): Include directories in response
Notes:
PipelineJobStore must be initialized with a valid Agave API client
Returns:
list: Agave-canonical absolute filenames in job.archive_path
"""
db_record = self.find_one_by_uuid(job_uuid)
if db_record is None:
raise UnknownJob('{} is not a valid job ID'.format(job_uuid))
dir_listing = self._helper.listdir(
db_record['archive_path'],
recurse=recurse,
storage_system=db_record.get('archive_system',
settings.STORAGE_SYSTEM),
directories=False)
return dir_listing
[docs] def fsm_state_png(self, uuid):
try:
job = self.find_one_by_uuid(uuid)
if job is not None:
events = [hist['name'] for hist in job['history']]
title = 'job:' + uuid
graph = build_graph(job['state'], title, events)
tmpname = ''.join([random.choice(string.ascii_letters + string.digits) for n in range(32)])
tmpfp = os.path.join(tempfile.gettempdir(), tmpname)
graph.draw(tmpfp, format='png', prog='dot')
encoded = base64.b64encode(open(tmpfp, "rb").read())
os.unlink(tmpfp)
return encoded
else:
raise ValueError('Unable to retrieve job {}'.format(uuid))
except Exception as exc:
raise JobError('Failed to get_graph', exc)
[docs]class StoreInterface(PipelineJobStore):
pass