NVIDIA Morpheus (24.06)
(Latest Version)

Program Listing for File kafka_source.cpp

Return to documentation for file (morpheus/_lib/src/stages/kafka_source.cpp)

Copy
Copied!
            

/* * SPDX-FileCopyrightText: Copyright (c) 2021-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. * SPDX-License-Identifier: Apache-2.0 * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include "morpheus/stages/kafka_source.hpp" #include "mrc/segment/object.hpp" #include "pymrc/utilities/function_wrappers.hpp" // for PyFuncWrapper #include "morpheus/messages/meta.hpp" #include "morpheus/utilities/stage_util.hpp" #include "morpheus/utilities/string_util.hpp" #include <boost/fiber/operations.hpp> // for sleep_for, yield #include <boost/fiber/recursive_mutex.hpp> #include <cudf/io/json.hpp> #include <glog/logging.h> #include <librdkafka/rdkafkacpp.h> #include <mrc/runnable/context.hpp> #include <mrc/segment/builder.hpp> #include <mrc/types.hpp> // for SharedFuture #include <nlohmann/json.hpp> #include <pybind11/pybind11.h> #include <pybind11/pytypes.h> #include <pymrc/node.hpp> #include <algorithm> // for find, min, transform #include <chrono> #include <compare> #include <cstdint> #include <exception> #include <functional> #include <iterator> // for back_insert_iterator, back_inserter #include <list> #include <memory> #include <mutex> #include <optional> #include <sstream> #include <stdexcept> #include <tuple> #include <type_traits> #include <utility> // IWYU thinks we need atomic for vector.emplace_back of a unique_ptr // and __alloc_traits<>::value_type for vector assignments // IWYU pragma: no_include <atomic> // IWYU pragma: no_include <ext/alloc_traits.h> #if !defined(DOXYGEN_SHOULD_SKIP_THIS) #define CHECK_KAFKA(command, expected, msg) \ { \ RdKafka::ErrorCode __code = command; \ if (__code != expected) \ { \ LOG(ERROR) << msg << ". Received unexpected ErrorCode. Expected: " << #expected << "(" << expected \ << "), Received: " << __code << ", Msg: " << RdKafka::err2str(__code); \ } \ }; #endif// DOXYGEN_SHOULD_SKIP_THIS namespace morpheus { KafkaOAuthCallback::KafkaOAuthCallback(const std::function<std::map<std::string, std::string>()>& oauth_callback) : m_oauth_callback(oauth_callback) {} void KafkaOAuthCallback::oauthbearer_token_refresh_cb(RdKafka::Handle* handle, const std::string& oauthbearer_config) { try { auto response = m_oauth_callback(); // Build parameters to pass to librdkafka std::string token = response["token"]; int64_t token_lifetime_ms = std::stoll(response["token_expiration_in_epoch"]); std::list<std::string> extensions; // currently not supported std::string errstr; auto result = handle->oauthbearer_set_token(token, token_lifetime_ms, "kafka", extensions, errstr); CHECK(result == RdKafka::ErrorCode::ERR_NO_ERROR) << "Error occurred while setting the oauthbearer token"; } catch (std::exception ex) { LOG(FATAL) << "Exception occured oauth refresh: " << ex.what(); } } // Component-private classes. // ************ KafkaSourceStage__UnsubscribedException**************// class KafkaSourceStageUnsubscribedException : public std::exception {}; class KafkaSourceStageStopAfter : public std::exception {}; // ************ KafkaSourceStage__Rebalancer *************************// class KafkaSourceStage__Rebalancer : public RdKafka::RebalanceCb // NOLINT { public: KafkaSourceStage__Rebalancer(std::function<uint32_t()> batch_timeout_fn, std::function<TensorIndex()> max_batch_size_fn, std::function<std::string(std::string)> display_str_fn, std::function<bool(std::vector<std::unique_ptr<RdKafka::Message>>&)> process_fn); void rebalance_cb(RdKafka::KafkaConsumer* consumer, RdKafka::ErrorCode err, std::vector<RdKafka::TopicPartition*>& partitions) override; void rebalance_loop(RdKafka::KafkaConsumer* consumer); bool is_rebalanced(); std::vector<std::unique_ptr<RdKafka::Message>> partition_progress_step(RdKafka::KafkaConsumer* consumer) { // auto batch_timeout = std::chrono::milliseconds(m_parent.batch_timeout_ms()); auto batch_timeout = std::chrono::milliseconds(m_batch_timeout_fn()); std::vector<std::unique_ptr<RdKafka::Message>> messages; auto now = std::chrono::high_resolution_clock::now(); auto batch_end = now + batch_timeout; do { auto remaining_ms = std::chrono::duration_cast<std::chrono::milliseconds>(batch_end - now).count(); DCHECK(remaining_ms >= 0) << "Cant have negative reminaing time"; std::unique_ptr<RdKafka::Message> msg{consumer->consume(std::min(10L, remaining_ms))}; switch (msg->err()) { case RdKafka::ERR__TIMED_OUT: // Yield on a timeout boost::this_fiber::yield(); break; case RdKafka::ERR_NO_ERROR: messages.emplace_back(std::move(msg)); break; case RdKafka::ERR__PARTITION_EOF: VLOG_EVERY_N(10, 10) << "Hit EOF for partition"; // Hit the end, sleep for 100 ms boost::this_fiber::sleep_for(std::chrono::milliseconds(100)); break; default: /* Errors */ LOG(ERROR) << "Consume failed: " << msg->errstr(); } // Update now now = std::chrono::high_resolution_clock::now(); } while (messages.size() < m_max_batch_size_fn() && now < batch_end); return std::move(messages); } bool process_messages(std::vector<std::unique_ptr<RdKafka::Message>>& messages) { return m_process_fn(messages); } private: bool m_is_rebalanced{false}; std::function<uint32_t()> m_batch_timeout_fn; std::function<TensorIndex()> m_max_batch_size_fn; std::function<std::string(std::string)> m_display_str_fn; std::function<bool(std::vector<std::unique_ptr<RdKafka::Message>>&)> m_process_fn; boost::fibers::recursive_mutex m_mutex; mrc::SharedFuture<bool> m_partition_future; }; KafkaSourceStage__Rebalancer::KafkaSourceStage__Rebalancer( std::function<uint32_t()> batch_timeout_fn, std::function<TensorIndex()> max_batch_size_fn, std::function<std::string(std::string)> display_str_fn, std::function<bool(std::vector<std::unique_ptr<RdKafka::Message>>&)> process_fn) : m_batch_timeout_fn(std::move(batch_timeout_fn)), m_max_batch_size_fn(std::move(max_batch_size_fn)), m_display_str_fn(std::move(display_str_fn)), m_process_fn(std::move(process_fn)) {} void KafkaSourceStage__Rebalancer::rebalance_cb(RdKafka::KafkaConsumer* consumer, RdKafka::ErrorCode err, std::vector<RdKafka::TopicPartition*>& partitions) { std::unique_lock<boost::fibers::recursive_mutex> lock(m_mutex); std::vector<RdKafka::TopicPartition*> current_assignment; CHECK_KAFKA(consumer->assignment(current_assignment), RdKafka::ERR_NO_ERROR, "Error retrieving current assignment"); auto old_partition_ids = foreach_map(current_assignment, [](const auto& x) { return x->partition(); }); auto new_partition_ids = foreach_map(partitions, [](const auto& x) { return x->partition(); }); if (err == RdKafka::ERR__ASSIGN_PARTITIONS) { VLOG(10) << m_display_str_fn(MORPHEUS_CONCAT_STR( "Rebalance: Assign Partitions. Current Partitions: << " << StringUtil::array_to_str(old_partition_ids.begin(), old_partition_ids.end()) << ". Assigning: " << StringUtil::array_to_str(new_partition_ids.begin(), new_partition_ids.end()))); // application may load offets from arbitrary external storage here and update \p partitions if (consumer->rebalance_protocol() == "COOPERATIVE") { CHECK_KAFKA(std::unique_ptr<RdKafka::Error>(consumer->incremental_assign(partitions))->code(), RdKafka::ERR_NO_ERROR, "Error during incremental assign"); } else { CHECK_KAFKA(consumer->assign(partitions), RdKafka::ERR_NO_ERROR, "Error during assign"); } } else if (err == RdKafka::ERR__REVOKE_PARTITIONS) { VLOG(10) << m_display_str_fn(MORPHEUS_CONCAT_STR( "Rebalance: Revoke Partitions. Current Partitions: << " << StringUtil::array_to_str(old_partition_ids.begin(), old_partition_ids.end()) << ". Revoking: " << StringUtil::array_to_str(new_partition_ids.begin(), new_partition_ids.end()))); // Application may commit offsets manually here if auto.commit.enable=false if (consumer->rebalance_protocol() == "COOPERATIVE") { CHECK_KAFKA(std::unique_ptr<RdKafka::Error>(consumer->incremental_unassign(partitions))->code(), RdKafka::ERR_NO_ERROR, "Error during incremental unassign"); } else { CHECK_KAFKA(consumer->unassign(), RdKafka::ERR_NO_ERROR, "Error during unassign"); } } else { LOG(ERROR) << "Rebalancing error: " << RdKafka::err2str(err) << std::endl; CHECK_KAFKA(consumer->unassign(), RdKafka::ERR_NO_ERROR, "Error during unassign"); } } void KafkaSourceStage__Rebalancer::rebalance_loop(RdKafka::KafkaConsumer* consumer) { do { // Poll until we are rebalanced while (!this->is_rebalanced()) { VLOG(10) << m_display_str_fn("Rebalance: Calling poll to trigger rebalance"); consumer->poll(500); } } while (m_partition_future.get()); } bool KafkaSourceStage__Rebalancer::is_rebalanced() { std::unique_lock<boost::fibers::recursive_mutex> lock(m_mutex); return m_is_rebalanced; } class KafkaRebalancer : public RdKafka::RebalanceCb { private: std::unique_ptr<RdKafka::KafkaConsumer> m_consumer; }; // Component public implementations // ************ KafkaStage ************************* // KafkaSourceStage::KafkaSourceStage(TensorIndex max_batch_size, std::string topic, uint32_t batch_timeout_ms, std::map<std::string, std::string> config, bool disable_commit, bool disable_pre_filtering, std::size_t stop_after, bool async_commits, std::unique_ptr<KafkaOAuthCallback> oauth_callback) : PythonSource(build()), m_max_batch_size(max_batch_size), m_topics(std::vector<std::string>{std::move(topic)}), m_batch_timeout_ms(batch_timeout_ms), m_config(std::move(config)), m_disable_commit(disable_commit), m_disable_pre_filtering(disable_pre_filtering), m_stop_after{stop_after}, m_async_commits(async_commits), m_oauth_callback(std::move(oauth_callback)) {} KafkaSourceStage::KafkaSourceStage(TensorIndex max_batch_size, std::vector<std::string> topics, uint32_t batch_timeout_ms, std::map<std::string, std::string> config, bool disable_commit, bool disable_pre_filtering, std::size_t stop_after, bool async_commits, std::unique_ptr<KafkaOAuthCallback> oauth_callback) : PythonSource(build()), m_max_batch_size(max_batch_size), m_topics(std::move(topics)), m_batch_timeout_ms(batch_timeout_ms), m_config(std::move(config)), m_disable_commit(disable_commit), m_disable_pre_filtering(disable_pre_filtering), m_stop_after{stop_after}, m_async_commits(async_commits), m_oauth_callback(std::move(oauth_callback)) {} KafkaSourceStage::subscriber_fn_t KafkaSourceStage::build() { return [this](rxcpp::subscriber<source_type_t> sub) -> void { std::size_t records_emitted = 0; // Build rebalancer KafkaSourceStage__Rebalancer rebalancer( [this]() { return this->batch_timeout_ms(); }, [this]() { return this->max_batch_size(); }, [this](const std::string str_to_display) { auto& ctx = mrc::runnable::Context::get_runtime_context(); return MORPHEUS_CONCAT_STR(ctx.info() << " " << str_to_display); }, [sub, &records_emitted, this](std::vector<std::unique_ptr<RdKafka::Message>>& message_batch) { // If we are unsubscribed, throw an error to break the loops if (!sub.is_subscribed()) { throw KafkaSourceStageUnsubscribedException(); } else if (m_stop_after > 0 && records_emitted >= m_stop_after) { throw KafkaSourceStageStopAfter(); } if (message_batch.empty()) { return false; } std::shared_ptr<morpheus::MessageMeta> batch; try { batch = std::move(this->process_batch(std::move(message_batch))); } catch (std::exception& ex) { LOG(ERROR) << "Exception in process_batch. Msg: " << ex.what(); return false; } auto num_records = batch->count(); sub.on_next(std::move(batch)); records_emitted += num_records; return m_requires_commit; }); auto& context = mrc::runnable::Context::get_runtime_context(); // Build consumer auto consumer = this->create_consumer(rebalancer); // Wait for all to connect context.barrier(); try { while (sub.is_subscribed()) { std::vector<std::unique_ptr<RdKafka::Message>> message_batch = rebalancer.partition_progress_step(consumer.get()); // Process the messages. Returns true if we need to commit auto should_commit = rebalancer.process_messages(message_batch); if (should_commit) { if (m_async_commits) { CHECK_KAFKA(consumer->commitAsync(), RdKafka::ERR_NO_ERROR, "Error during commitAsync"); } else { CHECK_KAFKA(consumer->commitSync(), RdKafka::ERR_NO_ERROR, "Error during commit"); } } } } catch (KafkaSourceStageStopAfter) { DLOG(INFO) << "Completed after emitting " << records_emitted << " records"; } catch (std::exception& ex) { LOG(ERROR) << "Exception in rebalance_loop. Msg: " << ex.what(); } consumer->unsubscribe(); consumer->close(); consumer.reset(); m_rebalancer = nullptr; sub.on_completed(); }; } TensorIndex KafkaSourceStage::max_batch_size() { return m_max_batch_size; } uint32_t KafkaSourceStage::batch_timeout_ms() { return m_batch_timeout_ms; } std::unique_ptr<RdKafka::Conf> KafkaSourceStage::build_kafka_conf(const std::map<std::string, std::string>& config_in) { // Copy the config std::map<std::string, std::string> config_out(config_in); std::map<std::string, std::string> defaults{{"session.timeout.ms", "60000"}, {"enable.auto.commit", "false"}, {"auto.offset.reset", "latest"}, {"enable.partition.eof", "true"}}; // Set some defaults if they dont exist config_out.merge(defaults); m_requires_commit = config_out["enable.auto.commit"] == "false"; if (m_requires_commit && m_disable_commit) { LOG(WARNING) << "KafkaSourceStage: Commits have been disabled for this Kafka consumer. This should only be " "used in a debug environment"; m_requires_commit = false; } else if (!m_requires_commit && m_disable_commit) { // User has auto-commit on and disable commit at same time LOG(WARNING) << "KafkaSourceStage: The config option 'enable.auto.commit' was set to True but commits have " "been disabled for this Kafka consumer. This should only be used in a debug environment"; } // Make the kafka_conf and set all properties auto kafka_conf = std::unique_ptr<RdKafka::Conf>(RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL)); for (auto const& key_value : config_out) { std::string error_string; if (RdKafka::Conf::ConfResult::CONF_OK != kafka_conf->set(key_value.first, key_value.second, error_string)) { LOG(ERROR) << "Error occurred while setting Kafka configuration. Error: " << error_string; } } return std::move(kafka_conf); } std::unique_ptr<RdKafka::KafkaConsumer> KafkaSourceStage::create_consumer(RdKafka::RebalanceCb& rebalancer) { auto kafka_conf = this->build_kafka_conf(m_config); std::string errstr; if (RdKafka::Conf::ConfResult::CONF_OK != kafka_conf->set("rebalance_cb", &rebalancer, errstr)) { LOG(FATAL) << "Error occurred while setting Kafka rebalance function. Error: " << errstr; } if (m_oauth_callback != nullptr) { if (RdKafka::Conf::ConfResult::CONF_OK != kafka_conf->set("oauthbearer_token_refresh_cb", m_oauth_callback.get(), errstr)) { LOG(FATAL) << "Error occurred while setting Kafka OAuth Callback function. Error: " << errstr; } } auto consumer = std::unique_ptr<RdKafka::KafkaConsumer>(RdKafka::KafkaConsumer::create(kafka_conf.get(), errstr)); if (!consumer) { LOG(FATAL) << "Error occurred creating Kafka consumer. Error: " << errstr; } // Subscribe to the topics. Uses the default rebalancer CHECK_KAFKA(consumer->subscribe(m_topics), RdKafka::ERR_NO_ERROR, "Error subscribing to topics"); // Create a vector of topic objects std::vector<std::unique_ptr<RdKafka::Topic>> topicObjs; // Create a topic object for each topic name and add it to the vector for (const auto& m_topic : m_topics) { auto topicObj = std::unique_ptr<RdKafka::Topic>(RdKafka::Topic::create(consumer.get(), m_topic, nullptr, errstr)); if (!topicObj) { throw std::runtime_error("Failed to create Kafka topic object"); } topicObjs.push_back(std::move(topicObj)); } RdKafka::Metadata* md; for (unsigned short i = 0; i < 5; ++i) { auto err_code = consumer->metadata(topicObjs.empty(), topicObjs.empty() ? nullptr : topicObjs[0].get(), &md, 1000); if (err_code == RdKafka::ERR_NO_ERROR && md != nullptr) { break; } LOG(WARNING) << "Timed out while trying to list topics from the Kafka broker. Attempt #" << i + 1 << "/5. Error message: " << RdKafka::err2str(err_code); } if (md == nullptr) { throw std::runtime_error("Failed to list topics in Kafka broker after 5 attempts"); } std::map<std::string, std::vector<int32_t>> topic_parts; auto& ctx = mrc::runnable::Context::get_runtime_context(); VLOG(10) << ctx.info() << MORPHEUS_CONCAT_STR(" Subscribed to " << md->topics()->size() << " topics:"); for (auto const& topic : *(md->topics())) { auto& part_ids = topic_parts[topic->topic()]; auto const& parts = *(topic->partitions()); std::transform(parts.cbegin(), parts.cend(), std::back_inserter(part_ids), [](auto const& part) { return part->id(); }); auto toppar_list = foreach_map(parts, [&topic](const auto& part) { return std::unique_ptr<RdKafka::TopicPartition>{ RdKafka::TopicPartition::create(topic->topic(), part->id())}; }); std::vector<RdKafka::TopicPartition*> toppar_ptrs = foreach_map(toppar_list, [](const std::unique_ptr<RdKafka::TopicPartition>& x) { return x.get(); }); // Query Kafka to populate the TopicPartitions with the desired offsets CHECK_KAFKA( consumer->committed(toppar_ptrs, 2000), RdKafka::ERR_NO_ERROR, "Failed retrieve Kafka committed offsets"); auto committed = foreach_map(toppar_list, [](const std::unique_ptr<RdKafka::TopicPartition>& x) { return x->offset(); }); // Query Kafka to populate the TopicPartitions with the desired offsets CHECK_KAFKA(consumer->position(toppar_ptrs), RdKafka::ERR_NO_ERROR, "Failed retrieve Kafka positions"); auto positions = foreach_map(toppar_list, [](const std::unique_ptr<RdKafka::TopicPartition>& x) { return x->offset(); }); auto watermarks = foreach_map(toppar_list, [&consumer](const std::unique_ptr<RdKafka::TopicPartition>& x) { int64_t low; int64_t high; CHECK_KAFKA(consumer->query_watermark_offsets(x->topic(), x->partition(), &low, &high, 1000), RdKafka::ERR_NO_ERROR, "Failed retrieve Kafka watermark offsets"); return std::make_tuple(low, high); }); auto watermark_strs = foreach_map(watermarks, [](const auto& x) { return MORPHEUS_CONCAT_STR("(" << std::get<0>(x) << ", " << std::get<1>(x) << ")"); }); auto& ctx = mrc::runnable::Context::get_runtime_context(); VLOG(10) << ctx.info() << MORPHEUS_CONCAT_STR( " Topic: '" << topic->topic() << "', Parts: " << StringUtil::array_to_str(part_ids.begin(), part_ids.end()) << ", Committed: " << StringUtil::array_to_str(committed.begin(), committed.end()) << ", Positions: " << StringUtil::array_to_str(positions.begin(), positions.end()) << ", Watermarks: " << StringUtil::array_to_str(watermark_strs.begin(), watermark_strs.end())); } return std::move(consumer); } cudf::io::table_with_metadata KafkaSourceStage::load_table(const std::string& buffer) { auto options = cudf::io::json_reader_options::builder(cudf::io::source_info(buffer.c_str(), buffer.size())).lines(true); return cudf::io::read_json(options.build()); } template <bool EnableFilter> std::string concat_message_batch(std::vector<std::unique_ptr<RdKafka::Message>> const& message_batch) { std::ostringstream buffer; for (auto& msg : message_batch) { auto s = static_cast<char*>(msg->payload()); if constexpr (EnableFilter) { if (!nlohmann::json::accept(s)) { LOG(ERROR) << "Failed to parse kafka message as json: " << s; continue; } } buffer << s << "\n"; } return buffer.str(); } std::shared_ptr<morpheus::MessageMeta> KafkaSourceStage::process_batch( std::vector<std::unique_ptr<RdKafka::Message>>&& message_batch) { // concat the kafka json messages auto json_lines = !this->m_disable_pre_filtering ? concat_message_batch<true>(message_batch) : concat_message_batch<false>(message_batch); // parse the json auto data_table = this->load_table(json_lines); // Next, create the message metadata. This gets reused for repeats return MessageMeta::create_from_cpp(std::move(data_table), 0); } // ************ KafkaStageInterfaceProxy ************ // std::shared_ptr<mrc::segment::Object<KafkaSourceStage>> KafkaSourceStageInterfaceProxy::init_with_single_topic( mrc::segment::Builder& builder, const std::string& name, TensorIndex max_batch_size, std::string topic, uint32_t batch_timeout_ms, std::map<std::string, std::string> config, bool disable_commit, bool disable_pre_filtering, std::size_t stop_after, bool async_commits, std::optional<pybind11::function> oauth_callback) { auto oauth_callback_cpp = KafkaSourceStageInterfaceProxy::make_kafka_oauth_callback(std::move(oauth_callback)); auto stage = builder.construct_object<KafkaSourceStage>(name, max_batch_size, topic, batch_timeout_ms, config, disable_commit, disable_pre_filtering, stop_after, async_commits, std::move(oauth_callback_cpp)); return stage; } std::shared_ptr<mrc::segment::Object<KafkaSourceStage>> KafkaSourceStageInterfaceProxy::init_with_multiple_topics( mrc::segment::Builder& builder, const std::string& name, TensorIndex max_batch_size, std::vector<std::string> topics, uint32_t batch_timeout_ms, std::map<std::string, std::string> config, bool disable_commit, bool disable_pre_filtering, std::size_t stop_after, bool async_commits, std::optional<pybind11::function> oauth_callback) { auto oauth_callback_cpp = KafkaSourceStageInterfaceProxy::make_kafka_oauth_callback(std::move(oauth_callback)); auto stage = builder.construct_object<KafkaSourceStage>(name, max_batch_size, topics, batch_timeout_ms, config, disable_commit, disable_pre_filtering, stop_after, async_commits, std::move(oauth_callback_cpp)); return stage; } std::unique_ptr<KafkaOAuthCallback> KafkaSourceStageInterfaceProxy::make_kafka_oauth_callback( std::optional<pybind11::function>&& oauth_callback) { if (oauth_callback == std::nullopt) { return static_cast<std::unique_ptr<KafkaOAuthCallback>>(nullptr); } auto oauth_callback_wrapped = mrc::pymrc::PyFuncWrapper(std::move(oauth_callback.value())); return std::make_unique<KafkaOAuthCallback>([oauth_callback_wrapped = std::move(oauth_callback_wrapped)]() { auto kvp_cpp = std::map<std::string, std::string>(); auto kvp = oauth_callback_wrapped.operator()<pybind11::dict>(); for (auto [key, value] : kvp) { kvp_cpp[key.cast<std::string>()] = value.cast<std::string>(); } return kvp_cpp; }); } } // namespace morpheus

© Copyright 2024, NVIDIA. Last updated on Jul 8, 2024.