8.3. Operators
Operators in Clara are containerized applications that perform a function on given data. Operators are “units” in a pipeline, which itself is a dependency tree of operator. A dependency between operators is created by declaring that the input of one or more operators is the output of another.
At the heart of any operator declaration is its container declaration, specifically the image it uses to perform the function. The image property defines the name of the container image Clara will fetch, deploy, and execute as part of the pipeline execution.
As part of the container declaration, tag and command can also be declared. The tag property is used to determine which version of a given image Clara Deploy SDK should select from its container image repository. The command property is an array (you can tell by the [
and ]
characters), and allows a set of strings to be passed to the container and used as the command it executes upon starting.
Operators can define sets of inputs and outputs which, depending on the Clara pipeline api-version
used may or not specify type information about the inputs and outputs.
In
api-version
s0.4.0
and below operators may not specify type information about their inputs and outputs, and as a result operators may only exchange data through disk I/O (volumes mounted as local directories in the container).In
api-version
0.5.0
and above operators must specify type information for inputs and outputs which allows data exchange between operators through both shared memory and disk I/O. Type information allows for pre-runtime validation of data compatibility between operators, the automatic selection of the channel of communication between operators (shared memory or disk), and scoping and access of data exchanged through shared memory.
NOTE: This functionality is only available with Clara orchestration (not Argo).
When operators exchange data via disk I/O, data is loaded in a volume inside the container image where it is accessible by the user’s application. To the application these mounted volumes appear as locally accessible directories. Alternatively, when type information is made available in operator inputs and outputs, the Clara orchestrator has the ability to choose either shared memory or a mounted volume to access the data. Before going into the details of what types are assigned to shared memory or disk, let us first review the types supported for inputs and outputs in the operator definition.
Clara pipeline operators can request a Triton Inference Server be made available to the operator and preloaded with a set of inference models. Requesting a preloaded Triton server instance is as easy as providing a list of inference models, by name, using reference/models.md.
NOTE: Preloaded models must be available via the Clara Deploy SDK Platform Model Repository. Any attempt to create or start a pipeline-job which references a model which is not contained in the Model Repository will result in an error.
Clara I/O types can be divided into two groups:primitives and structures. Albeit implementational, Clara primitives are only short-hand representations of structures (more on this below).
There are 3 Clara structures at present: array
, string
, and stream
.
array
is used for shared memory I/O; it indicates to the Clara orchestrator that the input (or output) should be allocated into shared memory.array
requiresan
element-type
which determines the type of each element in the array and must be a Clara primitive (see table below),and a
shape
which determines the number of dimensions and size of the array.
string
is syntactic sugar forarray<uint8>[-1]
which is a short-hand notation for “a dynamic array of bytes” (-1
is used to indicate arrays resizable at runtime).stream
is used for disk I/O; it indicates to the Clara orchestrator that the input (or output) is mounted as a local directory from disk.stream
requireselement-type
attribute, which is free-form. The user may enter anything as anelement-type
that describes the meaning of the input or output when the input is astream
, but if this is tied to be an input from another operator to to output to another operator theelement-type
s must match.stream
requires apath
attribute, which is a locally mounted volume with the data to be read and/or to be written.
Clara primitives are also syntactic sugar for various array
representations, all of which use shared memory allocations.
Clara Primitive |
Logical Interpretation |
|
---|---|---|
|
8-bit unsigned integer |
|
|
16-bit unsigned integer |
|
|
32-bit unsigned integer |
|
|
64-bit unsigned integer |
|
|
8-bit unsigned integer |
|
|
16-bit signed integer |
|
|
32-bit signed integer |
|
|
64-bit signed integer |
|
|
16-bit floating-point |
|
|
32-bit floating-point |
|
|
64-bit floating-point |
|
So far we’ve covered the concepts of typed and untyped operator inputs and outputs, and Clara types and their properties. Now, let us explore the structure of an operator declaration. Clara allows operators to be bound (declared within a pipeline) or unbound (declared independently and inported into a pipeline at a later point).
8.6.1. Disk-based I/O
Let us examine the unbound operator below.
name: i-copy-data
container:
image: ubuntu
tag: 18.04
command: ['sh', '-c', 'cp', '-r', '/input', '/output' ]
input:
- path: /input
type: stream # type `stream` requires `path`
element-type: dicom # anything is acceptable if type is stream, however, during pipeline validation the output `element-type` of the upstream operator must match this one
output:
- name: my-output
path: /output
type: stream
element-type: dicom # anything is acceptable if type is stream, however, during pipeline validation the input type of all downstream operator must match this one
Let us examine the operator.
The
/input
folder is a Clara-managed folder made available to the operator’s container because of the operator’s input declaration. The path property will map the input payload to the operator container’s/input
folder as a read-only mount. The container execution can then make use of the contents of the folder in any way its author sees fit.Next, notice that the
/output
folder is declared as part of the operator’s output property. This means that it too is Clara-managed storage, and the folder with be mapped as a writable mount. This path can be loaded by other operators to read from using itsname
property,my-output
.
Note that, if the pipeline defintion is using api-version
0.4.0
and below, the fields type
and element-type
will be ignored. These are mandatory only for pipeline api-version
0.5.0
and above. For clarity, when using api-version
0.4.0
the operator could be declared as
name: i-copy-data
container:
image: ubuntu
tag: 18.04
command: ['sh', '-c', 'cp', '-r', '/input', '/output' ]
input:
- path: /input
output:
- name: my-output
path: /output
8.6.2. Shared Memory I/O
To avoid having an I/O bound pipeline where data communication between operators is the bottleneck, one can use shared memory as a medium of communication. Shared memory communication between operators is available through the Clara orchestration engine as of pipeline api-version
0.5.0
.
Let us examine the following operator definition.
name: dicom-to-array
container:
image: my-container
tag: 0.1.0
command: ['sh', '-c', 'python', '-u', 'converter.py', '/input' ]
input:
- path: /input
type: stream
element-type: dicom
output:
- name: output-array
type: array
element-type: float32
shape: [256, 256, 128]
Here, it is assumed that a converter.py
utility has been developed by the user and containerized in an image tagged my-converter:0.1.0
with no predefined entrypoint (therefore a command
attribute has to be specified for the container to perform a function).
The container expects a series of DICOM files in the one input which is mapped in the container in the
/input
directory.The container outputs one item named
output-array
, which is a 3-dimensional array of shape(256, 256, 128)
of where each element is a single-precision floating-point number. The container image effectively reads a series of DICOM, and contructs a 3D array representing the volume containing the intensity values in single-precision floating-point in shared memory. The latter can then be read by any operator that reads the outputoutput-array
ofdicom-to-array
(see Pipelines with Typed Operators or Operator Reuse Across Pipelines).
So far, we have shown only examples of unbound operators, which by themselves can perform no function. Operators must be either imported into pipelines when declared as unbound (i.e. in their own yaml
file) or declared within the pipeline definition. It is recommended that operators be declared unbound and imported into pipelines so that they may be reused without having to be declared anew in each pipeline. For a detailed overview of the relatiionship between pipelines and operators please review the Pipeline documentation.
One important aspect of operators is the slight but sigificant differences in their design depending on the orchestration engine used when executing the pipeline. The next section highlights the high level design of operators designed for Argo orchestration and those designed for Clara orchestration.
Operators development follows a different paradigm based on which orchestration engine is used.
When using Argo, operators can be as simple as a Bash or Python script; there are no specific libraries that need to be used to make things work. However, Argo will start operators lazily, incurring an overhead of approximately 2 seconds per operator. This is because Argo assigns one operator per Kubernetes pod, and the cost of starting up a pod is about 2 seconds depending on the system it is run.
Clara orchestration requires the use of the Clara libraries, currently supporting C++ or Python. The Clara orchestration engine loads pipelines lazily, but operators greedily, thereby incurring a cost of about 2 seconds to start a pipeline. This is because the Clara orchestrator starts all operators in the same Kubernetes pod. For these reasons, it is recommended that Argo orchestration be used during operator development and integration testing, but Clara orchestration be used for more performance-demanding applications.
The following examples show minimal operators following each orchestrator’s programming paradigm.
Argo orchestrator-compatible operator.
import glob
input_path = '/input'
def execute():
printf(glob.glob(input_path + '/*.dcm'))
if __name__ == "__main__":
execute()
It is apparent that some information when using Argo has to be assumed, as is for instance the /input
path, which means that if the input path is changed at the level of the operator description (YAML) there is no utility allowing to retrieve this update from within the code.
It is possible to retrieve this information for environment variables passed by the Clara platform, for instance by querying
os.getenv(NVIDIA_CLARA_INPUTPATHS)
, however, this approach is not recommended since the developer’s code might not be compatible with any updates made to the structure ofNVIDIA_CLARA_INPUTPATHS
.
Clara orchestrator-compatible operator.
import glob
from clara import Driver, Payload, Error
from clara.logging import perf_logger
def execute(driver: Driver, payload: Payload):
input_path = None
for entry in payload.input_entries:
if entry.name == 'payload':
input_path = entry.path
if input_path:
printf(glob.glob(input_path + "/*.dcm"))
else:
raise Error("No input path provided.")
if __name__ == "__main__":
perf_logger.processing_started("Started")
driver = Driver(execute_handler=execute) # instantiate Clara orchestration driver with `execute` as callback
driver.start() # start execution thread
driver.wait_for_completion() # block until callback completes
perf_logger.processing_ended("Ended")
When using Clara orchestration, the user code is wrapped by the Clara Pipeline Driver (CPDriver) code. All operators are started at the same time in the same pod, but all operators with an upstream I/O dependency (whether shared memory or disk I/O) are blocked until all upstream operators complete. The callback function provided in execute_handler
is triggered once the all locks the operator is waiting on are released.
There are two parameters in the callback function: driver
and payload
.
driver
holds CPDriver metadata as dictated by Clara platform. For more information on thedriver
object please refer to the CPDriver documentation.payload
provides all payload information related to both disk-based and shared memory inputs and outputs. For more information on thepayload
object please refer to the Payloads documentation