mdx.mtmc.stream.kafka_message_broker module

class KafkaMessageBroker(config: AppConfig)

Bases: object

Module for Kafka message broker

Parameters:

config (AppConfig) – configuration for the app

kafka_message_broker = KafkaMessageBroker(config)
get_consumed_notification_messages_and_count(consumer: kafka.KafkaConsumer) Tuple[List[kafka.consumer.fetcher.ConsumerRecord], int]

Consumes messages and count from the mdx-notification topic

Parameters:

consumer (KafkaConsumer) – Kafka consumer

Returns:

Kafka consumer records and the count of messages

Return type:

Tuple[List[KafkaConsumerRecord],int]

messages, message_count = kafka_message_broker.get_consumed_notification_messages_and_count(consumer)
get_consumed_raw_messages(consumer: kafka.KafkaConsumer) Dict[kafka.TopicPartition, List[kafka.consumer.fetcher.ConsumerRecord]]

Consumes partitioned messages from the mdx-raw topic

Parameters:

consumer (KafkaConsumer) – Kafka consumer

Return type:

Dict[TopicPartition,List[KafkaConsumerRecord]]

partitioned_messages = kafka_message_broker.get_consumed_raw_messages(consumer)
get_consumed_raw_messages_and_count(consumer: kafka.KafkaConsumer) Tuple[List[Any], int]

Consumes messages and count from the mdx-raw topic

Parameters:

consumer (KafkaConsumer) – Kafka consumer

Return type:

Tuple[List[Any],int]

messages, message_count = kafka_message_broker.get_consumed_raw_messages_and_count(consumer)
get_consumer(topic: str, group_id: str) kafka.KafkaConsumer

Creates Kafka consumer

Parameters:
  • topic (str) – Kafka topic

  • group_id (str) – group ID

Returns:

Kafka consumer

Return type:

KafkaConsumer

consumer = kafka_message_broker.get_consumer(topic, group_id)
get_producer() kafka.KafkaProducer

Creates Kafka producer

Returns:

Kafka producer

Return type:

KafkaProducer

producer = kafka_message_broker.get_producer()
produce_calibration_request(producer: kafka.KafkaProducer) None

Sends a message requesting calibration to mdx-notification

Parameters:

producer (KafkaProducer) – Kafka producer

Returns:

None

kafka_message_broker.produce_calibration_request(producer)
produce_config_request(producer: kafka.KafkaProducer) None

Sends a message requesting MTMC analytics config

Parameters:

producer (KafkaProducer) – Kafka producer

Returns:

None

kafka_message_broker.produce_config_request(producer)
produce_init_config(producer: kafka.KafkaProducer) None

Sends a message containing init MTMC analytics config

Parameters:

producer (KafkaProducer) – Kafka producer

Returns:

None

kafka_message_broker.produce_init_config(producer)
produce_mtmc_messages(producer: kafka.KafkaProducer, mtmc_objects: List[MTMCObject]) None

Sends messages containing MTMC objects to mdx-mtmc

Parameters:
  • producer (KafkaProducer) – Kafka producer

  • mtmc_objects (List[MTMCObject]) – list of MTMC objects

Returns:

None

kafka_message_broker.produce_mtmc_messages(producer, mtmc_objects)
produce_mtmc_plus_messages(producer: kafka.KafkaProducer, mtmc_objects_plus: MTMCObjectsPlus) None

Sends messages containing MTMC objects plus locations to mdx-rtls

Parameters:
  • producer (KafkaProducer) – Kafka producer

  • mtmc_objects_plus (MTMCObjectsPlus) – MTMC objects plus locations

Returns:

None

kafka_message_broker.produce_mtmc_plus_messages(producer, mtmc_objects_plus)
set_sensor_state_objects(sensor_state_objects: Dict[str, SensorStateObject]) None

Sets sensor state objects

Parameters:

sensor_state_objects (Dict[str,SensorStateObject]) – map from sensor IDs to sensor state objects

Returns:

None

kafka_message_broker.set_sensor_state_objects(sensor_state_objects)
json_serializer(object_instance)