Program Listing for File http_server_source_stage.hpp
↰ Return to documentation for file (python/morpheus/morpheus/_lib/include/morpheus/stages/http_server_source_stage.hpp
)
/*
* SPDX-FileCopyrightText: Copyright (c) 2023-2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* SPDX-License-Identifier: Apache-2.0
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include "morpheus/export.h" // for exporting symbols
#include "morpheus/messages/control.hpp" // for ControlMessage
#include "morpheus/messages/meta.hpp" // for MessageMeta
#include "morpheus/utilities/http_server.hpp" // for HttpServer
#include "morpheus/utilities/json_types.hpp" // for control_message_task_t & json_t
#include <boost/beast/http/status.hpp> // for int_to_status, status
#include <boost/fiber/buffered_channel.hpp> // for buffered_channel
#include <boost/fiber/channel_op_status.hpp> // for channel_op_status
#include <boost/fiber/operations.hpp> // for sleep_for
#include <cudf/io/json.hpp> // for json_reader_options & read_json
#include <cudf/io/types.hpp> // for table_with_metadata
#include <glog/logging.h> // for CHECK & LOG
#include <mrc/segment/builder.hpp> // for segment::Builder
#include <mrc/segment/object.hpp> // for segment::Object
#include <nlohmann/json.hpp> // for json
#include <pybind11/pytypes.h> // for pybind11::object
#include <pymrc/node.hpp> // for PythonSource
#include <rxcpp/rx.hpp> // for subscriber
#include <atomic> // for atomic
#include <chrono> // for duration
#include <cstddef> // for size_t
#include <cstdint> // for int64_t
#include <exception> // for std::exception
#include <memory> // for shared_ptr & unique_ptr
#include <sstream> // needed by GLOG
#include <stdexcept> // for std::runtime_error
#include <string> // for string & to_string
#include <tuple> // for make_tuple
#include <utility> // for std::move & pair
#include <vector> // for vector
// IWYU thinks we're using thread::operator<<
// IWYU pragma: no_include <thread>
// IWYU thinks we need the http.hpp header for int_to_status, but it's defined in status.hpp
// IWYU pragma: no_include <boost/beast/http.hpp>
namespace morpheus {
using table_with_http_fields_t = std::pair<cudf::io::table_with_metadata, morpheus::utilities::json_t>;
using table_t = std::unique_ptr<table_with_http_fields_t>;
using request_queue_t = boost::fibers::buffered_channel<table_t>;
class SourceStageStopAfter : public std::exception
{};
// Per type overloads for producing the output message
void make_output_message(std::shared_ptr<MessageMeta>& incoming_message,
control_message_task_t* task,
morpheus::utilities::json_t&& http_fields,
std::shared_ptr<MessageMeta>& out_message);
void make_output_message(std::shared_ptr<MessageMeta>& incoming_message,
control_message_task_t* task,
morpheus::utilities::json_t&& http_fields,
std::shared_ptr<ControlMessage>& out_message);
/****** HttpServerSourceStage *************************************/
template <typename OutputT>
class MORPHEUS_EXPORT HttpServerSourceStage : public mrc::pymrc::PythonSource<std::shared_ptr<OutputT>>
{
public:
using base_t = mrc::pymrc::PythonSource<std::shared_ptr<OutputT>>;
using typename base_t::source_type_t;
using typename base_t::subscriber_fn_t;
HttpServerSourceStage(std::string bind_address = "127.0.0.1",
unsigned short port = 8080,
std::string endpoint = "/message",
std::string live_endpoint = "/live",
std::string ready_endpoint = "/ready",
std::string method = "POST",
std::string live_method = "GET",
std::string ready_method = "GET",
unsigned accept_status = 201,
float sleep_time = 0.1f,
long queue_timeout = 5,
std::size_t max_queue_size = 1024,
unsigned short num_server_threads = 1,
std::size_t max_payload_size = DefaultMaxPayloadSize,
std::chrono::seconds request_timeout = std::chrono::seconds(30),
bool lines = false,
std::size_t stop_after = 0,
std::unique_ptr<control_message_task_t> task = nullptr);
~HttpServerSourceStage() override
{
close();
}
void close();
private:
subscriber_fn_t build();
void source_generator(rxcpp::subscriber<source_type_t> subscriber);
std::atomic<int> m_queue_cnt = 0;
std::chrono::steady_clock::duration m_sleep_time;
std::chrono::duration<long> m_queue_timeout;
std::unique_ptr<HttpServer> m_server;
request_queue_t m_queue;
std::size_t m_max_queue_size;
std::size_t m_stop_after;
std::size_t m_records_emitted{0};
std::unique_ptr<control_message_task_t> m_task{nullptr};
};
/****** HttpServerSourceStageInterfaceProxy***********************/
struct MORPHEUS_EXPORT HttpServerSourceStageInterfaceProxy
{
static std::shared_ptr<mrc::segment::Object<HttpServerSourceStage<MessageMeta>>> init_meta(
mrc::segment::Builder& builder,
const std::string& name,
std::string bind_address,
unsigned short port,
std::string endpoint,
std::string live_endpoint,
std::string ready_endpoint,
std::string method,
std::string live_method,
std::string ready_method,
unsigned accept_status,
float sleep_time,
long queue_timeout,
std::size_t max_queue_size,
unsigned short num_server_threads,
std::size_t max_payload_size,
int64_t request_timeout,
bool lines,
std::size_t stop_after);
static std::shared_ptr<mrc::segment::Object<HttpServerSourceStage<ControlMessage>>> init_cm(
mrc::segment::Builder& builder,
const std::string& name,
std::string bind_address,
unsigned short port,
std::string endpoint,
std::string live_endpoint,
std::string ready_endpoint,
std::string method,
std::string live_method,
std::string ready_method,
unsigned accept_status,
float sleep_time,
long queue_timeout,
std::size_t max_queue_size,
unsigned short num_server_threads,
std::size_t max_payload_size,
int64_t request_timeout,
bool lines,
std::size_t stop_after,
const pybind11::object& task_type,
const pybind11::object& task_payload);
};
template <typename OutputT>
HttpServerSourceStage<OutputT>::HttpServerSourceStage(std::string bind_address,
unsigned short port,
std::string endpoint,
std::string live_endpoint,
std::string ready_endpoint,
std::string method,
std::string live_method,
std::string ready_method,
unsigned accept_status,
float sleep_time,
long queue_timeout,
std::size_t max_queue_size,
unsigned short num_server_threads,
std::size_t max_payload_size,
std::chrono::seconds request_timeout,
bool lines,
std::size_t stop_after,
std::unique_ptr<control_message_task_t> task) :
base_t(build()),
m_max_queue_size{max_queue_size},
m_sleep_time{std::chrono::milliseconds(static_cast<long int>(sleep_time))},
m_queue_timeout{queue_timeout},
m_queue{max_queue_size},
m_stop_after{stop_after},
m_task{std::move(task)}
{
CHECK(boost::beast::http::int_to_status(accept_status) != boost::beast::http::status::unknown)
<< "Invalid HTTP status code: " << accept_status;
request_handler_fn_t parser = [this, accept_status, lines](const tcp_endpoint_t& tcp_endpoint,
const request_t& request) {
// This function is called from one of the HTTPServer's worker threads, avoid performing any additional work
// here beyond what is strictly nessary to return a valid response to the client. We parse the payload here,
// that way we can return an appropriate error message if the payload is invalid however we stop avoid
// constructing a MessageMeta object since that would require grabbing the Python GIL, instead we push the
// libcudf table to the queue and let the subscriber handle the conversion to MessageMeta.
table_t table{nullptr};
try
{
std::string body{request.body()};
cudf::io::source_info source{body.c_str(), body.size()};
auto options = cudf::io::json_reader_options::builder(source).lines(lines);
auto cudf_table = cudf::io::read_json(options.build());
auto http_fields = request_headers_to_json(tcp_endpoint, request);
table = std::make_unique<table_with_http_fields_t>(std::move(cudf_table), std::move(http_fields));
} catch (const std::exception& e)
{
// We want to log the exception locally, but we don't want to include the exception message in the response
// since that may leak sensitive information
std::string error_msg = "Error occurred converting HTTP payload to Dataframe";
LOG(ERROR) << error_msg << ": " << e.what();
return std::make_tuple(400u, "text/plain", error_msg, nullptr);
}
try
{
// NOLINTNEXTLINE(clang-diagnostic-unused-value)
DCHECK_NOTNULL(table);
auto queue_status = m_queue.push_wait_for(std::move(table), m_queue_timeout);
if (queue_status == boost::fibers::channel_op_status::success)
{
++m_queue_cnt;
return std::make_tuple(accept_status, "text/plain", std::string(), nullptr);
}
std::string error_msg = "HTTP payload queue is ";
switch (queue_status)
{
case boost::fibers::channel_op_status::full:
case boost::fibers::channel_op_status::timeout: {
error_msg += "full";
break;
}
case boost::fibers::channel_op_status::closed: {
error_msg += "closed";
break;
}
default: {
error_msg += "in an unknown state";
break;
}
}
return std::make_tuple(503u, "text/plain", std::move(error_msg), nullptr);
} catch (const std::exception& e)
{
// Refer above comment about not including exception messages in the response
std::string error_msg = "Error occurred while pushing payload to queue";
LOG(ERROR) << error_msg << ": " << e.what();
return std::make_tuple(500u, "text/plain", error_msg, nullptr);
}
};
request_handler_fn_t live_parser = [this](const tcp_endpoint_t& tcp_endpoint, const request_t& request) {
if (!m_server->is_running())
{
std::string error_msg = "Source server is not running";
return std::make_tuple(500u, "text/plain", error_msg, nullptr);
}
return std::make_tuple(200u, "text/plain", std::string(), nullptr);
};
request_handler_fn_t ready_parser = [this](const tcp_endpoint_t& tcp_endpoint, const request_t& request) {
if (!m_server->is_running())
{
std::string error_msg = "Source server is not running";
return std::make_tuple(500u, "text/plain", error_msg, nullptr);
}
if (m_queue_cnt < m_max_queue_size)
{
return std::make_tuple(200u, "text/plain", std::string(), nullptr);
}
std::string error_msg = "HTTP payload queue is full or unavailable to accept new values";
return std::make_tuple(503u, "text/plain", std::move(error_msg), nullptr);
};
std::vector<HttpEndpoint> endpoints;
endpoints.emplace_back(parser, std::move(endpoint), std::move(method));
endpoints.emplace_back(live_parser, std::move(live_endpoint), std::move(live_method));
endpoints.emplace_back(ready_parser, std::move(ready_endpoint), std::move(ready_method));
m_server = std::make_unique<HttpServer>(
std::move(endpoints), std::move(bind_address), port, num_server_threads, max_payload_size, request_timeout);
}
template <typename OutputT>
HttpServerSourceStage<OutputT>::subscriber_fn_t HttpServerSourceStage<OutputT>::build()
{
return [this](rxcpp::subscriber<source_type_t> subscriber) -> void {
try
{
m_server->start();
this->source_generator(subscriber);
} catch (const SourceStageStopAfter& e)
{
DLOG(INFO) << "Completed after emitting " << m_records_emitted << " records";
} catch (const std::exception& e)
{
LOG(ERROR) << "Encountered error while listening for incoming HTTP requests: " << e.what() << std::endl;
subscriber.on_error(std::make_exception_ptr(e));
return;
}
subscriber.on_completed();
this->close();
};
}
template <typename OutputT>
void HttpServerSourceStage<OutputT>::source_generator(
rxcpp::subscriber<HttpServerSourceStage::source_type_t> subscriber)
{
// only check if the server is running when the queue is empty, allowing all queued messages to be processed prior
// to shutting down
bool server_running = true;
bool queue_closed = false;
while (subscriber.is_subscribed() && server_running && !queue_closed)
{
table_t table_ptr{nullptr};
auto queue_status = m_queue.try_pop(table_ptr);
if (queue_status == boost::fibers::channel_op_status::success)
{
--m_queue_cnt;
// NOLINTNEXTLINE(clang-diagnostic-unused-value)
DCHECK_NOTNULL(table_ptr);
try
{
auto message = MessageMeta::create_from_cpp(std::move(table_ptr->first), 0);
auto num_records = message->count();
// When OutputT is MessageMeta, we just swap the pointers
std::shared_ptr<OutputT> out_message{nullptr};
make_output_message(message, m_task.get(), std::move(table_ptr->second), out_message);
subscriber.on_next(std::move(out_message));
m_records_emitted += num_records;
} catch (const std::exception& e)
{
LOG(ERROR) << "Error occurred converting HTTP payload to Dataframe: " << e.what();
}
if (m_stop_after > 0 && m_records_emitted >= m_stop_after)
{
throw SourceStageStopAfter();
}
}
else if (queue_status == boost::fibers::channel_op_status::empty)
{
// if the queue is empty, maybe it's because our server is not running
server_running = m_server->is_running();
if (server_running)
{
// Sleep when there are no messages
boost::this_fiber::sleep_for(m_sleep_time);
}
}
else if (queue_status == boost::fibers::channel_op_status::closed)
{
queue_closed = true;
}
else
{
std::string error_msg{"Unknown queue status: " + std::to_string(static_cast<int>(queue_status))};
LOG(ERROR) << error_msg;
throw std::runtime_error(error_msg);
}
}
}
template <typename OutputT>
void HttpServerSourceStage<OutputT>::close()
{
if (m_server)
{
m_server->stop(); // this is a no-op if the server is not running
m_server.reset();
}
m_queue.close();
}
// end of group
} // namespace morpheus