ReactorManagedPipelineJob

The ReactorManagedPipelineJob class provides the same create-and-manage functions as ManagedPipelineJob, but leverages the Reactors client-side environment to simplify initialization.

Configuration

Like ManagedPipelineJob, this class interacts with Agave API, SD2 MongoDB, and the PipelineJobs Manager to launch a job, but the configuration is taken from the Reactor’s config.yml and environment variables. Inside a Reactor ( based on the sd2e/reactors:python3-edge), configuration is as follows:

from datacatalog.managers.pipelinejobs import ReactorManagedPipelineJobInstance
rx = Reactor()
# pass additional keyword arguments like experiment_id or instanced
# after `data`
rmpj = ReactorManagedPipelineJob(rx, data={}, ...)
rmpj.setup()
# Get the job's callback URL
my_callback = rmpj.callback
# Send an event
rmpj.run()

Note

Make sure to have a plan to capture and propagate update_token returned by any PipelineJob actions or events. It will be required for future updates and it may change after an unspecified number of uses.

Parameterization

See documentation for ManagedPipelineJob to see additional run-time parameterization options for this class.

Sending Events

All job events inherit a named method from ManagedPipelineJob. Thus, sending an event looks like so:

# just send 'run'
rmpj.run(data={'Job is now running!'})
# chain 'run' and 'update'
rmpj.run(data={'msg': 'Job is still running!'}).update(data={'msg': 'This is an update'})

Canceling a Job

As with ManagedPipelineJob, this class can cancel a job until it has progressed to the RUNNING state.

# cancel the job
rmpj.cancel()

Resetting a Job

As with ManagedPipelineJob, this class can reset a job to its original CREATED state, erasing contents of the job’s archive path in the process. This action requires an administrator-level token.

# cancel the job
admin_tok = 'ka5r54v83dmp3tf8'
rmpj.reset(token=admin_tok)

Deleting a Job

As with ManagedPipelineJob, this class can delete a job. At present, the job’s archive path and contents are preserved. This action requires an administrator-level token.

# cancel the job
admin_tok = 'ka5r54v83dmp3tf8'
rmpj.delete(token=admin_tok)