8.4. NVIDIA Clara Pipeline Driver Guidance
The NVIDIA Clara Pipeline Driver, or cpd, is an essential piece of pipeline orchestration. The cpd is provided as a library to be included as part of your pipeline stage’s worker process.
Bind the library to your source code. In C or C++ this is easy as adding
libnvclara.so
to your MAKE file and includingclara.h
in your source code. If you’re using a language like Python, Java, Node.js, or C#, use that language’s method of binding compiled binaries:Python uses C-Types. For more information, see: Python Standard Library. Additionally, NVIDIA has provided an already generated Python library (see: Python APIs section).
Once your process has started, call
nvidia_clara_cpd__create
, and keep a handle on thecpd_out
value.Have your code do whatever it needs to do to get started, then have it wait on the callbacks from cpd.
When your process is ready, call
nvidia_clara_cpd__start
with the cpd pointer fromnvidia_clara_cpd__create
, and appropriate function pointers.The driver calls your provided functions in a specific event order:
prepare
,execute
,cleanup
.When the process has completed, call
nvidia_clara_cpd__release
with the reference you took fromnvidia_clara_cpd__create
.Optionally, your process can block the current thread and wait for callback completion to occur by calling
nvidia_clara_cpd__wait_for_completion
and passing in the cpd pointer fromnvidia_clara_cpd__create
. This blocks the calling thread until all other life-cycle events have completed.
Clara Pipeline Driver [cpd] provides life-cycle orchestration for your process. Once the cpd instance has been started, your process receives event callbacks in the following order:
Prepare
This is a pre-execute phase. By this point in the operator life-cycle all other pipeline stages have been initialized, and the creator of the job (pipeline instance) has supplied all required inputs.
Stage inputs are not yet available.
Execute
This is the execution phase where your process most likely does the majority of its work.
Once this phase completes, the next stage is told to execute. If this is the last stage in the pipeline, the job creator is notified that its job has completed.
Clean Up
This phase occurs after the execute phase has completed and the job has been notified that this stage is complete. Clean up resources that need to be cleaned up, freed, or released.
Do not delete your results as other stages or the job creator may need those.
example:
/* The example provided is for the Clara Pipeline Driver C Client. */
static const char data_path_name[] = "ct-dataset";
// Execute callback provided to cpd for handling execution.
int my_execute_callback(nvidia_clara_payload *payload) {
char *buffer = NULL;
size_t buffer_len = 4096;
data_t data;
size_t data_path_name_len = strlen(data_path_name);
nvidia_clara_payload_entry **input_entries = NULL;
result r = 0;
// Initialize our dataset.
initialize_data(&data);
// Get a list of payload entries from the payload.
r = nvidia_clara_payload__input_entries(payload, &input_entries);
if (r != 0) {
fprintf(stderr, "Failed to read payload input entries (%d)", r);
goto _exit;
}
// Allocate a buffer to use for finding the input we care about.
buffer = calloc(buffer_len, sizeof(char));
if (!buffer) {
fprintf(stderr, "fatal: out of memory");
r = 1;
goto _exit;
}
// Iterate through the input entries until we find the correct dataset. The list is `NULL` terminated.
for (int i = 0; input_entries[i]; i += 1) {
size_t len;
// Read the entry's name
r = nvidia_clara_payload_entry__name(input_entries[i], buffer, buffer_len, &len);
if (r != 0) {
// The read failed because our buffer was too small.
// Resize the buffer and try again.
buffer = realloc(buffer, len);
buffer_len = len;
i -= 1;
continue;
}
// Skip any input which does not match `data_path_name`.
if (strncmp(data_path_name, buffer, len) != 0)
continue;
// Found our entry, read the path from it.
r = nvidia_clara_payload_entry__path(input_entries[i], buffer, buffer_len, &len);
if (r != 0) {
// The read failed because our buffer was too small.
// Resize the buffer and try again.
buffer = realloc(buffer, len);
buffer_len = len;
i -= 1;
continue;
}
// Read the input content from `path` into `data`.
data_read(&data, path);
// Found our dataset, no need to keep looking.
break;
}
// Now to do the exciting bits...
do_inference(&data);
_exit:
cleanup_data(&data);
if (buffer) {
free(buffer);
}
return r;
}
void error_callback(int code, char *message, char *source, int is_fatal) {
const char _error[] = "error";
const char _fatal[] = "fatal";
const char _info[] = "info";
const char *prefix = NULL;
if (is_fatal) {
prefix = _fatal;
}
else if (code == 0) {
prefix = _info;
}
else {
prefix = _error;
}
printf("%s: [%s (%d)] %s\n", prefix, source, code, message);
}
// Entry-point for the application.
int main(int arc, char *argv[]) {
nvidia_clara_cpd *cpd;
// Create the pipeline driver instance and start the operator life-cycle.
if (nvidia_clara_cpd__create(error_callback, &cpd) == 0
&& nvidia_clara_cpd__start(cpd, NULL, my_execute_callback, NULL) == 0) {
// Block the current thread and wait for the pipeline driver to complete.
// The driver will call our execution callback when input is ready to consume.
nvidia_clara_cpd__wait_for_completion(cpd);
// Clean up our cpd allocation.
nvidia_clara_cpd__release(cpd);
// Return 0 (success!)
return 0;
}
// Return -1 (error)
return -1;
}
The NVIDIA Clara Pipeline Driver provides a query interface for discovering state about the current pipeline job and/or stage. The query interface is provided by nvidia_clara_info__read
with the nvidia_clara_info info_kind
parameter determining the type of information returned.
The API provides the following information:
Job Identifier
This is the unique identifier for the currently running pipeline job. Unique identifiers are 32-character hexadecimal strings which represent 128-bit values.
Job Name
This is the human-readable name given to the currently running pipeline job. This value is intended to provide an easy reference point for humans when looking at user interfaces or log files.
Stage Name
This is the human-readable name given to the currently running pipeline stage. This value is intended to provide an easy reference point for humans when looking at user interfaces or log files.
Stage Timeout
This is a string representing the number of seconds the currently running pipeline stage has been allocated for completion. The stage can be terminated if it exceeds this value. If this value is null or empty, then there is no assigned timeout.
example:
/* The example provided is for the Clara Pipeline Driver C Client. */
static const size_t INIT_BUFFER_SIZE = 4096;
static char *buffer = NULL;
static size_t buffer_len = 0;
// Query Clara for information about the current pipeline.
int query_clara(nvidia_clara_cpd *cpd, nvidia_clara_info info) {
size_t len = 0;
result r = 0;
// Ensure that a buffer for the information has been allocated.
if (!buffer) {
buffer = (char *)calloc(INIT_BUFFER_SIZE, sizeof(char));
if (!buffer) {
fprintf(stderr, "fatal: Out of memory");
r = -1;
goto _exit;
}
buffer_len = INIT_BUFFER_SIZE;
}
_query:
/* Query clara for the desired information. */
if ((r = nvidia_clara_info__read(cpd, info, buffer, buffer_len, &len)) != 0) {
// The function will error if the buffer is not large enough.
if (buffer < len>) {
// The buffer is insufficient, let's resize it and try again.
buffer = realloc(buffer, len);
if (!buffer) {
fprintf(stderr, "fatal: Out of memory");
goto _exit;
}
buffer_len = len;
goto _query;
}
// The buffer was large enough, the failure is something else.
goto _exit;
}
_exit:
return r;
}
// Entry-point for the application.
int main(int arc, char *argv[]) {
nvidia_clara_cpd *cpd = NULL;
result r = 0;
if ((r = nvidia_clara_cpd__create(NULL, &cpd)) != 0) {
fprintf(stderr, "fatal: Creating a Clara Pipeline Driver instance failed with error: %d\n", r);
return 1;
}
/* Query Clara to discover the unique identifier for the current job. */
if ((r = nvidia_clara_info__read(cpd, NVIDIA_CLARA_INFO_JOB_ID, buffer, buffer_len, &len)) != 0) {
fprintf(stderr, "Querying Clara for Job ID failed with error: %d\n", r);
}
printf("The current Job ID is %s.", buffer);
/* Query Clara to discover the name of the current job. */
if ((r = query_clara(cpd, NVIDIA_CLARA_INFO_JOB_NAME)) != 0) {
fprintf(stderr, "Querying Clara for Job Name failed with error: %d\n", r);
}
printf("The current Job Name is %s.", buffer);
/* Query Clara to discover the name of the current job stage. */
if ((r = query_clara(cpd, NVIDIA_CLARA_INFO_STAGE_NAME)) != 0) {
fprintf(stderr, "Querying Clara for Stage Name failed with error: %d\n", r);
}
printf("The current Stage Name is %s.", buffer);
/* Query Clara to discover how long the current job stage has to complete its work. */
if ((r = query_clara(cpd, NVIDIA_CLARA_INFO_STAGE_TIMEOUT)) != 0) {
fprintf(stderr, "Querying Clara for Stage Timeout failed with error: %d\n", r);
}
printf("The current Stage Timeout is %s seconds.", buffer);
_exit:
if (cpd) {
nvidia_clara_cpd__release(cpd);
}
if (buffer) {
free(buffer);
}
return 0;
}