Program Listing for File http_server_source_stage.cpp#

Return to documentation for file (python/morpheus/morpheus/_lib/src/stages/http_server_source_stage.cpp)

/*
 * SPDX-FileCopyrightText: Copyright (c) 2023-2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
 * SPDX-License-Identifier: Apache-2.0
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#include "morpheus/stages/http_server_source_stage.hpp"

#include <pybind11/pybind11.h>  // for cast
#include <pymrc/utils.hpp>      // for cast_from_pyobject

namespace morpheus {

void make_output_message(std::shared_ptr<MessageMeta>& incoming_message,
                         control_message_task_t* task,
                         morpheus::utilities::json_t&& http_fields,
                         std::shared_ptr<MessageMeta>& out_message)
{
    DCHECK_EQ(task, nullptr) << "Tasks are not supported for MessageMeta";
    out_message.swap(incoming_message);
}

void make_output_message(std::shared_ptr<MessageMeta>& incoming_message,
                         control_message_task_t* task,
                         morpheus::utilities::json_t&& http_fields,
                         std::shared_ptr<ControlMessage>& out_message)
{
    utilities::json_t cm_config = {{"metadata", {{"http_fields", http_fields}}}};
    auto cm_msg                 = std::make_shared<ControlMessage>(cm_config);
    cm_msg->payload(incoming_message);
    if (task)
    {
        cm_msg->add_task(task->first, task->second);
    }
    out_message.swap(cm_msg);
}

// ************ HttpServerSourceStageInterfaceProxy ************ //
std::shared_ptr<mrc::segment::Object<HttpServerSourceStage<MessageMeta>>>
HttpServerSourceStageInterfaceProxy::init_meta(mrc::segment::Builder& builder,
                                               const std::string& name,
                                               std::string bind_address,
                                               unsigned short port,
                                               std::string endpoint,
                                               std::string live_endpoint,
                                               std::string ready_endpoint,
                                               std::string method,
                                               std::string live_method,
                                               std::string ready_method,
                                               unsigned accept_status,
                                               float sleep_time,
                                               long queue_timeout,
                                               std::size_t max_queue_size,
                                               unsigned short num_server_threads,
                                               std::size_t max_payload_size,
                                               int64_t request_timeout,
                                               bool lines,
                                               std::size_t stop_after)
{
    return builder.construct_object<HttpServerSourceStage<MessageMeta>>(name,
                                                                        std::move(bind_address),
                                                                        port,
                                                                        std::move(endpoint),
                                                                        std::move(live_endpoint),
                                                                        std::move(ready_endpoint),
                                                                        std::move(method),
                                                                        std::move(live_method),
                                                                        std::move(ready_method),
                                                                        accept_status,
                                                                        sleep_time,
                                                                        queue_timeout,
                                                                        max_queue_size,
                                                                        num_server_threads,
                                                                        max_payload_size,
                                                                        std::chrono::seconds(request_timeout),
                                                                        lines,
                                                                        stop_after,
                                                                        nullptr);
}

std::shared_ptr<mrc::segment::Object<HttpServerSourceStage<ControlMessage>>>
HttpServerSourceStageInterfaceProxy::init_cm(mrc::segment::Builder& builder,
                                             const std::string& name,
                                             std::string bind_address,
                                             unsigned short port,
                                             std::string endpoint,
                                             std::string live_endpoint,
                                             std::string ready_endpoint,
                                             std::string method,
                                             std::string live_method,
                                             std::string ready_method,
                                             unsigned accept_status,
                                             float sleep_time,
                                             long queue_timeout,
                                             std::size_t max_queue_size,
                                             unsigned short num_server_threads,
                                             std::size_t max_payload_size,
                                             int64_t request_timeout,
                                             bool lines,
                                             std::size_t stop_after,
                                             const pybind11::object& task_type,
                                             const pybind11::object& task_payload)
{
    std::unique_ptr<control_message_task_t> task{nullptr};

    if (!task_type.is_none() && !task_payload.is_none())
    {
        task = std::make_unique<control_message_task_t>(pybind11::cast<std::string>(task_type),
                                                        mrc::pymrc::cast_from_pyobject(task_payload));
    }

    return builder.construct_object<HttpServerSourceStage<ControlMessage>>(name,
                                                                           std::move(bind_address),
                                                                           port,
                                                                           std::move(endpoint),
                                                                           std::move(live_endpoint),
                                                                           std::move(ready_endpoint),
                                                                           std::move(method),
                                                                           std::move(live_method),
                                                                           std::move(ready_method),
                                                                           accept_status,
                                                                           sleep_time,
                                                                           queue_timeout,
                                                                           max_queue_size,
                                                                           num_server_threads,
                                                                           max_payload_size,
                                                                           std::chrono::seconds(request_timeout),
                                                                           lines,
                                                                           stop_after,
                                                                           std::move(task));
}
}  // namespace morpheus