NVIDIA Morpheus (24.06)
(Latest Version)

Class KafkaSourceStage

Base Type

  • public mrc::pymrc::PythonSource< std::shared_ptr< MessageMeta > >

class KafkaSourceStage : public mrc::pymrc::PythonSource<std::shared_ptr<MessageMeta>>

This class loads messages from the Kafka cluster by serving as a Kafka consumer.

Public Types

using base_t = mrc::pymrc::PythonSource<std::shared_ptr<MessageMeta>>

Public Functions

KafkaSourceStage(TensorIndex max_batch_size, std::string topic, uint32_t batch_timeout_ms, std::map<std::string, std::string> config, bool disable_commit = false, bool disable_pre_filtering = false, std::size_t stop_after = 0, bool async_commits = true, std::unique_ptr<KafkaOAuthCallback> oauth_callback = nullptr)

Construct a new Kafka Source Stage object.

Parameters
  • max_batch_size – : The maximum batch size for the messages batch.

  • topic – : Input kafka topic.

  • batch_timeout_ms – : Frequency of the poll in ms.

  • config – : Kafka consumer configuration.

  • disable_commit – : Enabling this option will skip committing messages as they are pulled off the server. This is only useful for debugging, allowing the user to process the same messages multiple times

  • disable_pre_filtering – : Enabling this option will skip pre-filtering of json messages. This is only useful when inputs are known to be valid json.

  • stop_after – : Stops ingesting after emitting stop_after records (rows in the table). Useful for testing. Disabled if 0

  • async_commits – : Asynchronously acknowledge consuming Kafka messages

KafkaSourceStage(TensorIndex max_batch_size, std::vector<std::string> topics, uint32_t batch_timeout_ms, std::map<std::string, std::string> config, bool disable_commit = false, bool disable_pre_filtering = false, std::size_t stop_after = 0, bool async_commits = true, std::unique_ptr<KafkaOAuthCallback> oauth_callback = nullptr)

Construct a new Kafka Source Stage object.

Parameters
  • max_batch_size – : The maximum batch size for the messages batch.

  • topics – : Input kafka topics.

  • batch_timeout_ms – : Frequency of the poll in ms.

  • config – : Kafka consumer configuration.

  • disable_commit – : Enabling this option will skip committing messages as they are pulled off the server. This is only useful for debugging, allowing the user to process the same messages multiple times

  • disable_pre_filtering – : Enabling this option will skip pre-filtering of json messages. This is only useful when inputs are known to be valid json.

  • stop_after – : Stops ingesting after emitting stop_after records (rows in the table). Useful for testing. Disabled if 0

  • async_commits – : Asynchronously acknowledge consuming Kafka messages

~KafkaSourceStage() override = default

TensorIndex max_batch_size()
Returns

maximum batch size for KafkaSource.

uint32_t batch_timeout_ms()
Returns

batch timeout in ms.

Previous Class KafkaRebalancer
Next Class KafkaSourceStage__Rebalancer
© Copyright 2024, NVIDIA. Last updated on Jul 8, 2024.