Program Listing for File http_server_source_stage.hpp
↰ Return to documentation for file (morpheus/_lib/include/morpheus/stages/http_server_source_stage.hpp
* SPDX-FileCopyrightText: Copyright (c) 2023-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* SPDX-License-Identifier: Apache-2.0
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
#pragma once
#include "morpheus/export.h" // for exporting symbols
#include "morpheus/messages/meta.hpp" // for MessageMeta
#include "morpheus/utilities/http_server.hpp" // for HttpServer
#include <boost/fiber/buffered_channel.hpp> // for buffered_channel
#include <boost/fiber/context.hpp> // for context
#include <cudf/io/types.hpp> // for table_with_metadata
#include <mrc/segment/builder.hpp> // for segment::Builder
#include <mrc/segment/object.hpp> // for segment::Object
#include <pymrc/node.hpp> // for PythonSource
#include <rxcpp/rx.hpp> // for subscriber
#include <atomic> // for atomic
#include <chrono> // for duration
#include <cstddef> // for size_t
#include <cstdint> // for int64_t
#include <memory> // for shared_ptr & unique_ptr
#include <string> // for string & to_string
// IWYU thinks we're using thread::operator<<
// IWYU pragma: no_include <thread>
namespace morpheus {
using table_t = std::unique_ptr<cudf::io::table_with_metadata>;
using request_queue_t = boost::fibers::buffered_channel<table_t>;
/****** Component public implementations *******************/
/****** HttpServerSourceStage *************************************/
// TODO(dagardner): optionally add headers to the dataframe
class MORPHEUS_EXPORT HttpServerSourceStage : public mrc::pymrc::PythonSource<std::shared_ptr<MessageMeta>>
using base_t = mrc::pymrc::PythonSource<std::shared_ptr<MessageMeta>>;
using typename base_t::source_type_t;
using typename base_t::subscriber_fn_t;
HttpServerSourceStage(std::string bind_address = "",
unsigned short port = 8080,
std::string endpoint = "/message",
std::string live_endpoint = "/live",
std::string ready_endpoint = "/ready",
std::string method = "POST",
std::string live_method = "GET",
std::string ready_method = "GET",
unsigned accept_status = 201,
float sleep_time = 0.1f,
long queue_timeout = 5,
std::size_t max_queue_size = 1024,
unsigned short num_server_threads = 1,
std::size_t max_payload_size = DefaultMaxPayloadSize,
std::chrono::seconds request_timeout = std::chrono::seconds(30),
bool lines = false,
std::size_t stop_after = 0);
~HttpServerSourceStage() override;
void close();
subscriber_fn_t build();
void source_generator(rxcpp::subscriber<source_type_t> subscriber);
std::atomic<int> m_queue_cnt = 0;
std::chrono::steady_clock::duration m_sleep_time;
std::chrono::duration<long> m_queue_timeout;
std::unique_ptr<HttpServer> m_server;
request_queue_t m_queue;
std::size_t m_max_queue_size;
std::size_t m_stop_after;
std::size_t m_records_emitted;
/****** HttpServerSourceStageInterfaceProxy***********************/
struct MORPHEUS_EXPORT HttpServerSourceStageInterfaceProxy
static std::shared_ptr<mrc::segment::Object<HttpServerSourceStage>> init(mrc::segment::Builder& builder,
const std::string& name,
std::string bind_address,
unsigned short port,
std::string endpoint,
std::string live_endpoint,
std::string ready_endpoint,
std::string method,
std::string live_method,
std::string ready_method,
unsigned accept_status,
float sleep_time,
long queue_timeout,
std::size_t max_queue_size,
unsigned short num_server_threads,
std::size_t max_payload_size,
int64_t request_timeout,
bool lines,
std::size_t stop_after);
}; // end of group
} // namespace morpheus