nv_ingest.framework.util.service.impl.ingest package#

Submodules#

nv_ingest.framework.util.service.impl.ingest.redis_ingest_service module#

class nv_ingest.framework.util.service.impl.ingest.redis_ingest_service.RedisIngestService(
redis_hostname: str,
redis_port: int,
redis_task_queue: str,
fetch_mode: FetchMode,
result_data_ttl_seconds: int | None,
state_ttl_seconds: int | None,
cache_config: Dict[str, Any] | None,
use_ssl: bool,
)[source]#

Bases: IngestServiceMeta

Submits jobs and fetches results via Redis, supporting multiple fetch modes and state management with TTLs. Operates asynchronously using asyncio.to_thread for synchronous Redis client operations.

async fetch_job(job_id: str) Dict | None[source]#

Fetches the job result using the configured RedisClient fetch mode and timeout. Executes the synchronous client call asynchronously.

Parameters:

job_id (str) – The unique identifier of the job.

Returns:

The job result message.

Return type:

dict or None

Raises:

TimeoutError, RedisError, ConnectionError, ValueError, RuntimeError – If the fetch operation fails.

async get_fetch_mode() FetchMode[source]#

Returns the configured fetch mode for the service.

Returns:

The current fetch mode.

Return type:

FetchMode

static get_instance() RedisIngestService[source]#

Static access method implementing the Singleton pattern.

Returns:

The singleton instance of the RedisIngestService.

Return type:

RedisIngestService

async get_job_state(job_id: str) str | None[source]#

Retrieves the explicit state of a job.

Parameters:

job_id (str) – The unique identifier of the job.

Returns:

The state of the job, or None if not found or upon error.

Return type:

str or None

async get_processing_cache(
job_id: str,
) List[ProcessingJob][source]#

Retrieves processing jobs data from the simple key-value cache.

Parameters:

job_id (str) – The unique identifier of the job.

Returns:

A list of processing jobs, or an empty list if not found or upon error.

Return type:

list of ProcessingJob

async set_job_state(job_id: str, state: str) None[source]#

Sets the explicit state of a job and refreshes its TTL.

Parameters:
  • job_id (str) – The unique identifier of the job.

  • state (str) – The state to be assigned to the job.

Return type:

None

async set_processing_cache(
job_id: str,
jobs_data: List[ProcessingJob],
) None[source]#

Stores processing jobs data in a simple key-value cache.

Parameters:
  • job_id (str) – The unique identifier of the job.

  • jobs_data (list of ProcessingJob) – The processing job data to be cached.

Return type:

None

async submit_job(
job_spec_wrapper: MessageWrapper,
trace_id: str,
) str[source]#

Validates, prepares, and submits a job specification to the Redis task queue. Sets result data TTL if configured for NON_DESTRUCTIVE mode.

Parameters:
  • job_spec_wrapper (MessageWrapper) – A wrapper containing the job specification payload.

  • trace_id (str) – A unique identifier for the job.

Returns:

The job trace_id.

Return type:

str

Raises:
  • ValueError – If the payload is missing or invalid.

  • JSONDecodeError, TypeError – For payload parsing errors.

  • RedisError, ConnectionError – For Redis-related errors.

nv_ingest.framework.util.service.impl.ingest.redis_ingest_service.get_fetch_mode_from_env() FetchMode[source]#

Retrieves the fetch mode from the environment variable FETCH_MODE.

Returns:

The fetch mode as specified by the environment variable, or NON_DESTRUCTIVE by default.

Return type:

FetchMode

Module contents#