ManagedPipelineJob: A Deep Dive¶
This document will demonstrate the specifics of parameterizing a
ManagedPipelineJob
. Detailed descriptions of its parameterization options
can be found in its Python API docs.
The same basic parameterization applies for ReactorManagedPipelineJob
as it
is a subclass of ManagedPipelineJob
.
Contents
Overview¶
The essential function of the ManagedPipelineJob
class is to link the Data
Catalog representation of a PipelineJob with its Pipeline, inputs,
references, and relevant experimental metadata.
A foundation for this linkage between data and compute is the Archive functionality in the Agave Jobs API. Briefly, an Agave’s job’s archive path is a combination of Agave storage system and absolute path where the job’s outputs are copied to after successful execution of job code. We extend this by minting a distinct archive path for each PipelineJob based on our knowledge of the job’s attributes. This lets us know that any files found under said path were generated by that PipelineJob.
When a PipelineJob is created, its parameters are inspected to establish
which pipeline generated it and whether it refers to any managed file or
reference entities. These connections provide additional context to the
job. To connect the nascent metadata subgraph surrounding the job to the
experimental metadata, we connect the job to one or more experiments,
samples, or measurements using the child_of
linkage. This allows very
straightforward queries to be constructed that return all the jobs (and
thus their archive paths and other metadata) that were run using data from
specific experiments, samples, or measurements.
Additional metadata associations at the level of individual output files is managed by processes that run after the job has completed. However, information about how those associations are made can be specified to the job, and the Pipelines framework will ensure that they happen.
There are two kinds of indexing which tie outputs to the rest of the metadata
catalog. The first is a simple scheme that runs at the end of every job which
creates (if it doens’t exist) a files record for files found under the
job’s archive path. These files are designated as being generated_by
the
job. The other kind of indexing is not yet implemented, but will soon connect
individual file records to their source data files and references by way of the
derived_from
and derived_using
linkages.
Setup¶
Several example parameterizations, addressing specific use cases, are presented as interactive Python sessions. If you wish to work through them on your own, you will need to set up your environment and initialize your session.
Prepare your environment¶
The project Makefile includes targets for building, testing, and working with
the python-datacatalog package. A subset are used to prepare for this
session. You only need to do these steps if you’re working through the
code samples on your own from a checkout of the python-datacatalog
repo.
$ make virtualenv
virtualenv env
...
..
.
Successfully installed agavepy-0.7.3 jsonschema-3.0.0a3
$ make mongo-up
cd docker && docker-compose up -d --force-recreate --quiet-pull
Creating network "docker_mongotest" with the default driver
Creating docker_mongodb_1 ... done
$ make bootstrap-tests
python -m bootstrap.create_database
__main__ - DEBUG - Reading project config
...
..
.
manage_files.py.INFO: Registered /uploads/tacc/example/123.txt
$ make user-smoketests
Initialize an interactive python-datacatalog session¶
The python-datacatalog is usually used in a scripting context, but its main classes can easily be instantiated in an interactive Python sesssion.
>>> from settings import settings
>>> from datacatalog.managers.pipelinejobs import ManagedPipelineJob as Job
>>> experiments = ['experiment.tacc.10001']
>>> samples = ['sample.tacc.20001']
>>> measurements1 = ['measurement.tacc.0xDEADBEF1']
>>> measurements2 = ['10483e8d-6602-532a-8941-176ce20dd05a', 'measurement.tacc.0xDEADBEF0']
>>> measurements3 = ['measurement.tacc.0xDEADBEEF', 'measurement.tacc.0xDEADBEF0']
>>> data_w_inputs = {'alpha': 0.5, 'inputs': ['agave://data-sd2e-community/uploads/tacc/example/345.txt'], 'parameters': {'ref1': 'agave://data-sd2e-community/reference/novel_chassis/uma_refs/MG1655_WT/MG1655_WT.fa'}}
Attention
The Python imports, variable names, and variable values from
this code block are implicit in the worked examples. For example, when
measurements1
is used, it always refers to ['measurement.tacc.0xDEADBEEF', 'measurement.tacc.0xDEADBEF0']
. You can always directly set experiment_id
, measurement_id
, or
sample_id
to the desired values.
UUID, Parentage, and Archive Path¶
A ManagedPipelineJob
is configured by passing keyword arguments at
instantiation: i.e.) ManagedPipelineJob(mongodb, pipelines, <param1=value1>...)
.
The combination and value of these arguments establish the UUID, metadata
linkage, and archive path for the PipelineJob. This is entirely
deterministic, which will be demonstrated in the examples below.
For more details on what arguments are available (and how to use them), please consult the ManagedPipelineJob API documentaion.
Measurement(s)¶
If one passes one or more values for measurement_id
:
- Job UUID is based on pipeline, those measurements, and an empty
data
- Job is a child of the specified measurement(s)
- Job archive path reflects the combination of measurement(s)
>>> mpj2 = Job(settings.mongodb, settings.pipelines, measurement_id=measurements2)
>>> mpj2.setup()
{}
uuid : 107596b8-25b2-557d-9702-853f0690c576
pipeline_uuid : 106c46ff-8186-5756-a934-071f4497b58d
data : {}
child_of : ['1041ab3f-5221-5c79-8781-8838dfb6eef9']
generated_by : ['106c46ff-8186-5756-a934-071f4497b58d']
acted_on : []
acted_using : []
archive_uri: agave://data-sd2e-community/products/v2/106c46ff81865756a934071f4497b58d/kZpoopq2GBKYy6DQ9pqqNbk8/PAVpwrObxp5YjYRvrJOd5yVp
Note the contents of child_of
- a single, measurement UUID referencing the
specified measurement_id value. Note also the second component of the archive
path kZpoopq2GBKYy6DQ9pqqNbk8
, which is unique to this measurement. Now
look at what happens with multiple measurements.
>>> mpj2 = Job(settings.mongodb, settings.pipelines, measurement_id=measurements2)
>>> mpj2.setup()
uuid : 10704831-8017-5e39-a4d7-dd73a08b4a4b
pipeline_uuid : 106c46ff-8186-5756-a934-071f4497b58d
data : {}
child_of : ['10483e8d-6602-532a-8941-176ce20dd05a', '104dae4d-a677-5991-ae1c-696d2ee9884e']
generated_by : ['106c46ff-8186-5756-a934-071f4497b58d']
acted_on : []
acted_using : []
archive_uri: agave://data-sd2e-community/products/v2/106c46ff81865756a934071f4497b58d/3pGOjjEYq3BZ2DJ6e0jLQqWb/PAVpwrObxp5YjYRvrJOd5yVp
Now, the UUID is different, as is the archive path, and the job is linked as a child of both measurements.
Sample¶
Passing in sample or samples works the same way. In the following case:
- Job UUID is based on pipeline, the sample, and an empty
data
- Job is a child of the the sample,
- Job archive path reflects the sample
>>> mpk = Job(settings.mongodb, settings.pipelines, sample_id=samples)
>>> mpk.setup()
uuid : 107fd3cf-23c6-522f-9270-19932d06def4
pipeline_uuid : 106c46ff-8186-5756-a934-071f4497b58d
data : {}
child_of : ['103246e1-bcdf-5b6e-a8dc-4c7e81b91141']
generated_by : ['106c46ff-8186-5756-a934-071f4497b58d']
acted_on : []
acted_using : []
archive_uri: agave://data-sd2e-community/products/v2/106c46ff81865756a934071f4497b58d/kZgygQV2EDAAkDLRzrep1gO2/PAVpwrObxp5YjYRvrJOd5yVp
The child_of linkage now points to a sample UUID.
Experiment¶
Finally, specifying the experiment or experiments works the same way as for samples and measurements. Thus, in this example:
- Job UUID is based on pipeline, the experiment(s), and an empty
data
- Job is a child of the designated experiment(s)
- Job archive path reflects the specific experiment(s)
>>> mpr = Job(settings.mongodb, settings.pipelines, experiment_id=experiments)
>>> mpr.setup()
uuid : 107a90b7-f083-5167-a63d-41a3bf278bf8
pipeline_uuid : 106c46ff-8186-5756-a934-071f4497b58d
data : {}
child_of : ['102e95e6-67a8-5a06-9484-3131c6907890']
generated_by : ['106c46ff-8186-5756-a934-071f4497b58d']
acted_on : []
acted_using : []
archive_uri: agave://data-sd2e-community/products/v2/106c46ff81865756a934071f4497b58d/Db6rzKZnnyA8E5qvwvxjpwZ4/PAVpwrObxp5YjYRvrJOd5yVp
Parameterization using the “data” object¶
The contents of the data
keyword argument are attached verbatim to
PipelineJob.data
, and it is also used to establish the terminal directory
in the archive path.
>>> mpn = Job(settings.mongodb, settings.pipelines, sample_id=samples, experiment_id=experiments, data={'alpha': 0.5})
>>> mpn.setup()
uuid : 1077a959-7163-516a-8fc1-dd11bd958190
pipeline_uuid : 106c46ff-8186-5756-a934-071f4497b58d
data : {'alpha': 0.5}
child_of : ['102e95e6-67a8-5a06-9484-3131c6907890']
generated_by : ['106c46ff-8186-5756-a934-071f4497b58d']
acted_on : []
acted_using : []
archive_uri: agave://data-sd2e-community/products/v2/106c46ff81865756a934071f4497b58d/Db6rzKZnnyA8E5qvwvxjpwZ4/0p5yeV3VR3OELzgoJ5kk6Yxw
The UUID is different than in mpr example above, as is the name of the last
directory in the archive path, but the first and second level of the archive
path are identical. Let us try changing the value of alpha
in the data
object and see what happens…
>>> mpn = Job(settings.mongodb, settings.pipelines, sample_id=samples, measurement_id=measurements2, data={'alpha': 0.6})
>>> mpn.setup()
uuid : 107e7a19-6d8a-540d-954c-09836d67de49
pipeline_uuid : 106c46ff-8186-5756-a934-071f4497b58d
data : {'alpha': 0.6}
child_of : ['102e95e6-67a8-5a06-9484-3131c6907890']
generated_by : ['106c46ff-8186-5756-a934-071f4497b58d']
acted_on : []
acted_using : []
archive_uri: agave://data-sd2e-community/products/v2/106c46ff81865756a934071f4497b58d/Db6rzKZnnyA8E5qvwvxjpwZ4/3pGLppQE69r3Z36EY3jlxxpN
The job’s UUID and terminal directory name have changed. This illustrates the central tenet of the PipelineJob design, which is that processing a specific set of experiments (or samples/measurements) using a specific pipeline and compute parameters yields a distinct job with its own unique storage location.
References and Files¶
The contents of the data
object are not constrained. However, if one
includes the keys inputs
or parameters
, the system attempts to resolve
the provided values to known reference or file records.
>>> mpo = Job(settings.mongodb, settings.pipelines, sample_id=samples, data=data_w_inputs)
>>> mpo.setup()
uuid : 1077bb63-d586-5519-a576-33b5d9f85d40
pipeline_uuid : 106c46ff-8186-5756-a934-071f4497b58d
data : {'alpha': 0.5, 'parameters': {'ref1': 'agave://data-sd2e-community/reference/novel_chassis/uma_refs/MG1655_WT/MG1655_WT.fa'}, 'inputs': ['agave://data-sd2e-community/uploads/tacc/example/345.txt']}
child_of : ['103246e1-bcdf-5b6e-a8dc-4c7e81b91141']
generated_by : ['106c46ff-8186-5756-a934-071f4497b58d']
acted_on : ['105fb204-530b-5915-9fd6-caf88ca9ad8a']
acted_using : ['1099ee04-0412-5566-bb4d-0efc2af3eea3']
archive_uri: agave://data-sd2e-community/products/v2/106c46ff81865756a934071f4497b58d/kZgygQV2EDAAkDLRzrep1gO2/RbQyWyezlxlvXOYeG81qVbG4
The reference asset (MG1655_WT.fa
) was identified and associated via
acted_using
, while the file asset was associated via acted_on
.
Interpretable inputs and parameters can be included in data
in three JSON
formats. It is vastly preferable to use the URI scheme to refer to a specific
asset where possible, rather than the path-relative form, which is provided
only for edge-case compatibility with old pipelines.
{"inputs": [
"/uploads/..",
"/products/..",
"/reference/..",
"agave://<system>/<path>",
"http://<external_ref>/",
"https://<external_ref>"]
}
{"parameters": {
"param_name_1": "agave://<system>/<path>",
"param_name_2: ""http://<external_ref>/",
"param_name_3: ""https://<external_ref>/"},
"param_name_4": "/uploads/..",
"param_name_5": "/reference/..",
"param_name_6": "/products/.."
}
{"inputs": {
"input_name_1": "agave://<system>/<path>",
"input_name_2": "/uploads/...",
"input_name_3": "/reference/...",
"input_name_4": "/products/..."
"parameters": {
"param_name_2: ""http://<external_ref>/",
"param_name_3: ""https://<external_ref>/"},
"param_name_4": "/uploads/...",
"param_name_5": "/reference/...",
"param_name_6": "/products/..."
}
Instanced Archive Paths¶
To assist with debugging or general collision avoidance, it is possible to extend the normally deterministic archive path with a named/date-stamped directory.
>>> mpp = Job(settings.mongodb, settings.pipelines, experiment_id=experiments, instanced=True)
>>> mpp.setup()
uuid : 107a90b7-f083-5167-a63d-41a3bf278bf8
pipeline_uuid : 106c46ff-8186-5756-a934-071f4497b58d
data : {}
child_of : ['102e95e6-67a8-5a06-9484-3131c6907890']
generated_by : ['106c46ff-8186-5756-a934-071f4497b58d']
acted_on : []
acted_using : []
archive_uri: agave://data-sd2e-community/products/v2/106c46ff81865756a934071f4497b58d/Db6rzKZnnyA8E5qvwvxjpwZ4/PAVpwrObxp5YjYRvrJOd5yVp/sensible-adder-20190215T213009Z
This appends adjective-animal-utcZ
as a subdirectory of archive path,
preserving the contents of the original archive path should there be any.
Indexing Configurations¶
There are two types of indexing that can occur. Both can be run automatically
when a job reaches the FINISHED
state, as well as by secondary processes
such as Reactors or even user scripts.
Job to Products¶
The first type of indexing connects a job with its products. The behavior for
this indexing is controlled using the value of a job’s archive_patterns
array. If it is empty, all files found under the archive path will
be associated with the job using a generated_by
linkage and will be
assigned a processing level of “1”. This behavior can be changed by specifying
one or more indexing patterns as demonstrated below:
>>> archive_patterns = [
>>> {'processing_level': '1', 'patterns': ['.csv$']},
>>> {'processing_level': '2', 'patterns': ['.xls$', '.pdf$']}
>>> ]
>>> mpq = Job(settings.mongodb, settings.pipelines, experiment_id=experiments, archive_patterns=archive_patterns)
These indexing patterns will:
- Mark CSV outputs as generated by the job
- Mark the found CSV files as level “1” data products
- Mark XLS and PDF outputs as generated by the job
- Mark XLS and PDF outputs as level “2” data products
This approach is useful when a job generates more than one data level at once.
Note
Lists of patterns are processed asynchronously and in indeterminate order for a given job. Make sure not to build workflow logic that assumes any specific order of indexing operations.
Another case where archive patterns are helpful is in sub-selecting outputs
where the filename is derived from experimental or parameter metadata. For
example, multiple jobs might share an archive path (as an example: job A
processes sample 0xDEADBEF3
and job B processes 0xDEADBEF4
). A bit of
pattern engineering will make their products distinguishable and
discoverable at the job level.
>>> archive_patterns = [
>>> {'processing_level': '1', 'patterns': ['0xDEADBEF3_']}
>>> ]
>>> archive_patterns = [
>>> {'processing_level': '1', 'patterns': ['0xDEADBEF4_']}
>>> ]
Note
The output filenames from each job must contain the string(s) defined in their respective archive_patterns. You may have to revise some file- handling logic in your computational pipeline to make this work.
Products to Inputs and References¶
This section remains to be written