8.7. Pipelines

A Clara Pipeline is a directed acyclic graph that describes I/O dependencies between Clara Operators. Operators are defined as a container and a set of services the operator can make use of. Let’s take a look at a few examples. I/O dependencies can be disk-based or shared-memory payloads, where a downstream operator reads the disk outputs of one or many upstream operators.

As of release 0.5.0, Clara Pipelines can be typed and untyped.

  • Untyped pipelines, supported in API versions 0.4.0 and below both when using Argo or Clara orchestration, have the following properties.
    • They do not automatically support shared memory management, a consequence of which is the inability to expose the assumed type information about the shared memory allocation manually managed by the operator developer.
    • They do not provide the pipeline developer with the ability to perform pre-runtime validation of operator compatibility, which is necessary when different developers are responsible for different operators which may be used in multiple pipelines.
  • Typed pipelines, supported with API versions 0.5.0 and above and only when using the Clara orchestration engine, address the drawbacks of untyped pipelines.
    • Shared memory variables and disk-based payloads are declared with explicit typing in the operator properties, allowing pre-runtime validation of I/O compatibility between operators.
    • Shared memory variable life-cycles are automatically managed by the Clara orchestration engine, where variables are allocated and de-allocated based on the scope where they are used in the pipeline.

The following two sections introduce typed and untyped pipelines in greater detail.

8.7.1. A simple pipeline example

Below is a simple example of a pipeline with no inputs or outputs and only a single operator named whalesay.

# Simple "hello world" pipeline definition example.
api-version: 0.4.0
orchestrator: Clara
name: hello-world
operators:
  - name: whalesay
    # The container property is the heart of the operator. Container images
    # contain the executable code which defines what the operator does.
    container:
      # The image container::property is the only required property in an
      # operator's definition. Without it, there is literally nothing to do.
      image: clara/examples/whalesay
      tag: latest
      command: ['sh', '-c', 'whalesay', 'hello world!']

Pipeline definitions use YAML as their domain specific language (DSL). YAML is widely used and supported, and most text editors and IDE support it without any need for third-party plug-ins. Each line of the definition declares a property, and the indention at the beginning of the line determines the scope of the property.

Read Ansible’s YAML Basics for more information regarding YAML and a decent guide on how to read and write it.

Now, let’s look closer at the definition itself and how Clara Deploy SDK interprets it. The first line of the definition is a comment. Comments begin with the hash (also known as number sign, pound sign, and/or octothorpe) character, and end with a new line character. Comments are ignored by Clara Deploy SDK’s definition interpreter, but are a great way to convey useful information to human readers of the definition (most often yourself months after the fact).

The second line contains the api-version that the definition is expecting to be executed by. Including the api-version allows Clara Deploy SDK’s definition interpreter to make smart choices when handling definitions defined against other versions of Clara Deploy SDK, including applying rules defined by that version of the API. When api-version is not specified, Clara Deploy SDK’s definition interpreter assumes the current version is intended, regardless of the actual version the definition was created with.

The third line contains the orchestrator that the operators in the job corresponding to the pipeline is expecting to be orchestrated by. Clara currently has two orchestrators, Argo and Clara. Starting from Api-version 0.4, if the orchestrator is not specified, the default orchestrator for Clara pipelines is Clara. You can override this to use Argo by specifying the orchestrator to be Argo.

The fourth line declares the definition’s name. Intended primarily for human use, (it is easier to discuss “the hello world pipeline” than it is to discuss “pipeline 9a16fd803f52489bb3331aa553605024”), the value is still expected to be unique to all other names declared in the definition.

The fifth line declares the set of operators that compose the pipeline. The operators property is a list, which can be inferred by the first line of an entry beginning with a - character.

8.7.2. Pipelines with Untyped Operators

Let us now consider a two-operator pipeline with disk-based inputs and outputs. In this pipeline we want,

  • the first operator to read the pipeline’s payload (implicitly expected to be a DICOM series) and outputs an MHD volume,
  • and the second operator to read the MHD output of the first operator, perform an “intensity normalization” transformation and output the normalized MHD volume. The pipeline might look as shown below.
api-version: 0.4.0
orchestrator: Clara
name: dicom-intensity-normalization
operators:
  # the `dicom-reader` container image available in ngc.nvidia.com
  # converts a dicom series to an MHD volume
  - name: dicom-series-to-mhd-volume
    container:
      image: nvcr.io/nvidia/clara/dicom-reader
      tag: 0.6.0-2006.4
    input:
    - path: /input      # the pipeline's payload is mounted here
    output:
    - name: mhd-output  # label for the output volume (optional if operator produces only one output)
      path: /output     # the output path mounted inside the container image of this operator
  - name: normalize-mhd-volume-intensities
    container:
      image: my-intensity-normalization-container
      tag: 0.1.0
    input:
    - from: dicom-series-to-mhd-volume   # the operator name whose contents should be loaded in this operators input path
      name: mhd-output                   # the name of the output whose contents should be loaded in this operator input path
      path: /input
    output:
    - name: normalized-mhd
      path: /output

Let us examine the components of the pipeline.

  • We have an operator dicom-series-to-mhd-volume based on NGC which reads a series of DICOM images and outputs an MHD representation of these in an output labelled mhd-output.
  • We have a second (custom) operator normalize-mhd-volume-intensities which reads from mhd-output of operator dicom-series-to-mhd-volume, performs an intensity-normalizing transformation, and outputs an MHD with the new values.

There is an implied contract that dicom-series-to-mhd-volume should accept a DICOM series and output an MHD volume, and that normalize-mhd-volume-intensities should read an MHD volume an normalize its values. In reality, there are no limitations on the data types that are acceptable by each operator therefore the pipeline developer has no way to verify that one operator’s outputs are compatible with another operator’s inputs before runtime.

Typed pipelines, described next, partially address the implicit type contract between operators by allowing the operator developer to output explicit types for data that is passed via shared memory.

8.7.3. Pipelines with Typed Operators for Performance and Explicit Data Typing

As of API version 0.5.0, a developer must explicitly declare the type of the inputs and outputs of each operator. We could write the pipeline definition above as:

api-version: 0.5.0
orchestrator: Clara
name: dicom-intensity-normalization
operators:
  # the `dicom-reader` container image available in ngc.nvidia.com
  # converts a dicom series to an MHD volume
  - name: dicom-series-to-fastio
    container:
      image: my-dicom-reader
      tag: 0.1.0
    input:
    - path: /input            # the pipeline's payload is mounted here
      type: stream            # `stream` is a Clara type that is used only for file inputs
      element-type: dicom     # the entry for `element-type` can be anything that is descriptive of the content of the files if `type=stream`
    output:
    - name: intensity-values   # the operator now outputs a well-defined type
      type: array             # `array` is a Clara type that is used to indicate a shared-memory input or output (here: output)
      element-type: float32   # `float32` is a Clara sub-type that describes the size of each element in the `array`
      shape: [-1, -1, -1]     # `shape` determines the shape of the array (here: -1 indicates a dynamic shape determined at runtime)
    - name: voxel-spacing
      type: array
      element-type: float32
      shape: [3]
    - name: volume-orientation
      type: array
      element-type: float32
      shape: [4, 4]
    - name: volume-origin
      element-type: float32
      shape: [3]
  - name: normalize-mhd-volume-intensities
    container:
      image: my-intensity-normalization-container
      tag: 0.1.0
    input:
    - from: dicom-series-to-fastio   # the operator name whose FastIO outputs contents should be accessed in this operator
      name: fastio-output            # the name of the FastIO output whose contents are to be read
      type: array                    # the input type `array` should match the output type of the upstream operator
      element-type: float32          # along with the element-type
      shape: [-1, -1, -1]            # and shape; all should match in order for this pipeline to pass pre-runtime validation
    - from: dicom-series-to-fastio
      name: voxel-spacing
      type: array
      element-type: float32
      shape: [3]
    - from: dicom-series-to-fastio
      name: volume-orientation
      type: array
      element-type: float32
      shape: [4, 4]
    - from: dicom-series-to-fastio
      name: volume-origin
      element-type: float32
      shape: [3]
    output:
    - name: normalized-mhd
      path: /output
      type: stream
      element-type: mhd

Let us examine the components of the pipeline.

  • We have two custom operators, namely, dicom-series-to-fastio and normalize-mhd-volume-intensities.
    • dicom-series-to-fastio
      • reads the payload of the pipeline in the /input folder which is annotated to be a stream of dicom file(s),
      • and produces
        • fastio-output whose exact shape is determined at runtime (note the -1 entries in the shape), but it is expected that it should produce a 3D array since len(shape) -> 3
        • voxel-spacing should contain voxel spacing information and is expected to contain exactly 3 elements of type float32,
        • volume-orientation should contain an affine transformation matrix revealing information about the orientation of fastio-output in space,
        • volume-origin should contain information about the volume’s offset from (0,0,0).
    • normalize-mhd-volume-intensities
      • reads the array with name fastio-output output by dicom-series-to-fastio from shared memory,
      • performs an intensity-normalizing transformation on the array
      • and uses voxel-spacing, volume-origin, and volume-orientation from dicom-series-to-fastio to output an MHD volume with normalized intensity values but while retaining the relevant metadata such as pixel spacing, origin, and orientation.

All inputs and outputs in the pipeline contain type information, however, different types have different restrictions and are assigned to different “channels of communication” (see Clara types)

8.7.3.1. Implicit Semantics, Explicit Types

Implicit semantic data contracts in pipelines are unavoidable considering operators are containerized applications. For example, dicom-series-to-fastio could swap the values in voxel-spacing and volume-origin while retaining the right type and shape, thereby breaking the semantic contract between the metadata (name of the output) and the values. This problem can be addressed by the developer during operator design, where JSON string type is output in shared memory holding metadata about the application-specific semantics of the output data.

Nevertheless, the type contracts between operators

  • are explicit and verifiable pre-runtime (a float32 output of one operator will not be compatible with an int32 input to another operator - more on this below),
  • can be used to constrain the behavior of the containerized code of the operator (if dicom-series-to-fastio tries to place an array of int32 of 4 elements in voxel-spacing it will fail to do so).

8.7.3.2. Operator Reuse Across Pipelines

Let us assume we have hundreds of pipelines stemming from only a handful of operators combined in different ways (much like millions of songs can be composed of only eight notes). We would probably choose to “unbind” the operator definition from the pipeline definition, leaving the type information in the operator definition and the inter-operator dependency in the pipeline definition.

The unbound operator definitions for dicom-series-to-fastio and normalize-mhd-volume-intensities would look something like below.

dicom-series-to-fastio-op.yml

name: dicom-series-to-fastio
container:
  image: my-dicom-reader
  tag: 0.1.0
input:
- path: /input
  type: stream
  element-type: dicom
output:
- name: intensity-values
  type: array
  element-type: float32
  shape: [-1, -1, -1]
- name: voxel-spacing
  type: array
  element-type: float32
  shape: [3]
- name: volume-orientation
  type: array
  element-type: float32
  shape: [4, 4]
- name: volume-origin
  element-type: float32
  shape: [3]

normalize-mhd-volume-intensities-op.yaml

name: normalize-mhd-volume-intensities
container:
  image: my-intensity-normalization-container
  tag: 0.1.0
input:
- name: intensity-values
  type: array
  element-type: float32
  shape: [-1, -1, -1]
- name: voxel-spacing
  type: array
  element-type: float32
  shape: [3]
- name: volume-orientation
  type: array
  element-type: float32
  shape: [4, 4]
- name: volume-origin
  element-type: float32
  shape: [3]
output:
- path: /output
  type: stream
  element-type: mhd

In this last unbound operator definition, notice how we have no specified the from property as we did before. This is simply because unbound operators do not have knowledge of other operators until they are bound to a pipeline. So let’s bind the two above to a pipeline.

dicom-intensity-normalization-pipe.yaml

api-version: 0.5.0
orchestrator: Clara
name: dicom-intensity-normalization
operators:
- name: my-imported-intensity-normalization-container    # the name for the operator can be overwritten in the pipeline definition
  import:
    path: ./my-dicom-reader-op.yaml
  output:
  - name: intensity-values                               # the name of any of the input and output definitions can be overwritten in the pipeline definition
  - name: voxel-spacing                                  # however the order must be preserved
  - name: volume-orientation
  - name: volume-origin
- name: my-imported-intensity-normalization-container
  import:
    path: ./multiorgan-segmentation-op.yaml
  input:
  - from: my-imported-intensity-normalization-container
    name: intensity-values                               # the input name here must match one of the output name in the operator specified in `from`
  - from: my-imported-intensity-normalization-container
    name: voxel-spacing
  - from: my-imported-intensity-normalization-container
    name: volume-orientation
  - from: my-imported-intensity-normalization-container
    name: volume-origin
  output:
  - name: segmentation