datacatalog.managers.pipelinejobs package¶
-
class
datacatalog.managers.pipelinejobs.
ManagedPipelineJob
(mongodb, pipelines, instanced=False, agave=None, *args, **kwargs)[source]¶ Bases:
datacatalog.managers.pipelinejobs.jobmanager.JobManager
Specialized PipelineJob that supports archiving to defined stores and deferred updates
uuid
is assigned as hash ofpipeline_uuid
anddata
duringsetup()
archive_path
is either specified ininit()
or generated from XXXX. The latter is the default behavior and is considered SD2 project best practice.- When
archive_path
is generated, it is “instanced” by default. This means the root of generated path will be predictable, but will include a terminal directory adjective-animal-date for collision avoidance. Passinstanced=False
to disable this behavior. - A job’s linkages enable discoverability of its outputs relative to the primary experimental metadata. Consequently, they are very carefully specified. A job is
generated_by
one computational pipeline and is achild_of
one or more measurements. - The file contents of
archive_path
are directly associated with the job via agenerated_by
relationship after job runs to completion
The
child_of
relationship is established when the job is initialized by looking inkwargs
for and resolving into a list of measurements: (measurement_id
,sample_id
, thenexperiment_id
). These can be passed either as single strings or as a list of strings. Either canonical identifiers (measurement_id
,sample_id
, andexperiment_id
values) or their corresponding UUID5 values can be used. At least one of (experiment_id, sample_id, measurement_id) must be passed toinit()
to connect a job to its upstream metadata.The job’s
archive_path
is generated as follows (ifarchive_path
is not specified atinit()
): It begins with a prefix /products/v2, to which is added a compressed version of the pipeline UUID. Next, the UUIDs of the metadata associations (experiment, sample, measurement) are hashed and added to the path. Finally, contents of the job’sdata
key are serialized, hashed, and added to the path. The net result of this strategy is that each combination of pipeline, metadata linkage, and run-time parameterization can be uniquely referenced and is stored in a collision-proofed location on the storage resource.Parameters: - mongodb (mongo.Connection) – Connection to system MongoDB with write access to
jobs
- pipelines (dict/PipelineJobsConfig) – Pipelines configuration
Keyword Arguments: - experiment_id (str/list, optional) – Identifier(s) for the experiment(s) to which the job is linked
- sample_id (str/list, optional) – Identifer(s) for the sample(s) to which the job is linked
- measurement_id (str/list, optional) – Identifier(s) for the measurement(s) to which the job is linked
- data (dict, optional) – Defines the job’s parameterization.
- archive_path (str, optional) – Override value for automatically-generated
archive_path
- archive_system (str, optional) – Override default Agave
archive_system
- archive_patterns (list, optional) – List of
ArchiveIndexRequest
objects - product_patterns (list, optional) – List of
ProductIndexRequest
objects - instanced (bool, optional) – Should
archive_path
be extended with a randomized session name - setup_archive_path (bool, optional) – Should
archive_path
be created at job setup? - session (str, optional) – A short alphanumeric correlation string
- agave (agavepy.agave.Agave, optional) – Active TACC.cloud API client. Needed only to resolve references to Agave or Abaco entities.
Other Parameters: - agent (str, optional) – Abaco actorId or Agave appId managing the pipeline
- archive_collection_level (str, optional) – Overrides default of
measurement
for aggregating outputs - inputs (list, optional) – Data files and references being computed upon. This supplements values of
inputs
discovered indata
- generated_by – (str, optional): String UUID5 of a named process
- pipeline_uuid (str, optional) – Overrides value of
pipelines.pipeline_uuid
- task (str, optional) – The specific instance of agent
Note
Only one of (experiment_id, sample_id, measurement_id) may be be passed when initializing an instance of ManagedPipelineJob.
-
COLL_PARAMS
= [('measurement_id', 'measurement'), ('sample_id', 'sample'), ('experiment_id', 'experiment')]¶
-
INIT_LINK_PARAMS
= [('pipeline_uuid', True, 'uuid', None, 'pipeline', 'child_of')]¶
-
LINK_FIELDS
= ['child_of', 'acted_on', 'acted_using', 'generated_by']¶
-
METADATA_ARG_NAMES
= ('experiment_design_id', 'experiment_id', 'sample_id', 'measurement_id')¶
-
PARAMS
= [('agent', False, 'agent', None), ('task', False, 'task', None), ('session', False, 'session', None), ('data', False, 'data', {}), ('level_store', False, 'level_store', 'product'), ('archive_path', False, 'archive_path', None), ('setup_archive_path', False, 'setup_archive_path', True), ('archive_system', False, 'archive_system', 'data-sd2e-community'), ('archive_patterns', False, 'archive_patterns', []), ('product_patterns', False, 'product_patterns', []), ('uuid', False, 'uuid', None)]¶
-
build_webhook
()[source]¶ Return a webhook to update this job via web callback
Sending event messages over HTTP POST to this webhook will result in the
jobs-manager
Reactor making managed updates to this job’s state.Returns: A callback URL Return type: str
-
get_archive_path
(instanced=True, *args, **kwargs)[source]¶ Computes and returns the document’s archive_path
-
instanced_directory
(session=None)[source]¶ Extend a path with an instanced directory name
Parameters: session (str, optional) – Short alphanumeric session string Returns: The new instance directory name Return type: str
-
refs_from_data_dict
(data={}, store='files')[source]¶ Find agave-canonical URI from data dicts
Discover late-bound links to files and references from the contents of
inputs
andparameters
keys in adata
dictionary.Parameters: - data (dict) – A data dictionary
- store (string, optional) – Which store to resolve against (files|references)
Returns: Discovered list of managed file- or reference URIs
Return type:
-
class
datacatalog.managers.pipelinejobs.
ReactorManagedPipelineJob
(reactor, data={}, *args, **kwargs)[source]¶ Bases:
datacatalog.managers.pipelinejobs.store.ManagedPipelineJob
-
class
datacatalog.managers.pipelinejobs.
ManagedPipelineJobInstance
(mongodb, uuid, agave=None, **kwargs)[source]¶ Bases:
datacatalog.managers.pipelinejobs.indexer.Indexer
Supports working with a existing ManagedPipelineJob
Parameters: -
PARAMS
= [('state', False, 'state', None), ('archive_path', False, 'archive_path', None), ('archive_system', False, 'archive_system', 'data-sd2e-community'), ('archive_patterns', False, 'archive_patterns', []), ('product_patterns', False, 'product_patterns', []), ('generated_by', False, 'generated_by', []), ('child_of', False, 'child_of', []), ('acted_on', False, 'acted_on', []), ('acted_using', False, 'acted_using', []), ('pipeline_uuid', False, 'pipeline_uuid', None), ('last_event', False, 'last_event', None)]¶
-
handle
(event_doc, token=None)[source]¶ Override super().handle to process events directly rather than by name
-
-
exception
datacatalog.managers.pipelinejobs.
ManagedPipelineJobError
[source]¶ Bases:
Exception
An error happened in the context of a ManagedPipelineJob
Submodules¶
datacatalog.managers.pipelinejobs.config module¶
-
class
datacatalog.managers.pipelinejobs.config.
PipelineJobsConfig
(**kwargs)[source]¶ Bases:
datacatalog.managers.pipelinejobs.config.CorePipelinesConfig
A PipelineJobs configuration.
Implements sanity on
init()
checking to avoid misconfigured Pipeline Jobs agents.
-
class
datacatalog.managers.pipelinejobs.config.
PipelinesConfig
(**kwargs)[source]¶ Bases:
datacatalog.managers.pipelinejobs.config.CorePipelinesConfig
datacatalog.managers.pipelinejobs.exceptions module¶
datacatalog.managers.pipelinejobs.indexer module¶
-
class
datacatalog.managers.pipelinejobs.indexer.
Indexer
(mongodb, agave=None, *args, **kwargs)[source]¶ Bases:
datacatalog.managers.common.Manager
-
file_job_relative_path
(string_reference, check_exists=True)[source]¶ Resolves a filename relative to a job’s archive path as a file UUID
-
file_or_ref_identifier
(string_reference)[source]¶ Resolves a string identifier into a files or reference UUID
-
index_if_exists
(abs_filename, storage_system=None, check_exists=True)[source]¶ Index a file if it can be confirmed to exist
-
resolve_derived_references
(reference_set, permissive=False)[source]¶ Resolves a list of linkages to UUIDs
-
datacatalog.managers.pipelinejobs.indexrequest module¶
-
class
datacatalog.managers.pipelinejobs.indexrequest.
ArchiveIndexRequest
(**kwargs)[source]¶ Bases:
datacatalog.managers.pipelinejobs.indexrequest.IndexRequest
-
PARAMS
= [('patterns', True, 'filters', []), ('note', False, 'note', None), ('fixity', False, 'fixity', True), ('generated_by', False, 'generated_by', []), ('level', True, 'level', '1')]¶
-
kind
= 'archive'¶
-
-
class
datacatalog.managers.pipelinejobs.indexrequest.
ProductIndexRequest
(**kwargs)[source]¶ Bases:
datacatalog.managers.pipelinejobs.indexrequest.IndexRequest
-
PARAMS
= [('patterns', True, 'filters', []), ('note', False, 'note', None), ('fixity', False, 'fixity', False), ('generated_by', False, 'generated_by', []), ('derived_from', True, 'derived_from', []), ('derived_using', False, 'derived_using', [])]¶
-
kind
= 'product'¶
-
-
class
datacatalog.managers.pipelinejobs.indexrequest.
IndexType
[source]¶ Bases:
str
An named indexing type
-
MEMBERS
= ['archive', 'product']¶
-
kind
= None¶
-
patterns_field
¶
-
-
exception
datacatalog.managers.pipelinejobs.indexrequest.
IndexingError
[source]¶ Bases:
Exception
An error has occurred during setup or execution of an indexing task
-
exception
datacatalog.managers.pipelinejobs.indexrequest.
InvalidIndexingRequest
[source]¶ Bases:
ValueError
An error has occurred during setup or execution of an indexing task
datacatalog.managers.pipelinejobs.instanced module¶
-
class
datacatalog.managers.pipelinejobs.instanced.
ManagedPipelineJobInstance
(mongodb, uuid, agave=None, **kwargs)[source]¶ Bases:
datacatalog.managers.pipelinejobs.indexer.Indexer
Supports working with a existing ManagedPipelineJob
Parameters: -
PARAMS
= [('state', False, 'state', None), ('archive_path', False, 'archive_path', None), ('archive_system', False, 'archive_system', 'data-sd2e-community'), ('archive_patterns', False, 'archive_patterns', []), ('product_patterns', False, 'product_patterns', []), ('generated_by', False, 'generated_by', []), ('child_of', False, 'child_of', []), ('acted_on', False, 'acted_on', []), ('acted_using', False, 'acted_using', []), ('pipeline_uuid', False, 'pipeline_uuid', None), ('last_event', False, 'last_event', None)]¶
-
handle
(event_doc, token=None)[source]¶ Override super().handle to process events directly rather than by name
-
datacatalog.managers.pipelinejobs.jobmanager module¶
-
class
datacatalog.managers.pipelinejobs.jobmanager.
JobManager
(mongodb, agave=None, *args, **kwargs)[source]¶ Bases:
datacatalog.managers.common.Manager
-
ADMIN_EVENTS
= ['reset', 'ready', 'delete', 'purge']¶
-
PARAMS
= [('archive_path', False, 'archive_path', None), ('archive_patterns', False, 'archive_patterns', []), ('archive_system', False, 'archive_system', 'data-sd2e-community'), ('pipeline_uuid', False, 'pipeline_uuid', None), ('token', False, 'token', None), ('uuid', False, 'uuid', None), ('state', False, 'state', None), ('last_event', False, 'last_event', None)]¶
-
attach
(token=None)[source]¶ Simplified proxy for load
Materializes just enough of the job to interact with it
-
load
(job_uuid=None, token=None)[source]¶ Load up an JobManager instance for the given UUID
This fuction is the opposite of
setup()
. It populates a minimum attribute set from the current contents of a job.Parameters: - job_uuid (string) – A known job TypedUUID
- token (string, optional) – Update token for the job
Raises: ManagedPipelineJobError is raised on any unrecoverable error
Returns: self
Return type:
-
datacatalog.managers.pipelinejobs.reactor module¶
-
class
datacatalog.managers.pipelinejobs.reactor.
ReactorManagedPipelineJob
(reactor, data={}, *args, **kwargs)[source]¶ Bases:
datacatalog.managers.pipelinejobs.store.ManagedPipelineJob
datacatalog.managers.pipelinejobs.store module¶
-
class
datacatalog.managers.pipelinejobs.store.
ManagedPipelineJob
(mongodb, pipelines, instanced=False, agave=None, *args, **kwargs)[source]¶ Bases:
datacatalog.managers.pipelinejobs.jobmanager.JobManager
Specialized PipelineJob that supports archiving to defined stores and deferred updates
uuid
is assigned as hash ofpipeline_uuid
anddata
duringsetup()
archive_path
is either specified ininit()
or generated from XXXX. The latter is the default behavior and is considered SD2 project best practice.- When
archive_path
is generated, it is “instanced” by default. This means the root of generated path will be predictable, but will include a terminal directory adjective-animal-date for collision avoidance. Passinstanced=False
to disable this behavior. - A job’s linkages enable discoverability of its outputs relative to the primary experimental metadata. Consequently, they are very carefully specified. A job is
generated_by
one computational pipeline and is achild_of
one or more measurements. - The file contents of
archive_path
are directly associated with the job via agenerated_by
relationship after job runs to completion
The
child_of
relationship is established when the job is initialized by looking inkwargs
for and resolving into a list of measurements: (measurement_id
,sample_id
, thenexperiment_id
). These can be passed either as single strings or as a list of strings. Either canonical identifiers (measurement_id
,sample_id
, andexperiment_id
values) or their corresponding UUID5 values can be used. At least one of (experiment_id, sample_id, measurement_id) must be passed toinit()
to connect a job to its upstream metadata.The job’s
archive_path
is generated as follows (ifarchive_path
is not specified atinit()
): It begins with a prefix /products/v2, to which is added a compressed version of the pipeline UUID. Next, the UUIDs of the metadata associations (experiment, sample, measurement) are hashed and added to the path. Finally, contents of the job’sdata
key are serialized, hashed, and added to the path. The net result of this strategy is that each combination of pipeline, metadata linkage, and run-time parameterization can be uniquely referenced and is stored in a collision-proofed location on the storage resource.Parameters: - mongodb (mongo.Connection) – Connection to system MongoDB with write access to
jobs
- pipelines (dict/PipelineJobsConfig) – Pipelines configuration
Keyword Arguments: - experiment_id (str/list, optional) – Identifier(s) for the experiment(s) to which the job is linked
- sample_id (str/list, optional) – Identifer(s) for the sample(s) to which the job is linked
- measurement_id (str/list, optional) – Identifier(s) for the measurement(s) to which the job is linked
- data (dict, optional) – Defines the job’s parameterization.
- archive_path (str, optional) – Override value for automatically-generated
archive_path
- archive_system (str, optional) – Override default Agave
archive_system
- archive_patterns (list, optional) – List of
ArchiveIndexRequest
objects - product_patterns (list, optional) – List of
ProductIndexRequest
objects - instanced (bool, optional) – Should
archive_path
be extended with a randomized session name - setup_archive_path (bool, optional) – Should
archive_path
be created at job setup? - session (str, optional) – A short alphanumeric correlation string
- agave (agavepy.agave.Agave, optional) – Active TACC.cloud API client. Needed only to resolve references to Agave or Abaco entities.
Other Parameters: - agent (str, optional) – Abaco actorId or Agave appId managing the pipeline
- archive_collection_level (str, optional) – Overrides default of
measurement
for aggregating outputs - inputs (list, optional) – Data files and references being computed upon. This supplements values of
inputs
discovered indata
- generated_by – (str, optional): String UUID5 of a named process
- pipeline_uuid (str, optional) – Overrides value of
pipelines.pipeline_uuid
- task (str, optional) – The specific instance of agent
Note
Only one of (experiment_id, sample_id, measurement_id) may be be passed when initializing an instance of ManagedPipelineJob.
-
COLL_PARAMS
= [('measurement_id', 'measurement'), ('sample_id', 'sample'), ('experiment_id', 'experiment')]¶
-
INIT_LINK_PARAMS
= [('pipeline_uuid', True, 'uuid', None, 'pipeline', 'child_of')]¶
-
LINK_FIELDS
= ['child_of', 'acted_on', 'acted_using', 'generated_by']¶
-
METADATA_ARG_NAMES
= ('experiment_design_id', 'experiment_id', 'sample_id', 'measurement_id')¶
-
PARAMS
= [('agent', False, 'agent', None), ('task', False, 'task', None), ('session', False, 'session', None), ('data', False, 'data', {}), ('level_store', False, 'level_store', 'product'), ('archive_path', False, 'archive_path', None), ('setup_archive_path', False, 'setup_archive_path', True), ('archive_system', False, 'archive_system', 'data-sd2e-community'), ('archive_patterns', False, 'archive_patterns', []), ('product_patterns', False, 'product_patterns', []), ('uuid', False, 'uuid', None)]¶
-
build_webhook
()[source]¶ Return a webhook to update this job via web callback
Sending event messages over HTTP POST to this webhook will result in the
jobs-manager
Reactor making managed updates to this job’s state.Returns: A callback URL Return type: str
-
get_archive_path
(instanced=True, *args, **kwargs)[source]¶ Computes and returns the document’s archive_path
-
instanced_directory
(session=None)[source]¶ Extend a path with an instanced directory name
Parameters: session (str, optional) – Short alphanumeric session string Returns: The new instance directory name Return type: str
-
refs_from_data_dict
(data={}, store='files')[source]¶ Find agave-canonical URI from data dicts
Discover late-bound links to files and references from the contents of
inputs
andparameters
keys in adata
dictionary.Parameters: - data (dict) – A data dictionary
- store (string, optional) – Which store to resolve against (files|references)
Returns: Discovered list of managed file- or reference URIs
Return type: