Source code for nv_ingest_api.util.service_clients.client_base
# SPDX-FileCopyrightText: Copyright (c) 2024, NVIDIA CORPORATION & AFFILIATES.
# All rights reserved.
# SPDX-License-Identifier: Apache-2.0
from abc import ABC
from abc import abstractmethod
from enum import Enum, auto
from typing import Tuple
from nv_ingest_api.internal.schemas.message_brokers.response_schema import ResponseSchema
[docs]
class FetchMode(Enum):
DESTRUCTIVE = auto() # Read and delete immediately (current behavior)
NON_DESTRUCTIVE = auto() # Read without deleting (requires TTL on Redis data)
CACHE_BEFORE_DELETE = auto() # Read, write to local cache, then delete from Redis
[docs]
class MessageBrokerClientBase(ABC):
"""
Abstract base class for a messaging client to interface with various messaging systems.
Provides a standard interface for sending and receiving messages with connection management
and retry logic.
"""
@abstractmethod
def __init__(
self,
host: str,
port: int,
db: int = 0,
max_retries: int = 0,
max_backoff: int = 32,
connection_timeout: int = 300,
max_pool_size: int = 128,
use_ssl: bool = False,
):
"""
Initialize the messaging client with connection parameters.
"""
[docs]
@abstractmethod
def get_client(self):
"""
Returns the client instance, reconnecting if necessary.
Returns:
The client instance.
"""
[docs]
@abstractmethod
def ping(self) -> bool:
"""
Checks if the server is responsive.
Returns:
True if the server responds to a ping, False otherwise.
"""
[docs]
@abstractmethod
def fetch_message(
self, job_index: str, timeout: Tuple[int, float] = (100, None), override_fetch_mode: FetchMode = None
) -> ResponseSchema:
"""
Fetches a message from the specified queue with retries on failure.
Parameters:
job_index (str): The index of the job to fetch the message for.
timeout (float): The timeout in seconds for blocking until a message is available.
override_fetch_mode: Optional; overrides the default fetch mode.
Returns:
The fetched message, or None if no message could be fetched.
"""
[docs]
@abstractmethod
def submit_message(self, channel_name: str, message: str, for_nv_ingest=False) -> ResponseSchema:
"""
Submits a message to a specified queue with retries on failure.
Parameters:
channel_name (str): The name of the queue to submit the message to.
message (str): The message to submit.
for_nv_ingest (bool): Whether the message is for NV Ingest.
"""