datacatalog.managers.pipelinejobs package¶
-
class
datacatalog.managers.pipelinejobs.ManagedPipelineJob(mongodb, pipelines, instanced=False, agave=None, *args, **kwargs)[source]¶ Bases:
datacatalog.managers.pipelinejobs.jobmanager.JobManagerSpecialized PipelineJob that supports archiving to defined stores and deferred updates
uuidis assigned as hash ofpipeline_uuidanddataduringsetup()archive_pathis either specified ininit()or generated from XXXX. The latter is the default behavior and is considered SD2 project best practice.- When
archive_pathis generated, it is “instanced” by default. This means the root of generated path will be predictable, but will include a terminal directory adjective-animal-date for collision avoidance. Passinstanced=Falseto disable this behavior. - A job’s linkages enable discoverability of its outputs relative to the primary experimental metadata. Consequently, they are very carefully specified. A job is
generated_byone computational pipeline and is achild_ofone or more measurements. - The file contents of
archive_pathare directly associated with the job via agenerated_byrelationship after job runs to completion
The
child_ofrelationship is established when the job is initialized by looking inkwargsfor and resolving into a list of measurements: (measurement_id,sample_id, thenexperiment_id). These can be passed either as single strings or as a list of strings. Either canonical identifiers (measurement_id,sample_id, andexperiment_idvalues) or their corresponding UUID5 values can be used. At least one of (experiment_id, sample_id, measurement_id) must be passed toinit()to connect a job to its upstream metadata.The job’s
archive_pathis generated as follows (ifarchive_pathis not specified atinit()): It begins with a prefix /products/v2, to which is added a compressed version of the pipeline UUID. Next, the UUIDs of the metadata associations (experiment, sample, measurement) are hashed and added to the path. Finally, contents of the job’sdatakey are serialized, hashed, and added to the path. The net result of this strategy is that each combination of pipeline, metadata linkage, and run-time parameterization can be uniquely referenced and is stored in a collision-proofed location on the storage resource.Parameters: - mongodb (mongo.Connection) – Connection to system MongoDB with write access to
jobs - pipelines (dict/PipelineJobsConfig) – Pipelines configuration
Keyword Arguments: - experiment_id (str/list, optional) – Identifier(s) for the experiment(s) to which the job is linked
- sample_id (str/list, optional) – Identifer(s) for the sample(s) to which the job is linked
- measurement_id (str/list, optional) – Identifier(s) for the measurement(s) to which the job is linked
- data (dict, optional) – Defines the job’s parameterization.
- archive_path (str, optional) – Override value for automatically-generated
archive_path - archive_system (str, optional) – Override default Agave
archive_system - archive_patterns (list, optional) – List of
ArchiveIndexRequestobjects - product_patterns (list, optional) – List of
ProductIndexRequestobjects - instanced (bool, optional) – Should
archive_pathbe extended with a randomized session name - setup_archive_path (bool, optional) – Should
archive_pathbe created at job setup? - session (str, optional) – A short alphanumeric correlation string
- agave (agavepy.agave.Agave, optional) – Active TACC.cloud API client. Needed only to resolve references to Agave or Abaco entities.
Other Parameters: - agent (str, optional) – Abaco actorId or Agave appId managing the pipeline
- archive_collection_level (str, optional) – Overrides default of
measurementfor aggregating outputs - inputs (list, optional) – Data files and references being computed upon. This supplements values of
inputsdiscovered indata - generated_by – (str, optional): String UUID5 of a named process
- pipeline_uuid (str, optional) – Overrides value of
pipelines.pipeline_uuid - task (str, optional) – The specific instance of agent
Note
Only one of (experiment_id, sample_id, measurement_id) may be be passed when initializing an instance of ManagedPipelineJob.
-
COLL_PARAMS= [('measurement_id', 'measurement'), ('sample_id', 'sample'), ('experiment_id', 'experiment')]¶
-
INIT_LINK_PARAMS= [('pipeline_uuid', True, 'uuid', None, 'pipeline', 'child_of')]¶
-
LINK_FIELDS= ['child_of', 'acted_on', 'acted_using', 'generated_by']¶
-
METADATA_ARG_NAMES= ('experiment_design_id', 'experiment_id', 'sample_id', 'measurement_id')¶
-
PARAMS= [('agent', False, 'agent', None), ('task', False, 'task', None), ('session', False, 'session', None), ('data', False, 'data', {}), ('level_store', False, 'level_store', 'product'), ('archive_path', False, 'archive_path', None), ('setup_archive_path', False, 'setup_archive_path', True), ('archive_system', False, 'archive_system', 'data-sd2e-community'), ('archive_patterns', False, 'archive_patterns', []), ('product_patterns', False, 'product_patterns', []), ('uuid', False, 'uuid', None)]¶
-
build_webhook()[source]¶ Return a webhook to update this job via web callback
Sending event messages over HTTP POST to this webhook will result in the
jobs-managerReactor making managed updates to this job’s state.Returns: A callback URL Return type: str
-
get_archive_path(instanced=True, *args, **kwargs)[source]¶ Computes and returns the document’s archive_path
-
instanced_directory(session=None)[source]¶ Extend a path with an instanced directory name
Parameters: session (str, optional) – Short alphanumeric session string Returns: The new instance directory name Return type: str
-
refs_from_data_dict(data={}, store='files')[source]¶ Find agave-canonical URI from data dicts
Discover late-bound links to files and references from the contents of
inputsandparameterskeys in adatadictionary.Parameters: - data (dict) – A data dictionary
- store (string, optional) – Which store to resolve against (files|references)
Returns: Discovered list of managed file- or reference URIs
Return type:
-
class
datacatalog.managers.pipelinejobs.ReactorManagedPipelineJob(reactor, data={}, *args, **kwargs)[source]¶ Bases:
datacatalog.managers.pipelinejobs.store.ManagedPipelineJob
-
class
datacatalog.managers.pipelinejobs.ManagedPipelineJobInstance(mongodb, uuid, agave=None, **kwargs)[source]¶ Bases:
datacatalog.managers.pipelinejobs.indexer.IndexerSupports working with a existing ManagedPipelineJob
Parameters: -
PARAMS= [('state', False, 'state', None), ('archive_path', False, 'archive_path', None), ('archive_system', False, 'archive_system', 'data-sd2e-community'), ('archive_patterns', False, 'archive_patterns', []), ('product_patterns', False, 'product_patterns', []), ('generated_by', False, 'generated_by', []), ('child_of', False, 'child_of', []), ('acted_on', False, 'acted_on', []), ('acted_using', False, 'acted_using', []), ('pipeline_uuid', False, 'pipeline_uuid', None), ('last_event', False, 'last_event', None)]¶
-
handle(event_doc, token=None)[source]¶ Override super().handle to process events directly rather than by name
-
-
exception
datacatalog.managers.pipelinejobs.ManagedPipelineJobError[source]¶ Bases:
ExceptionAn error happened in the context of a ManagedPipelineJob
Submodules¶
datacatalog.managers.pipelinejobs.config module¶
-
class
datacatalog.managers.pipelinejobs.config.PipelineJobsConfig(**kwargs)[source]¶ Bases:
datacatalog.managers.pipelinejobs.config.CorePipelinesConfigA PipelineJobs configuration.
Implements sanity on
init()checking to avoid misconfigured Pipeline Jobs agents.
-
class
datacatalog.managers.pipelinejobs.config.PipelinesConfig(**kwargs)[source]¶ Bases:
datacatalog.managers.pipelinejobs.config.CorePipelinesConfig
datacatalog.managers.pipelinejobs.exceptions module¶
datacatalog.managers.pipelinejobs.indexer module¶
-
class
datacatalog.managers.pipelinejobs.indexer.Indexer(mongodb, agave=None, *args, **kwargs)[source]¶ Bases:
datacatalog.managers.common.Manager-
file_job_relative_path(string_reference, check_exists=True)[source]¶ Resolves a filename relative to a job’s archive path as a file UUID
-
file_or_ref_identifier(string_reference)[source]¶ Resolves a string identifier into a files or reference UUID
-
index_if_exists(abs_filename, storage_system=None, check_exists=True)[source]¶ Index a file if it can be confirmed to exist
-
resolve_derived_references(reference_set, permissive=False)[source]¶ Resolves a list of linkages to UUIDs
-
datacatalog.managers.pipelinejobs.indexrequest module¶
-
class
datacatalog.managers.pipelinejobs.indexrequest.ArchiveIndexRequest(**kwargs)[source]¶ Bases:
datacatalog.managers.pipelinejobs.indexrequest.IndexRequest-
PARAMS= [('patterns', True, 'filters', []), ('note', False, 'note', None), ('fixity', False, 'fixity', True), ('generated_by', False, 'generated_by', []), ('level', True, 'level', '1')]¶
-
kind= 'archive'¶
-
-
class
datacatalog.managers.pipelinejobs.indexrequest.ProductIndexRequest(**kwargs)[source]¶ Bases:
datacatalog.managers.pipelinejobs.indexrequest.IndexRequest-
PARAMS= [('patterns', True, 'filters', []), ('note', False, 'note', None), ('fixity', False, 'fixity', False), ('generated_by', False, 'generated_by', []), ('derived_from', True, 'derived_from', []), ('derived_using', False, 'derived_using', [])]¶
-
kind= 'product'¶
-
-
class
datacatalog.managers.pipelinejobs.indexrequest.IndexType[source]¶ Bases:
strAn named indexing type
-
MEMBERS= ['archive', 'product']¶
-
kind= None¶
-
patterns_field¶
-
-
exception
datacatalog.managers.pipelinejobs.indexrequest.IndexingError[source]¶ Bases:
ExceptionAn error has occurred during setup or execution of an indexing task
-
exception
datacatalog.managers.pipelinejobs.indexrequest.InvalidIndexingRequest[source]¶ Bases:
ValueErrorAn error has occurred during setup or execution of an indexing task
datacatalog.managers.pipelinejobs.instanced module¶
-
class
datacatalog.managers.pipelinejobs.instanced.ManagedPipelineJobInstance(mongodb, uuid, agave=None, **kwargs)[source]¶ Bases:
datacatalog.managers.pipelinejobs.indexer.IndexerSupports working with a existing ManagedPipelineJob
Parameters: -
PARAMS= [('state', False, 'state', None), ('archive_path', False, 'archive_path', None), ('archive_system', False, 'archive_system', 'data-sd2e-community'), ('archive_patterns', False, 'archive_patterns', []), ('product_patterns', False, 'product_patterns', []), ('generated_by', False, 'generated_by', []), ('child_of', False, 'child_of', []), ('acted_on', False, 'acted_on', []), ('acted_using', False, 'acted_using', []), ('pipeline_uuid', False, 'pipeline_uuid', None), ('last_event', False, 'last_event', None)]¶
-
handle(event_doc, token=None)[source]¶ Override super().handle to process events directly rather than by name
-
datacatalog.managers.pipelinejobs.jobmanager module¶
-
class
datacatalog.managers.pipelinejobs.jobmanager.JobManager(mongodb, agave=None, *args, **kwargs)[source]¶ Bases:
datacatalog.managers.common.Manager-
ADMIN_EVENTS= ['reset', 'ready', 'delete', 'purge']¶
-
PARAMS= [('archive_path', False, 'archive_path', None), ('archive_patterns', False, 'archive_patterns', []), ('archive_system', False, 'archive_system', 'data-sd2e-community'), ('pipeline_uuid', False, 'pipeline_uuid', None), ('token', False, 'token', None), ('uuid', False, 'uuid', None), ('state', False, 'state', None), ('last_event', False, 'last_event', None)]¶
-
attach(token=None)[source]¶ Simplified proxy for load
Materializes just enough of the job to interact with it
-
load(job_uuid=None, token=None)[source]¶ Load up an JobManager instance for the given UUID
This fuction is the opposite of
setup(). It populates a minimum attribute set from the current contents of a job.Parameters: - job_uuid (string) – A known job TypedUUID
- token (string, optional) – Update token for the job
Raises: ManagedPipelineJobError is raised on any unrecoverable error
Returns: selfReturn type:
-
datacatalog.managers.pipelinejobs.reactor module¶
-
class
datacatalog.managers.pipelinejobs.reactor.ReactorManagedPipelineJob(reactor, data={}, *args, **kwargs)[source]¶ Bases:
datacatalog.managers.pipelinejobs.store.ManagedPipelineJob
datacatalog.managers.pipelinejobs.store module¶
-
class
datacatalog.managers.pipelinejobs.store.ManagedPipelineJob(mongodb, pipelines, instanced=False, agave=None, *args, **kwargs)[source]¶ Bases:
datacatalog.managers.pipelinejobs.jobmanager.JobManagerSpecialized PipelineJob that supports archiving to defined stores and deferred updates
uuidis assigned as hash ofpipeline_uuidanddataduringsetup()archive_pathis either specified ininit()or generated from XXXX. The latter is the default behavior and is considered SD2 project best practice.- When
archive_pathis generated, it is “instanced” by default. This means the root of generated path will be predictable, but will include a terminal directory adjective-animal-date for collision avoidance. Passinstanced=Falseto disable this behavior. - A job’s linkages enable discoverability of its outputs relative to the primary experimental metadata. Consequently, they are very carefully specified. A job is
generated_byone computational pipeline and is achild_ofone or more measurements. - The file contents of
archive_pathare directly associated with the job via agenerated_byrelationship after job runs to completion
The
child_ofrelationship is established when the job is initialized by looking inkwargsfor and resolving into a list of measurements: (measurement_id,sample_id, thenexperiment_id). These can be passed either as single strings or as a list of strings. Either canonical identifiers (measurement_id,sample_id, andexperiment_idvalues) or their corresponding UUID5 values can be used. At least one of (experiment_id, sample_id, measurement_id) must be passed toinit()to connect a job to its upstream metadata.The job’s
archive_pathis generated as follows (ifarchive_pathis not specified atinit()): It begins with a prefix /products/v2, to which is added a compressed version of the pipeline UUID. Next, the UUIDs of the metadata associations (experiment, sample, measurement) are hashed and added to the path. Finally, contents of the job’sdatakey are serialized, hashed, and added to the path. The net result of this strategy is that each combination of pipeline, metadata linkage, and run-time parameterization can be uniquely referenced and is stored in a collision-proofed location on the storage resource.Parameters: - mongodb (mongo.Connection) – Connection to system MongoDB with write access to
jobs - pipelines (dict/PipelineJobsConfig) – Pipelines configuration
Keyword Arguments: - experiment_id (str/list, optional) – Identifier(s) for the experiment(s) to which the job is linked
- sample_id (str/list, optional) – Identifer(s) for the sample(s) to which the job is linked
- measurement_id (str/list, optional) – Identifier(s) for the measurement(s) to which the job is linked
- data (dict, optional) – Defines the job’s parameterization.
- archive_path (str, optional) – Override value for automatically-generated
archive_path - archive_system (str, optional) – Override default Agave
archive_system - archive_patterns (list, optional) – List of
ArchiveIndexRequestobjects - product_patterns (list, optional) – List of
ProductIndexRequestobjects - instanced (bool, optional) – Should
archive_pathbe extended with a randomized session name - setup_archive_path (bool, optional) – Should
archive_pathbe created at job setup? - session (str, optional) – A short alphanumeric correlation string
- agave (agavepy.agave.Agave, optional) – Active TACC.cloud API client. Needed only to resolve references to Agave or Abaco entities.
Other Parameters: - agent (str, optional) – Abaco actorId or Agave appId managing the pipeline
- archive_collection_level (str, optional) – Overrides default of
measurementfor aggregating outputs - inputs (list, optional) – Data files and references being computed upon. This supplements values of
inputsdiscovered indata - generated_by – (str, optional): String UUID5 of a named process
- pipeline_uuid (str, optional) – Overrides value of
pipelines.pipeline_uuid - task (str, optional) – The specific instance of agent
Note
Only one of (experiment_id, sample_id, measurement_id) may be be passed when initializing an instance of ManagedPipelineJob.
-
COLL_PARAMS= [('measurement_id', 'measurement'), ('sample_id', 'sample'), ('experiment_id', 'experiment')]¶
-
INIT_LINK_PARAMS= [('pipeline_uuid', True, 'uuid', None, 'pipeline', 'child_of')]¶
-
LINK_FIELDS= ['child_of', 'acted_on', 'acted_using', 'generated_by']¶
-
METADATA_ARG_NAMES= ('experiment_design_id', 'experiment_id', 'sample_id', 'measurement_id')¶
-
PARAMS= [('agent', False, 'agent', None), ('task', False, 'task', None), ('session', False, 'session', None), ('data', False, 'data', {}), ('level_store', False, 'level_store', 'product'), ('archive_path', False, 'archive_path', None), ('setup_archive_path', False, 'setup_archive_path', True), ('archive_system', False, 'archive_system', 'data-sd2e-community'), ('archive_patterns', False, 'archive_patterns', []), ('product_patterns', False, 'product_patterns', []), ('uuid', False, 'uuid', None)]¶
-
build_webhook()[source]¶ Return a webhook to update this job via web callback
Sending event messages over HTTP POST to this webhook will result in the
jobs-managerReactor making managed updates to this job’s state.Returns: A callback URL Return type: str
-
get_archive_path(instanced=True, *args, **kwargs)[source]¶ Computes and returns the document’s archive_path
-
instanced_directory(session=None)[source]¶ Extend a path with an instanced directory name
Parameters: session (str, optional) – Short alphanumeric session string Returns: The new instance directory name Return type: str
-
refs_from_data_dict(data={}, store='files')[source]¶ Find agave-canonical URI from data dicts
Discover late-bound links to files and references from the contents of
inputsandparameterskeys in adatadictionary.Parameters: - data (dict) – A data dictionary
- store (string, optional) – Which store to resolve against (files|references)
Returns: Discovered list of managed file- or reference URIs
Return type: