Source code for nv_ingest_client.client.client

# SPDX-FileCopyrightText: Copyright (c) 2024, NVIDIA CORPORATION & AFFILIATES.
# All rights reserved.
# SPDX-License-Identifier: Apache-2.0

# pylint: disable=broad-except

import concurrent
import json
import logging
import math
import time
from collections import defaultdict
from concurrent.futures import Future
from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import as_completed
from typing import Any, Type, Callable
from typing import Dict
from typing import List
from typing import Optional
from typing import Tuple
from typing import Union

from nv_ingest_api.util.service_clients.client_base import MessageBrokerClientBase
from nv_ingest_api.util.service_clients.rest.rest_client import RestClient
from nv_ingest_client.primitives import BatchJobSpec
from nv_ingest_client.primitives import JobSpec
from nv_ingest_client.primitives.jobs import JobState
from nv_ingest_client.primitives.jobs import JobStateEnum
from nv_ingest_client.primitives.tasks import Task
from nv_ingest_client.primitives.tasks import TaskType
from nv_ingest_client.primitives.tasks import is_valid_task_type
from nv_ingest_client.primitives.tasks import task_factory
from nv_ingest_client.util.processing import handle_future_result, IngestJobFailure
from nv_ingest_client.util.util import create_job_specs_for_batch, check_ingest_result

logger = logging.getLogger(__name__)


[docs] class DataDecodeException(Exception): """ Exception raised for errors in decoding data. Attributes: message -- explanation of the error data -- the data that failed to decode, optionally """ def __init__(self, message="Data decoding error", data=None): self.message = message self.data = data super().__init__(f"{message}: {data}") def __str__(self): return f"{self.__class__.__name__}({self.message}, Data={self.data})"
class _ConcurrentProcessor: """ Manages the asynchronous submission and result fetching of jobs using a client's public methods, mirroring the batching structure of the CLI path. This processor takes a list of pre-created job indices, submits them in batches via the client's `submit_job_async`, and then fetches results for each batch using `fetch_job_result_async`. It processes results as they become available within the batch using `as_completed`. Retries due to job readiness timeouts are handled by adding the job index to the next processing batch. """ def __init__( self, client: "NvIngestClient", job_indices: List[str], job_queue_id: Optional[str], batch_size: int, timeout: Tuple[int, Union[float, None]], max_job_retries: Optional[int], completion_callback: Optional[Callable[[Dict[str, Any], str], None]], fail_on_submit_error: bool, verbose: bool = False, ): """ Initializes the concurrent processor. Parameters ---------- client : NvIngestClient The client instance used for job operations. Requires methods: `_worker_pool`, `submit_job_async`, `fetch_job_result_async`, `_get_job_state_object`. job_indices : List[str] List of pre-created, unique job indices to process. job_queue_id : Optional[str] The ID of the job queue required for submitting new jobs via `submit_job_async`. batch_size : int Maximum number of jobs to include in each processing batch. timeout : Tuple[int, Union[float, None]] Timeout configuration potentially used by underlying client fetch operations. Its direct usage depends on the client implementation. max_job_retries : Optional[int] Maximum number of times to retry fetching a job result if the initial fetch attempt times out (indicating the job is not ready). `None` indicates infinite retries. completion_callback : Optional[Callable[[Dict[str, Any], str], None]] A callback function executed upon the successful completion and fetch of a job. It receives the full response dictionary and the job index. fail_on_submit_error : bool If True, the entire process will stop and raise an error if initiating job submission or fetching fails for a batch. verbose : bool, optional If True, enables detailed debug logging. Default is False. Raises ------ AttributeError If the provided client object is missing required methods or attributes. TypeError If the client's `_worker_pool` is not a `ThreadPoolExecutor`. """ self.client = client self.all_job_indices_list: List[str] = list(job_indices) self.job_queue_id = job_queue_id self.batch_size = batch_size self.timeout = timeout self.max_job_retries = max_job_retries self.completion_callback = completion_callback self.fail_on_submit_error = fail_on_submit_error self.verbose = verbose # State variables managed across batch cycles self.retry_job_ids: List[str] = [] self.retry_counts: Dict[str, int] = defaultdict(int) self.results: List[Dict[str, Any]] = [] # Stores successful results (full dicts) self.failures: List[Tuple[str, str]] = [] # (job_index, error_message) # --- Initial Checks --- if not self.job_queue_id: logger.warning("job_queue_id is not set; submission of new jobs will fail.") # -------------------------------------------------------------------------- # Private Methods # -------------------------------------------------------------------------- def _handle_processing_failure(self, job_index: str, error_msg: str, is_submission_failure: bool = False) -> None: """ Handles terminal failures during job initiation or processing. Logs the error, records the failure, cleans up retry counts, and attempts to update the job's state locally in the client. This method does not increment the overall processed count itself. Parameters ---------- job_index : str The unique identifier of the job that failed. error_msg : str A message describing the reason for the failure. is_submission_failure : bool, optional If True, indicates the failure occurred during the submission or fetch initiation phase, rather than during result processing. Default is False. """ log_prefix = "Initiation failed" if is_submission_failure else "Processing failed" # Log validation failures less prominently if they are noisy if "validation failed" in error_msg: logger.warning(f"{log_prefix} for {job_index}: {error_msg}") else: logger.error(f"{log_prefix} for {job_index}: {error_msg}") # Record failure only once per job index if not any(f[0] == job_index for f in self.failures): failed_job_spec = self.client._job_index_to_job_spec.get(job_index) self.failures.append((f"{job_index}:{failed_job_spec.source_id}", error_msg)) elif self.verbose: logger.debug(f"Failure already recorded for {job_index}") # Cleanup retry count if it exists for this job if job_index in self.retry_counts: del self.retry_counts[job_index] # Attempt to mark state as FAILED locally in the client (best effort) try: # Use a method assumed to safely get the state object job_state = self.client._get_job_state_object(job_index) # Check state exists and is not already terminal before updating if ( job_state and hasattr(job_state, "state") and job_state.state not in ["FAILED", "COMPLETED"] ): # Use actual Enum names/values if available job_state.state = "FAILED" # Use actual Enum value if self.verbose: logger.debug(f"Marked job {job_index} state as FAILED locally " f"after error.") except Exception as state_update_err: # Ignore errors during error handling state update, but log if verbose if self.verbose: logger.warning( f"Could not update state to FAILED for job {job_index} " f"after failure: {state_update_err}" ) def _handle_processing_success(self, job_index: str, result_data: Dict[str, Any], trace_id: Optional[str]) -> None: """ Handles the successful fetch and retrieval of a job result. Stores the result, cleans up retry counts, and triggers the completion callback if provided. This method does not increment the overall processed count itself. Parameters ---------- job_index : str The unique identifier of the successfully processed job. result_data : Dict[str, Any] The full response dictionary fetched for the job. trace_id : Optional[str] The trace identifier associated with the fetch operation, if available. """ if self.verbose: trace_info = f" (Trace: {trace_id})" if trace_id else "" logger.info(f"Successfully fetched result for job {job_index}{trace_info}") is_failed, description = check_ingest_result(result_data) if is_failed: failed_job_spec = self.client._job_index_to_job_spec.get(job_index) self.failures.append((f"{job_index}:{failed_job_spec.source_id}", description)) else: self.results.append(result_data.get("data")) # Cleanup retry count if it exists if job_index in self.retry_counts: del self.retry_counts[job_index] # Execute completion callback if provided if self.completion_callback: try: self.completion_callback(result_data, job_index) except Exception as cb_err: logger.error(f"Error in completion_callback for {job_index}: {cb_err}", exc_info=True) def _log_final_status(self, total_jobs: int) -> None: """ Logs the final processing summary and checks for count discrepancies. Parameters ---------- total_jobs : int The total number of jobs that were initially intended for processing. """ final_processed_count = len(self.results) + len(self.failures) logger.info( f"Batch processing finished. Success: {len(self.results)}, " f"Failures: {len(self.failures)}. " f"Total accounted for: {final_processed_count}/{total_jobs}" ) if final_processed_count != total_jobs: logger.warning( "Final accounted count doesn't match total jobs. " "Some jobs may have been lost or unaccounted for." ) # Attempt to identify potentially lost jobs (best effort) processed_indices = {f[0] for f in self.failures} # Assuming results contain the job index or can be mapped back # If result_data is dict and has 'jobIndex': try: result_indices = {r.get("jobIndex") for r in self.results if isinstance(r, dict) and "jobIndex" in r} # Filter out None if get returns None result_indices = {idx for idx in result_indices if idx is not None} processed_indices.update(result_indices) except Exception: logger.warning("Could not reliably extract job indices from results for final check.") initial_indices = set(self.all_job_indices_list) unaccounted_indices = initial_indices - processed_indices if unaccounted_indices: logger.warning(f"Potentially unaccounted for jobs: {unaccounted_indices}") # Optionally add them to failures # for idx in unaccounted_indices: # if not any(f[0] == idx for f in self.failures): # self.failures.append((idx, "Job lost or unaccounted for at exit")) # -------------------------------------------------------------------------- # Public Methods # -------------------------------------------------------------------------- def run(self) -> Tuple[List[Dict[str, Any]], List[Tuple[str, str]]]: """ Executes the main processing loop in batches. This method orchestrates the job processing by repeatedly determining a batch of jobs (including retries), initiating their submission (if new) and fetching, and then processing the results of that batch as they complete. Returns ------- Tuple[List[Dict[str, Any]], List[Tuple[str, str]]] A tuple containing two lists: 1. A list of successfully fetched job results (full dictionaries). 2. A list of tuples for failed jobs, where each tuple contains (job_index, error_message). Raises ------ ValueError If `submit_job_async` is required but `job_queue_id` was not provided. RuntimeError If `fail_on_submit_error` is True and a batch submission or fetch initiation error occurs. """ total_jobs = len(self.all_job_indices_list) # Tracks indices for which submission has been initiated at least once submitted_new_indices_count = 0 logger.info(f"Starting batch processing for {total_jobs} jobs with batch " f"size {self.batch_size}.") # Main loop: continues as long as there are new jobs to submit # or jobs waiting for retry. while (submitted_new_indices_count < total_jobs) or self.retry_job_ids: # --- Determine Jobs for Current Batch --- current_batch_job_indices: List[str] = [] # Add retries from the previous batch first if self.retry_job_ids: num_retries = len(self.retry_job_ids) current_batch_job_indices.extend(self.retry_job_ids) if self.verbose: logger.debug(f"Adding {num_retries} retry jobs to current batch.") # Clear the list; retries for *this* batch will be collected later self.retry_job_ids = [] # Determine and add new jobs to the batch num_already_in_batch = len(current_batch_job_indices) if (num_already_in_batch < self.batch_size) and (submitted_new_indices_count < total_jobs): num_new_to_add = min(self.batch_size - num_already_in_batch, total_jobs - submitted_new_indices_count) start_idx = submitted_new_indices_count end_idx = submitted_new_indices_count + num_new_to_add current_batch_new_job_indices = self.all_job_indices_list[start_idx:end_idx] if self.verbose: logger.debug(f"Adding {len(current_batch_new_job_indices)} new " f"jobs to current batch.") # Initiate async submission for ONLY the NEW jobs if current_batch_new_job_indices: if not self.job_queue_id: error_msg = "Cannot submit new jobs: job_queue_id is not set." logger.error(error_msg) # Fail these jobs immediately for job_index in current_batch_new_job_indices: self._handle_processing_failure(job_index, error_msg, is_submission_failure=True) # Mark as "submitted" (to prevent reprocessing) but failed submitted_new_indices_count += len(current_batch_new_job_indices) if self.fail_on_submit_error: raise ValueError(error_msg) else: try: # Fire-and-forget submission initiation _ = self.client.submit_job_async(current_batch_new_job_indices, self.job_queue_id) # Add successfully initiated jobs to the overall batch list current_batch_job_indices.extend(current_batch_new_job_indices) # Update count of total initiated jobs submitted_new_indices_count += len(current_batch_new_job_indices) except Exception as e: error_msg = ( f"Batch async submission initiation failed for " f"{len(current_batch_new_job_indices)} new jobs: {e}" ) logger.error(error_msg, exc_info=True) # Fail these jobs immediately for job_index in current_batch_new_job_indices: self._handle_processing_failure( job_index, f"Batch submission initiation error: {e}", is_submission_failure=True ) # Mark as "submitted" (to prevent reprocessing) but failed submitted_new_indices_count += len(current_batch_new_job_indices) if self.fail_on_submit_error: raise RuntimeError(error_msg) from e # If nothing ended up in the batch (e.g., only submission failures) if not current_batch_job_indices: if self.verbose: logger.debug("No jobs identified for fetching in this batch iteration.") # If there are no retries pending either, break the loop if not self.retry_job_ids and submitted_new_indices_count >= total_jobs: logger.debug("Exiting loop: No jobs to fetch and no retries pending.") break continue # Otherwise, proceed to next iteration # --- Initiate Fetching for the Current Batch --- try: if self.verbose: logger.debug( f"Calling fetch_job_result_async for " f"{len(current_batch_job_indices)} jobs in current batch." ) # Use data_only=False to get full response for callback/results batch_futures_dict = self.client.fetch_job_result_async(current_batch_job_indices, data_only=False) # Check for discrepancies where client might not return all futures if len(batch_futures_dict) != len(current_batch_job_indices): returned_indices = set(batch_futures_dict.values()) missing_indices = [idx for idx in current_batch_job_indices if idx not in returned_indices] logger.error( f"fetch_job_result_async discrepancy: Expected " f"{len(current_batch_job_indices)}, got " f"{len(batch_futures_dict)}. Missing: {missing_indices}" ) # Fail the missing ones explicitly for missing_idx in missing_indices: self._handle_processing_failure( missing_idx, "Future not returned by fetch_job_result_async", is_submission_failure=True ) if self.fail_on_submit_error: raise RuntimeError("fetch_job_result_async failed to return all " "expected futures.") # Continue processing only the futures we received current_batch_job_indices = list(returned_indices) except Exception as fetch_init_err: error_msg = ( f"fetch_job_result_async failed for batch " f"({len(current_batch_job_indices)} jobs): {fetch_init_err}" ) logger.error(error_msg, exc_info=True) logger.warning( f"Marking all {len(current_batch_job_indices)} jobs in " f"failed fetch initiation batch as failed." ) # Fail all jobs intended for this batch for job_index in current_batch_job_indices: self._handle_processing_failure( job_index, f"Fetch initiation failed for batch: {fetch_init_err}", is_submission_failure=True ) if self.fail_on_submit_error: raise RuntimeError( f"Stopping due to fetch initiation failure: {fetch_init_err}" ) from fetch_init_err continue # Skip processing results for this failed batch # --- Process Results for the Current Batch --- if not batch_futures_dict: if self.verbose: logger.debug("No futures returned/available for processing in this batch.") continue # Skip processing if no futures batch_timeout = 600.0 # Timeout for waiting on the whole batch try: # Process futures as they complete within this batch for future in as_completed(batch_futures_dict.keys(), timeout=batch_timeout): job_index = batch_futures_dict[future] try: # Expect list with one tuple: [(data, index, trace)] result_list = future.result() if not isinstance(result_list, list) or len(result_list) != 1: raise ValueError(f"Expected list length 1, got {len(result_list)}") result_tuple = result_list[0] if not isinstance(result_tuple, (tuple, list)) or len(result_tuple) != 3: raise ValueError(f"Expected tuple/list length 3, got {len(result_tuple)}") full_response_dict, fetched_job_index, trace_id = result_tuple if fetched_job_index != job_index: logger.warning(f"Mismatch: Future for {job_index} returned " f"{fetched_job_index}") self._handle_processing_success(job_index, full_response_dict, trace_id) except TimeoutError: # Handle job not ready - check retry policy self.retry_counts[job_index] += 1 if self.max_job_retries is None or self.retry_counts[job_index] <= self.max_job_retries: if self.verbose: logger.info( f"Job {job_index} not ready, adding to next " f"batch's retry list (Attempt " f"{self.retry_counts[job_index]}/" f"{self.max_job_retries or 'inf'})." ) # Collect for the *next* batch self.retry_job_ids.append(job_index) else: error_msg = f"Exceeded max fetch retries " f"({self.max_job_retries}) for job {job_index}." logger.error(error_msg) self._handle_processing_failure(job_index, error_msg) except (ValueError, RuntimeError) as e: logger.error(f"Job {job_index} failed processing result: {e}", exc_info=self.verbose) self._handle_processing_failure(job_index, f"Error processing result: {e}") except Exception as e: logger.exception(f"Unhandled error processing future for job {job_index}: {e}") self._handle_processing_failure(job_index, f"Unhandled error processing future: {e}") # No finally block incrementing count here; tracking is batch-based except TimeoutError: # `as_completed` timed out waiting for remaining futures in batch logger.error( f"Batch processing timed out after {batch_timeout}s waiting " f"for futures. Some jobs in batch may be lost or incomplete." ) # Identify and fail remaining futures remaining_indices_in_batch = [] for f, idx in batch_futures_dict.items(): if not f.done(): remaining_indices_in_batch.append(idx) f.cancel() # Attempt to cancel underlying task logger.warning( f"Jobs potentially lost/cancelled due to batch timeout: " f"{remaining_indices_in_batch}" ) for idx in remaining_indices_in_batch: self._handle_processing_failure(idx, f"Batch processing timed out after {batch_timeout}s") # End of processing for this batch cycle # --- Final Logging --- self._log_final_status(total_jobs) return self.results, self.failures
[docs] class NvIngestClient: """ A client class for interacting with the nv-ingest service, supporting custom client allocators. """ def __init__( self, message_client_allocator: Type[MessageBrokerClientBase] = RestClient, message_client_hostname: Optional[str] = "localhost", message_client_port: Optional[int] = 7670, message_client_kwargs: Optional[Dict[str, Any]] = None, msg_counter_id: Optional[str] = "nv-ingest-message-id", worker_pool_size: int = 8, ) -> None: """ Initialize the NvIngestClient. Parameters ---------- message_client_allocator : Type[MessageBrokerClientBase], optional Callable that creates the message broker client. Defaults to RestClient. message_client_hostname : str, optional Hostname of the REST/message service. Defaults to "localhost". message_client_port : int, optional Port of the REST/message service. Defaults to 7670. message_client_kwargs : dict, optional Extra keyword arguments passed to the client allocator. msg_counter_id : str, optional Identifier for message counting. Defaults to "nv-ingest-message-id". worker_pool_size : int, optional Number of workers in the thread pool. Defaults to 1. Returns ------- None """ self._current_message_id = 0 self._job_states = {} self._job_index_to_job_spec = {} self._message_client_hostname = message_client_hostname or "localhost" self._message_client_port = message_client_port or 7670 self._message_counter_id = msg_counter_id or "nv-ingest-message-id" self._message_client_kwargs = message_client_kwargs or {} logger.debug("Instantiate NvIngestClient:\n%s", str(self)) self._message_client = message_client_allocator( host=self._message_client_hostname, port=self._message_client_port, **self._message_client_kwargs, ) # Initialize the worker pool with the specified size self._worker_pool = ThreadPoolExecutor(max_workers=worker_pool_size) self._telemetry = {} def __str__(self) -> str: """ Returns a string representation of the NvIngestClient configuration and runtime state. Returns ------- str A string representation of the client showing the Redis configuration. """ info = "NvIngestClient:\n" info += f" message_client_host: {self._message_client_hostname}\n" info += f" message_client_port: {self._message_client_port}\n" return info def _generate_job_index(self) -> str: """ Generates a unique job ID by combining a UUID with an incremented value from Redis. Returns ------- str A unique job ID in the format of "<UUID>_<Redis incremented value>". IF the client is a RedisClient. In the case of a RestClient it is simply the UUID. """ job_index = str(self._current_message_id) self._current_message_id += 1 return job_index def _pop_job_state(self, job_index: str) -> JobState: """ Deletes the job with the specified ID from the job tracking dictionary. Parameters ---------- job_index : str The ID of the job to delete. """ job_state = self._get_and_check_job_state(job_index) self._job_states.pop(job_index) return job_state def _get_and_check_job_state( self, job_index: str, required_state: Optional[Union[JobStateEnum, List[JobStateEnum]]] = None, ) -> JobState: """ Retrieve and optionally validate the state of a job. Parameters ---------- job_index : str The client-side identifier of the job. required_state : JobStateEnum or list of JobStateEnum, optional State or list of states the job must currently be in. If provided and the job is not in one of these states, an error is raised. Returns ------- JobState The state object for the specified job. Raises ------ ValueError If the job does not exist or is not in an allowed state. """ if required_state and not isinstance(required_state, list): required_state = [required_state] job_state = self._job_states.get(job_index) if not job_state: raise ValueError(f"Job with ID {job_index} does not exist in JobStates: {self._job_states}") if required_state and (job_state.state not in required_state): raise ValueError( f"Job with ID {job_state.job_spec.job_id} has invalid state " f"{job_state.state}, expected {required_state}" ) return job_state
[docs] def job_count(self) -> int: """ Get the number of jobs currently tracked by the client. Returns ------- int The total count of jobs in internal state tracking. """ return len(self._job_states)
def _add_single_job(self, job_spec: JobSpec) -> str: """ Add a single job specification to internal tracking. Parameters ---------- job_spec : JobSpec The specification object describing the job. Returns ------- str The newly generated job index. """ job_index = self._generate_job_index() self._job_states[job_index] = JobState(job_spec=job_spec) return job_index
[docs] def add_job(self, job_spec: Union[BatchJobSpec, JobSpec]) -> Union[str, List[str]]: """ Add one or more jobs to the client for later processing. Parameters ---------- job_spec : JobSpec or BatchJobSpec A single job specification or a batch containing multiple specs. Returns ------- str or list of str The job index for a single spec, or a list of indices for a batch. Raises ------ ValueError If an unsupported type is provided. """ if isinstance(job_spec, JobSpec): job_index = self._add_single_job(job_spec) self._job_index_to_job_spec[job_index] = job_spec return job_index elif isinstance(job_spec, BatchJobSpec): job_indexes = [] for _, job_specs in job_spec.job_specs.items(): for job in job_specs: job_index = self._add_single_job(job) job_indexes.append(job_index) self._job_index_to_job_spec[job_index] = job return job_indexes else: raise ValueError(f"Unexpected type: {type(job_spec)}")
[docs] def create_job( self, payload: Dict[str, Any], source_id: str, source_name: str, document_type: Optional[str] = None, tasks: Optional[List[Task]] = None, extended_options: Optional[Dict[str, Any]] = None, ) -> str: """ Construct and register a new job from provided metadata. Parameters ---------- payload : dict The data payload for the job. source_id : str Identifier of the data source. source_name : str Human-readable name for the source. document_type : str, optional Type of document (inferred from source_name if omitted). tasks : list of Task, optional Initial set of processing tasks to attach. extended_options : dict, optional Extra parameters for advanced configuration. Returns ------- str The client-side job index. Raises ------ ValueError If job creation parameters are invalid. """ document_type = document_type or source_name.split(".")[-1] job_spec = JobSpec( payload=payload or {}, tasks=tasks, document_type=document_type, source_id=source_id, source_name=source_name, extended_options=extended_options, ) job_id = self.add_job(job_spec) return job_id
[docs] def add_task(self, job_index: str, task: Task) -> None: """ Attach an existing Task object to a pending job. Parameters ---------- job_index : str The client-side identifier of the target job. task : Task The task instance to add. """ job_state = self._get_and_check_job_state(job_index, required_state=JobStateEnum.PENDING) job_state.job_spec.add_task(task)
[docs] def create_task( self, job_index: Union[str, int], task_type: TaskType, task_params: Optional[Dict[str, Any]] = None, ) -> None: """ Create and attach a new task to a pending job by type and parameters. Parameters ---------- job_index : str or int Identifier of the job to modify. task_type : TaskType Enum specifying the kind of task to create. task_params : dict, optional Parameters for the new task. Raises ------ ValueError If the job does not exist or is not pending. """ task_params = task_params or {} return self.add_task(job_index, task_factory(task_type, **task_params))
def _fetch_job_result( self, job_index: str, timeout: Tuple[int, Optional[float]] = (100, None), data_only: bool = False, ) -> Tuple[Any, str, Optional[str]]: """ Retrieve the result of a submitted job, handling status codes. Parameters ---------- job_index : str Client-side job identifier. timeout : tuple Timeouts (connect, read) for the fetch operation. data_only : bool, optional If True, return only the 'data' portion of the payload. Returns ------- result_data : any Parsed job result or full JSON payload. job_index : str Echoes the client-side job ID. trace_id : str or None Trace identifier from the message client. Raises ------ TimeoutError If the job is not yet ready (HTTP 202). RuntimeError For terminal server errors (HTTP 404/500, etc.). ValueError On JSON decoding errors or missing state. Exception For unexpected issues. """ try: # Get job state using the client-side index job_state = self._get_and_check_job_state( job_index, required_state=[JobStateEnum.SUBMITTED, JobStateEnum.SUBMITTED_ASYNC] ) # Validate server_job_id before making the call server_job_id = job_state.job_id if not server_job_id: error_msg = ( f"Cannot fetch job index {job_index}: Server Job ID is missing or invalid in state" f" {job_state.state}." ) logger.error(error_msg) job_state.state = JobStateEnum.FAILED raise ValueError(error_msg) # Fetch using the *server-assigned* job ID response = self._message_client.fetch_message(server_job_id, timeout) job_state.trace_id = response.trace_id # Store trace ID from this fetch attempt # --- Handle ResponseSchema Code --- if response.response_code == 0: # Success (e.g., HTTP 200) try: # Don't change state here yet, only after successful processing logger.debug( f"Received successful response for job index {job_index} (Server ID: {server_job_id}). " f"Decoding JSON." ) response_json = json.loads(response.response) result_data = response_json.get("data") if data_only else response_json # Mark state as PROCESSING *after* successful decode, just before returning job_state.state = JobStateEnum.PROCESSING # Pop state *only* after successful processing is complete self._pop_job_state(job_index) logger.debug( f"Successfully processed and removed job index {job_index} (Server ID: {server_job_id})" ) return result_data, job_index, job_state.trace_id except json.JSONDecodeError as err: logger.error( f"Failed to decode JSON response for job index {job_index} (Server ID: {server_job_id}):" f" {err}. Response text: {response.response[:500]}" ) job_state.state = JobStateEnum.FAILED # Mark as failed due to decode error raise ValueError(f"Error decoding job result JSON: {err}") from err except Exception as e: # Catch other potential errors during processing of successful response logger.exception( f"Error processing successful response for job index {job_index} (Server ID: {server_job_id}):" f" {e}" ) job_state.state = JobStateEnum.FAILED raise # Re-raise unexpected errors elif response.response_code == 2: # Job Not Ready (e.g., HTTP 202) # Raise TimeoutError to signal the calling retry loop in fetch_job_result logger.debug( f"Job index {job_index} (Server ID: {server_job_id}) not ready (Response Code: 2). Signaling retry." ) # Do not change job state here, remains SUBMITTED raise TimeoutError(f"Job not ready: {response.response_reason}") else: # Failure from RestClient (response_code == 1, including 404, 400, 500, conn errors) # Log the failure reason from the ResponseSchema error_msg = ( f"Terminal failure fetching result for client index {job_index} (Server ID: {server_job_id}). " f"Code: {response.response_code}, Reason: {response.response_reason}" ) logger.error(error_msg) job_state.state = JobStateEnum.FAILED # Mark job as failed in the client # Do NOT pop the state for failed jobs here # Raise RuntimeError to indicate a terminal failure for this fetch attempt raise RuntimeError(error_msg) except (TimeoutError, ValueError, RuntimeError): # Re-raise specific handled exceptions raise except Exception as err: # Catch unexpected errors during the process (e.g., in _get_and_check_job_state) logger.exception(f"Unexpected error during fetch process for job index {job_index}: {err}") # Attempt to mark state as FAILED if possible and state object exists if "job_state" in locals() and hasattr(job_state, "state"): job_state.state = JobStateEnum.FAILED raise # Re-raise the original exception
[docs] def fetch_job_result_cli( self, job_ids: Union[str, List[str]], data_only: bool = False, ) -> List[Tuple[Any, str, Optional[str]]]: """ Fetch job results via CLI semantics (synchronous list return). Parameters ---------- job_ids : str or list of str Single or multiple client-side job identifiers. data_only : bool, optional If True, extract only the 'data' field. Default is False. Returns ------- list of (result_data, job_index, trace_id) List of tuples for each fetched job. """ if isinstance(job_ids, str): job_ids = [job_ids] return [self._fetch_job_result(job_id, data_only=data_only) for job_id in job_ids]
[docs] def process_jobs_concurrently( self, job_indices: Union[str, List[str]], job_queue_id: Optional[str] = None, concurrency_limit: int = 64, timeout: int = 100, max_job_retries: Optional[int] = None, retry_delay: float = 5.0, fail_on_submit_error: bool = False, completion_callback: Optional[Callable[[Any, str], None]] = None, return_failures: bool = False, data_only: bool = True, verbose: bool = False, ) -> Union[List[Any], Tuple[List[Any], List[Tuple[str, str]]]]: """ Submit and fetch multiple jobs concurrently. Parameters ---------- job_indices : str or list of str Single or multiple job indices to process. job_queue_id : str, optional Queue identifier for submission. concurrency_limit : int, optional Max number of simultaneous in-flight jobs. Default is 128. timeout : int, optional Timeout in seconds per fetch attempt. Default is 100. max_job_retries : int, optional Max retries for 'not ready' jobs. None for infinite. Default is None. retry_delay : float, optional Delay in seconds between retry cycles. Default is 5.0. fail_on_submit_error : bool, optional If True, abort on submission error. Default is False. completion_callback : callable, optional Called on each successful fetch as (result_data, job_index). return_failures : bool, optional If True, return (results, failures). Default is False. data_only : bool, optional If True, return only payload 'data'. Default is True. verbose : bool, optional If True, enable debug logging. Default is False. Returns ------- results : list List of successful job results when `return_failures` is False. results, failures : tuple Tuple of (successful results, failure tuples) when `return_failures` is True. Raises ------ RuntimeError If `fail_on_submit_error` is True and a submission fails. """ # Normalize single index to list if isinstance(job_indices, str): job_indices = [job_indices] # Handle empty input if not job_indices: return ([], []) if return_failures else [] # Prepare timeout tuple for fetch calls effective_timeout: Tuple[int, None] = (timeout, None) # Delegate to the concurrent processor processor = _ConcurrentProcessor( client=self, batch_size=64, job_indices=job_indices, job_queue_id=job_queue_id, timeout=effective_timeout, max_job_retries=max_job_retries, completion_callback=completion_callback, fail_on_submit_error=fail_on_submit_error, verbose=verbose, ) results, failures = processor.run() if return_failures: return results, failures if failures: logger.warning(f"{len(failures)} job(s) failed during concurrent processing." " Check logs for details.") return results
def _ensure_submitted(self, job_ids: Union[str, List[str]]) -> None: """ Block until all specified jobs have been marked submitted. Parameters ---------- job_ids : str or list of str One or more job indices expected to reach a SUBMITTED state. """ if isinstance(job_ids, str): job_ids = [job_ids] # Ensure job_ids is always a list submission_futures = {} for job_id in job_ids: job_state = self._get_and_check_job_state( job_id, required_state=[JobStateEnum.SUBMITTED, JobStateEnum.SUBMITTED_ASYNC], ) if job_state.state == JobStateEnum.SUBMITTED_ASYNC: submission_futures[job_state.future] = job_state for future in as_completed(submission_futures.keys()): job_state = submission_futures[future] job_state.state = JobStateEnum.SUBMITTED job_state.trace_id = future.result()[0] # Trace_id from `submit_job` endpoint submission job_state.future = None
[docs] def fetch_job_result_async(self, job_ids: Union[str, List[str]], data_only: bool = True) -> Dict[Future, str]: """ Fetches job results for a list or a single job ID asynchronously and returns a mapping of futures to job IDs. Parameters: job_ids (Union[str, List[str]]): A single job ID or a list of job IDs. timeout (float): Timeout (connect, read) for fetching each job result, in seconds. data_only (bool): Whether to return only the data part of the job result. Returns: Dict[Future, str]: A dictionary mapping each future to its corresponding job ID. """ if isinstance(job_ids, str): job_ids = [job_ids] # Ensure job_ids is always a list # Make sure all jobs have actually been submitted before launching fetches. self._ensure_submitted(job_ids) future_to_job_id = {} for job_id in job_ids: job_state = self._get_and_check_job_state(job_id) future = self._worker_pool.submit(self.fetch_job_result_cli, job_id, data_only) job_state.future = future future_to_job_id[future] = job_id return future_to_job_id
def _submit_job( self, job_index: str, job_queue_id: str, ) -> Optional[Dict]: """ Submits a job to a specified job queue and optionally waits for a response if blocking is True. Parameters ---------- job_index : str The unique identifier of the job to be submitted. job_queue_id : str The ID of the job queue where the job will be submitted. Returns ------- Optional[Dict] The job result if blocking is True and a result is available before the timeout, otherwise None. Raises ------ Exception If submitting the job fails. """ job_state = self._get_and_check_job_state( job_index, required_state=[JobStateEnum.PENDING, JobStateEnum.SUBMITTED_ASYNC] ) try: message = json.dumps(job_state.job_spec.to_dict()) response = self._message_client.submit_message(job_queue_id, message, for_nv_ingest=True) x_trace_id = response.trace_id transaction_id = response.transaction_id job_id = "" if transaction_id is None else transaction_id.replace('"', "") logger.debug(f"Submitted job {job_index} to queue {job_queue_id} and got back job ID {job_id}") job_state.state = JobStateEnum.SUBMITTED job_state.job_id = job_id # Free up memory -- payload should never be used again, and we don't want to keep it around. job_state.job_spec.payload = None return x_trace_id except Exception as err: err_msg = f"Failed to submit job {job_index} to queue {job_queue_id}: {err}" logger.exception(err_msg) job_state.state = JobStateEnum.FAILED raise
[docs] def submit_job( self, job_indices: Union[str, List[str]], job_queue_id: str, batch_size: int = 10, ) -> List[str]: """ Submit one or more jobs in batches. Parameters ---------- job_indices : str or list of str Job indices to submit. job_queue_id : str Queue identifier for submission. batch_size : int, optional Maximum number of jobs per batch. Default is 10. Returns ------- list of str Trace identifiers for each submitted job. Raises ------ Exception Propagates first error if any job in a batch fails. """ if isinstance(job_indices, str): job_indices = [job_indices] results = [] total_batches = math.ceil(len(job_indices) / batch_size) submission_errors = [] for batch_num in range(total_batches): batch_start = batch_num * batch_size batch_end = batch_start + batch_size batch = job_indices[batch_start:batch_end] # Submit each batch of jobs for job_id in batch: try: x_trace_id = self._submit_job(job_id, job_queue_id) except Exception as e: # Even if one fails, we should continue with the rest of the batch. submission_errors.append(e) continue results.append(x_trace_id) if submission_errors: error_msg = str(submission_errors[0]) if len(submission_errors) > 1: error_msg += f"... [{len(submission_errors) - 1} more messages truncated]" raise type(submission_errors[0])(error_msg) return results
[docs] def submit_job_async(self, job_indices: Union[str, List[str]], job_queue_id: str) -> Dict[Future, str]: """ Asynchronously submits one or more jobs to a specified job queue using a thread pool. This method handles both single job ID or a list of job IDs. Parameters ---------- job_indices : Union[str, List[str]] A single job ID or a list of job IDs to be submitted. job_queue_id : str The ID of the job queue where the jobs will be submitted. Returns ------- Dict[Future, str] A dictionary mapping futures to their respective job IDs for later retrieval of outcomes. Notes ----- - This method queues the jobs for asynchronous submission and returns a mapping of futures to job IDs. - It does not wait for any of the jobs to complete. - Ensure that each job is in the proper state before submission. """ if isinstance(job_indices, str): job_indices = [job_indices] # Convert single job_id to a list future_to_job_index = {} for job_index in job_indices: job_state = self._get_and_check_job_state(job_index, JobStateEnum.PENDING) job_state.state = JobStateEnum.SUBMITTED_ASYNC future = self._worker_pool.submit(self.submit_job, job_index, job_queue_id) job_state.future = future future_to_job_index[future] = job_index return future_to_job_index
[docs] def fetch_job_result( self, job_ids: Union[str, List[str]], timeout: float = 100, max_retries: Optional[int] = None, retry_delay: float = 1, verbose: bool = False, completion_callback: Optional[Callable[[Dict, str], None]] = None, return_failures: bool = False, ) -> Union[List[Tuple[Optional[Dict], str]], Tuple[List[Tuple[Optional[Dict], str]], List[Tuple[str, str]]]]: """ Fetches job results for multiple job IDs concurrently with individual timeouts and retry logic. Args: job_ids (Union[str, List[str]]): A job ID or list of job IDs to fetch results for. timeout (float): Timeout for each fetch operation, in seconds. max_retries (Optional[int]): Maximum number of retries for jobs that are not ready yet. retry_delay (float): Delay between retry attempts, in seconds. verbose (bool): If True, logs additional information. completion_callback (Optional[Callable[[Dict, str], None]]): A callback function that is executed each time a job result is successfully fetched. It receives two arguments: the job result (a dict) and the job ID. return_failures (bool): If True, returns a separate list of failed jobs. Returns: - If `return_failures=False`: List[Tuple[Optional[Dict], str]] - A list of tuples, each containing the job result (or None on failure) and the job ID. - If `return_failures=True`: Tuple[List[Tuple[Optional[Dict], str]], List[Tuple[str, str]]] - A tuple of: - List of successful job results. - List of failures containing job ID and error message. Raises: ValueError: If there is an error in decoding the job result. TimeoutError: If the fetch operation times out. Exception: For all other unexpected issues. """ if isinstance(job_ids, str): job_ids = [job_ids] results = [] failures = [] def fetch_with_retries(job_id: str): retries = 0 while (max_retries is None) or (retries < max_retries): try: # Attempt to fetch the job result result = self._fetch_job_result(job_id, timeout, data_only=False) return result, job_id except TimeoutError: if verbose: logger.info( f"Job {job_id} is not ready. Retrying {retries + 1}/{max_retries if max_retries else '∞'} " f"after {retry_delay} seconds." ) retries += 1 time.sleep(retry_delay) # Wait before retrying except (RuntimeError, Exception) as err: logger.error(f"Error while fetching result for job ID {job_id}: {err}") return None, job_id logger.error(f"Max retries exceeded for job {job_id}.") return None, job_id # Use ThreadPoolExecutor to fetch results concurrently with ThreadPoolExecutor() as executor: futures = {executor.submit(fetch_with_retries, job_id): job_id for job_id in job_ids} # Collect results as futures complete for future in as_completed(futures): job_id = futures[future] try: result, _ = handle_future_result(future, timeout=timeout) # Append a tuple of (result data, job_id). (Using result.get("data") if result is valid.) results.append(result.get("data")) # Run the callback if provided and the result is valid if completion_callback and result: completion_callback(result, job_id) except concurrent.futures.TimeoutError as e: error_msg = ( f"Timeout while fetching result for job ID {job_id}: " f"{self._job_index_to_job_spec[job_id].source_id}" ) logger.error(error_msg) failures.append((self._job_index_to_job_spec[job_id].source_id, str(e))) except json.JSONDecodeError as e: error_msg = ( f"Decoding error while processing job ID {job_id}: " f"{self._job_index_to_job_spec[job_id].source_id}\n{e}" ) logger.error(error_msg) failures.append((self._job_index_to_job_spec[job_id].source_id, str(e))) except RuntimeError as e: error_msg = ( f"Error while processing job ID {job_id}: " f"{self._job_index_to_job_spec[job_id].source_id}\n{e}" ) logger.error(error_msg) failures.append((self._job_index_to_job_spec[job_id].source_id, str(e))) except IngestJobFailure as e: error_msg = ( f"Error while processing job ID {job_id}: " f"{self._job_index_to_job_spec[job_id].source_id}\n{e.description}" ) logger.error(error_msg) failures.append((self._job_index_to_job_spec[job_id].source_id, e.annotations)) except Exception as e: error_msg = ( f"Error while fetching result for job ID {job_id}: " f"{self._job_index_to_job_spec[job_id].source_id}\n{e}" ) logger.error(error_msg) failures.append((self._job_index_to_job_spec[job_id].source_id, str(e))) finally: # Clean up the job spec mapping del self._job_index_to_job_spec[job_id] if return_failures: return results, failures return results
[docs] def create_jobs_for_batch(self, files_batch: List[str], tasks: Dict[str, Any]) -> List[str]: """ Create and submit job specifications (JobSpecs) for a batch of files, returning the job IDs. This function takes a batch of files, processes each file to extract its content and type, creates a job specification (JobSpec) for each file, and adds tasks from the provided task list. It then submits the jobs to the client and collects their job IDs. Parameters ---------- files_batch : List[str] A list of file paths to be processed. Each file is assumed to be in a format compatible with the `extract_file_content` function, which extracts the file's content and type. tasks : Dict[str, Any] A dictionary of tasks to be added to each job. The keys represent task names, and the values represent task specifications or configurations. Standard tasks include "split", "extract", "store", "caption", "dedup", "filter", "embed". Returns ------- Tuple[List[JobSpec], List[str]] A Tuple containing the list of JobSpecs and list of job IDs corresponding to the submitted jobs. Each job ID is returned by the client's `add_job` method. Raises ------ ValueError If there is an error extracting the file content or type from any of the files, a ValueError will be logged, and the corresponding file will be skipped. Notes ----- - The function assumes that a utility function `extract_file_content` is defined elsewhere, which extracts the content and type from the provided file paths. - For each file, a `JobSpec` is created with relevant metadata, including document type and file content. Various tasks are conditionally added based on the provided `tasks` dictionary. - The job specification includes tracing options with a timestamp (in nanoseconds) for diagnostic purposes. Examples -------- Suppose you have a batch of files and tasks to process: >>> files_batch = ["file1.txt", "file2.pdf"] >>> tasks = {"split": ..., "extract_txt": ..., "store": ...} >>> client = NvIngestClient() >>> job_ids = client.create_job_specs_for_batch(files_batch, tasks) >>> print(job_ids) ['job_12345', 'job_67890'] In this example, jobs are created and submitted for the files in `files_batch`, with the tasks in `tasks` being added to each job specification. The returned job IDs are then printed. See Also -------- create_job_specs_for_batch: Function that creates job specifications for a batch of files. JobSpec : The class representing a job specification. """ if not isinstance(tasks, dict): raise ValueError("`tasks` must be a dictionary of task names -> task specifications.") job_specs = create_job_specs_for_batch(files_batch) job_ids = [] for job_spec in job_specs: logger.debug(f"Tasks: {tasks.keys()}") for task in tasks: logger.debug(f"Task: {task}") file_type = job_spec.document_type seen_tasks = set() # For tracking tasks and rejecting duplicate tasks. for task_name, task_config in tasks.items(): if task_name.lower().startswith("extract_"): task_file_type = task_name.split("_", 1)[1] if file_type.lower() != task_file_type.lower(): continue elif not is_valid_task_type(task_name.upper()): raise ValueError(f"Invalid task type: '{task_name}'") if str(task_config) in seen_tasks: raise ValueError(f"Duplicate task detected: {task_name} with config {task_config}") job_spec.add_task(task_config) seen_tasks.add(str(task_config)) job_id = self.add_job(job_spec) job_ids.append(job_id) return job_ids