mdx.mtmc.stream.kafka_message_broker module
- class KafkaMessageBroker(config: AppConfig)
Bases:
object
Module for Kafka message broker
- Parameters:
config (AppConfig) – configuration for the app
kafka_message_broker = KafkaMessageBroker(config)
- get_consumed_notification_messages_and_count(consumer: kafka.KafkaConsumer) Tuple[List[kafka.consumer.fetcher.ConsumerRecord], int]
Consumes messages and count from the mdx-notification topic
- Parameters:
consumer (KafkaConsumer) – Kafka consumer
- Returns:
Kafka consumer records and the count of messages
- Return type:
Tuple[List[KafkaConsumerRecord],int]
messages, message_count = kafka_message_broker.get_consumed_notification_messages_and_count(consumer)
- get_consumed_raw_messages(consumer: kafka.KafkaConsumer) Dict[kafka.TopicPartition, List[kafka.consumer.fetcher.ConsumerRecord]]
Consumes partitioned messages from the mdx-raw topic
- Parameters:
consumer (KafkaConsumer) – Kafka consumer
- Return type:
Dict[TopicPartition,List[KafkaConsumerRecord]]
partitioned_messages = kafka_message_broker.get_consumed_raw_messages(consumer)
- get_consumed_raw_messages_and_count(consumer: kafka.KafkaConsumer) Tuple[List[Any], int]
Consumes messages and count from the mdx-raw topic
- Parameters:
consumer (KafkaConsumer) – Kafka consumer
- Return type:
Tuple[List[Any],int]
messages, message_count = kafka_message_broker.get_consumed_raw_messages_and_count(consumer)
- get_consumer(topic: str, group_id: str) kafka.KafkaConsumer
Creates Kafka consumer
- Parameters:
topic (str) – Kafka topic
group_id (str) – group ID
- Returns:
Kafka consumer
- Return type:
KafkaConsumer
consumer = kafka_message_broker.get_consumer(topic, group_id)
- get_producer() kafka.KafkaProducer
Creates Kafka producer
- Returns:
Kafka producer
- Return type:
KafkaProducer
producer = kafka_message_broker.get_producer()
- produce_calibration_request(producer: kafka.KafkaProducer) None
Sends a message requesting calibration to mdx-notification
- Parameters:
producer (KafkaProducer) – Kafka producer
- Returns:
None
kafka_message_broker.produce_calibration_request(producer)
- produce_config_request(producer: kafka.KafkaProducer) None
Sends a message requesting MTMC analytics config
- Parameters:
producer (KafkaProducer) – Kafka producer
- Returns:
None
kafka_message_broker.produce_config_request(producer)
- produce_init_config(producer: kafka.KafkaProducer) None
Sends a message containing init MTMC analytics config
- Parameters:
producer (KafkaProducer) – Kafka producer
- Returns:
None
kafka_message_broker.produce_init_config(producer)
- produce_mtmc_messages(producer: kafka.KafkaProducer, mtmc_objects: List[MTMCObject]) None
Sends messages containing MTMC objects to mdx-mtmc
- Parameters:
producer (KafkaProducer) – Kafka producer
mtmc_objects (List[MTMCObject]) – list of MTMC objects
- Returns:
None
kafka_message_broker.produce_mtmc_messages(producer, mtmc_objects)
- produce_mtmc_plus_messages(producer: kafka.KafkaProducer, mtmc_objects_plus: MTMCObjectsPlus) None
Sends messages containing MTMC objects plus locations to mdx-rtls
- Parameters:
producer (KafkaProducer) – Kafka producer
mtmc_objects_plus (MTMCObjectsPlus) – MTMC objects plus locations
- Returns:
None
kafka_message_broker.produce_mtmc_plus_messages(producer, mtmc_objects_plus)
- set_sensor_state_objects(sensor_state_objects: Dict[str, SensorStateObject]) None
Sets sensor state objects
- Parameters:
sensor_state_objects (Dict[str,SensorStateObject]) – map from sensor IDs to sensor state objects
- Returns:
None
kafka_message_broker.set_sensor_state_objects(sensor_state_objects)
- json_serializer(object_instance)