nv_ingest_api.util.message_brokers.simple_message_broker package#

Submodules#

nv_ingest_api.util.message_brokers.simple_message_broker.broker module#

class nv_ingest_api.util.message_brokers.simple_message_broker.broker.SimpleMessageBroker(host: str, port: int, max_queue_size: int)[source]#

Bases: ThreadingMixIn, TCPServer

A thread-safe message broker server that manages multiple message queues and supports commands such as PUSH, POP, SIZE, and PING.

allow_reuse_address = True#
class nv_ingest_api.util.message_brokers.simple_message_broker.broker.SimpleMessageBrokerHandler(request, client_address, server)[source]#

Bases: BaseRequestHandler

Handles incoming client requests for the SimpleMessageBroker server, processes commands such as PUSH, POP, SIZE, and PING, and manages message queues with thread-safe operations.

handle()[source]#

Handles incoming client requests, validates the request data, and dispatches to the appropriate command handler.

Raises:
  • ValidationError – If the incoming request data fails schema validation.

  • Exception – If there is an unexpected error while processing the request.

nv_ingest_api.util.message_brokers.simple_message_broker.ordered_message_queue module#

class nv_ingest_api.util.message_brokers.simple_message_broker.ordered_message_queue.OrderedMessageQueue(maxsize=0)[source]#

Bases: object

acknowledge(transaction_id)[source]#

Acknowledge that a message has been processed.

can_push()[source]#

Check if the queue can accept more messages.

empty()[source]#

Check if the queue is empty.

full()[source]#

Check if the queue is full.

pop(transaction_id)[source]#

Pop a message from the queue and mark it as in-flight.

push(message)[source]#

Add a message to the queue after it has been acknowledged.

qsize()[source]#

Get the number of messages currently in the queue.

return_message(transaction_id)[source]#

Return an unacknowledged message back to the queue.

nv_ingest_api.util.message_brokers.simple_message_broker.simple_client module#

class nv_ingest_api.util.message_brokers.simple_message_broker.simple_client.SimpleClient(
host: str,
port: int,
db: int = 0,
max_retries: int = 3,
max_backoff: int = 32,
connection_timeout: int = 300,
max_pool_size: int = 128,
use_ssl: bool = False,
)[source]#

Bases: MessageBrokerClientBase

A client for interfacing with SimpleMessageBroker, creating a new socket connection per request to ensure thread safety and robustness. Respects timeouts for all operations.

fetch_message(
queue_name: str,
timeout: Tuple[int, float] | None = (100, None),
override_fetch_mode: FetchMode | None = None,
) ResponseSchema[source]#

Fetch a message from the specified queue.

Parameters:
  • queue_name (str) – The name of the queue.

  • timeout (float, optional) – Timeout in seconds for the operation.

Returns:

The response containing the fetched message.

Return type:

ResponseSchema

get_client()[source]#

Retrieve the current client instance.

Returns:

The current client instance.

Return type:

SimpleClient

ping() ResponseSchema[source]#

Ping the broker to check connectivity.

Returns:

The response indicating the success of the ping operation.

Return type:

ResponseSchema

size(
queue_name: str,
) ResponseSchema[source]#

Fetch the size of the specified queue.

Parameters:

queue_name (str) – The name of the queue.

Returns:

The response containing the queue size.

Return type:

ResponseSchema

submit_message(
queue_name: str,
message: str,
timeout: Tuple[int, float] | None = (100, None),
for_nv_ingest: bool = False,
) ResponseSchema[source]#

Submit a message to the specified queue.

Parameters:
  • queue_name (str) – The name of the queue.

  • message (str) – The message to be submitted.

  • timeout (float, optional) – Timeout in seconds for the operation.

  • for_nv_ingest (bool, optional) – Indicates whether the message is for NV ingest operations.

Returns:

The response from the broker.

Return type:

ResponseSchema

Module contents#

class nv_ingest_api.util.message_brokers.simple_message_broker.ResponseSchema(
*,
response_code: int,
response_reason: str | None = 'OK',
response: str | dict | None = None,
trace_id: str | None = None,
transaction_id: str | None = None,
)[source]#

Bases: BaseModel

model_config: ClassVar[ConfigDict] = {}#

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

response: str | dict | None#
response_code: int#
response_reason: str | None#
trace_id: str | None#
transaction_id: str | None#
class nv_ingest_api.util.message_brokers.simple_message_broker.SimpleClient(
host: str,
port: int,
db: int = 0,
max_retries: int = 3,
max_backoff: int = 32,
connection_timeout: int = 300,
max_pool_size: int = 128,
use_ssl: bool = False,
)[source]#

Bases: MessageBrokerClientBase

A client for interfacing with SimpleMessageBroker, creating a new socket connection per request to ensure thread safety and robustness. Respects timeouts for all operations.

fetch_message(
queue_name: str,
timeout: Tuple[int, float] | None = (100, None),
override_fetch_mode: FetchMode | None = None,
) ResponseSchema[source]#

Fetch a message from the specified queue.

Parameters:
  • queue_name (str) – The name of the queue.

  • timeout (float, optional) – Timeout in seconds for the operation.

Returns:

The response containing the fetched message.

Return type:

ResponseSchema

get_client()[source]#

Retrieve the current client instance.

Returns:

The current client instance.

Return type:

SimpleClient

ping() ResponseSchema[source]#

Ping the broker to check connectivity.

Returns:

The response indicating the success of the ping operation.

Return type:

ResponseSchema

size(
queue_name: str,
) ResponseSchema[source]#

Fetch the size of the specified queue.

Parameters:

queue_name (str) – The name of the queue.

Returns:

The response containing the queue size.

Return type:

ResponseSchema

submit_message(
queue_name: str,
message: str,
timeout: Tuple[int, float] | None = (100, None),
for_nv_ingest: bool = False,
) ResponseSchema[source]#

Submit a message to the specified queue.

Parameters:
  • queue_name (str) – The name of the queue.

  • message (str) – The message to be submitted.

  • timeout (float, optional) – Timeout in seconds for the operation.

  • for_nv_ingest (bool, optional) – Indicates whether the message is for NV ingest operations.

Returns:

The response from the broker.

Return type:

ResponseSchema

class nv_ingest_api.util.message_brokers.simple_message_broker.SimpleMessageBroker(host: str, port: int, max_queue_size: int)[source]#

Bases: ThreadingMixIn, TCPServer

A thread-safe message broker server that manages multiple message queues and supports commands such as PUSH, POP, SIZE, and PING.

allow_reuse_address = True#