Creating a C++ Source Stage
For this example, we are going to add a C++ implementation for the RabbitMQSourceStage
we designed in the Python examples. The Python implementation of this stage emits messages of the type MessageMeta
; as such, our C++ implementation must do the same.
For communicating with RabbitMQ we will be using the SimpleAmqpClient library, and libcudf for constructing the DataFrame
.
Our includes:
#include <SimpleAmqpClient/SimpleAmqpClient.h> // for AmqpClient::Channel::ptr_t
#include <cudf/io/types.hpp> // for cudf::io::table_with_metadata
#include <morpheus/messages/meta.hpp> // for MessageMeta
#include <mrc/segment/builder.hpp> // for Segment Builder
#include <mrc/segment/object.hpp> // for Segment Object
#include <pymrc/node.hpp> // for mrc::pymrc::PythonSource
#include <chrono> // for chrono::milliseconds
#include <memory> // for shared_ptr
#include <string>
The MRC includes bring in the definitions for MRC Builder
, SegmentObject
and PythonSource
.
Our namespace and class definition is:
namespace morpheus_rabbit {
// pybind11 sets visibility to hidden by default; we want to export our symbols
#pragma GCC visibility push(default)
using namespace std::literals;
using namespace morpheus;
class RabbitMQSourceStage : public mrc::pymrc::PythonSource<std::shared_ptr<MessageMeta>>
{
public:
using base_t = mrc::pymrc::PythonSource<std::shared_ptr<MessageMeta>>;
using typename base_t::source_type_t;
using typename base_t::subscriber_fn_t;
Our base class defines source_type_t
as an alias for std::shared_ptr<MessageMeta>
which we are going to use as it will occur in some of our function signatures. The way to think about source_type_t
is that the stage we are writing emits objects of type MessageMeta
. The subscriber_fn_t
is an alias for a function which will receive an rxcpp::subscriber
instance and emit messages into the pipeline. The class we are deriving from, PythonSource
, defines both of these to make writing function signatures easier.
Our constructor is similar to the constructor of our Python class with the majority of the parameters being specific to communicating with RabbitMQ. In this case the default destructor is sufficient.
RabbitMQSourceStage(const std::string& host,
const std::string& exchange,
const std::string& exchange_type = "fanout"s,
const std::string& queue_name = ""s,
std::chrono::milliseconds poll_interval = 100ms);
~RabbitMQSourceStage() override = default;
Our class will require a few private methods
subscriber_fn_t build();
void source_generator(rxcpp::subscriber<source_type_t> subscriber);
cudf::io::table_with_metadata from_json(const std::string& body) const;
void close();
The build
method is responsible for returning a function with a signature matching subscriber_fn_t
, the result of which will be passed into our base’s constructor. Typically, this function is the center of a source stage, making calls to the subscriber
’s on_next
, on_error
, and on_completed
methods. For this example, the RabbitMQ-specific logic was broken out into the source_generator
method, which should be analogous to the source_generator
method from the Python class, and will emit new messages into the pipeline by calling subscriber.on_next(message)
.
The from_json
method parses a JSON string to a cuDF table_with_metadata. Lastly, the close
method disconnects from the RabbitMQ exchange.
We will also need three private attributes specific to our interactions with RabbitMQ: our polling interval, the name of the queue we are listening to, and a pointer to our channel object.
std::chrono::milliseconds m_poll_interval;
std::string m_queue_name;
AmqpClient::Channel::ptr_t m_channel;
Wrapping it all together, our header file should be similar to:
#pragma once
#include <SimpleAmqpClient/SimpleAmqpClient.h> // for AmqpClient::Channel::ptr_t
#include <cudf/io/types.hpp> // for cudf::io::table_with_metadata
#include <morpheus/messages/meta.hpp> // for MessageMeta
#include <mrc/segment/builder.hpp> // for Segment Builder
#include <mrc/segment/object.hpp> // for Segment Object
#include <pymrc/node.hpp> // for mrc::pymrc::PythonSource
#include <chrono> // for chrono::milliseconds
#include <memory> // for shared_ptr
#include <string>
namespace morpheus_rabbit {
// pybind11 sets visibility to hidden by default; we want to export our symbols
#pragma GCC visibility push(default)
using namespace std::literals;
using namespace morpheus;
class RabbitMQSourceStage : public mrc::pymrc::PythonSource<std::shared_ptr<MessageMeta>>
{
public:
using base_t = mrc::pymrc::PythonSource<std::shared_ptr<MessageMeta>>;
using typename base_t::source_type_t;
using typename base_t::subscriber_fn_t;
RabbitMQSourceStage(const std::string& host,
const std::string& exchange,
const std::string& exchange_type = "fanout"s,
const std::string& queue_name = ""s,
std::chrono::milliseconds poll_interval = 100ms);
~RabbitMQSourceStage() override = default;
private:
subscriber_fn_t build();
void source_generator(rxcpp::subscriber<source_type_t> subscriber);
cudf::io::table_with_metadata from_json(const std::string& body) const;
void close();
std::chrono::milliseconds m_poll_interval;
std::string m_queue_name;
AmqpClient::Channel::ptr_t m_channel;
};
/****** RabbitMQSourceStageInferenceProxy**********************/
/**
* @brief Interface proxy, used to insulate Python bindings.
*/
struct RabbitMQSourceStageInterfaceProxy
{
/**
* @brief Create and initialize a RabbitMQSourceStage, and return the result.
*/
static std::shared_ptr<mrc::segment::Object<RabbitMQSourceStage>> init(mrc::segment::Builder& builder,
const std::string& name,
const std::string& host,
const std::string& exchange,
const std::string& exchange_type,
const std::string& queue_name,
std::chrono::milliseconds poll_interval);
};
#pragma GCC visibility pop
} // namespace morpheus_rabbit
Our includes section is:
#include "rabbitmq_source.hpp"
#include <cudf/io/json.hpp>
#include <cudf/table/table.hpp>
#include <glog/logging.h>
#include <pybind11/chrono.h> // for timedelta->chrono conversions
#include <pybind11/pybind11.h>
#include <exception>
#include <sstream>
#include <thread> // for std::this_thread::sleep_for
#include <vector>
The Google Logging Library (glog) is used internally by Morpheus for logging; however, the choice of a logger is up to the individual developer.
Authors of stages that require concurrency are free to choose their own concurrency models. Launching new processes, threads, or fibers from within a stage is permissible as long as the management of those resources is contained within the stage. Newly launched threads are also free to use thread-local storage so long as it doesn’t occur within the thread the stage is executed from.
The definition for our constructor is:
RabbitMQSourceStage::RabbitMQSourceStage(const std::string& host,
const std::string& exchange,
const std::string& exchange_type,
const std::string& queue_name,
std::chrono::milliseconds poll_interval) :
PythonSource(build()),
m_channel{AmqpClient::Channel::Create(host)},
m_poll_interval{poll_interval}
{
m_channel->DeclareExchange(exchange, exchange_type);
m_queue_name = m_channel->DeclareQueue(queue_name);
m_channel->BindQueue(m_queue_name, exchange);
}
The key thing to note is the invocation of our base’s constructor is the result of the build
method:
PythonSource(build()),
Our build
method returns a function, which needs to do three things:
Emit data into the pipeline by calling
rxcpp::subscriber
’son_next
method. In our example, this occurs in thesource_generator
method.When an error occurs, call the
rxcpp::subscriber
’son_error
method.When we are done, call the
rxcpp::subscriber
’son_complete
method.
Note: For some source stages, such as ones that read input data from a file, there is a clear point where the stage is complete. Others such as this one are intended to continue running until it is shut down. For the latter situation, the stage can poll the rxcpp::subscriber
’s is_subscribed
method, which will return a value of false
on shut down.
RabbitMQSourceStage::subscriber_fn_t RabbitMQSourceStage::build()
{
return [this](rxcpp::subscriber<source_type_t> subscriber) -> void {
try
{
this->source_generator(subscriber);
} catch (const std::exception& e)
{
LOG(ERROR) << "Encountered error while polling RabbitMQ: " << e.what() << std::endl;
subscriber.on_error(std::make_exception_ptr(e));
close();
return;
}
close();
subscriber.on_completed();
};
}
As a design decision, we left the majority of the RabbitMQ specific code in the source_generator
method, leaving Morpheus specific code in the build
method. For each message that we receive and can successfully parse, we call the rxcpp::subscriber
’s on_next
method. When there are no messages in the queue, we yield the thread by sleeping, potentially allowing the scheduler to perform a context switch.
void RabbitMQSourceStage::source_generator(rxcpp::subscriber<RabbitMQSourceStage::source_type_t> subscriber)
{
const std::string consumer_tag = m_channel->BasicConsume(m_queue_name, "", true, false);
while (subscriber.is_subscribed())
{
AmqpClient::Envelope::ptr_t envelope;
if (m_channel->BasicConsumeMessage(consumer_tag, envelope, 0))
{
try
{
auto table = from_json(envelope->Message()->Body());
auto message = MessageMeta::create_from_cpp(std::move(table), 0);
subscriber.on_next(std::move(message));
} catch (const std::exception& e)
{
LOG(ERROR) << "Error occurred converting RabbitMQ message to Dataframe: " << e.what();
}
m_channel->BasicAck(envelope);
}
else
{
// Sleep when there are no messages
std::this_thread::sleep_for(m_poll_interval);
}
}
}
We don’t yet know the size of the messages that we are going to receive from RabbitMQ, but we should assume that they may be quite large. As such, we try to limit the number of copies of this data, preferring to instead pass by reference or move data. The SimpleAmqpClient
’s Body()
method returns a const reference to the payload, which we also pass by reference into the from_json
method. Since our stage has no need for the data itself after it’s emitted into the pipeline, we move our cuDF data table when we construct our MessageMeta
instance, and then we once again move the message into the subscriber’s on_next
method.
Our from_json
and close
methods are rather straightforward:
cudf::io::table_with_metadata RabbitMQSourceStage::from_json(const std::string& body) const
{
cudf::io::source_info source{body.c_str(), body.size()};
auto options = cudf::io::json_reader_options::builder(source).lines(true);
return cudf::io::read_json(options.build());
}
void RabbitMQSourceStage::close()
{
// disconnect
if (m_channel)
{
m_channel.reset();
}
}
std::shared_ptr<mrc::segment::Object<RabbitMQSourceStage>> RabbitMQSourceStageInterfaceProxy::init(
mrc::segment::Builder& builder,
const std::string& name,
const std::string& host,
const std::string& exchange,
const std::string& exchange_type,
const std::string& queue_name,
std::chrono::milliseconds poll_interval)
{
return builder.construct_object<RabbitMQSourceStage>(
name, host, exchange, exchange_type, queue_name, poll_interval);
}
namespace py = pybind11;
// Define the pybind11 module m.
PYBIND11_MODULE(morpheus_rabbit, m)
{
mrc::pymrc::import(m, "morpheus._lib.messages");
py::class_<mrc::segment::Object<RabbitMQSourceStage>,
mrc::segment::ObjectProperties,
std::shared_ptr<mrc::segment::Object<RabbitMQSourceStage>>>(
m, "RabbitMQSourceStage", py::multiple_inheritance())
.def(py::init<>(&RabbitMQSourceStageInterfaceProxy::init),
py::arg("builder"),
py::arg("name"),
py::arg("host"),
py::arg("exchange"),
py::arg("exchange_type") = "fanout",
py::arg("queue_name") = "",
py::arg("poll_interval") = 100ms);
}
Wrapping it all together, our source file should be:
#include "rabbitmq_source.hpp"
#include <cudf/io/json.hpp>
#include <cudf/table/table.hpp>
#include <glog/logging.h>
#include <pybind11/chrono.h> // for timedelta->chrono conversions
#include <pybind11/pybind11.h>
#include <exception>
#include <sstream>
#include <thread> // for std::this_thread::sleep_for
#include <vector>
namespace morpheus_rabbit {
RabbitMQSourceStage::RabbitMQSourceStage(const std::string& host,
const std::string& exchange,
const std::string& exchange_type,
const std::string& queue_name,
std::chrono::milliseconds poll_interval) :
PythonSource(build()),
m_channel{AmqpClient::Channel::Create(host)},
m_poll_interval{poll_interval}
{
m_channel->DeclareExchange(exchange, exchange_type);
m_queue_name = m_channel->DeclareQueue(queue_name);
m_channel->BindQueue(m_queue_name, exchange);
}
RabbitMQSourceStage::subscriber_fn_t RabbitMQSourceStage::build()
{
return [this](rxcpp::subscriber<source_type_t> subscriber) -> void {
try
{
this->source_generator(subscriber);
} catch (const std::exception& e)
{
LOG(ERROR) << "Encountered error while polling RabbitMQ: " << e.what() << std::endl;
subscriber.on_error(std::make_exception_ptr(e));
close();
return;
}
close();
subscriber.on_completed();
};
}
void RabbitMQSourceStage::source_generator(rxcpp::subscriber<RabbitMQSourceStage::source_type_t> subscriber)
{
const std::string consumer_tag = m_channel->BasicConsume(m_queue_name, "", true, false);
while (subscriber.is_subscribed())
{
AmqpClient::Envelope::ptr_t envelope;
if (m_channel->BasicConsumeMessage(consumer_tag, envelope, 0))
{
try
{
auto table = from_json(envelope->Message()->Body());
auto message = MessageMeta::create_from_cpp(std::move(table), 0);
subscriber.on_next(std::move(message));
} catch (const std::exception& e)
{
LOG(ERROR) << "Error occurred converting RabbitMQ message to Dataframe: " << e.what();
}
m_channel->BasicAck(envelope);
}
else
{
// Sleep when there are no messages
std::this_thread::sleep_for(m_poll_interval);
}
}
}
cudf::io::table_with_metadata RabbitMQSourceStage::from_json(const std::string& body) const
{
cudf::io::source_info source{body.c_str(), body.size()};
auto options = cudf::io::json_reader_options::builder(source).lines(true);
return cudf::io::read_json(options.build());
}
void RabbitMQSourceStage::close()
{
// disconnect
if (m_channel)
{
m_channel.reset();
}
}
std::shared_ptr<mrc::segment::Object<RabbitMQSourceStage>> RabbitMQSourceStageInterfaceProxy::init(
mrc::segment::Builder& builder,
const std::string& name,
const std::string& host,
const std::string& exchange,
const std::string& exchange_type,
const std::string& queue_name,
std::chrono::milliseconds poll_interval)
{
return builder.construct_object<RabbitMQSourceStage>(
name, host, exchange, exchange_type, queue_name, poll_interval);
}
namespace py = pybind11;
// Define the pybind11 module m.
PYBIND11_MODULE(morpheus_rabbit, m)
{
mrc::pymrc::import(m, "morpheus._lib.messages");
py::class_<mrc::segment::Object<RabbitMQSourceStage>,
mrc::segment::ObjectProperties,
std::shared_ptr<mrc::segment::Object<RabbitMQSourceStage>>>(
m, "RabbitMQSourceStage", py::multiple_inheritance())
.def(py::init<>(&RabbitMQSourceStageInterfaceProxy::init),
py::arg("builder"),
py::arg("name"),
py::arg("host"),
py::arg("exchange"),
py::arg("exchange_type") = "fanout",
py::arg("queue_name") = "",
py::arg("poll_interval") = 100ms);
}
} // namespace morpheus_rabbit
Previously, our stage connected to the RabbitMQ server in the constructor. This is no longer advantageous to us when C++ execution is enabled. Instead, we will record our constructor arguments and move the connection code to a new connect
method. Our new constructor and connect
methods are updated to:
def __init__(self,
config: Config,
host: str,
exchange: str,
exchange_type: str = 'fanout',
queue_name: str = '',
poll_interval: str = '100millis'):
super().__init__(config)
self._host = host
self._exchange = exchange
self._exchange_type = exchange_type
self._queue_name = queue_name
self._connection = None
self._channel = None
self._poll_interval = pd.Timedelta(poll_interval)
# Flag to indicate whether or not we should stop
self._stop_requested = False
def connect(self):
self._connection = pika.BlockingConnection(pika.ConnectionParameters(host=self._host))
self._channel = self._connection.channel()
self._channel.exchange_declare(exchange=self._exchange, exchange_type=self._exchange_type)
result = self._channel.queue_declare(queue=self._queue_name, exclusive=True)
# When queue_name='' we will receive a randomly generated queue name
self._queue_name = result.method.queue
self._channel.queue_bind(exchange=self._exchange, queue=self._queue_name)
Lastly, our _build_source
method needs to be updated to build a C++ node when morpheus.config.CppConfig.get_should_use_cpp()
is configured to True
by using the self._build_cpp_node()
method.
def _build_source(self, builder: mrc.Builder) -> StreamPair:
if self._build_cpp_node():
node = morpheus_rabbit_cpp.RabbitMQSourceStage(builder,
self.unique_name,
self._host,
self._exchange,
self._exchange_type,
self._queue_name,
self._poll_interval.to_pytimedelta())
else:
self.connect()
node = builder.make_source(self.unique_name, self.source_generator)
return node, MessageMeta