ManagedPipelineJob

The Python3 class datacatalog.managers.pipelinejobs.ManagedPipelineJob provides all the critical job management and metadata linkage functionality. Other methods build on it, providing alternative interfaces suited to particular use cases.

Configuration

The class interacts with Agave API, SD2 MongoDB, and the PipelineJobs Manager Reactor. Configuration details for these resources can be passed like so:

>>> from datacatalog.managers.pipelinejobs import ManagedPipelineJob
>>> from agavepy.agave import Agave
>>> mongodb={'authn': 'bW9uZ29kYjov...jRWJTI2SCUyQiy1zdGFnIwL2W1hcnk='}
>>> pipelines={'job_manager_id': 'G1p783PxpalBB',
              'job_manager_nonce': 'SD2E_G1p783PxpalBB',
              'pipeline_uuid': '10650844-1baa-55c5-a481-5e945b19c065'}
>>> agave_client=Agave.restore()
>>> mpj = ManagedPipelineJob(mongodb, pipelines, agave=agave_client, ...)
>>> mpj.setup()

Warning

Never actually include the value for job_manager_nonce in a public source repository or Docker image.

Parameterization

Much flexibility is allowed at ManagedPipelineJob setup time. This is covered in detail in ManagedPipelineJob: A Deep Dive.

Sending Events

All job events have a corresponding method in ManagedPipelineJob. Calling an event method will cause the job to attempt a state transition. If an invalid transition is requested, a transitions.core.MachineError will be raised. Otherwise, the job’s state will change and the event appended to the job’s history. Events can be sent with an optional data dictionary which will be attached to the event in the history. Event methods can be chained, so long as doing so does not violate allowable state transitions.

>>> mpj.run(data={'Job is now running!'})
>>> mpj.run(data={'msg': 'Job is still running!'}).update(data={'msg': 'This is an update'})
>>> mpj.finish()
>>> mpj.run()
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
transitions.core.MachineError: 'Cannot transition to RUNNING from FINISHED'

Canceling a Job

A PipelineJob may be canceled until it processes an a event. If the job has not yet entered a RUNNING or FAILED state, do the following to deletes the nascent job record from the Data Catalog.

>>> mpj.cancel()

If A job needs to be marked as unsuccessful after beginning its lifecycle, the only recourse (barring administrator intervention) is via job.fail(). Cancelling an active job will result in a ManagedPipelineJobError exception. This design helps ensure an adequate record is maintained of all computational work managed by the system.

Failing a PipelineJob

As mentioned above, FAILED is a valid terminal state. Set it by sending the fail event.

>>> mpj.fail()
>>> print(mpj.state)
'FAILED'

Resetting a PipelineJob

A job may be rolled back to its initial CREATED state, provided a valid administrator-class token is passed with the request. The standard update token associated with the job cannot authorize a reset action.

>>> mpj.reset(token='rkz78NEcsD7ZmhVc')
>>> print(mpj.state)
'CREATED'

The contents of the terminal directory in the job’s archive path, but not the directory itself is remains.

Deleting a PipelineJob

A job may be deleted entirely (including references to it in the linkage fields of the other LinkedStore documents), but only by passing an administrator-class token to authorize the action.

>>> mpj.delete(token='rkz78NEcsD7ZmhVc')

Currently, the job archive path and its contents are left intact.

Deferred Updates

It is possible to update a job’s status after the initiating process has exited, so long as the job’s current token is known. The token must be included in JSON messages to ManagedPipelineJobInstance or in web service callbacks posted to the Jobs Manager Reactor.