nv_ingest_api.util.message_brokers.simple_message_broker package#
Submodules#
nv_ingest_api.util.message_brokers.simple_message_broker.broker module#
- class nv_ingest_api.util.message_brokers.simple_message_broker.broker.SimpleMessageBroker(host: str, port: int, max_queue_size: int)[source]#
Bases:
ThreadingMixIn
,TCPServer
A thread-safe message broker server that manages multiple message queues and supports commands such as PUSH, POP, SIZE, and PING.
- allow_reuse_address = True#
- class nv_ingest_api.util.message_brokers.simple_message_broker.broker.SimpleMessageBrokerHandler(request, client_address, server)[source]#
Bases:
BaseRequestHandler
Handles incoming client requests for the SimpleMessageBroker server, processes commands such as PUSH, POP, SIZE, and PING, and manages message queues with thread-safe operations.
nv_ingest_api.util.message_brokers.simple_message_broker.ordered_message_queue module#
nv_ingest_api.util.message_brokers.simple_message_broker.simple_client module#
- class nv_ingest_api.util.message_brokers.simple_message_broker.simple_client.SimpleClient(
- host: str,
- port: int,
- db: int = 0,
- max_retries: int = 3,
- max_backoff: int = 32,
- connection_timeout: int = 300,
- max_pool_size: int = 128,
- use_ssl: bool = False,
Bases:
MessageBrokerClientBase
A client for interfacing with SimpleMessageBroker, creating a new socket connection per request to ensure thread safety and robustness. Respects timeouts for all operations.
- fetch_message(
- queue_name: str,
- timeout: Tuple[int, float] | None = (100, None),
- override_fetch_mode: FetchMode | None = None,
Fetch a message from the specified queue.
- Parameters:
queue_name (str) – The name of the queue.
timeout (float, optional) – Timeout in seconds for the operation.
- Returns:
The response containing the fetched message.
- Return type:
- get_client()[source]#
Retrieve the current client instance.
- Returns:
The current client instance.
- Return type:
- ping() ResponseSchema [source]#
Ping the broker to check connectivity.
- Returns:
The response indicating the success of the ping operation.
- Return type:
- size(
- queue_name: str,
Fetch the size of the specified queue.
- Parameters:
queue_name (str) – The name of the queue.
- Returns:
The response containing the queue size.
- Return type:
- submit_message(
- queue_name: str,
- message: str,
- timeout: Tuple[int, float] | None = (100, None),
- for_nv_ingest: bool = False,
Submit a message to the specified queue.
- Parameters:
queue_name (str) – The name of the queue.
message (str) – The message to be submitted.
timeout (float, optional) – Timeout in seconds for the operation.
for_nv_ingest (bool, optional) – Indicates whether the message is for NV ingest operations.
- Returns:
The response from the broker.
- Return type:
Module contents#
- class nv_ingest_api.util.message_brokers.simple_message_broker.ResponseSchema(
- *,
- response_code: int,
- response_reason: str | None = 'OK',
- response: str | dict | None = None,
- trace_id: str | None = None,
- transaction_id: str | None = None,
Bases:
BaseModel
- model_config: ClassVar[ConfigDict] = {}#
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- response: str | dict | None#
- response_code: int#
- response_reason: str | None#
- trace_id: str | None#
- transaction_id: str | None#
- class nv_ingest_api.util.message_brokers.simple_message_broker.SimpleClient(
- host: str,
- port: int,
- db: int = 0,
- max_retries: int = 3,
- max_backoff: int = 32,
- connection_timeout: int = 300,
- max_pool_size: int = 128,
- use_ssl: bool = False,
Bases:
MessageBrokerClientBase
A client for interfacing with SimpleMessageBroker, creating a new socket connection per request to ensure thread safety and robustness. Respects timeouts for all operations.
- fetch_message(
- queue_name: str,
- timeout: Tuple[int, float] | None = (100, None),
- override_fetch_mode: FetchMode | None = None,
Fetch a message from the specified queue.
- Parameters:
queue_name (str) – The name of the queue.
timeout (float, optional) – Timeout in seconds for the operation.
- Returns:
The response containing the fetched message.
- Return type:
- get_client()[source]#
Retrieve the current client instance.
- Returns:
The current client instance.
- Return type:
- ping() ResponseSchema [source]#
Ping the broker to check connectivity.
- Returns:
The response indicating the success of the ping operation.
- Return type:
- size(
- queue_name: str,
Fetch the size of the specified queue.
- Parameters:
queue_name (str) – The name of the queue.
- Returns:
The response containing the queue size.
- Return type:
- submit_message(
- queue_name: str,
- message: str,
- timeout: Tuple[int, float] | None = (100, None),
- for_nv_ingest: bool = False,
Submit a message to the specified queue.
- Parameters:
queue_name (str) – The name of the queue.
message (str) – The message to be submitted.
timeout (float, optional) – Timeout in seconds for the operation.
for_nv_ingest (bool, optional) – Indicates whether the message is for NV ingest operations.
- Returns:
The response from the broker.
- Return type:
- class nv_ingest_api.util.message_brokers.simple_message_broker.SimpleMessageBroker(host: str, port: int, max_queue_size: int)[source]#
Bases:
ThreadingMixIn
,TCPServer
A thread-safe message broker server that manages multiple message queues and supports commands such as PUSH, POP, SIZE, and PING.
- allow_reuse_address = True#