ManagedPipelineJob: A Deep Dive

This document will demonstrate the specifics of parameterizing a ManagedPipelineJob. Detailed descriptions of its parameterization options can be found in its Python API docs. The same basic parameterization applies for ReactorManagedPipelineJob as it is a subclass of ManagedPipelineJob.

Overview

The essential function of the ManagedPipelineJob class is to link the Data Catalog representation of a PipelineJob with its Pipeline, inputs, references, and relevant experimental metadata.

A foundation for this linkage between data and compute is the Archive functionality in the Agave Jobs API. Briefly, an Agave’s job’s archive path is a combination of Agave storage system and absolute path where the job’s outputs are copied to after successful execution of job code. We extend this by minting a distinct archive path for each PipelineJob based on our knowledge of the job’s attributes. This lets us know that any files found under said path were generated by that PipelineJob.

When a PipelineJob is created, its parameters are inspected to establish which pipeline generated it and whether it refers to any managed file or reference entities. These connections provide additional context to the job. To connect the nascent metadata subgraph surrounding the job to the experimental metadata, we connect the job to one or more experiments, samples, or measurements using the child_of linkage. This allows very straightforward queries to be constructed that return all the jobs (and thus their archive paths and other metadata) that were run using data from specific experiments, samples, or measurements.

Additional metadata associations at the level of individual output files is managed by processes that run after the job has completed. However, information about how those associations are made can be specified to the job, and the Pipelines framework will ensure that they happen.

There are two kinds of indexing which tie outputs to the rest of the metadata catalog. The first is a simple scheme that runs at the end of every job which creates (if it doens’t exist) a files record for files found under the job’s archive path. These files are designated as being generated_by the job. The other kind of indexing is not yet implemented, but will soon connect individual file records to their source data files and references by way of the derived_from and derived_using linkages.

Setup

Several example parameterizations, addressing specific use cases, are presented as interactive Python sessions. If you wish to work through them on your own, you will need to set up your environment and initialize your session.

Prepare your environment

The project Makefile includes targets for building, testing, and working with the python-datacatalog package. A subset are used to prepare for this session. You only need to do these steps if you’re working through the code samples on your own from a checkout of the python-datacatalog repo.

$ make virtualenv
virtualenv env
...
..
.
Successfully installed agavepy-0.7.3 jsonschema-3.0.0a3
$ make mongo-up
cd docker && docker-compose up -d --force-recreate --quiet-pull
Creating network "docker_mongotest" with the default driver
Creating docker_mongodb_1 ... done
$ make bootstrap-tests
python -m bootstrap.create_database
__main__ - DEBUG - Reading project config
...
..
.
manage_files.py.INFO: Registered /uploads/tacc/example/123.txt
$ make user-smoketests

Initialize an interactive python-datacatalog session

The python-datacatalog is usually used in a scripting context, but its main classes can easily be instantiated in an interactive Python sesssion.

>>> from settings import settings
>>> from datacatalog.managers.pipelinejobs import ManagedPipelineJob as Job
>>> experiments = ['experiment.tacc.10001']
>>> samples = ['sample.tacc.20001']
>>> measurements1 = ['measurement.tacc.0xDEADBEF1']
>>> measurements2 = ['10483e8d-6602-532a-8941-176ce20dd05a', 'measurement.tacc.0xDEADBEF0']
>>> measurements3 = ['measurement.tacc.0xDEADBEEF', 'measurement.tacc.0xDEADBEF0']
>>> data_w_inputs = {'alpha': 0.5, 'inputs': ['agave://data-sd2e-community/uploads/tacc/example/345.txt'], 'parameters': {'ref1': 'agave://data-sd2e-community/reference/novel_chassis/uma_refs/MG1655_WT/MG1655_WT.fa'}}

Attention

The Python imports, variable names, and variable values from this code block are implicit in the worked examples. For example, when measurements1 is used, it always refers to ['measurement.tacc.0xDEADBEEF', 'measurement.tacc.0xDEADBEF0']. You can always directly set experiment_id, measurement_id, or sample_id to the desired values.

UUID, Parentage, and Archive Path

A ManagedPipelineJob is configured by passing keyword arguments at instantiation: i.e.) ManagedPipelineJob(mongodb, pipelines, <param1=value1>...). The combination and value of these arguments establish the UUID, metadata linkage, and archive path for the PipelineJob. This is entirely deterministic, which will be demonstrated in the examples below.

For more details on what arguments are available (and how to use them), please consult the ManagedPipelineJob API documentaion.

Measurement(s)

If one passes one or more values for measurement_id:

  • Job UUID is based on pipeline, those measurements, and an empty data
  • Job is a child of the specified measurement(s)
  • Job archive path reflects the combination of measurement(s)
>>> mpj2 = Job(settings.mongodb, settings.pipelines, measurement_id=measurements2)
>>> mpj2.setup()
{}
uuid : 107596b8-25b2-557d-9702-853f0690c576
pipeline_uuid : 106c46ff-8186-5756-a934-071f4497b58d
data : {}
child_of : ['1041ab3f-5221-5c79-8781-8838dfb6eef9']
generated_by : ['106c46ff-8186-5756-a934-071f4497b58d']
acted_on : []
acted_using : []
archive_uri: agave://data-sd2e-community/products/v2/106c46ff81865756a934071f4497b58d/kZpoopq2GBKYy6DQ9pqqNbk8/PAVpwrObxp5YjYRvrJOd5yVp

Note the contents of child_of - a single, measurement UUID referencing the specified measurement_id value. Note also the second component of the archive path kZpoopq2GBKYy6DQ9pqqNbk8, which is unique to this measurement. Now look at what happens with multiple measurements.

>>> mpj2 = Job(settings.mongodb, settings.pipelines, measurement_id=measurements2)
>>> mpj2.setup()
uuid : 10704831-8017-5e39-a4d7-dd73a08b4a4b
pipeline_uuid : 106c46ff-8186-5756-a934-071f4497b58d
data : {}
child_of : ['10483e8d-6602-532a-8941-176ce20dd05a', '104dae4d-a677-5991-ae1c-696d2ee9884e']
generated_by : ['106c46ff-8186-5756-a934-071f4497b58d']
acted_on : []
acted_using : []
archive_uri: agave://data-sd2e-community/products/v2/106c46ff81865756a934071f4497b58d/3pGOjjEYq3BZ2DJ6e0jLQqWb/PAVpwrObxp5YjYRvrJOd5yVp

Now, the UUID is different, as is the archive path, and the job is linked as a child of both measurements.

Sample

Passing in sample or samples works the same way. In the following case:

  • Job UUID is based on pipeline, the sample, and an empty data
  • Job is a child of the the sample,
  • Job archive path reflects the sample
>>> mpk = Job(settings.mongodb, settings.pipelines, sample_id=samples)
>>> mpk.setup()
uuid : 107fd3cf-23c6-522f-9270-19932d06def4
pipeline_uuid : 106c46ff-8186-5756-a934-071f4497b58d
data : {}
child_of : ['103246e1-bcdf-5b6e-a8dc-4c7e81b91141']
generated_by : ['106c46ff-8186-5756-a934-071f4497b58d']
acted_on : []
acted_using : []
archive_uri: agave://data-sd2e-community/products/v2/106c46ff81865756a934071f4497b58d/kZgygQV2EDAAkDLRzrep1gO2/PAVpwrObxp5YjYRvrJOd5yVp

The child_of linkage now points to a sample UUID.

Experiment

Finally, specifying the experiment or experiments works the same way as for samples and measurements. Thus, in this example:

  • Job UUID is based on pipeline, the experiment(s), and an empty data
  • Job is a child of the designated experiment(s)
  • Job archive path reflects the specific experiment(s)
>>> mpr = Job(settings.mongodb, settings.pipelines, experiment_id=experiments)
>>> mpr.setup()
uuid : 107a90b7-f083-5167-a63d-41a3bf278bf8
pipeline_uuid : 106c46ff-8186-5756-a934-071f4497b58d
data : {}
child_of : ['102e95e6-67a8-5a06-9484-3131c6907890']
generated_by : ['106c46ff-8186-5756-a934-071f4497b58d']
acted_on : []
acted_using : []
archive_uri: agave://data-sd2e-community/products/v2/106c46ff81865756a934071f4497b58d/Db6rzKZnnyA8E5qvwvxjpwZ4/PAVpwrObxp5YjYRvrJOd5yVp

Parameterization using the “data” object

The contents of the data keyword argument are attached verbatim to PipelineJob.data, and it is also used to establish the terminal directory in the archive path.

>>> mpn = Job(settings.mongodb, settings.pipelines, sample_id=samples, experiment_id=experiments, data={'alpha': 0.5})
>>> mpn.setup()
uuid : 1077a959-7163-516a-8fc1-dd11bd958190
pipeline_uuid : 106c46ff-8186-5756-a934-071f4497b58d
data : {'alpha': 0.5}
child_of : ['102e95e6-67a8-5a06-9484-3131c6907890']
generated_by : ['106c46ff-8186-5756-a934-071f4497b58d']
acted_on : []
acted_using : []
archive_uri: agave://data-sd2e-community/products/v2/106c46ff81865756a934071f4497b58d/Db6rzKZnnyA8E5qvwvxjpwZ4/0p5yeV3VR3OELzgoJ5kk6Yxw

The UUID is different than in mpr example above, as is the name of the last directory in the archive path, but the first and second level of the archive path are identical. Let us try changing the value of alpha in the data object and see what happens…

>>> mpn = Job(settings.mongodb, settings.pipelines, sample_id=samples, measurement_id=measurements2, data={'alpha': 0.6})
>>> mpn.setup()
uuid : 107e7a19-6d8a-540d-954c-09836d67de49
pipeline_uuid : 106c46ff-8186-5756-a934-071f4497b58d
data : {'alpha': 0.6}
child_of : ['102e95e6-67a8-5a06-9484-3131c6907890']
generated_by : ['106c46ff-8186-5756-a934-071f4497b58d']
acted_on : []
acted_using : []
archive_uri: agave://data-sd2e-community/products/v2/106c46ff81865756a934071f4497b58d/Db6rzKZnnyA8E5qvwvxjpwZ4/3pGLppQE69r3Z36EY3jlxxpN

The job’s UUID and terminal directory name have changed. This illustrates the central tenet of the PipelineJob design, which is that processing a specific set of experiments (or samples/measurements) using a specific pipeline and compute parameters yields a distinct job with its own unique storage location.

References and Files

The contents of the data object are not constrained. However, if one includes the keys inputs or parameters, the system attempts to resolve the provided values to known reference or file records.

>>> mpo = Job(settings.mongodb, settings.pipelines, sample_id=samples, data=data_w_inputs)
>>> mpo.setup()
uuid : 1077bb63-d586-5519-a576-33b5d9f85d40
pipeline_uuid : 106c46ff-8186-5756-a934-071f4497b58d
data : {'alpha': 0.5, 'parameters': {'ref1': 'agave://data-sd2e-community/reference/novel_chassis/uma_refs/MG1655_WT/MG1655_WT.fa'}, 'inputs': ['agave://data-sd2e-community/uploads/tacc/example/345.txt']}
child_of : ['103246e1-bcdf-5b6e-a8dc-4c7e81b91141']
generated_by : ['106c46ff-8186-5756-a934-071f4497b58d']
acted_on : ['105fb204-530b-5915-9fd6-caf88ca9ad8a']
acted_using : ['1099ee04-0412-5566-bb4d-0efc2af3eea3']
archive_uri: agave://data-sd2e-community/products/v2/106c46ff81865756a934071f4497b58d/kZgygQV2EDAAkDLRzrep1gO2/RbQyWyezlxlvXOYeG81qVbG4

The reference asset (MG1655_WT.fa) was identified and associated via acted_using, while the file asset was associated via acted_on.

Interpretable inputs and parameters can be included in data in three JSON formats. It is vastly preferable to use the URI scheme to refer to a specific asset where possible, rather than the path-relative form, which is provided only for edge-case compatibility with old pipelines.

List-style inputs
{"inputs": [
  "/uploads/..",
  "/products/..",
  "/reference/..",
  "agave://<system>/<path>",
  "http://<external_ref>/",
  "https://<external_ref>"]
}
Agave-style parameters
{"parameters": {
    "param_name_1": "agave://<system>/<path>",
    "param_name_2: ""http://<external_ref>/",
    "param_name_3: ""https://<external_ref>/"},
    "param_name_4": "/uploads/..",
    "param_name_5": "/reference/..",
    "param_name_6": "/products/.."
}
Agave-style inputs and parameters
{"inputs": {
    "input_name_1": "agave://<system>/<path>",
    "input_name_2": "/uploads/...",
    "input_name_3": "/reference/...",
    "input_name_4": "/products/..."
 "parameters": {
    "param_name_2: ""http://<external_ref>/",
    "param_name_3: ""https://<external_ref>/"},
    "param_name_4": "/uploads/...",
    "param_name_5": "/reference/...",
    "param_name_6": "/products/..."
}

Instanced Archive Paths

To assist with debugging or general collision avoidance, it is possible to extend the normally deterministic archive path with a named/date-stamped directory.

>>> mpp = Job(settings.mongodb, settings.pipelines, experiment_id=experiments, instanced=True)
>>> mpp.setup()
uuid : 107a90b7-f083-5167-a63d-41a3bf278bf8
pipeline_uuid : 106c46ff-8186-5756-a934-071f4497b58d
data : {}
child_of : ['102e95e6-67a8-5a06-9484-3131c6907890']
generated_by : ['106c46ff-8186-5756-a934-071f4497b58d']
acted_on : []
acted_using : []
archive_uri: agave://data-sd2e-community/products/v2/106c46ff81865756a934071f4497b58d/Db6rzKZnnyA8E5qvwvxjpwZ4/PAVpwrObxp5YjYRvrJOd5yVp/sensible-adder-20190215T213009Z

This appends adjective-animal-utcZ as a subdirectory of archive path, preserving the contents of the original archive path should there be any.

Indexing Configurations

There are two types of indexing that can occur. Both can be run automatically when a job reaches the FINISHED state, as well as by secondary processes such as Reactors or even user scripts.

Job to Products

The first type of indexing connects a job with its products. The behavior for this indexing is controlled using the value of a job’s archive_patterns array. If it is empty, all files found under the archive path will be associated with the job using a generated_by linkage and will be assigned a processing level of “1”. This behavior can be changed by specifying one or more indexing patterns as demonstrated below:

>>> archive_patterns = [
>>>    {'processing_level': '1', 'patterns': ['.csv$']},
>>>    {'processing_level': '2', 'patterns': ['.xls$', '.pdf$']}
>>> ]
>>> mpq = Job(settings.mongodb, settings.pipelines, experiment_id=experiments, archive_patterns=archive_patterns)

These indexing patterns will:

  1. Mark CSV outputs as generated by the job
  2. Mark the found CSV files as level “1” data products
  3. Mark XLS and PDF outputs as generated by the job
  4. Mark XLS and PDF outputs as level “2” data products

This approach is useful when a job generates more than one data level at once.

Note

Lists of patterns are processed asynchronously and in indeterminate order for a given job. Make sure not to build workflow logic that assumes any specific order of indexing operations.

Another case where archive patterns are helpful is in sub-selecting outputs where the filename is derived from experimental or parameter metadata. For example, multiple jobs might share an archive path (as an example: job A processes sample 0xDEADBEF3 and job B processes 0xDEADBEF4). A bit of pattern engineering will make their products distinguishable and discoverable at the job level.

Job A: 0xDEADBEF3
>>> archive_patterns = [
>>>    {'processing_level': '1', 'patterns': ['0xDEADBEF3_']}
>>> ]
Job B: 0xDEADBEF3
>>> archive_patterns = [
>>>    {'processing_level': '1', 'patterns': ['0xDEADBEF4_']}
>>> ]

Note

The output filenames from each job must contain the string(s) defined in their respective archive_patterns. You may have to revise some file- handling logic in your computational pipeline to make this work.

Products to Inputs and References

This section remains to be written