nv_ingest_api.util.service_clients.redis package#
Submodules#
nv_ingest_api.util.service_clients.redis.redis_client module#
- class nv_ingest_api.util.service_clients.redis.redis_client.RedisClient(host: str, port: int, db: int = 0, max_retries: int = 3, max_backoff: int = 32, connection_timeout: int = 300, max_pool_size: int = 128, use_ssl: bool = False, redis_allocator: ~typing.Callable[[...], ~redis.client.Redis] = <class 'redis.client.Redis'>, fetch_mode: ~nv_ingest_api.util.service_clients.client_base.FetchMode = None, cache_config: ~typing.Dict[str, ~typing.Any] | None = None, message_ttl_seconds: int | None = 600)[source]#
Bases:
MessageBrokerClientBaseA client for interfacing with Redis, providing mechanisms for sending and receiving messages with retry logic, connection management, configurable fetch modes, and optional local caching.
Handles message fragmentation transparently during fetch operations.
- fetch_message(
- channel_name: str,
- timeout: float = 10,
- override_fetch_mode: FetchMode | None = None,
Fetches a complete message from Redis. It handles fragmentation according to the specified or configured fetch mode and retries on connection errors.
- Parameters:
channel_name (str) – The Redis channel key from which to fetch the message.
timeout (float, optional) – The timeout in seconds for fetching the message. Default is 10 seconds.
override_fetch_mode (FetchMode, optional) – If provided, overrides the configured fetch mode for this operation.
- Returns:
The final reconstructed message dictionary if successful, or None if not found.
- Return type:
dict or None
- Raises:
TimeoutError – If fetching times out.
ValueError – If non-retryable errors occur or max retries are exceeded.
RuntimeError – For other runtime errors.
- fetch_message_from_any(
- channel_names: List[str],
- timeout: float = 0,
Attempt to fetch a message from the first non-empty list among the provided channel names using Redis BLPOP. If the popped item represents a fragmented message, this method will continue popping from the same channel to reconstruct the full message.
- Parameters:
channel_names (List[str]) – Ordered list of Redis list keys to attempt in priority order.
timeout (float, optional) – Timeout in seconds to wait for any item across the provided lists. Redis supports integer-second timeouts; sub-second values will be truncated.
- Returns:
The reconstructed message dictionary if an item was fetched; otherwise None on timeout.
- Return type:
dict or None
- fetch_message_from_any_with_key(
- channel_names: List[str],
- timeout: float = 0,
Like fetch_message_from_any(), but returns the Redis list key together with the message. This is useful for higher-level schedulers that need to apply per-category quotas.
- get_client() Redis[source]#
Returns a Redis client instance, attempting reconnection if the current client is invalid.
- Returns:
The active Redis client instance.
- Return type:
redis.Redis
- Raises:
RuntimeError – If no valid client can be established.
- property max_retries: int#
Gets the maximum number of allowed retries for Redis operations.
- Returns:
The maximum number of retries.
- Return type:
int
- ping() bool[source]#
Checks if the Redis client connection is alive by issuing a PING command.
- Returns:
True if the ping is successful, False otherwise.
- Return type:
bool
- submit_message(
- channel_name: str,
- message: str,
- ttl_seconds: int | None = None,
Submits a message to Redis using RPUSH and optionally sets a TTL on the channel key.
- Parameters:
channel_name (str) – The Redis list key (queue name) to which the message will be appended.
message (str) – The message payload as a JSON string.
ttl_seconds (int, optional) – Time-To-Live for the Redis key in seconds. If not provided, uses message_ttl_seconds.
- Return type:
None
- Raises:
ValueError – If maximum retry attempts are exceeded.
ConnectionError – If there is a connection error with Redis.
redis.RedisError – For other non-recoverable Redis errors.