nv_ingest.util.message_brokers.redis package#

Submodules#

nv_ingest.util.message_brokers.redis.redis_client module#

class nv_ingest.util.message_brokers.redis.redis_client.RedisClient(
host: str,
port: int,
db: int = 0,
max_retries: int = 0,
max_backoff: int = 32,
connection_timeout: int = 300,
max_pool_size: int = 128,
use_ssl: bool = False,
redis_allocator: ~typing.Any = <class 'redis.client.Redis'>,
)[source]#

Bases: MessageBrokerClientBase

A client for interfacing with Redis, providing mechanisms for sending and receiving messages with retry logic and connection management.

Parameters:
  • host (str) – The hostname of the Redis server.

  • port (int) – The port number of the Redis server.

  • db (int, optional) – The database number to connect to. Default is 0.

  • max_retries (int, optional) – The maximum number of retry attempts for operations. Default is 0 (no retries).

  • max_backoff (int, optional) – The maximum backoff delay between retries in seconds. Default is 32 seconds.

  • connection_timeout (int, optional) – The timeout in seconds for connecting to the Redis server. Default is 300 seconds.

  • max_pool_size (int, optional) – The maximum number of connections in the Redis connection pool. Default is 128.

  • use_ssl (bool, optional) – Specifies if SSL should be used for the connection. Default is False.

  • redis_allocator (Any, optional) – The Redis client allocator, allowing for custom Redis client instances. Default is redis.Redis.

client#

The Redis client instance used for operations.

Type:

Any

fetch_message(
channel_name: str,
timeout: float = 10,
) str | Dict | None[source]#

Fetches a message from the specified queue with retries on failure. If the message is fragmented, it will continue fetching fragments until all parts have been collected.

Parameters:
  • channel_name (str) – Channel to fetch the message from.

  • timeout (float) – The timeout in seconds for blocking until a message is available. If we receive a multi-part message, this value will be temporarily extended in order to collect all fragments.

Returns:

The full fetched message, or None if no message could be fetched after retries.

Return type:

Optional[str]

Raises:

ValueError – If fetching the message fails after the specified number of retries or due to other critical errors.

get_client() Any[source]#

Returns a Redis client instance, reconnecting if necessary.

Returns:

The Redis client instance.

Return type:

Any

property max_retries: int#
ping() bool[source]#

Checks if the Redis server is responsive.

Returns:

True if the server responds to a ping, False otherwise.

Return type:

bool

submit_message(channel_name: str, message: str) None[source]#

Submits a message to a specified Redis queue with retries on failure.

Parameters:
  • channel_name (str) – The name of the queue to submit the message to.

  • message (str) – The message to submit.

Raises:

RedisError – If submitting the message fails after the specified number of retries.

Module contents#