Class KafkaSourceStage
Defined in File kafka_source.hpp
Base Type
public mrc::pymrc::PythonSource< std::shared_ptr< MessageMeta > >
-
class KafkaSourceStage : public mrc::pymrc::PythonSource<std::shared_ptr<MessageMeta>>
This class loads messages from the Kafka cluster by serving as a Kafka consumer.
Public Types
- using base_t = mrc::pymrc::PythonSource<std::shared_ptr<MessageMeta>>
Public Functions
-
KafkaSourceStage(TensorIndex max_batch_size, std::string topic, uint32_t batch_timeout_ms, std::map<std::string, std::string> config, bool disable_commit = false, bool disable_pre_filtering = false, std::size_t stop_after = 0, bool async_commits = true, std::unique_ptr<KafkaOAuthCallback> oauth_callback = nullptr)
Construct a new Kafka Source Stage object.
- Parameters
max_batch_size – : The maximum batch size for the messages batch.
topic – : Input kafka topic.
batch_timeout_ms – : Frequency of the poll in ms.
config – : Kafka consumer configuration.
disable_commit – : Enabling this option will skip committing messages as they are pulled off the server. This is only useful for debugging, allowing the user to process the same messages multiple times
disable_pre_filtering – : Enabling this option will skip pre-filtering of json messages. This is only useful when inputs are known to be valid json.
stop_after – : Stops ingesting after emitting
stop_after
records (rows in the table). Useful for testing. Disabled if0
async_commits – : Asynchronously acknowledge consuming Kafka messages
-
KafkaSourceStage(TensorIndex max_batch_size, std::vector<std::string> topics, uint32_t batch_timeout_ms, std::map<std::string, std::string> config, bool disable_commit = false, bool disable_pre_filtering = false, std::size_t stop_after = 0, bool async_commits = true, std::unique_ptr<KafkaOAuthCallback> oauth_callback = nullptr)
Construct a new Kafka Source Stage object.
- Parameters
max_batch_size – : The maximum batch size for the messages batch.
topics – : Input kafka topics.
batch_timeout_ms – : Frequency of the poll in ms.
config – : Kafka consumer configuration.
disable_commit – : Enabling this option will skip committing messages as they are pulled off the server. This is only useful for debugging, allowing the user to process the same messages multiple times
disable_pre_filtering – : Enabling this option will skip pre-filtering of json messages. This is only useful when inputs are known to be valid json.
stop_after – : Stops ingesting after emitting
stop_after
records (rows in the table). Useful for testing. Disabled if0
async_commits – : Asynchronously acknowledge consuming Kafka messages
- ~KafkaSourceStage() override = default
-
TensorIndex max_batch_size()
- Returns
maximum batch size for KafkaSource.
-
uint32_t batch_timeout_ms()
- Returns
batch timeout in ms.