9.3. Operators

Operators in Clara are containerized applications that perform a function on given data. Operators are “units” in a pipeline, which itself is a dependency tree of operator. A dependency between operators is created by declaring that the input of one or more operators is the output of another.

At the heart of any operator declaration is its container declaration, specifically the image it uses to perform the function. The image property defines the name of the container image Clara will fetch, deploy, and execute as part of the pipeline execution.

As part of the container declaration, tag and command can also be declared. The tag property is used to determine which version of a given image Clara Deploy SDK should select from its container image repository. The command property is an array (you can tell by the [ and ] characters), and allows a set of strings to be passed to the container and used as the command it executes upon starting.

Operators can define sets of inputs and outputs which, depending on the Clara pipeline api-version used may or not specify type information about the inputs and outputs.

  • In api-versions 0.4.0 and below operators may not specify type information about their inputs and outputs, and as a result operators may only exchange data through disk I/O (volumes mounted as local directories in the container).
  • In api-version 0.5.0 and above operators must specify type information for inputs and outputs which allows data exchange between operators through both shared memory and disk I/O. Type information allows for pre-runtime validation of data compatibility between operators, the automatic selection of the channel of communication between operators (shared memory or disk), and scoping and access of data exchanged through shared memory.

NOTE: This functionality is only available with Clara orchestration (not Argo).

When operators exchange data via disk I/O, data is loaded in a volume inside the container image where it is accessible by the user’s application. To the application these mounted volumes appear as locally accessible directories. Alternatively, when type information is made available in operator inputs and outputs, the Clara orchestrator has the ability to choose either shared memory or a mounted volume to access the data. Before going into the details of what types are assigned to shared memory or disk, let us first review the types supported for inputs and outputs in the operator definition.

9.4. Model Repository Integration (pipeline api-version 0.7.0 and above)

Clara pipeline operators can request a Triton Inference Server be made available to the operator and preloaded with a set of inference models. Requesting a preloaded Triton server instance is as easy as providing a list of inference models, by name, using reference/models.md.

NOTE: Preloaded models must be available via the Clara Deploy SDK Platform Model Repository. Any attempt to create or start a pipeline-job which references a model which is not contained in the Model Repository will result in an error.

9.5. Clara Types (pipeline api-version 0.5.0 and above)

Clara I/O types can be divided into two groups:primitives and structures. Albeit implementational, Clara primitives are only short-hand representations of structures (more on this below).

There are 3 Clara structures at present: array, string, and stream.

  • array is used for shared memory I/O; it indicates to the Clara orchestrator that the input (or output) should be allocated into shared memory.
    • array requires
      • an element-type which determines the type of each element in the array and must be a Clara primitive (see table below),
      • and a shape which determines the number of dimensions and size of the array.
  • string is syntactic sugar for array<uint8>[-1] which is a short-hand notation for “a dynamic array of bytes” (-1 is used to indicate arrays resizable at runtime).
  • stream is used for disk I/O; it indicates to the Clara orchestrator that the input (or output) is mounted as a local directory from disk.
    • stream requires element-type attribute, which is free-form. The user may enter anything as an element-type that describes the meaning of the input or output when the input is a stream, but if this is tied to be an input from another operator to to output to another operator the element-types must match.
    • stream requires a path attribute, which is a locally mounted volume with the data to be read and/or to be written.

Clara primitives are also syntactic sugar for various array representations, all of which use shared memory allocations.

Clara Primitive Logical Interpretation array Representation
u8, uint8 8-bit unsigned integer array<uint8>[1]
u16, uint16 16-bit unsigned integer array<uint16>[1]
u32, uint32 32-bit unsigned integer array<uint32>[1]
u64, uint64 64-bit unsigned integer array<uint64>[1]
i8, int8 8-bit unsigned integer array<int8>[1]
i16, int16 16-bit signed integer array<int16>[1]
i32, int32 32-bit signed integer array<int32>[1]
i64, int64 64-bit signed integer array<int64>[1]
f16, float16 16-bit floating-point array<float16>[1]
f32, float32 32-bit floating-point array<float32>[1]
f64, float64 64-bit floating-point array<float64>[1]

9.6. The Anatomy of an Operator

So far we’ve covered the concepts of typed and untyped operator inputs and outputs, and Clara types and their properties. Now, let us explore the structure of an operator declaration. Clara allows operators to be bound (declared within a pipeline) or unbound (declared independently and inported into a pipeline at a later point).

9.6.1. Disk-based I/O

Let us examine the unbound operator below.

name: i-copy-data
container:
  image: ubuntu
  tag: 18.04
  command: ['sh', '-c', 'cp', '-r', '/input', '/output' ]
input:
- path: /input
  type: stream            # type `stream` requires `path`
  element-type: dicom     # anything is acceptable if type is stream, however, during pipeline validation the output `element-type` of the upstream operator must match this one
output:
- name: my-output
  path: /output
  type: stream
  element-type: dicom     # anything is acceptable if type is stream, however, during pipeline validation the input type of all downstream operator must match this one

Let us examine the operator.

  • The /input folder is a Clara-managed folder made available to the operator’s container because of the operator’s input declaration. The path property will map the input payload to the operator container’s /input folder as a read-only mount. The container execution can then make use of the contents of the folder in any way its author sees fit.
  • Next, notice that the /output folder is declared as part of the operator’s output property. This means that it too is Clara-managed storage, and the folder with be mapped as a writable mount. This path can be loaded by other operators to read from using its name property, my-output.

Note that, if the pipeline defintion is using api-version 0.4.0 and below, the fields type and element-type will be ignored. These are mandatory only for pipeline api-version 0.5.0 and above. For clarity, when using api-version 0.4.0 the operator could be declared as

name: i-copy-data
container:
  image: ubuntu
  tag: 18.04
  command: ['sh', '-c', 'cp', '-r', '/input', '/output' ]
input:
- path: /input
output:
- name: my-output
  path: /output

9.6.2. Shared Memory I/O

To avoid having an I/O bound pipeline where data communication between operators is the bottleneck, one can use shared memory as a medium of communication. Shared memory communication between operators is available through the Clara orchestration engine as of pipeline api-version 0.5.0.

Let us examine the following operator definition.

name: dicom-to-array
container:
  image: my-container
  tag: 0.1.0
  command: ['sh', '-c', 'python', '-u', 'converter.py', '/input' ]
input:
- path: /input
  type: stream
  element-type: dicom
output:
- name: output-array
  type: array
  element-type: float32
  shape: [256, 256, 128]

Here, it is assumed that a converter.py utility has been developed by the user and containerized in an image tagged my-converter:0.1.0 with no predefined entrypoint (therefore a command attribute has to be specified for the container to perform a function).

  • The container expects a series of DICOM files in the one input which is mapped in the container in the /input directory.
  • The container outputs one item named output-array, which is a 3-dimensional array of shape (256, 256, 128) of where each element is a single-precision floating-point number. The container image effectively reads a series of DICOM, and contructs a 3D array representing the volume containing the intensity values in single-precision floating-point in shared memory. The latter can then be read by any operator that reads the output output-array of dicom-to-array (see Pipelines with Typed Operators or Operator Reuse Across Pipelines).

So far, we have shown only examples of unbound operators, which by themselves can perform no function. Operators must be either imported into pipelines when declared as unbound (i.e. in their own yaml file) or declared within the pipeline definition. It is recommended that operators be declared unbound and imported into pipelines so that they may be reused without having to be declared anew in each pipeline. For a detailed overview of the relatiionship between pipelines and operators please review the Pipeline documentation.

One important aspect of operators is the slight but sigificant differences in their design depending on the orchestration engine used when executing the pipeline. The next section highlights the high level design of operators designed for Argo orchestration and those designed for Clara orchestration.

9.7. Argo vs. Clara Orchestration

Operators development follows a different paradigm based on which orchestration engine is used.

  • When using Argo, operators can be as simple as a Bash or Python script; there are no specific libraries that need to be used to make things work. However, Argo will start operators lazily, incurring an overhead of approximately 2 seconds per operator. This is because Argo assigns one operator per Kubernetes pod, and the cost of starting up a pod is about 2 seconds depending on the system it is run.
  • Clara orchestration requires the use of the Clara libraries, currently supporting C++ or Python. The Clara orchestration engine loads pipelines lazily, but operators greedily, thereby incurring a cost of about 2 seconds to start a pipeline. This is because the Clara orchestrator starts all operators in the same Kubernetes pod. For these reasons, it is recommended that Argo orchestration be used during operator development and integration testing, but Clara orchestration be used for more performance-demanding applications.

The following examples show minimal operators following each orchestrator’s programming paradigm.

Argo orchestrator-compatible operator.

import glob

input_path = '/input'

def execute():
  printf(glob.glob(input_path + '/*.dcm'))

if __name__ == "__main__":
  execute()

It is apparent that some information when using Argo has to be assumed, as is for instance the /input path, which means that if the input path is changed at the level of the operator description (YAML) there is no utility allowing to retrieve this update from within the code.

  • It is possible to retrieve this information for environment variables passed by the Clara platform, for instance by querying os.getenv(NVIDIA_CLARA_INPUTPATHS), however, this approach is not recommended since the developer’s code might not be compatible with any updates made to the structure of NVIDIA_CLARA_INPUTPATHS.

Clara orchestrator-compatible operator.

import glob
from clara import Driver, Payload, Error
from clara.logging import perf_logger

def execute(driver: Driver, payload: Payload):
  input_path = None
  for entry in payload.input_entries:
    if entry.name == 'payload':
      input_path = entry.path

  if input_path:
    printf(glob.glob(input_path + "/*.dcm"))
  else:
    raise Error("No input path provided.")

if __name__ == "__main__":
  perf_logger.processing_started("Started")
  driver = Driver(execute_handler=execute)        # instantiate Clara orchestration driver with `execute` as callback
  driver.start()                                  # start execution thread
  driver.wait_for_completion()                    # block until callback completes
  perf_logger.processing_ended("Ended")

When using Clara orchestration, the user code is wrapped by the Clara Pipeline Driver (CPDriver) code. All operators are started at the same time in the same pod, but all operators with an upstream I/O dependency (whether shared memory or disk I/O) are blocked until all upstream operators complete. The callback function provided in execute_handler is triggered once the all locks the operator is waiting on are released.

There are two parameters in the callback function: driver and payload.

  • driver holds CPDriver metadata as dictated by Clara platform. For more information on the driver object please refer to the CPDriver documentation.
  • payload provides all payload information related to both disk-based and shared memory inputs and outputs. For more information on the payload object please refer to the Payloads documentation