Program Listing for File kafka_source.hpp#

Return to documentation for file (python/morpheus/morpheus/_lib/include/morpheus/stages/kafka_source.hpp)

/*
 * SPDX-FileCopyrightText: Copyright (c) 2021-2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
 * SPDX-License-Identifier: Apache-2.0
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#pragma once

#include "morpheus/export.h"
#include "morpheus/messages/meta.hpp"
#include "morpheus/types.hpp"

#include <boost/fiber/context.hpp>
#include <cudf/io/types.hpp>
#include <librdkafka/rdkafkacpp.h>
#include <mrc/segment/builder.hpp>
#include <mrc/segment/object.hpp>
#include <pybind11/pytypes.h>
#include <pymrc/node.hpp>
#include <rxcpp/rx.hpp>  // for apply, make_subscriber, observable_member, is_on_error<>::not_void, is_on_next_of<>::not_void, trace_activity

#include <cstddef>  // for size_t
#include <cstdint>  // for uuint32_t
#include <functional>
#include <map>
#include <memory>
#include <optional>
#include <string>
#include <thread>
#include <vector>

namespace morpheus {

/****** Component public implementations *******************/
/****** KafkaSourceStage************************************/

class MORPHEUS_EXPORT KafkaOAuthCallback : public RdKafka::OAuthBearerTokenRefreshCb
{
  public:
    KafkaOAuthCallback(const std::function<std::map<std::string, std::string>()>& oauth_callback);

    void oauthbearer_token_refresh_cb(RdKafka::Handle* handle, const std::string& oauthbearer_config) override;

  private:
    const std::function<std::map<std::string, std::string>()>& m_oauth_callback;
};
class MORPHEUS_EXPORT KafkaSourceStage : public mrc::pymrc::PythonSource<std::shared_ptr<MessageMeta>>
{
  public:
    using base_t = mrc::pymrc::PythonSource<std::shared_ptr<MessageMeta>>;
    using typename base_t::source_type_t;
    using typename base_t::subscriber_fn_t;

    KafkaSourceStage(TensorIndex max_batch_size,
                     std::string topic,
                     uint32_t batch_timeout_ms,
                     std::map<std::string, std::string> config,
                     bool disable_commit                                = false,
                     bool disable_pre_filtering                         = false,
                     std::size_t stop_after                             = 0,
                     bool async_commits                                 = true,
                     std::unique_ptr<KafkaOAuthCallback> oauth_callback = nullptr);

    KafkaSourceStage(TensorIndex max_batch_size,
                     std::vector<std::string> topics,
                     uint32_t batch_timeout_ms,
                     std::map<std::string, std::string> config,
                     bool disable_commit                                = false,
                     bool disable_pre_filtering                         = false,
                     std::size_t stop_after                             = 0,
                     bool async_commits                                 = true,
                     std::unique_ptr<KafkaOAuthCallback> oauth_callback = nullptr);

    ~KafkaSourceStage() override = default;

    TensorIndex max_batch_size();

    uint32_t batch_timeout_ms();

  private:
    subscriber_fn_t build();

    std::unique_ptr<RdKafka::Conf> build_kafka_conf(const std::map<std::string, std::string>& config_in);

    std::unique_ptr<RdKafka::KafkaConsumer> create_consumer(RdKafka::RebalanceCb& rebalancer);

    cudf::io::table_with_metadata load_table(const std::string& buffer);

    std::shared_ptr<morpheus::MessageMeta> process_batch(
        std::vector<std::unique_ptr<RdKafka::Message>>&& message_batch);

    TensorIndex m_max_batch_size{128};
    uint32_t m_batch_timeout_ms{100};

    std::vector<std::string> m_topics;
    std::map<std::string, std::string> m_config;

    bool m_disable_commit{false};
    bool m_disable_pre_filtering{false};
    bool m_requires_commit{false};  // Whether or not manual committing is required
    bool m_async_commits{true};
    std::size_t m_stop_after{0};

    void* m_rebalancer;

    std::unique_ptr<KafkaOAuthCallback> m_oauth_callback;
};

/****** KafkaSourceStageInferenceProxy**********************/
struct MORPHEUS_EXPORT KafkaSourceStageInterfaceProxy
{
    static std::shared_ptr<mrc::segment::Object<KafkaSourceStage>> init_with_single_topic(
        mrc::segment::Builder& builder,
        const std::string& name,
        TensorIndex max_batch_size,
        std::string topic,
        uint32_t batch_timeout_ms,
        std::map<std::string, std::string> config,
        bool disable_commit,
        bool disable_pre_filtering,
        std::size_t stop_after                           = 0,
        bool async_commits                               = true,
        std::optional<pybind11::function> oauth_callback = std::nullopt);

    static std::shared_ptr<mrc::segment::Object<KafkaSourceStage>> init_with_multiple_topics(
        mrc::segment::Builder& builder,
        const std::string& name,
        TensorIndex max_batch_size,
        std::vector<std::string> topics,
        uint32_t batch_timeout_ms,
        std::map<std::string, std::string> config,
        bool disable_commit,
        bool disable_pre_filtering,
        std::size_t stop_after                           = 0,
        bool async_commits                               = true,
        std::optional<pybind11::function> oauth_callback = std::nullopt);

  private:
    static std::unique_ptr<KafkaOAuthCallback> make_kafka_oauth_callback(
        std::optional<pybind11::function>&& oauth_callback);
};  // end of group
}  // namespace morpheus