Application Development Workflow#
GXF Applications, also called as graphs, can be created by programming them in python / C++ and also in a no-code environment using the Graph Composer tool. All of these approaches are described below.
Python Application Development#
The GXF framework provides a rich python API to create graphs with ease. The following section describes the workflow to create GXF applications in python.
Python API Setup#
Prerequisites (skip if already finished):
After installing the Graph Composer runtime debian, GXF Python core APIs at
/opt/nvidia/graph-composer/core
. See InstallationSync user required GXF extensions using tool
registry
. See Sync Extensions
Steps:
install user required GXF extensions
Eg, for frequently used extensions:
registry extn install -e StandardExtension registry extn install -e PythonCodeletExtension registry extn install -e MultimediaExtension registry extn install -e SampleExtensionThe extensions will be installed to default GXF workspace path at
/var/tmp/gxf/.cache/gxf_workspace
.1/var/tmp/gxf/.cache/gxf_workspace 2└── gxf 3 ├── core -> /opt/nvidia/graph-composer/core 4 ├── multimedia 5 ├── python_codelet 6 ├── sample 7 └── std
Add the workspace path to python path
Suggested practice:
Create a
gxf.pth
file at/home/$USER/.local/lib/python<VERSION>/site-packages/gxf.pth
, and add the GXF workspace path to this file.Example commands in python:
import os import site # /var/tmp/gxf/.cache/gxf_workspace default_extensions_path = get_default_extensions_path() # /home/<USER_NAME>/.local/lib/python<VERSION>/site-packages user_site_packages_dir = site.getusersitepackages() # Construct the path to the .pth file # /home/<USER_NAME>/.local/lib/python<VERSION>/site-packages/gxf.pth pth_file_path = os.path.join(user_site_packages_dir, "gxf.pth") # Write the path to the .pth file with open(pth_file_path, "w") as pth_file: pth_file.write(default_extensions_path) print(f"Created gxf.pth file at {pth_file_path} with the following content:") print(default_extensions_path)Note
Alternatively, the GXF workspace can be added to python path by updating the
PYTHONPATH
env variableexport PYTHONPATH=$PYTHONPATH:/var/tmp/gxf/.cache/gxf_workspace
ComputeEntity#
ComputeEntity automatically adds an entity in the graph with basic scheduling terms. A codelet implemented in c++ with python binding can be added to a ComputeEntity.
Steps to compose a graph:
Create a GXF graph object
Set clock and scheduler to the graph object
Add ComputeEntity with codelet
Connect the transmitter and receiver from the ComputeEntity
Graph to load extensions, run, and destroy
Example graph:
import os
from gxf.core import Graph
from gxf import std
from gxf.std import RealtimeClock
from gxf.std import GreedyScheduler
from gxf.std import ComputeEntity
from gxf.sample import PingTx
from gxf.sample import PingRx
def main():
g = Graph()
clock = std.set_clock(g, RealtimeClock(name='clock'))
std.set_scheduler(g, GreedyScheduler(
max_duration_ms=1000000, clock=clock))
g.add(ComputeEntity("tx", count=5)).add_codelet(PingTx(clock=clock))
g.add(ComputeEntity("rx")).add_codelet(PingRx())
std.connect(g.tx.signal, g.rx.signal)
g.load_extensions()
g.run()
g.destroy()
if __name__ == "__main__":
main()
PyComputeEntity#
PyComputeEntity automatically adds an entity in the graph with basic scheduling terms. Python codelet can be added in a PyComputeEntity.
Steps to compose a graph:
Implement a python codelet by inheriting CodeletAdapter
Create a GXF graph object
Set clock and scheduler to the graph object
Add PyComputeEntity with python codelet
Connect the transmitter and receiver from the ComputeEntity
Graph to load extensions, run, and destroy
Example graph with Python codelet implement:
import os
import numpy as np
from gxf.core import Graph
import gxf.std as std
from gxf.std import RealtimeClock
from gxf.std import GreedyScheduler
from gxf.python_codelet import CodeletAdapter
from gxf.core import MessageEntity
from gxf.python_codelet import PyComputeEntity
from gxf.std import Tensor
# implement a GXF Codelet
class PyPingTx(CodeletAdapter):
def __init__(self, some_param="what"):
super().__init__()
self.txs = ["tx"]
self.some_param = some_param
def start(self):
self.params = self.get_params()
self.count = 1
pass
def tick(self):
msg = MessageEntity(self.context())
t = Tensor(np.array([1+self.count, 2+self.count, 3+self.count]))
Tensor.add_to_entity(msg, t)
self.tx.publish(msg, 1)
print("PyPingTx " + self.name() + ": Message Sent: " + str(self.count))
self.count+=1
return
def stop(self):
pass
# implement a GXF Codelet
class PyPingRx(CodeletAdapter):
def __init__(self):
super().__init__()
self.rxs = ["input"]
def start(self):
self.count = 1
pass
def tick(self):
msg = self.input.receive()
t = Tensor.get_from_entity(msg)
assert(np.array_equal(np.array(t), [1+self.count, 2+self.count, 3+self.count]))
print("PyPingRx " + self.name() + ": Message Received: " + str(self.count))
self.count +=1
return
def stop(self):
return
def main():
g = Graph()
clock = std.set_clock(g, RealtimeClock(name='clock'))
std.set_scheduler(g, GreedyScheduler(max_duration_ms=1000000, clock=clock))
ptx = g.add(PyComputeEntity("PingTx", count=5))
ptx.add_codelet("somename", PyPingTx(some_param="some_value"))
prx = g.add(PyComputeEntity("PingRx", count = 5))
prx.add_codelet("codelet", PyPingRx())
std.connect(g.PingTx.tx, prx.input)
g.load_extensions()
g.run()
g.destroy()
if __name__ == "__main__":
main()
Tensor zero-copy with other frameworks#
GXF supports zero-copy tensor data exchange between different deep learning frameworks and libraries, like NumPy, CuPy, PyTorch, Tensorflow, etc.
see Tensor for API details
From other frameworks to GXF#
import numpy as np
import cupy as cp
np_tensor = np.random.rand(3, 4)
cp_tensor = cp.random.rand(3, 4)
from gxf.std import Tensor
gxf_tensor_host = Tensor.as_tensor(np_tensor)
gxf_tensor_cuda = Tensor.as_tensor(cp_tensor)
Then the tensor data can travel in GXF graph wrapped by a message entity, eg gxf_msg_out in below example
Tensor.add_to_entity(gxf_msg_out, gxf_tensor_host, "out_tensor_on_host")
Tensor.add_to_entity(gxf_msg_out, gxf_tensor_cuda, "out_tensor_on_cuda")
From GXF to other frameworks#
NumPy / CuPy asarray()
np_tensor = np.asarray(gxf_tensor_host)
cp_tensor = cp.asarray(gxf_tensor_cuda)
NumPy / CuPy from_dlpack()
np_tensor = np.from_dlpack(gxf_tensor_host)
cp_tensor = cp.from_dlpack(gxf_tensor_cuda)
Example Application#
import os
import cupy as cp
import numpy as np
import gxf.std as std
from gxf.core import (
Graph,
MessageEntity
)
from gxf.std import (
Entity,
GPUDevice,
GreedyScheduler,
RealtimeClock,
Tensor,
)
from gxf.python_codelet import (
PyComputeEntity,
CodeletAdapter,
)
class TensorPairGenerator(CodeletAdapter):
"""Python codelet to generate a stream of tensors on tick()
Transmitter:
* host_out
Transmits one message containing one pair of GXF Tensors zero-copied from NumPy, on every tick()
* cuda_out
Transmits one message containing one pair of GXF Tensors zero-copied from CuPy, on every tick()
"""
def __init__(self):
super().__init__()
self.txs = ["host_out", "cuda_out"]
def start(self):
self.params = self.get_params()
rows = self.params.get("rows", 16)
cols = self.params.get("cols", 64)
self.shape = (rows, cols)
self.dtype = np.float32
return
def tick(self):
gxf_msg_out_cuda = MessageEntity(self.context())
gxf_msg_out_host = MessageEntity(self.context())
# Initialize a pair of tensors using NumPy
np1 = np.arange(self.shape[0] * self.shape[1], dtype=self.dtype).reshape(self.shape)
np2 = np.ascontiguousarray(np1.transpose())
# Convert NumPy tensors to GXF tensors
# GXF implements zero-copy from NumPy tensor
for i, arr in enumerate([np1, np2]):
gxf_tensor = Tensor.as_tensor(arr)
Tensor.add_to_entity(gxf_msg_out_host, gxf_tensor, f"host_tensor{i + 1}")
# Initialize a pair of cuda tensors using CuPy
cp1 = cp.asarray(np1)
cp2 = cp.asarray(np2)
# Convert CuPy tensors to GXF tensors
# GXF implements zero-copy from CuPy tensor, via gxf.Tensor as_tensor() and from_dlpack()
for i, dev_arr in enumerate([cp1, cp2]):
gxf_tensor_cuda = Tensor.as_tensor(dev_arr)
Tensor.add_to_entity(gxf_msg_out_cuda, gxf_tensor_cuda, f"cuda_tensor{i + 1}")
self.host_out.publish(gxf_msg_out_host)
self.cuda_out.publish(gxf_msg_out_cuda)
return
def stop(self):
pass
class DotProduct(CodeletAdapter):
"""Python codelet to do dot product of a pair of tensors.
Receiver:
* rx
Receives one message containing one pair of GXF Tensors zero-copied from NumPy or CuPy, on every tick()
Transmitter:
* tx
Transmits one message containing one GXF Tensor zero-copied from NumPy or CuPy, on every tick()
The `device` parameter can be set to either 'cpu' or 'gpu'.
"""
def __init__(self):
super().__init__()
self.txs = ["tx"]
self.rxs = ["rx"]
def start(self):
self.params = self.get_params()
# use NumPy or CuPy based on the 'device' parameter
device = self.params.get("device", "cpu")
if (not isinstance(device, str) or device.lower() not in ["cpu", "gpu"]):
raise ValueError("device parameter must be one of {'cpu', 'gpu'}")
self.xp = cp if device == "gpu" else np
def tick(self):
xp = self.xp
gxf_msg_in = self.rx.receive()
# Get GXF tensors from GXF graph
gxf_tensors = Tensor.find_all_from_entity(gxf_msg_in)
np.testing.assert_equal(len(gxf_tensors), 2)
# Convert GXF tensor to NumPy / CuPy tensor
# GXF implements zero-copy interface to NumPy / CuPy tensor, via NumPy / CuPy asarray() or from_dlpack()
xp_tensor_in0 = xp.asarray(gxf_tensors[0])
xp_tensor_in1 = xp.asarray(gxf_tensors[1])
# NumPy / CuPy libraries to process NumPy / CuPy tensors
xp_tensor_out = xp.dot(xp_tensor_in0, xp_tensor_in1)
gxf_msg_out = MessageEntity(self.context())
# Convert NumPy / CuPy tensor back to GXF Graph
# GXF implements zero-copy from NumPy / CuPy tensor, via gxf.Tensor as_tensor() and from_dlpack()
gxf_tensor = Tensor.as_tensor(xp_tensor_out)
Tensor.add_to_entity(gxf_msg_out, gxf_tensor, "")
self.tx.publish(gxf_msg_out)
def stop(self):
pass
class VerifyEqual(CodeletAdapter):
"""Python codelet to compare a GPU tensor to a CPU tensor to within a tolerance.
Receiver:
* host_in
Receives one message containing one GXF Tensor zero-copied from NumPy, on every tick()
* cuda_in
Receives one message containing one GXF Tensor zero-copied from CuPy, on every tick()
"""
def __init__(self):
super().__init__()
self.rxs = ["host_in", "cuda_in"]
def start(self):
self.params = self.get_params()
def tick(self):
gxf_msg_in_host = self.host_in.receive()
gxf_msg_in_cuda = self.cuda_in.receive()
# Get GXF tensors from GXF graph
gxf_tensors_cuda = Tensor.find_all_from_entity(gxf_msg_in_cuda)
np.testing.assert_equal(len(gxf_tensors_cuda), 1)
gxf_tensors_host = Tensor.find_all_from_entity(gxf_msg_in_host)
np.testing.assert_equal(len(gxf_tensors_host), 1)
# Convert GXF tensor to NumPy / CuPy tensor
# GXF implements zero-copy interface to NumPy / CuPy tensor, via NumPy / CuPy asarray() or from_dlpack()
cp_tensor = cp.asarray(gxf_tensors_cuda[0])
np_tensor = np.asarray(gxf_tensors_host[0])
# NumPy / CuPy libraries to process NumPy / CuPy tensors
# Check if the two arrays are element-wise equal within a tolerance
cp.testing.assert_allclose(cp_tensor, np_tensor, rtol=1e-5)
return
def stop(self):
pass
class DLPackSimpleApp:
def run(self, count=20):
g = Graph()
clock = std.set_clock(g, RealtimeClock(name="clock"))
std.set_scheduler(g, GreedyScheduler(max_duration_ms=1000000, clock=clock))
std.enable_job_statistics(g, clock=clock)
# create the tensor generator entity
source_entity = g.add(PyComputeEntity("TensorPairGenerator", count=count))
source_entity.add_codelet(
"tensor_pair_generator",
TensorPairGenerator(),
# Codelet own params
rows=16,
cols=64,
)
# create the host matrix multiply entity
process_entity_host = g.add(PyComputeEntity("DotProductHost", count=-1))
process_entity_host.add_codelet(
"host_dot_product",
DotProduct(),
# Codelet own params
device="cpu",
)
# create the device matrix multiply entity
process_entity_cuda = g.add(PyComputeEntity("DotProductCuda", count=-1))
process_entity_cuda.add_codelet(
"cuda_dot_product",
DotProduct(),
# Codelet own params
device="gpu",
)
# create the tensor verification entity
sink_entity = g.add(PyComputeEntity("VerifyEqual", count=-1))
sink_entity.add_codelet(
"verify_equal",
VerifyEqual(),
)
std.connect(source_entity.host_out, process_entity_host.rx)
std.connect(source_entity.cuda_out, process_entity_cuda.rx)
std.connect(process_entity_host.tx, sink_entity.host_in)
std.connect(process_entity_cuda.tx, sink_entity.cuda_in)
# add a GPUDevice for use by the default entity group
g.add(Entity("GPU_0")).add(GPUDevice(name="GPU_0", dev_id=0))
g.load_extensions(workspace=self._get_default_extensions_path())
g.run_async()
g.wait()
g.destroy()
# default gxf extension path base, /var/tmp/gxf/.cache/gxf_workspace
def _get_default_extensions_path(self):
home_dir = os.path.expanduser("~")
return os.path.join(home_dir, ".cache", "gxf_workspace")
if __name__ == "__main__":
app = DLPackSimpleApp()
app.run()
UCX Transmit / Receive#
Transmit and receive data on remote host or device. Eg, System 1 transmit Tensor with device data to remote system 2.
Note
entity UCXSource, UCXSink, and UCX are referred in the example app
future App API wraps UCXSource, UCXSink, UCX entity implementation, and exposes more concise higher level APIs for distributed execution
Graph with UCX Tx#
Remote process 1 sending tensor with device data, via UCX Transmitter.
Local data with memory on host or device is transmitted via UCX context, UCX Transmitter to remote host or device.
Below comopnents are needed on top of a regular graph
UCX entity containing UCX context
UCXSink entity containing UCX Transmitter
class PyPingTx(CodeletAdapter):
""" Python codelet to send a msg on tick()
Python implementation of Ping Tx.
Sends a message to the transmitter on every tick()
"""
def __init__(self):
super().__init__()
self.txs = ["tx"]
def start(self):
self.params = self.get_params()
self.allocator = Allocator.get(self.context(), self.cid(), self.params["allocator"])
self.shape = self.params.get("shape", [1, 2])
def tick(self):
msg = MessageEntity(self.context())
# add ones tensor allocated on cupy
cp_tensor = cp.ones(self.shape)
gxf_tensor = Tensor.as_tensor(cp_tensor)
Tensor.add_to_entity(msg, gxf_tensor, "ones_tensor")
# add uninitialized tensor allocated by gxf allocator
tensor_on_allocator = Tensor.add_to_entity(msg, "zeros_tensor")
td = TensorDescription(
name="zeros_tensor",
storage_type=MemoryStorageType.kDevice,
shape=Shape(self.shape),
element_type=PrimitiveType.kFloat32,
bytes_per_element=4
)
tensor_on_allocator.reshape(td, self.allocator)
self.tx.publish(msg, 1)
return
def stop(self):
pass
def run_ucx_tx_graph():
g = Graph()
g.set_severity(logging.DEBUG)
clock = std.set_clock(g, RealtimeClock(name='clock'))
std.enable_job_statistics(g, clock=clock)
std.set_scheduler(g, GreedyScheduler(
max_duration_ms=5000, clock=clock, stop_on_deadlock=False))
g.add(Entity("mem_pool")).add(
BlockMemoryPool(
"device_image_pool",
storage_type=1,
block_size=1920 * 1080 * 4,
num_blocks=150,
)
)
# g.add(UCX("ucx", allocator=g.mem_pool.device_image_pool))
g.add(UCX("ucx"))
g.add(PyComputeEntity("PingTx", count=5)).add_codelet(
"pingtx",
PyPingTx(),
allocator=g.mem_pool.device_image_pool,
shape=[2, 3],
)
g.add(UCXSink("sink", count=-1, address="localhost", port=13338))
std.connect(g.PingTx.tx, g.sink.input)
g.load_extensions()
g.run()
g.destroy()
if __name__ == '__main__':
run_ucx_tx_graph()
Graph with UCX Rx#
Remote process 2 receiving tensor with device data, via UCX Receiver.
UCX context, UCX Receiver receives data onto local host or device from remote host or device.
Below comopnents are needed on top of a regular graph
UCX entity containing UCX context
UCXSource entity containing UCX Receiver
class PyPingRx(CodeletAdapter):
""" Python codelet to send a msg on tick()
Python implementation of Ping Tx.
Sends a message to the transmitter on every tick()
"""
def __init__(self):
super().__init__()
self.rxs = ["rx"]
def start(self):
self.params = self.get_params()
self.shape_expected = self.params.get("shape_expected", [1, 2])
self.expected_ones = cp.ones(self.shape_expected)
self.expected_zeros = cp.zeros(self.shape_expected)
def tick(self):
msg = self.rx.receive()
ones_tensor = Tensor.get_from_entity(msg, "ones_tensor")
actual_ones = cp.asarray(ones_tensor)
cp.testing.assert_allclose(actual_ones, self.expected_ones, rtol=1e-5)
print("Correctly received tensor from remote CuPy over UCX:\n" + str(actual_ones))
zeros_tensor = Tensor.get_from_entity(msg, "zeros_tensor")
actual_zeros = cp.asarray(zeros_tensor)
cp.testing.assert_allclose(actual_zeros, self.expected_zeros, rtol=1e-5)
print("Correctly received tensor from remote Allocator over UCX:\n" + str(actual_zeros))
return
def stop(self):
return
def run_ucx_rx_graph():
g = Graph()
g.set_severity(logging.DEBUG)
clock = std.set_clock(g, RealtimeClock(name="clock"))
std.enable_job_statistics(g, clock=clock)
std.set_scheduler(g, GreedyScheduler(
max_duration_ms=5000, clock=clock, stop_on_deadlock=False))
g.add(Entity("mem_pool")).add(
BlockMemoryPool(
"device_image_pool",
storage_type=1,
block_size=1920 * 1080 * 4,
num_blocks=150,
)
)
g.add(UCX("ucx", allocator=g.mem_pool.device_image_pool))
g.add(UCXSource("source", address="localhost", port=13338))
g.add(PyComputeEntity("PingRx", count=-1)).add_codelet(
"pingrx",
PyPingRx(),
allocator=g.mem_pool.device_image_pool,
shape_expected=[2, 3],
)
std.connect(g.source.output, g.PingRx.rx)
g.load_extensions()
g.run()
g.destroy()
if __name__ == '__main__':
run_ucx_rx_graph()
UCXSource, UCXSink, and context implementation for reference#
from gxf.serialization import Endpoint
from gxf.multimedia import VideoBuffer
from gxf.core import Entity
from gxf.std import Forward
from gxf.std import DoubleBufferTransmitter, DoubleBufferReceiver
from gxf.std import DownstreamReceptiveSchedulingTerm, CountSchedulingTerm
from gxf.std import UnboundedAllocator
from gxf.std import MessageAvailableSchedulingTerm
class UCXSource(Entity):
"""UCXSource Entity containing all the requied components to receive data on ucx address:port"""
def __init__(self, name, address="0.0.0.0", port=13337, count=-1, capacity=1, min_message_reception=1,
allocator_type=None, **kwargs):
super().__init__(name, True)
self._address = address
self._port = port
self._count = count
self._capacity = capacity
self._min_message_reception = min_message_reception
self._allocator_type = allocator_type
self._kwargs = kwargs
self.add(UnboundedAllocator(name="allocator"))
self.add(UcxSerializationBuffer(
name="serialization_buffer", allocator=self.allocator))
self.add(UcxReceiver(name="input", port=self._port, address=self._address,
buffer=self.serialization_buffer))
self.add(MessageAvailableSchedulingTerm(name='mast', receiver=self.input,
min_size=min_message_reception))
self.add(DoubleBufferTransmitter(name="output", capacity=capacity))
# 'in' is a keyword in python. can't access as an attribute
self.add(Forward(name="forward"))
self.forward._params["in"] = self.input
self.forward._params["out"] = self.output
self.add(DownstreamReceptiveSchedulingTerm(name='drst', transmitter=self.output,
min_size=min_message_reception))
if count >= 0:
self.add(CountSchedulingTerm(name="cst", count=self.count))
class UCXSink(Entity):
"""UCXSink Entity containing all the required components to push data on a ucx address:port"""
def __init__(self, name, address="0.0.0.0", port=13337, count=-1, capacity=1, min_message_available=1,
allocator_type=None, **kwargs):
super().__init__(name, True)
self._address = address
self._port = port
self._count = count
self._capacity = capacity
self._min_message_available = min_message_available
self._allocator_type = allocator_type
self._kwargs = kwargs
self.add(UnboundedAllocator(name="allocator"))
self.add(UcxSerializationBuffer(
name="serialization_buffer", allocator=self.allocator))
self.add(UcxTransmitter(name="output", port=self._port,
buffer=self.serialization_buffer, receiver_address=self._address))
self.add(DoubleBufferReceiver(name="input", capacity=capacity))
# in is a keyword in python. can't access as an attribute
self.add(Forward(name="forward"))
self.forward._params["in"] = self.input
self.forward._params["out"] = self.output
self.add(MessageAvailableSchedulingTerm(name='mast', receiver=self.input,
min_size=min_message_available))
if count >= 0:
self.add(CountSchedulingTerm(name="cst", count=self._count))
class UCX(Entity):
"""UCX Entity requied to add UCXSource and UCXSink"""
def __init__(self, name, allocator=None):
super().__init__(name, True)
if not allocator:
allocator = self.add(UnboundedAllocator(name="allocator"))
self.add(UcxComponentSerializer(
name="component_serializer", allocator=allocator))
self.add(UcxEntitySerializer(name="entity_serializer",
component_serializers=[self.component_serializer]))
self.add(UcxContext(name="ucx_context",
serializer=self.entity_serializer))
Note
On top of existing Python API, Python App API will be available in future release.
Python App API will be similar to C++ App API
[Coming Soon] More sample applications will be available in GXF github repo https://github.com/NVIDIA-AI-IOT/deepstream_gxf_ref_apps
C++ Application Development#
The traditional method of creating GXF applications via YAML documents included composing entities in a yaml file which was executed by the
GXE runtime along with an extension manifest. The GXF Application layer simplifies this process by allowing a user to create applications programmatically.
The implementation of this layer is compiled in a shared dynamic library libgxf_app.so
and packaged in the GXF runtime debian installers.
See GXF App C++ APIs for the API reference of the C++ application layer.
The basic building blocks of this layer include Graph Entity, Segment & Application.
Graph Entity#
A GXF graph consists of one or more entities composed of multiple components. These components typically include a codelet along with other components
like resources, scheduling terms and messages queues (transmitters and receivers). The nvidia::gxf::GraphEntity
class simplifies the process of
creating and managing programmable graph entities in an application.
Graph Entities in an application are created using the nvidia::gxf::Segment::makeEntity<T>(...)
api. This is a templated api that accepts a parameter pack of arguments. The template
type is used to specify the type of codelet to be added in the entity and the parameter pack is used to specify the rest of the components to be added and also the
arguments for the codelet itself.
Here is an example of an entity containing a PingTx codelet represented in a yaml
The example below creates a simple graph entity consisting of the nvidia::gxf::PingTx
codelet.
// create a codelet to generate 10 messages
auto tx_entity = makeEntity<PingTx>("Tx", makeTerm<CountSchedulingTerm>("count", Arg("count", 10)));
Scheduling terms are created with the makeTerm api and a list of parameter values are passed in the form of Args.
// create an entity to copy messages from device to host
auto copier = makeEntity<TensorCopier>("Copier", Arg("allocator") = makeResource<UnboundedAllocator>("allocator"), Arg("mode", 1));
Resources like memory pools and allocators can be created with the makeResource
api. In this example the TensorCopier codelet is being configured
with an UnboundedAllocator.
The transmitter and receiver queues of an entity are not specified while creating an graph entity. These components are auto added by the connect
apis when
an entity is being connected with another. The scheduling terms related to the transmitter and receiver are also added automatically by the connect
apis.
Segment#
The segment interface is an individual runtime context consisting of one or more graph entities. Each segment has its own scheduler
and its corresponding clock component. The Segment::Compose()
api is a virtual function used to create the entities of a segment and customize
their properties. The Segment layer provides multiple api’s to make the process of creating graph entities simpler.
A complex distributed application can consist of multiple segments working with each other. The intra segment connections between entities of a segment make use of double buffer transmitters and receivers and all inter segment connections make of the UCX transmitters and receivers.
Connecting Graph Entities#
The simplest connection api accepts only the source and destination graph entities. For such entities, it is assumed that their interfaces are comparable.
i.e The sender entity must have a codelet with only one Parameter<Handle<Transmitter>>
and the receiving entity must have a codelet with only one registered
Parameter<Handle<Receiver>>
. The scheduling terms related to the transmitter and receiver are also added automatically by the connect
apis. The names of
auto added message queues and scheduling terms are the same as the corresponding parameter key of the codelet to which it is connected to.
// create a codelet to generate 10 messages
auto tx_entity = makeEntity<PingTx>("Tx", makeTerm<CountSchedulingTerm>("count", Arg("count", 10)));
// create a codelet to receive the messages
auto rx_entity = makeEntity<PingRx>("Rx");
// add data flow connection tx -> rx
connect(tx_entity, rx_entity);
For entities with codelets having multiple inputs and outputs, the connection mapping can be specified in the connect api.
void compose() override {
// create a codelet to generate 10 messages
auto left_tx = makeEntity<PingTx>(
"Left Tx", makeTerm<PeriodicSchedulingTerm>("periodic", Arg("recess_period", "50Hz")),
makeTerm<CountSchedulingTerm>("count", Arg("count", 100)));
auto right_tx = makeEntity<PingTx>(
"Right Tx", makeTerm<PeriodicSchedulingTerm>("periodic", Arg("recess_period", "50Hz")),
makeTerm<CountSchedulingTerm>("count", Arg("count", 100)));
// create a codelet to receive the messages
auto multi_ping_rx = makeEntity<MultiPingRx>("Multi Rx");
// add data flow connection tx -> rx
connect(left_tx, multi_ping_rx, PortPair{"signal", "receivers"});
connect(right_tx, multi_ping_rx, PortPair{"signal", "receivers"});
}
Application#
The Application class is a scaffolding layer to create applications imperatively in GXF. It provides a virtual compose() function where individual building blocks of an application can be constructed, configured, and connected with each other. The Application class also provides functions to set the configuration file, load extensions, create segments, and run the application.
The application class extends the Segment class and provides all the benefits of the segment apis in the application layer as well.
In its most simplest form, an application does not have any segments. It has multiple graph entities governed by a scheduler. The following example creates two graph entities and connects them with each other.
class PingSimpleApp : public Application {
public:
void compose() override {
// create a codelet to generate 10 messages
auto tx_entity = makeEntity<PingTx>("Tx", makeTerm<CountSchedulingTerm>("count", Arg("count", 10)));
// create a codelet to receive the messages
auto rx_entity = makeEntity<PingRx>("Rx");
// add data flow connection tx -> rx
connect(tx_entity, rx_entity);
// configure the scheduler
setScheduler(SchedulerType::kGreedy);
}
};
int main(int argc, char** argv) {
auto app = create_app<PingSimpleApp>();
app->loadExtensionManifest(kManifestFilename);
app->compose();
auto result = app->run();
return ToResultCode(result);
}
A manifest file in the above example is a YAML file with a single top-level entry ‘extensions’ followed by a list of filenames of GXF extension shared libraries.
Example:
extensions: - gxf/std/libgxf_std.so - gxf/sample/libgxf_sample.so
The Application class supports different modes of execution, which are represented by the ExecutionMode
enum.
The different modes are:
kUnset
: The default mode, which is unset.kSingleSegment
: This mode is used when the application contains only one segment.kMultiSegment
: This mode is used when the application contains multiple segments being executed in parallel in a single process.kDistributed
: This mode is used when the application is distributed across multiple nodes, and the segments are executed in parallel on different nodes.Large applications with complex execution patterns can be broken down into a application containing multiple segments. The example below refactors the single segment application into a multi segment application. Individual graph entities are composed in the segment, and those segments are composed and configured in the application.
class PingTxSegment : public Segment {
public:
void compose() override {
// create a codelet to generate 10 messages
auto tx_entity = makeEntity<PingTx>("Tx", makeTerm<CountSchedulingTerm>("count", Arg("count", 10)));
// add a scheduler component and configure the clock
auto scheduler = setScheduler<Greedy>(Arg("stop_on_deadlock", false), Arg("max_duration_ms", 5000));
}
};
class PingRxSegment : public Segment {
public:
void compose() override {
// create a codelet to receive the messages
auto rx_entity = makeEntity<PingRx>("Rx");
// add a scheduler component and configure the clock
auto scheduler = setScheduler<Greedy>(Arg("max_duration_ms", 5000), Arg("stop_on_deadlock", false));
}
};
class PingSegmentApp : public Application {
public:
void compose() override {
auto tx_segment = createSegment<PingTxSegment>("TxSegment");
auto rx_segment = createSegment<PingRxSegment>("RxSegment");
// add data flow connection tx -> rx
connect(tx_segment, rx_segment, {SegmentPortPair("Tx.signal", "Rx.signal")});
}
};
int main(int argc, char** argv) {
auto app = create_app<PingSegmentApp>();
app->loadExtensionManifest(kManifestFilename);
app->compose();
auto result = app->run();
return ToResultCode(result);
}
Runtime Codelet Registration#
The application layer further simplifies the process of creating apps by enabling runtime registration of codelets. All the prior examples made use of pre-defined codelets from the sample extension which was pre-loaded by the application.
The application layer also has support for registering new codelets on the fly as shown in the following example. A new codelet
PingTxRuntime
is defined along with the sample application it is used in. The new codelet does not have to be wrapped in an
extension library to be used in the application.
class PingTxRuntime : public Codelet {
public:
virtual ~PingTxRuntime() = default;
gxf_result_t registerInterface(Registrar* registrar) override {
Expected<void> result;
result &= registrar->parameter(signal_, "signal", "Signal", "Transmitter channel publishing messages to other graph entities");
return ToResultCode(result);
}
gxf_result_t tick() override {
auto message = Entity::New(context());
if (!message) {
GXF_LOG_ERROR("Failure creating message entity.");
return message.error();
}
auto result = signal_->publish(message.value());
GXF_LOG_INFO("Message Sent: %d", this->count);
this->count = this->count + 1;
return ToResultCode(message);
}
private:
Parameter<Handle<Transmitter>> signal_;
int count = 1;
};
class PingRuntimeApp : public Application {
public:
void compose() override {
// create a codelet to generate 10 messages
auto tx_entity = makeEntity<PingTxRuntime>("Tx", makeTerm<CountSchedulingTerm>("count", Arg("count", 10)));
// create a codelet to receive the messages
auto rx_entity = makeEntity<PingRx>("Rx");
// add data flow connection tx -> rx
connect(tx_entity, rx_entity);
// configure the scheduler component
setScheduler(SchedulerType::kGreedy);
}
};
int main(int argc, char** argv) {
auto app = create_app<PingRuntimeApp>();
app->loadExtensionManifest(kManifestFilename);
app->compose();
auto result = app->run();
return ToResultCode(result);
}
Distributed Application#
A multi segment application, can also be distributed across multiple nodes in a distributed application.
As introduced in Distributed Execution
section, a segment is the basic unit being distributed.
Segments communicate each other via UCX transmitters and receivers.
However users do not need to worry about adding the UCX connection. The Application API implementation automatically completes the connection, when calling connect() API on the pair of segments. The connected pair of segments can run in single node / single process, single node / multi process or on multi nodes / multi process. No matter which way, since UCX transmitter and receiver are use between the connected pair of segment, it is considered as distributed execution use case.
As compared to a single segment application, in a multi segment or a distributed application, instead of creating and connecting entities, segments are created and connected with each as show in the example below.
class SampleSegmentApp : public Application {
public:
void compose() override {
// create segments
auto tx_segment = createSegment<PingTxSegment>("TxSegment");
auto fwd_segment = createSegment<ForwardSegment>("FwdSegment");
auto rx_segment = createSegment<PingRxSegment>("RxSegment");
// connect segments
connect(tx_segment, fwd_segment, {SegmentPortPair("Tx.signal", "Fwd.in")});
connect(fwd_segment, rx_segment, {SegmentPortPair("Fwd.out", "Rx.signal")});
}
};
Creating a Segment
- const char *name,
In a distributed application, the same application binary is executed on multiple nodes. This does not necessarily mean that a node has to execute all the segment in an application. Each application binary instance enables only a subset of of all the segments created in the source code.
Each node can be configured with the specific segments that it is supposed to execute. This is done by the segment config file as shown below. If no segment config file provided to execute the app, all segments get enabled in that instance.
Connecting Segments
- std::shared_ptr<nvidia::gxf::Segment> source,
- std::shared_ptr<nvidia::gxf::Segment> target,
- std::vector<SegmentPortPair> port_maps,
Segments are connected via SegmentPortPair
to form a global segment connection map.
It’s a vector of connection because we support more than one connection between a pair of segments.
For the most common case, the port_maps size is 1, i.e. there is only one connection between source and target segment.
Please note it is normal that source and target segments are not enabled in the same application instance, the app instance that enables the segment automatically completes corresponding ucx components. And GraphDriver later resolves the connection address between the two instances.
Segment config
Segment config is a YAML file consisting of one node with identifier segment_config
. There are 3 member types:
segment_config.member:
enabled_segments
Mandatory. Specify the segment by name when create in source code.segment_config.member:
worker
Mandatory. Specify driver IP and port; optionally specify its own port.segment_config.member:
driver
Optional. Driver member can be enabled in any app instance or a standalone instance.
For example, we can run the same SampleSegmentApp executable binary in 3 remote processes and each process executes one segment from the app. Process 1 to run TxSegment; Process 2 to run FwdSegment; Process 3 to run RxSegment.
segment config for instance in process 1:
---
segment_config:
- member: enabled_segments
parameters:
enabled:
- TxSegment
- member: worker
parameters:
enabled: True
name: worker_Tx
port: 50001
driver_ip: "localhost"
driver_port: 50000
- member: driver
parameters:
enabled: True
name: driver_50000
port: 50000
segment config for instance in process 2:
---
segment_config:
- member: enabled_segments
parameters:
enabled:
- FwdSegment
- member: worker
parameters:
enabled: True
name: worker_Fwd
port: 50002
driver_ip: "localhost"
driver_port: 50000
segment config for instance in process 3:
---
segment_config:
- member: enabled_segments
parameters:
enabled:
- RxSegment
- member: worker
parameters:
enabled: True
name: worker_Rx
port: 50003
driver_ip: "localhost"
driver_port: 50000
Note
The distributed execution in C++ App API is facing segfault after finishing execution during destroy; The equivalent YAML API execution doesn’t observe the issue.
The same Python App API for distributed execution will be available in future release.
[Coming Soon] More sample applications will be available in GXF github repo https://github.com/NVIDIA-AI-IOT/deepstream_gxf_ref_apps
Graph Composer#
This section helps you get familiar with the application development workflow using Graph Composer which includes the following:
Launch Graph Composer
Sync extensions from NVIDIA Cloud repository
Create simple application using Graph Composer
Run application
Create container image for the application
We will start by first setting up the system and explain the basic layout of the Composer on Ubuntu 22.04 x86_64. Then, we will load, understand, and run a simple application. This will provide an understanding of how the Composer works. Finally, we will create a simple application without writing a single line of code. Graph development is currently supported only on x86. Graph Composer package for arm64 can be used to deploy or execute graph on Jetson.
Installation step installs all tools in the /opt/nvidia/graph-composer
directory with links to tools in /usr/bin
directory. You can access the tools without switching to the installation directory. After installation, check if the installation was successful using the following commands in a terminal:
registry --help
usage: registry [-h] [-v] ...
positional arguments:
cache Perform actions on cache
repo Perform actions on repositories
comp Perform actions on components
extn Perform actions on extensions
graph Perform actions on graph
optional arguments:
-h, --help show this help message and exit
-v, --version Print registry tool and GXF Spec version
container_builder --help
usage: container_builder [-h] [-v] [--log-level {DEBUG,INFO,WARNING,ERROR,CRITICAL}] [--log-file LOG_FILE] ...
build docker images from config file
positional arguments:
build Build container image using config files
push Push local container image to remote repo
optional arguments:
-h, --help show this help message and exit
-v, --version Container Builder Version
--log-level {DEBUG,INFO,WARNING,ERROR,CRITICAL}
set log level, default is INFO
--log-file LOG_FILE Optional, set log output file
If you still don’t see the components, check the FAQ section.
Launch Graph Composer#
There are two options to launch Composer:
Native workstation:
Launch Composer from native workstation using following command:
composer
DeepStream SDK devel container image:
Launch Composer from DeepStream SDK devel container image, installation on local system is not required for it:
docker pull nvcr.io/nvidia/deepstream:7.1-gc-triton-devel
xhost +
docker run -it --entrypoint /bin/bash --gpus all --rm --network=host -e DISPLAY=${DISPLAY} -v /tmp/.X11-unix/:/tmp/.X11-unix --privileged -v /var/run/docker.sock:/var/run/docker.sock nvcr.io/nvidia/deepstream:7.1-gc-triton-devel
composer
Note
When using the Composer from the devel container image, users could have a problem browsing the “/” folder from the file browser, in this case they can just type the file path directly or copy and paste it.
Sync Extensions#
Before any graph can be executed or container built, extensions from NGC public repo must be synced. Follow the steps below to sync the extensions:
Ensure the gxf_server has started by running the following command in a terminal:
systemctl --user status gxf_server
If the service is currently running. You’ll see “Active: active (running)” in the output. If the service is not running, run command systemctl --user start gxf_server
to start it.
By default, gxf_server runs on port 50051
. It can be changed by export GXF_SERVER_PORT=<port_number>
before installation of graph-composer.
To change the port after the service has already started automatically post-installation, use command sudo systemctl set-environment GXF_SERVER_PORT=<port_number>
to set new port. Then, restart the service to apply the changes using command: systemctl --user restart gxf_server
Also change the port number in Composer.
- Open the
Preferences
window.
- Change the port number in the server tab.
Be sure no graph is opened. If there is an graph being opened, it must be closed to make the registry menu usable.
Open the
Registry
menu from the menubar at the top and click onSync Repo
Select
ngc-public
from the drop-down list and click onSync
The composer reports the current status using a progress bar.
Once the extension sync is complete, the composer displays a success message.
On clicking ‘OK’, the composer automatically refreshes component list. You can see the refreshed list in the component list window on the right.
Create a Graph#
Now, let’s create a simple graph and run it. For this example we will create a simple Ping Graph using components present in the Sample Extension and Standard Extension. In this Ping Graph, we simply send a message from one entity to another periodically for certain number of counts. It uses the following components:
- Transmitter:
DoubleBufferTransmitter
- This is a queue which is holds a message being transmitted.
PingTx
- This component creates and publishes a message every time it’s executed.
PeriodicSchedulingTerm
- Scheduling Terms determine when to execute an entity in this case Transmitter. PeriodicSchedulingTerm is used to execute entities periodically.
CountSchedulingTerm
- CountSchedulingTerm is used to stop the execution after a certain count. If you want to keep it running then skip adding this component.
- Receiver:
DoubleBufferReceiver
- This is a queue which hold the messages sent by other components.
PingRx
- This component receives a message on DoubleBufferReceiver every time it’s executed.
MessageAvailableSchedulingTerm
- This Scheduling Term determines if a new message has arrived and only then PingRx codelet is ticked.
- Scheduler:
GreedyScheduler
- Scheduler determines the order in which components are executed. GreedyScheduler is a simple single-threaded scheduler which executes components one after another.
RealtimeClock
- A clock used by Scheduler to track time.
Follow the steps:
Add PingTx, PingRx and GreedyScheduler by dragging and dropping them from the components panel the graph window.
Add the rest of the components such as CountSchedulingTerm, PeriodicSchedulingTerm and MessageAvailableSchedulingTerm by dragging and dropping into the respective entity node.
Now, right click on the signal in PingTx and click Create DoubleBufferTransmitter. Follow the same steps for PingRx’s signal and GreedyScheduler’s clock.
We can create a graph by simply dragging and dropping components from the Component Panel and add more components to it.
After adding the components your graph will look like the image below:
Now we make connections between components. For instance, you will have to connect a DoubleBufferTransmitter
to a DoubleBufferReceiver
to pass messages between them. PingTx/clock
needs to be linked to GreedyScheduler/RealtimeClock
. These connections are made by creating an edge between the components as shown below:
Finally, we have to set the required parameters for the components:
In
PingRx/MessageAvailableSchedulingTerm
: set min_size to1
In
PingTx/CountSchedulingTerm
: set count to5
In
PingTx/PeriodicSchedulingTerm
: set recess_period to5
Now you can save the graph using File -> Save Graph (as)
. This will create a yaml
file with all the components and the connections.
application:
name: MyGraph
---
dependencies:
- extension: SampleExtension
uuid: a6ad78b6-1682-11ec-9621-0242ac130002
version: 1.5.0
- extension: StandardExtension
uuid: 8ec2d5d6-b5df-48bf-8dee-0252606fdd7e
version: 2.5.0
---
components:
- name: ping_tx0
parameters:
clock: GreedyScheduler/realtime_clock12
signal: double_buffer_transmitter10
type: nvidia::gxf::PingTx
- name: periodic_scheduling_term3
type: nvidia::gxf::PeriodicSchedulingTerm
- name: count_scheduling_term4
type: nvidia::gxf::CountSchedulingTerm
- name: double_buffer_transmitter10
type: nvidia::gxf::DoubleBufferTransmitter
name: PingTx
ui_property:
position:
x: 56.0
y: 103.0
---
components:
- name: ping_rx1
parameters:
signal: double_buffer_receiver11
type: nvidia::gxf::PingRx
- name: message_available_scheduling_term5
parameters:
receiver: double_buffer_receiver11
type: nvidia::gxf::MessageAvailableSchedulingTerm
- name: double_buffer_receiver11
type: nvidia::gxf::DoubleBufferReceiver
name: PingRx
ui_property:
position:
x: 489.0
y: 106.0
---
components:
- name: greedy_scheduler2
parameters:
clock: realtime_clock12
type: nvidia::gxf::GreedyScheduler
- name: realtime_clock12
type: nvidia::gxf::RealtimeClock
name: GreedyScheduler
ui_property:
position:
x: 486.0
y: 314.0
---
components:
- name: connection13
parameters:
source: PingTx/double_buffer_transmitter10
target: PingRx/double_buffer_receiver11
type: nvidia::gxf::Connection
name: node1
Run Graph from Graph Composer#
You can deploy the graph using one of the following methods:
To execute the currently open graph, click on the Run Graph
button from the
toolbar on the left. This will open the Run Graph
dialog.
Local System#
Make sure
gxf_server
is running on the local system and the IP address in the Edit/Preferences is of local host.Select appropriate Platform config file using the file browser.
Click on
Run
. The graph execution progress will be reported via logs in the console window.
Remote System#
Execute on Jetson or another remote system
Make sure
gxf_server
is running on the remote system and the IP address in the Edit/Preferences is of remote host.Select appropriate Platform config file (aarch64 or x86_64) based on the remote machine configuration.
Click on
Run
. The graph execution progress will be reported via logs in the console window.
Please note that this requires Graph Composer package be installed on the remote system.
Execute on Jetson or another remote system through Windows
Executing graph through Windows is very similar to executing graph on Jetson or another remote system. Please note that this requires Graph Composer package be installed on the remote system.
Run Graph from Command line#
Execute Graph using commandline (execute_graph.sh script)
The execute_graph.sh
script provided with the graph composer helps with graph execution and provides added functionality.
Complete usage reference:
Usage: /opt/nvidia/graph-composer/execute_graph.sh [options] <graph-file> [additional graph files]
Options:
-d, --graph-target "<graph-target-file>" [Required] Graph target config file
-s, --subgraphs <subgraph1>,<subgraph2>,... [Optional] Paths of subgraphs used by the application, comma-separated list
--resources <graph-resources-file> [Optional] Graph resources file
-f, --fresh-manifest [Optional] Re-install graph and generate a new manifest file
-g, --with-gdb [Optional] Execute the graph under gdb
-m, --use-manifest <existing-manifest> [Optional] Use an existing manifest file
-r, --remove-install-dir [Optional] Remove graph installation directory during exit
-t, --target <username@host> [Optional] Target to execute the graph on. SSH will be used
--target-env-vars "<env-vars>" [Optional] Separated list of environment variables to be set before running on target
-a --app-root <app-root> [Optional] Root path for gxe to search subgraphs
Note
To execute graphs on a remote target: * Graph Composer package must already be installed on the target * It is recommended that a password-less login method be used for SSH
To execute a graph locally, run:
/opt/nvidia/graph-composer/execute_graph.sh <graph-file> -d <graph-target>
For example, on dGPU host, run:
.. code-block:: text
/opt/nvidia/graph-composer/execute_graph.sh <graph-file> -d /opt/nvidia/graph-composer/config/target_x86_64.yaml
To execute on a remote Jetson target, run:
/opt/nvidia/graph-composer/execute_graph.sh <graph-file> -d /opt/nvidia/graph-composer/config/target_aarch64.yaml \
-t <username@host> --target-env-vars "DISPLAY=:0"
Note
If a graph has resources associated with it described in a resources YAML file, an additional argument
--resources <resources.yaml>
can be passed to the script. The resources would be copied to the remote target before
graph execution
Note
When executing a graph that uses subgraphs, you must pass additional argument -s <subgraph1>,<subgraph2>,...
containing paths
to the subgraph files. You must not pass the subgraphs as graph file arguments without an option.
Note
To run the graph on the remote machine, install the following packages:
openssh-client
sshfs
Use ssh-keygen to generate an ssh key pair. Copy key to target using ssh-copy-id ${TARGET}
Create Container Image from Graph Composer#
Container image can be created for Ubuntu 22.04 x86_64 or Jetson but creation is supported only on Ubuntu 22.04 x86_64. Following scenarios are supported for it.
To build a container, first click on the Build Container
button from the
toolbar on the top. This will open the Build Container
window.
Local System#
For creating a container on the local system,
Make sure
gxf_server
is running on the local system and the IP address in the Edit/Preferences is of local host.Launch the file browser using the button next to the
Configuration File
input.
Select a container builder configuration file and open it.
Click the button next to the
Platform config File
input to launch the file browser. Select a platform config file and open it.
Click on
Build
to start the build process. Composer reports the container build status using a progress bar.
On successful completion, composer will show a success message.
Remote System (Windows)#
Building container image through Windows is very similar to building container image on Linux system.
Add remote system’s IP address and port number in the Server tab in Edit/Preferences window.
For creating a container on the remote system,
Choose the container builder config file and target config file and click on Build Image
.
Please note that this requires Graph Composer package be installed on the remote system.
DeepStream Application#
Previous application was simple demonstrating application workflow. Similar workflow can be used to create, load and run DeepStream applications using GXF. It requires that DeepStream 7.1 and reference graphs packages are installed on the system with all the dependencies.
Open the
File
menu from the menubar at the top and click onOpen Graph
to launch the file browser. You may alternatively use theCtrl + O
key combination.
Browse to a valid graph file, select it and click on
Okay
to open the graph.
Composer should now show the application graph.
To load component parameters from a separate file, right-click on the graph and select
Load parameters
from the context menu to launch the file browser.
Browse to an appropriate parameters file for the currently open and visible graph, select it and click on
Okay
to load parameter values from the file.
Rest of the steps to run the application or build container image are same as demonstrated earlier.