datacatalog.managers.pipelinejobs package

class datacatalog.managers.pipelinejobs.ManagedPipelineJob(mongodb, pipelines, instanced=False, agave=None, *args, **kwargs)[source]

Bases: datacatalog.managers.pipelinejobs.jobmanager.JobManager

Specialized PipelineJob that supports archiving to defined stores and deferred updates

  1. uuid is assigned as hash of pipeline_uuid and data during setup()
  2. archive_path is either specified in init() or generated from XXXX. The latter is the default behavior and is considered SD2 project best practice.
  3. When archive_path is generated, it is “instanced” by default. This means the root of generated path will be predictable, but will include a terminal directory adjective-animal-date for collision avoidance. Pass instanced=False to disable this behavior.
  4. A job’s linkages enable discoverability of its outputs relative to the primary experimental metadata. Consequently, they are very carefully specified. A job is generated_by one computational pipeline and is a child_of one or more measurements.
  5. The file contents of archive_path are directly associated with the job via a generated_by relationship after job runs to completion

The child_of relationship is established when the job is initialized by looking in kwargs for and resolving into a list of measurements: (measurement_id, sample_id, then experiment_id). These can be passed either as single strings or as a list of strings. Either canonical identifiers (measurement_id, sample_id, and experiment_id values) or their corresponding UUID5 values can be used. At least one of (experiment_id, sample_id, measurement_id) must be passed to init() to connect a job to its upstream metadata.

The job’s archive_path is generated as follows (if archive_path is not specified at init()): It begins with a prefix /products/v2, to which is added a compressed version of the pipeline UUID. Next, the UUIDs of the metadata associations (experiment, sample, measurement) are hashed and added to the path. Finally, contents of the job’s data key are serialized, hashed, and added to the path. The net result of this strategy is that each combination of pipeline, metadata linkage, and run-time parameterization can be uniquely referenced and is stored in a collision-proofed location on the storage resource.

Parameters:
  • mongodb (mongo.Connection) – Connection to system MongoDB with write access to jobs
  • pipelines (dict/PipelineJobsConfig) – Pipelines configuration
Keyword Arguments:
 
  • experiment_id (str/list, optional) – Identifier(s) for the experiment(s) to which the job is linked
  • sample_id (str/list, optional) – Identifer(s) for the sample(s) to which the job is linked
  • measurement_id (str/list, optional) – Identifier(s) for the measurement(s) to which the job is linked
  • data (dict, optional) – Defines the job’s parameterization.
  • archive_path (str, optional) – Override value for automatically-generated archive_path
  • archive_system (str, optional) – Override default Agave archive_system
  • archive_patterns (list, optional) – List of ArchiveIndexRequest objects
  • product_patterns (list, optional) – List of ProductIndexRequest objects
  • instanced (bool, optional) – Should archive_path be extended with a randomized session name
  • setup_archive_path (bool, optional) – Should archive_path be created at job setup?
  • session (str, optional) – A short alphanumeric correlation string
  • agave (agavepy.agave.Agave, optional) – Active TACC.cloud API client. Needed only to resolve references to Agave or Abaco entities.
Other Parameters:
 
  • agent (str, optional) – Abaco actorId or Agave appId managing the pipeline
  • archive_collection_level (str, optional) – Overrides default of measurement for aggregating outputs
  • inputs (list, optional) – Data files and references being computed upon. This supplements values of inputs discovered in data
  • generated_by – (str, optional): String UUID5 of a named process
  • pipeline_uuid (str, optional) – Overrides value of pipelines.pipeline_uuid
  • task (str, optional) – The specific instance of agent

Note

Only one of (experiment_id, sample_id, measurement_id) may be be passed when initializing an instance of ManagedPipelineJob.

COLL_PARAMS = [('measurement_id', 'measurement'), ('sample_id', 'sample'), ('experiment_id', 'experiment')]
METADATA_ARG_NAMES = ('experiment_design_id', 'experiment_id', 'sample_id', 'measurement_id')
PARAMS = [('agent', False, 'agent', None), ('task', False, 'task', None), ('session', False, 'session', None), ('data', False, 'data', {}), ('level_store', False, 'level_store', 'product'), ('archive_path', False, 'archive_path', None), ('setup_archive_path', False, 'setup_archive_path', True), ('archive_system', False, 'archive_system', 'data-sd2e-community'), ('archive_patterns', False, 'archive_patterns', []), ('product_patterns', False, 'product_patterns', []), ('uuid', False, 'uuid', None)]
agave_notifications()[source]

Returns a minimal set of Agave job notifications

build_webhook()[source]

Return a webhook to update this job via web callback

Sending event messages over HTTP POST to this webhook will result in the jobs-manager Reactor making managed updates to this job’s state.

Returns:A callback URL
Return type:str
canonicalize_actor(actor_id)[source]
canonicalize_app(app_id)[source]
canonicalize_execution(actor_id, exec_id)[source]
canonicalize_job(job_id)[source]
canonicalize_system(system_id)[source]
get_archive_path(instanced=True, *args, **kwargs)[source]

Computes and returns the document’s archive_path

instanced_directory(session=None)[source]

Extend a path with an instanced directory name

Parameters:session (str, optional) – Short alphanumeric session string
Returns:The new instance directory name
Return type:str
refs_from_data_dict(data={}, store='files')[source]

Find agave-canonical URI from data dicts

Discover late-bound links to files and references from the contents of inputs and parameters keys in a data dictionary.

Parameters:
  • data (dict) – A data dictionary
  • store (string, optional) – Which store to resolve against (files|references)
Returns:

Discovered list of managed file- or reference URIs

Return type:

list

set_archive_path(instanced=True, level_store='product', *args, **kwargs)[source]

Sets the document’s archive_path

set_callbacks()[source]

Establish the web service callbacks for the job

setup(data={})[source]

Finish initializing the manager

Args: data (dict, optional): Override or set value for job data

Returns:self
Return type:object
class datacatalog.managers.pipelinejobs.ReactorManagedPipelineJob(reactor, data={}, *args, **kwargs)[source]

Bases: datacatalog.managers.pipelinejobs.store.ManagedPipelineJob

class datacatalog.managers.pipelinejobs.ManagedPipelineJobInstance(mongodb, uuid, agave=None, **kwargs)[source]

Bases: datacatalog.managers.pipelinejobs.indexer.Indexer

Supports working with a existing ManagedPipelineJob

Parameters:
  • mongodb (mongo.Connection) – Connection to MongoDB with write access to jobs
  • uuid (str) – Job UUID
  • token (str) – Update token for the job
PARAMS = [('state', False, 'state', None), ('archive_path', False, 'archive_path', None), ('archive_system', False, 'archive_system', 'data-sd2e-community'), ('archive_patterns', False, 'archive_patterns', []), ('product_patterns', False, 'product_patterns', []), ('generated_by', False, 'generated_by', []), ('child_of', False, 'child_of', []), ('acted_on', False, 'acted_on', []), ('acted_using', False, 'acted_using', []), ('pipeline_uuid', False, 'pipeline_uuid', None), ('last_event', False, 'last_event', None)]
handle(event_doc, token=None)[source]

Override super().handle to process events directly rather than by name

index(token=None, transition=False, level='1', fixity=False, filters=None, permissive=False, **kwargs)[source]

Index the contents of the job’s archive path

indexed(token=None)[source]

Mark job outputs indexing as completed

exception datacatalog.managers.pipelinejobs.ManagedPipelineJobError[source]

Bases: Exception

An error happened in the context of a ManagedPipelineJob

Submodules

datacatalog.managers.pipelinejobs.config module

class datacatalog.managers.pipelinejobs.config.CorePipelinesConfig(**kwargs)[source]

Bases: datacatalog.extensible.ExtensibleAttrDict

class datacatalog.managers.pipelinejobs.config.PipelineJobsConfig(**kwargs)[source]

Bases: datacatalog.managers.pipelinejobs.config.CorePipelinesConfig

A PipelineJobs configuration.

Implements sanity on init() checking to avoid misconfigured Pipeline Jobs agents.

class datacatalog.managers.pipelinejobs.config.PipelinesConfig(**kwargs)[source]

Bases: datacatalog.managers.pipelinejobs.config.CorePipelinesConfig

datacatalog.managers.pipelinejobs.exceptions module

exception datacatalog.managers.pipelinejobs.exceptions.ManagedPipelineJobError[source]

Bases: Exception

An error happened in the context of a ManagedPipelineJob

datacatalog.managers.pipelinejobs.indexer module

class datacatalog.managers.pipelinejobs.indexer.Indexer(mongodb, agave=None, *args, **kwargs)[source]

Bases: datacatalog.managers.common.Manager

file_agave_url(string_reference, check_exists=True)[source]

Resolves an Agave URL into a file UUID

file_job_relative_path(string_reference, check_exists=True)[source]

Resolves a filename relative to a job’s archive path as a file UUID

file_or_ref_identifier(string_reference)[source]

Resolves a string identifier into a files or reference UUID

file_or_ref_uuid(string_reference)[source]

Resolves a string as a file or reference UUID

index_if_exists(abs_filename, storage_system=None, check_exists=True)[source]

Index a file if it can be confirmed to exist

resolve_derived_references(reference_set, permissive=False)[source]

Resolves a list of linkages to UUIDs

single_index_request(index_request, token=None, refresh=False, fixity=True, permissive=False)[source]

Processes a single indexing request

sync_listing(force=False)[source]

Updates the job’s cache of archive_path contents

datacatalog.managers.pipelinejobs.indexrequest module

class datacatalog.managers.pipelinejobs.indexrequest.ArchiveIndexRequest(**kwargs)[source]

Bases: datacatalog.managers.pipelinejobs.indexrequest.IndexRequest

PARAMS = [('patterns', True, 'filters', []), ('note', False, 'note', None), ('fixity', False, 'fixity', True), ('generated_by', False, 'generated_by', []), ('level', True, 'level', '1')]
kind = 'archive'
class datacatalog.managers.pipelinejobs.indexrequest.ProductIndexRequest(**kwargs)[source]

Bases: datacatalog.managers.pipelinejobs.indexrequest.IndexRequest

PARAMS = [('patterns', True, 'filters', []), ('note', False, 'note', None), ('fixity', False, 'fixity', False), ('generated_by', False, 'generated_by', []), ('derived_from', True, 'derived_from', []), ('derived_using', False, 'derived_using', [])]
kind = 'product'
class datacatalog.managers.pipelinejobs.indexrequest.IndexType[source]

Bases: str

An named indexing type

MEMBERS = ['archive', 'product']
kind = None
patterns_field
exception datacatalog.managers.pipelinejobs.indexrequest.IndexingError[source]

Bases: Exception

An error has occurred during setup or execution of an indexing task

exception datacatalog.managers.pipelinejobs.indexrequest.InvalidIndexingRequest[source]

Bases: ValueError

An error has occurred during setup or execution of an indexing task

datacatalog.managers.pipelinejobs.indexrequest.get_index_request(**kwargs)[source]

Transform an index request dict into a typed IndexRequest object

datacatalog.managers.pipelinejobs.instanced module

class datacatalog.managers.pipelinejobs.instanced.ManagedPipelineJobInstance(mongodb, uuid, agave=None, **kwargs)[source]

Bases: datacatalog.managers.pipelinejobs.indexer.Indexer

Supports working with a existing ManagedPipelineJob

Parameters:
  • mongodb (mongo.Connection) – Connection to MongoDB with write access to jobs
  • uuid (str) – Job UUID
  • token (str) – Update token for the job
PARAMS = [('state', False, 'state', None), ('archive_path', False, 'archive_path', None), ('archive_system', False, 'archive_system', 'data-sd2e-community'), ('archive_patterns', False, 'archive_patterns', []), ('product_patterns', False, 'product_patterns', []), ('generated_by', False, 'generated_by', []), ('child_of', False, 'child_of', []), ('acted_on', False, 'acted_on', []), ('acted_using', False, 'acted_using', []), ('pipeline_uuid', False, 'pipeline_uuid', None), ('last_event', False, 'last_event', None)]
handle(event_doc, token=None)[source]

Override super().handle to process events directly rather than by name

index(token=None, transition=False, level='1', fixity=False, filters=None, permissive=False, **kwargs)[source]

Index the contents of the job’s archive path

indexed(token=None)[source]

Mark job outputs indexing as completed

datacatalog.managers.pipelinejobs.jobmanager module

class datacatalog.managers.pipelinejobs.jobmanager.JobManager(mongodb, agave=None, *args, **kwargs)[source]

Bases: datacatalog.managers.common.Manager

ADMIN_EVENTS = ['reset', 'ready', 'delete', 'purge']
PARAMS = [('archive_path', False, 'archive_path', None), ('archive_patterns', False, 'archive_patterns', []), ('archive_system', False, 'archive_system', 'data-sd2e-community'), ('pipeline_uuid', False, 'pipeline_uuid', None), ('token', False, 'token', None), ('uuid', False, 'uuid', None), ('state', False, 'state', None), ('last_event', False, 'last_event', None)]
archive_uri()[source]

Formats archive system and path into a URI

attach(token=None)[source]

Simplified proxy for load

Materializes just enough of the job to interact with it

cancel(token=None)[source]

Cancel the job, deleting it from the system

delete(token=None)[source]

Delete the job once, even if it has processed events

fail(data={}, token=None)[source]

Wrapper for fail

finish(data={}, token=None)[source]

Wrapper for finish

handle(event_name, data={}, token=None, **kwargs)[source]

Handle a named event

index(data={}, token=None, **kwargs)[source]

Wrapper for index

indexed(data={}, token=None, **kwargs)[source]

Wrapper for indexed

load(job_uuid=None, token=None)[source]

Load up an JobManager instance for the given UUID

This fuction is the opposite of setup(). It populates a minimum attribute set from the current contents of a job.

Parameters:
  • job_uuid (string) – A known job TypedUUID
  • token (string, optional) – Update token for the job
Raises:

ManagedPipelineJobError is raised on any unrecoverable error

Returns:

self

Return type:

object

ready(data={}, token=None)[source]

Wrapper for **ready*

reset(data={}, no_clear_path=False, token=None, permissive=False)[source]

Wrapper for reset

Note: This event encapsulates both the ‘reset’ and subsequent ‘ready’ event, as the resetting process needs to be thread-locked.

resource(data={}, token=None)[source]

Wrapper for resource

run(data={}, token=None)[source]

Wrapper for run

serialize_data()[source]

Serializes self.data into a minified string

setup(*args, **kwargs)[source]
update(data={}, token=None)[source]

Wrapper for update

datacatalog.managers.pipelinejobs.reactor module

class datacatalog.managers.pipelinejobs.reactor.ReactorManagedPipelineJob(reactor, data={}, *args, **kwargs)[source]

Bases: datacatalog.managers.pipelinejobs.store.ManagedPipelineJob

datacatalog.managers.pipelinejobs.store module

class datacatalog.managers.pipelinejobs.store.ManagedPipelineJob(mongodb, pipelines, instanced=False, agave=None, *args, **kwargs)[source]

Bases: datacatalog.managers.pipelinejobs.jobmanager.JobManager

Specialized PipelineJob that supports archiving to defined stores and deferred updates

  1. uuid is assigned as hash of pipeline_uuid and data during setup()
  2. archive_path is either specified in init() or generated from XXXX. The latter is the default behavior and is considered SD2 project best practice.
  3. When archive_path is generated, it is “instanced” by default. This means the root of generated path will be predictable, but will include a terminal directory adjective-animal-date for collision avoidance. Pass instanced=False to disable this behavior.
  4. A job’s linkages enable discoverability of its outputs relative to the primary experimental metadata. Consequently, they are very carefully specified. A job is generated_by one computational pipeline and is a child_of one or more measurements.
  5. The file contents of archive_path are directly associated with the job via a generated_by relationship after job runs to completion

The child_of relationship is established when the job is initialized by looking in kwargs for and resolving into a list of measurements: (measurement_id, sample_id, then experiment_id). These can be passed either as single strings or as a list of strings. Either canonical identifiers (measurement_id, sample_id, and experiment_id values) or their corresponding UUID5 values can be used. At least one of (experiment_id, sample_id, measurement_id) must be passed to init() to connect a job to its upstream metadata.

The job’s archive_path is generated as follows (if archive_path is not specified at init()): It begins with a prefix /products/v2, to which is added a compressed version of the pipeline UUID. Next, the UUIDs of the metadata associations (experiment, sample, measurement) are hashed and added to the path. Finally, contents of the job’s data key are serialized, hashed, and added to the path. The net result of this strategy is that each combination of pipeline, metadata linkage, and run-time parameterization can be uniquely referenced and is stored in a collision-proofed location on the storage resource.

Parameters:
  • mongodb (mongo.Connection) – Connection to system MongoDB with write access to jobs
  • pipelines (dict/PipelineJobsConfig) – Pipelines configuration
Keyword Arguments:
 
  • experiment_id (str/list, optional) – Identifier(s) for the experiment(s) to which the job is linked
  • sample_id (str/list, optional) – Identifer(s) for the sample(s) to which the job is linked
  • measurement_id (str/list, optional) – Identifier(s) for the measurement(s) to which the job is linked
  • data (dict, optional) – Defines the job’s parameterization.
  • archive_path (str, optional) – Override value for automatically-generated archive_path
  • archive_system (str, optional) – Override default Agave archive_system
  • archive_patterns (list, optional) – List of ArchiveIndexRequest objects
  • product_patterns (list, optional) – List of ProductIndexRequest objects
  • instanced (bool, optional) – Should archive_path be extended with a randomized session name
  • setup_archive_path (bool, optional) – Should archive_path be created at job setup?
  • session (str, optional) – A short alphanumeric correlation string
  • agave (agavepy.agave.Agave, optional) – Active TACC.cloud API client. Needed only to resolve references to Agave or Abaco entities.
Other Parameters:
 
  • agent (str, optional) – Abaco actorId or Agave appId managing the pipeline
  • archive_collection_level (str, optional) – Overrides default of measurement for aggregating outputs
  • inputs (list, optional) – Data files and references being computed upon. This supplements values of inputs discovered in data
  • generated_by – (str, optional): String UUID5 of a named process
  • pipeline_uuid (str, optional) – Overrides value of pipelines.pipeline_uuid
  • task (str, optional) – The specific instance of agent

Note

Only one of (experiment_id, sample_id, measurement_id) may be be passed when initializing an instance of ManagedPipelineJob.

COLL_PARAMS = [('measurement_id', 'measurement'), ('sample_id', 'sample'), ('experiment_id', 'experiment')]
METADATA_ARG_NAMES = ('experiment_design_id', 'experiment_id', 'sample_id', 'measurement_id')
PARAMS = [('agent', False, 'agent', None), ('task', False, 'task', None), ('session', False, 'session', None), ('data', False, 'data', {}), ('level_store', False, 'level_store', 'product'), ('archive_path', False, 'archive_path', None), ('setup_archive_path', False, 'setup_archive_path', True), ('archive_system', False, 'archive_system', 'data-sd2e-community'), ('archive_patterns', False, 'archive_patterns', []), ('product_patterns', False, 'product_patterns', []), ('uuid', False, 'uuid', None)]
agave_notifications()[source]

Returns a minimal set of Agave job notifications

build_webhook()[source]

Return a webhook to update this job via web callback

Sending event messages over HTTP POST to this webhook will result in the jobs-manager Reactor making managed updates to this job’s state.

Returns:A callback URL
Return type:str
canonicalize_actor(actor_id)[source]
canonicalize_app(app_id)[source]
canonicalize_execution(actor_id, exec_id)[source]
canonicalize_job(job_id)[source]
canonicalize_system(system_id)[source]
get_archive_path(instanced=True, *args, **kwargs)[source]

Computes and returns the document’s archive_path

instanced_directory(session=None)[source]

Extend a path with an instanced directory name

Parameters:session (str, optional) – Short alphanumeric session string
Returns:The new instance directory name
Return type:str
refs_from_data_dict(data={}, store='files')[source]

Find agave-canonical URI from data dicts

Discover late-bound links to files and references from the contents of inputs and parameters keys in a data dictionary.

Parameters:
  • data (dict) – A data dictionary
  • store (string, optional) – Which store to resolve against (files|references)
Returns:

Discovered list of managed file- or reference URIs

Return type:

list

set_archive_path(instanced=True, level_store='product', *args, **kwargs)[source]

Sets the document’s archive_path

set_callbacks()[source]

Establish the web service callbacks for the job

setup(data={})[source]

Finish initializing the manager

Args: data (dict, optional): Override or set value for job data

Returns:self
Return type:object