nv_ingest.framework.util.service.impl.ingest package#
Submodules#
nv_ingest.framework.util.service.impl.ingest.redis_ingest_service module#
- class nv_ingest.framework.util.service.impl.ingest.redis_ingest_service.RedisIngestService(
- redis_hostname: str,
- redis_port: int,
- redis_task_queue: str,
- fetch_mode: FetchMode,
- result_data_ttl_seconds: int | None,
- state_ttl_seconds: int | None,
- cache_config: Dict[str, Any] | None,
- use_ssl: bool,
Bases:
IngestServiceMeta
Submits jobs and fetches results via Redis, supporting multiple fetch modes and state management with TTLs. Operates asynchronously using asyncio.to_thread for synchronous Redis client operations.
- async fetch_job(job_id: str) Dict | None [source]#
Fetches the job result using the configured RedisClient fetch mode and timeout. Executes the synchronous client call asynchronously.
- Parameters:
job_id (str) – The unique identifier of the job.
- Returns:
The job result message.
- Return type:
dict or None
- Raises:
TimeoutError, RedisError, ConnectionError, ValueError, RuntimeError – If the fetch operation fails.
- async get_fetch_mode() FetchMode [source]#
Returns the configured fetch mode for the service.
- Returns:
The current fetch mode.
- Return type:
- static get_instance() RedisIngestService [source]#
Static access method implementing the Singleton pattern.
- Returns:
The singleton instance of the RedisIngestService.
- Return type:
- async get_job_state(job_id: str) str | None [source]#
Retrieves the explicit state of a job.
- Parameters:
job_id (str) – The unique identifier of the job.
- Returns:
The state of the job, or None if not found or upon error.
- Return type:
str or None
- async get_processing_cache(
- job_id: str,
Retrieves processing jobs data from the simple key-value cache.
- Parameters:
job_id (str) – The unique identifier of the job.
- Returns:
A list of processing jobs, or an empty list if not found or upon error.
- Return type:
list of ProcessingJob
- async set_job_state(job_id: str, state: str) None [source]#
Sets the explicit state of a job and refreshes its TTL.
- Parameters:
job_id (str) – The unique identifier of the job.
state (str) – The state to be assigned to the job.
- Return type:
None
- async set_processing_cache(
- job_id: str,
- jobs_data: List[ProcessingJob],
Stores processing jobs data in a simple key-value cache.
- Parameters:
job_id (str) – The unique identifier of the job.
jobs_data (list of ProcessingJob) – The processing job data to be cached.
- Return type:
None
- async submit_job(
- job_spec_wrapper: MessageWrapper,
- trace_id: str,
Validates, prepares, and submits a job specification to the Redis task queue. Sets result data TTL if configured for NON_DESTRUCTIVE mode.
- Parameters:
job_spec_wrapper (MessageWrapper) – A wrapper containing the job specification payload.
trace_id (str) – A unique identifier for the job.
- Returns:
The job trace_id.
- Return type:
str
- Raises:
ValueError – If the payload is missing or invalid.
JSONDecodeError, TypeError – For payload parsing errors.
RedisError, ConnectionError – For Redis-related errors.