Class KafkaSourceStage#

Inheritance Relationships#

Base Type#

  • public mrc::pymrc::PythonSource< std::shared_ptr< MessageMeta > >

Class Documentation#

class KafkaSourceStage : public mrc::pymrc::PythonSource<std::shared_ptr<MessageMeta>>#

This class loads messages from the Kafka cluster by serving as a Kafka consumer.

Public Types

using base_t = mrc::pymrc::PythonSource<std::shared_ptr<MessageMeta>>#

Public Functions

KafkaSourceStage(
TensorIndex max_batch_size,
std::string topic,
uint32_t batch_timeout_ms,
std::map<std::string, std::string> config,
bool disable_commit = false,
bool disable_pre_filtering = false,
std::size_t stop_after = 0,
bool async_commits = true,
std::unique_ptr<KafkaOAuthCallback> oauth_callback = nullptr
)#

Construct a new Kafka Source Stage object.

Parameters:
  • max_batch_size – : The maximum batch size for the messages batch.

  • topic – : Input kafka topic.

  • batch_timeout_ms – : Frequency of the poll in ms.

  • config – : Kafka consumer configuration.

  • disable_commit – : Enabling this option will skip committing messages as they are pulled off the server. This is only useful for debugging, allowing the user to process the same messages multiple times

  • disable_pre_filtering – : Enabling this option will skip pre-filtering of json messages. This is only useful when inputs are known to be valid json.

  • stop_after – : Stops ingesting after emitting stop_after records (rows in the table). Useful for testing. Disabled if 0

  • async_commits – : Asynchronously acknowledge consuming Kafka messages

KafkaSourceStage(
TensorIndex max_batch_size,
std::vector<std::string> topics,
uint32_t batch_timeout_ms,
std::map<std::string, std::string> config,
bool disable_commit = false,
bool disable_pre_filtering = false,
std::size_t stop_after = 0,
bool async_commits = true,
std::unique_ptr<KafkaOAuthCallback> oauth_callback = nullptr
)#

Construct a new Kafka Source Stage object.

Parameters:
  • max_batch_size – : The maximum batch size for the messages batch.

  • topics – : Input kafka topics.

  • batch_timeout_ms – : Frequency of the poll in ms.

  • config – : Kafka consumer configuration.

  • disable_commit – : Enabling this option will skip committing messages as they are pulled off the server. This is only useful for debugging, allowing the user to process the same messages multiple times

  • disable_pre_filtering – : Enabling this option will skip pre-filtering of json messages. This is only useful when inputs are known to be valid json.

  • stop_after – : Stops ingesting after emitting stop_after records (rows in the table). Useful for testing. Disabled if 0

  • async_commits – : Asynchronously acknowledge consuming Kafka messages

~KafkaSourceStage() override = default#
TensorIndex max_batch_size()#
Returns:

maximum batch size for KafkaSource.

uint32_t batch_timeout_ms()#
Returns:

batch timeout in ms.