nv_ingest_client.client package#

Submodules#

nv_ingest_client.client.client module#

exception nv_ingest_client.client.client.DataDecodeException(message='Data decoding error', data=None)[source]#

Bases: Exception

Exception raised for errors in decoding data.

message -- explanation of the error
data -- the data that failed to decode, optionally
class nv_ingest_client.client.client.NvIngestClient(
message_client_allocator: ~typing.Type[~nv_ingest_api.util.service_clients.client_base.MessageBrokerClientBase] = <class 'nv_ingest_api.util.service_clients.rest.rest_client.RestClient'>,
message_client_hostname: str | None = 'localhost',
message_client_port: int | None = 7670,
message_client_kwargs: ~typing.Dict[str,
~typing.Any] | None = None,
msg_counter_id: str | None = 'nv-ingest-message-id',
worker_pool_size: int = 8,
)[source]#

Bases: object

A client class for interacting with the nv-ingest service, supporting custom client allocators.

add_job(
job_spec: BatchJobSpec | JobSpec,
) str | List[str][source]#

Add one or more jobs to the client for later processing.

Parameters:

job_spec (JobSpec or BatchJobSpec) – A single job specification or a batch containing multiple specs.

Returns:

The job index for a single spec, or a list of indices for a batch.

Return type:

str or list of str

Raises:

ValueError – If an unsupported type is provided.

add_task(
job_index: str,
task: Task,
) None[source]#

Attach an existing Task object to a pending job.

Parameters:
  • job_index (str) – The client-side identifier of the target job.

  • task (Task) – The task instance to add.

create_job(
payload: Dict[str, Any],
source_id: str,
source_name: str,
document_type: str | None = None,
tasks: List[Task] | None = None,
extended_options: Dict[str, Any] | None = None,
) str[source]#

Construct and register a new job from provided metadata.

Parameters:
  • payload (dict) – The data payload for the job.

  • source_id (str) – Identifier of the data source.

  • source_name (str) – Human-readable name for the source.

  • document_type (str, optional) – Type of document (inferred from source_name if omitted).

  • tasks (list of Task, optional) – Initial set of processing tasks to attach.

  • extended_options (dict, optional) – Extra parameters for advanced configuration.

Returns:

The client-side job index.

Return type:

str

Raises:

ValueError – If job creation parameters are invalid.

create_jobs_for_batch(
files_batch: List[str],
tasks: Dict[str, Any],
) List[str][source]#

Create and submit job specifications (JobSpecs) for a batch of files, returning the job IDs. This function takes a batch of files, processes each file to extract its content and type, creates a job specification (JobSpec) for each file, and adds tasks from the provided task list. It then submits the jobs to the client and collects their job IDs.

Parameters:
  • files_batch (List[str]) – A list of file paths to be processed. Each file is assumed to be in a format compatible with the extract_file_content function, which extracts the file’s content and type.

  • tasks (Dict[str, Any]) – A dictionary of tasks to be added to each job. The keys represent task names, and the values represent task specifications or configurations. Standard tasks include “split”, “extract”, “store”, “caption”, “dedup”, “filter”, “embed”.

Returns:

A Tuple containing the list of JobSpecs and list of job IDs corresponding to the submitted jobs. Each job ID is returned by the client’s add_job method.

Return type:

Tuple[List[JobSpec], List[str]]

Raises:

ValueError – If there is an error extracting the file content or type from any of the files, a ValueError will be logged, and the corresponding file will be skipped.

Notes

  • The function assumes that a utility function extract_file_content is defined elsewhere, which extracts the content and type from the provided file paths.

  • For each file, a JobSpec is created with relevant metadata, including document type and file content. Various tasks are conditionally added based on the provided tasks dictionary.

  • The job specification includes tracing options with a timestamp (in nanoseconds) for diagnostic purposes.

Examples

Suppose you have a batch of files and tasks to process: >>> files_batch = [“file1.txt”, “file2.pdf”] >>> tasks = {“split”: …, “extract_txt”: …, “store”: …} >>> client = NvIngestClient() >>> job_ids = client.create_job_specs_for_batch(files_batch, tasks) >>> print(job_ids) [‘job_12345’, ‘job_67890’]

In this example, jobs are created and submitted for the files in files_batch, with the tasks in tasks being added to each job specification. The returned job IDs are then printed.

See also

create_job_specs_for_batch

Function that creates job specifications for a batch of files.

JobSpec

The class representing a job specification.

create_task(
job_index: str | int,
task_type: TaskType,
task_params: Dict[str, Any] | None = None,
) None[source]#

Create and attach a new task to a pending job by type and parameters.

Parameters:
  • job_index (str or int) – Identifier of the job to modify.

  • task_type (TaskType) – Enum specifying the kind of task to create.

  • task_params (dict, optional) – Parameters for the new task.

Raises:

ValueError – If the job does not exist or is not pending.

fetch_job_result(
job_ids: str | List[str],
timeout: float = 100,
max_retries: int | None = None,
retry_delay: float = 1,
verbose: bool = False,
completion_callback: Callable[[Dict, str], None] | None = None,
return_failures: bool = False,
) List[Tuple[Dict | None, str]] | Tuple[List[Tuple[Dict | None, str]], List[Tuple[str, str]]][source]#

Fetches job results for multiple job IDs concurrently with individual timeouts and retry logic.

Parameters:
  • job_ids (Union[str, List[str]]) – A job ID or list of job IDs to fetch results for.

  • timeout (float) – Timeout for each fetch operation, in seconds.

  • max_retries (Optional[int]) – Maximum number of retries for jobs that are not ready yet.

  • retry_delay (float) – Delay between retry attempts, in seconds.

  • verbose (bool) – If True, logs additional information.

  • completion_callback (Optional[Callable[[Dict, str], None]]) – A callback function that is executed each time a job result is successfully fetched. It receives two arguments: the job result (a dict) and the job ID.

  • return_failures (bool) – If True, returns a separate list of failed jobs.

Returns:

List[Tuple[Optional[Dict], str]]
  • A list of tuples, each containing the job result (or None on failure) and the job ID.

  • If return_failures=True: Tuple[List[Tuple[Optional[Dict], str]], List[Tuple[str, str]]] - A tuple of:

    • List of successful job results.

    • List of failures containing job ID and error message.

Return type:

  • If return_failures=False

Raises:
  • ValueError – If there is an error in decoding the job result.

  • TimeoutError – If the fetch operation times out.

  • Exception – For all other unexpected issues.

fetch_job_result_async(
job_ids: str | List[str],
data_only: bool = True,
) Dict[Future, str][source]#

Fetches job results for a list or a single job ID asynchronously and returns a mapping of futures to job IDs.

Parameters:
  • job_ids (Union[str, List[str]]) – A single job ID or a list of job IDs.

  • timeout (float) – Timeout (connect, read) for fetching each job result, in seconds.

  • data_only (bool) – Whether to return only the data part of the job result.

Returns:

A dictionary mapping each future to its corresponding job ID.

Return type:

Dict[Future, str]

fetch_job_result_cli(
job_ids: str | List[str],
data_only: bool = False,
) List[Tuple[Any, str, str | None]][source]#

Fetch job results via CLI semantics (synchronous list return).

Parameters:
  • job_ids (str or list of str) – Single or multiple client-side job identifiers.

  • data_only (bool, optional) – If True, extract only the ‘data’ field. Default is False.

Returns:

List of tuples for each fetched job.

Return type:

list of (result_data, job_index, trace_id)

job_count() int[source]#

Get the number of jobs currently tracked by the client.

Returns:

The total count of jobs in internal state tracking.

Return type:

int

process_jobs_concurrently(
job_indices: str | List[str],
job_queue_id: str | None = None,
concurrency_limit: int = 64,
timeout: int = 100,
max_job_retries: int | None = None,
retry_delay: float = 5.0,
fail_on_submit_error: bool = False,
completion_callback: Callable[[Any, str], None] | None = None,
return_failures: bool = False,
data_only: bool = True,
verbose: bool = False,
) List[Any] | Tuple[List[Any], List[Tuple[str, str]]][source]#

Submit and fetch multiple jobs concurrently.

Parameters:
  • job_indices (str or list of str) – Single or multiple job indices to process.

  • job_queue_id (str, optional) – Queue identifier for submission.

  • concurrency_limit (int, optional) – Max number of simultaneous in-flight jobs. Default is 128.

  • timeout (int, optional) – Timeout in seconds per fetch attempt. Default is 100.

  • max_job_retries (int, optional) – Max retries for ‘not ready’ jobs. None for infinite. Default is None.

  • retry_delay (float, optional) – Delay in seconds between retry cycles. Default is 5.0.

  • fail_on_submit_error (bool, optional) – If True, abort on submission error. Default is False.

  • completion_callback (callable, optional) – Called on each successful fetch as (result_data, job_index).

  • return_failures (bool, optional) – If True, return (results, failures). Default is False.

  • data_only (bool, optional) – If True, return only payload ‘data’. Default is True.

  • verbose (bool, optional) – If True, enable debug logging. Default is False.

Returns:

  • results (list) – List of successful job results when return_failures is False.

  • results, failures (tuple) – Tuple of (successful results, failure tuples) when return_failures is True.

Raises:

RuntimeError – If fail_on_submit_error is True and a submission fails.

submit_job(
job_indices: str | List[str],
job_queue_id: str,
batch_size: int = 10,
) List[str][source]#

Submit one or more jobs in batches.

Parameters:
  • job_indices (str or list of str) – Job indices to submit.

  • job_queue_id (str) – Queue identifier for submission.

  • batch_size (int, optional) – Maximum number of jobs per batch. Default is 10.

Returns:

Trace identifiers for each submitted job.

Return type:

list of str

Raises:

Exception – Propagates first error if any job in a batch fails.

submit_job_async(
job_indices: str | List[str],
job_queue_id: str,
) Dict[Future, str][source]#

Asynchronously submits one or more jobs to a specified job queue using a thread pool. This method handles both single job ID or a list of job IDs.

Parameters:
  • job_indices (Union[str, List[str]]) – A single job ID or a list of job IDs to be submitted.

  • job_queue_id (str) – The ID of the job queue where the jobs will be submitted.

Returns:

A dictionary mapping futures to their respective job IDs for later retrieval of outcomes.

Return type:

Dict[Future, str]

Notes

  • This method queues the jobs for asynchronous submission and returns a mapping of futures to job IDs.

  • It does not wait for any of the jobs to complete.

  • Ensure that each job is in the proper state before submission.

nv_ingest_client.client.interface module#

class nv_ingest_client.client.interface.Ingestor(
documents: List[str] | None = None,
client: NvIngestClient | None = None,
job_queue_id: str = 'ingest_task_queue',
**kwargs,
)[source]#

Bases: object

Ingestor provides an interface for building, managing, and running data ingestion jobs through NvIngestClient, allowing for chainable task additions and job state tracking.

Parameters:
  • documents (List[str]) – List of document paths to be processed.

  • client (Optional[NvIngestClient], optional) – An instance of NvIngestClient. If not provided, a client is created.

  • job_queue_id (str, optional) – The ID of the job queue for job submission, default is “ingest_task_queue”.

all_tasks() Ingestor[source]#

Adds a default set of tasks to the batch job specification.

The default tasks include extracting text, tables, charts, images, deduplication, filtering, splitting, and embedding tasks.

Returns:

Returns self for chaining.

Return type:

Ingestor

cancelled_jobs() int[source]#

Counts the jobs that have been cancelled.

Returns:

Number of jobs in the CANCELLED state.

Return type:

int

caption(
**kwargs: Any,
) Ingestor[source]#

Adds a CaptionTask to the batch job specification.

Parameters:

kwargs (dict) – Parameters specific to the CaptionTask.

Returns:

Returns self for chaining.

Return type:

Ingestor

completed_jobs() int[source]#

Counts the jobs that have completed successfully.

Returns:

Number of jobs in the COMPLETED state.

Return type:

int

dedup(
**kwargs: Any,
) Ingestor[source]#

Adds a DedupTask to the batch job specification.

Parameters:

kwargs (dict) – Parameters specific to the DedupTask.

Returns:

Returns self for chaining.

Return type:

Ingestor

embed(
**kwargs: Any,
) Ingestor[source]#

Adds an EmbedTask to the batch job specification.

Parameters:

kwargs (dict) – Parameters specific to the EmbedTask.

Returns:

Returns self for chaining.

Return type:

Ingestor

extract(
**kwargs: Any,
) Ingestor[source]#

Adds an ExtractTask for each document type to the batch job specification.

Parameters:

kwargs (dict) – Parameters specific to the ExtractTask.

Returns:

Returns self for chaining.

Return type:

Ingestor

failed_jobs() int[source]#

Counts the jobs that have failed.

Returns:

Number of jobs in the FAILED state.

Return type:

int

files(
documents: str | List[str],
) Ingestor[source]#

Add documents (local paths, globs, or remote URIs) for processing.

Remote URIs will force _all_local=False. Local globs that match nothing are fine. Explicit local paths that don’t exist cause _all_local=False.

filter(
**kwargs: Any,
) Ingestor[source]#

Adds a FilterTask to the batch job specification.

Parameters:

kwargs (dict) – Parameters specific to the FilterTask.

Returns:

Returns self for chaining.

Return type:

Ingestor

ingest(
show_progress: bool = False,
return_failures: bool = False,
**kwargs: Any,
) List[Dict[str, Any]] | Tuple[List[Dict[str, Any]], List[Tuple[str, str]]][source]#

Ingest documents by submitting jobs and fetching results concurrently.

Parameters:
  • show_progress (bool, optional) – Whether to display a progress bar. Default is False.

  • return_failures (bool, optional) – If True, return a tuple (results, failures); otherwise, return only results. Default is False.

  • **kwargs (Any) – Additional keyword arguments for the underlying client methods. Supported keys: ‘concurrency_limit’, ‘timeout’, ‘max_job_retries’, ‘retry_delay’, ‘data_only’, ‘verbose’. Unrecognized keys are passed through to process_jobs_concurrently.

Returns:

  • results (list of dict) – List of successful job results when return_failures is False.

  • results, failures (tuple (list of dict, list of tuple of str)) – Tuple containing successful results and failure information when return_failures is True.

ingest_async(
**kwargs: Any,
) Future[source]#

Asynchronously submits jobs and returns a single future that completes when all jobs have finished.

Parameters:

kwargs (dict) – Additional parameters for the submit_job_async method.

Returns:

A future that completes when all submitted jobs have reached a terminal state.

Return type:

Future

load(**kwargs) Ingestor[source]#

Ensure all document files are accessible locally, downloading if necessary.

For each document in _documents, checks if the file exists locally. If not, attempts to download the file to a temporary directory using fsspec. Updates _documents with paths to local copies, initializes _job_specs, and sets _all_local to True upon successful loading.

Parameters:

kwargs (dict) – Additional keyword arguments for remote file access via fsspec.

Returns:

Returns self for chaining after ensuring all files are accessible locally.

Return type:

Ingestor

remaining_jobs() int[source]#

Counts the jobs that are not in a terminal state.

Returns:

Number of jobs that are neither completed, failed, nor cancelled.

Return type:

int

split(
**kwargs: Any,
) Ingestor[source]#

Adds a SplitTask to the batch job specification.

Parameters:

kwargs (dict) – Parameters specific to the SplitTask.

Returns:

Returns self for chaining.

Return type:

Ingestor

store(
**kwargs: Any,
) Ingestor[source]#

Adds a StoreTask to the batch job specification.

Parameters:

kwargs (dict) – Parameters specific to the StoreTask.

Returns:

Returns self for chaining.

Return type:

Ingestor

store_embed(
**kwargs: Any,
) Ingestor[source]#

Adds a StoreTask to the batch job specification.

Parameters:

kwargs (dict) – Parameters specific to the StoreTask.

Returns:

Returns self for chaining.

Return type:

Ingestor

vdb_upload(
**kwargs: Any,
) Ingestor[source]#

Adds a VdbUploadTask to the batch job specification.

Parameters:

kwargs (dict) – Parameters specific to the VdbUploadTask.

Returns:

Returns self for chaining.

Return type:

Ingestor

nv_ingest_client.client.interface.ensure_job_specs(func)[source]#

Decorator to ensure _job_specs is initialized before calling task methods.

Module contents#

class nv_ingest_client.client.Ingestor(
documents: List[str] | None = None,
client: NvIngestClient | None = None,
job_queue_id: str = 'ingest_task_queue',
**kwargs,
)[source]#

Bases: object

Ingestor provides an interface for building, managing, and running data ingestion jobs through NvIngestClient, allowing for chainable task additions and job state tracking.

Parameters:
  • documents (List[str]) – List of document paths to be processed.

  • client (Optional[NvIngestClient], optional) – An instance of NvIngestClient. If not provided, a client is created.

  • job_queue_id (str, optional) – The ID of the job queue for job submission, default is “ingest_task_queue”.

all_tasks() Ingestor[source]#

Adds a default set of tasks to the batch job specification.

The default tasks include extracting text, tables, charts, images, deduplication, filtering, splitting, and embedding tasks.

Returns:

Returns self for chaining.

Return type:

Ingestor

cancelled_jobs() int[source]#

Counts the jobs that have been cancelled.

Returns:

Number of jobs in the CANCELLED state.

Return type:

int

caption(
**kwargs: Any,
) Ingestor[source]#

Adds a CaptionTask to the batch job specification.

Parameters:

kwargs (dict) – Parameters specific to the CaptionTask.

Returns:

Returns self for chaining.

Return type:

Ingestor

completed_jobs() int[source]#

Counts the jobs that have completed successfully.

Returns:

Number of jobs in the COMPLETED state.

Return type:

int

dedup(
**kwargs: Any,
) Ingestor[source]#

Adds a DedupTask to the batch job specification.

Parameters:

kwargs (dict) – Parameters specific to the DedupTask.

Returns:

Returns self for chaining.

Return type:

Ingestor

embed(
**kwargs: Any,
) Ingestor[source]#

Adds an EmbedTask to the batch job specification.

Parameters:

kwargs (dict) – Parameters specific to the EmbedTask.

Returns:

Returns self for chaining.

Return type:

Ingestor

extract(
**kwargs: Any,
) Ingestor[source]#

Adds an ExtractTask for each document type to the batch job specification.

Parameters:

kwargs (dict) – Parameters specific to the ExtractTask.

Returns:

Returns self for chaining.

Return type:

Ingestor

failed_jobs() int[source]#

Counts the jobs that have failed.

Returns:

Number of jobs in the FAILED state.

Return type:

int

files(
documents: str | List[str],
) Ingestor[source]#

Add documents (local paths, globs, or remote URIs) for processing.

Remote URIs will force _all_local=False. Local globs that match nothing are fine. Explicit local paths that don’t exist cause _all_local=False.

filter(
**kwargs: Any,
) Ingestor[source]#

Adds a FilterTask to the batch job specification.

Parameters:

kwargs (dict) – Parameters specific to the FilterTask.

Returns:

Returns self for chaining.

Return type:

Ingestor

ingest(
show_progress: bool = False,
return_failures: bool = False,
**kwargs: Any,
) List[Dict[str, Any]] | Tuple[List[Dict[str, Any]], List[Tuple[str, str]]][source]#

Ingest documents by submitting jobs and fetching results concurrently.

Parameters:
  • show_progress (bool, optional) – Whether to display a progress bar. Default is False.

  • return_failures (bool, optional) – If True, return a tuple (results, failures); otherwise, return only results. Default is False.

  • **kwargs (Any) – Additional keyword arguments for the underlying client methods. Supported keys: ‘concurrency_limit’, ‘timeout’, ‘max_job_retries’, ‘retry_delay’, ‘data_only’, ‘verbose’. Unrecognized keys are passed through to process_jobs_concurrently.

Returns:

  • results (list of dict) – List of successful job results when return_failures is False.

  • results, failures (tuple (list of dict, list of tuple of str)) – Tuple containing successful results and failure information when return_failures is True.

ingest_async(
**kwargs: Any,
) Future[source]#

Asynchronously submits jobs and returns a single future that completes when all jobs have finished.

Parameters:

kwargs (dict) – Additional parameters for the submit_job_async method.

Returns:

A future that completes when all submitted jobs have reached a terminal state.

Return type:

Future

load(**kwargs) Ingestor[source]#

Ensure all document files are accessible locally, downloading if necessary.

For each document in _documents, checks if the file exists locally. If not, attempts to download the file to a temporary directory using fsspec. Updates _documents with paths to local copies, initializes _job_specs, and sets _all_local to True upon successful loading.

Parameters:

kwargs (dict) – Additional keyword arguments for remote file access via fsspec.

Returns:

Returns self for chaining after ensuring all files are accessible locally.

Return type:

Ingestor

remaining_jobs() int[source]#

Counts the jobs that are not in a terminal state.

Returns:

Number of jobs that are neither completed, failed, nor cancelled.

Return type:

int

split(
**kwargs: Any,
) Ingestor[source]#

Adds a SplitTask to the batch job specification.

Parameters:

kwargs (dict) – Parameters specific to the SplitTask.

Returns:

Returns self for chaining.

Return type:

Ingestor

store(
**kwargs: Any,
) Ingestor[source]#

Adds a StoreTask to the batch job specification.

Parameters:

kwargs (dict) – Parameters specific to the StoreTask.

Returns:

Returns self for chaining.

Return type:

Ingestor

store_embed(
**kwargs: Any,
) Ingestor[source]#

Adds a StoreTask to the batch job specification.

Parameters:

kwargs (dict) – Parameters specific to the StoreTask.

Returns:

Returns self for chaining.

Return type:

Ingestor

vdb_upload(
**kwargs: Any,
) Ingestor[source]#

Adds a VdbUploadTask to the batch job specification.

Parameters:

kwargs (dict) – Parameters specific to the VdbUploadTask.

Returns:

Returns self for chaining.

Return type:

Ingestor

class nv_ingest_client.client.NvIngestClient(
message_client_allocator: ~typing.Type[~nv_ingest_api.util.service_clients.client_base.MessageBrokerClientBase] = <class 'nv_ingest_api.util.service_clients.rest.rest_client.RestClient'>,
message_client_hostname: str | None = 'localhost',
message_client_port: int | None = 7670,
message_client_kwargs: ~typing.Dict[str,
~typing.Any] | None = None,
msg_counter_id: str | None = 'nv-ingest-message-id',
worker_pool_size: int = 8,
)[source]#

Bases: object

A client class for interacting with the nv-ingest service, supporting custom client allocators.

add_job(
job_spec: BatchJobSpec | JobSpec,
) str | List[str][source]#

Add one or more jobs to the client for later processing.

Parameters:

job_spec (JobSpec or BatchJobSpec) – A single job specification or a batch containing multiple specs.

Returns:

The job index for a single spec, or a list of indices for a batch.

Return type:

str or list of str

Raises:

ValueError – If an unsupported type is provided.

add_task(
job_index: str,
task: Task,
) None[source]#

Attach an existing Task object to a pending job.

Parameters:
  • job_index (str) – The client-side identifier of the target job.

  • task (Task) – The task instance to add.

create_job(
payload: Dict[str, Any],
source_id: str,
source_name: str,
document_type: str | None = None,
tasks: List[Task] | None = None,
extended_options: Dict[str, Any] | None = None,
) str[source]#

Construct and register a new job from provided metadata.

Parameters:
  • payload (dict) – The data payload for the job.

  • source_id (str) – Identifier of the data source.

  • source_name (str) – Human-readable name for the source.

  • document_type (str, optional) – Type of document (inferred from source_name if omitted).

  • tasks (list of Task, optional) – Initial set of processing tasks to attach.

  • extended_options (dict, optional) – Extra parameters for advanced configuration.

Returns:

The client-side job index.

Return type:

str

Raises:

ValueError – If job creation parameters are invalid.

create_jobs_for_batch(
files_batch: List[str],
tasks: Dict[str, Any],
) List[str][source]#

Create and submit job specifications (JobSpecs) for a batch of files, returning the job IDs. This function takes a batch of files, processes each file to extract its content and type, creates a job specification (JobSpec) for each file, and adds tasks from the provided task list. It then submits the jobs to the client and collects their job IDs.

Parameters:
  • files_batch (List[str]) – A list of file paths to be processed. Each file is assumed to be in a format compatible with the extract_file_content function, which extracts the file’s content and type.

  • tasks (Dict[str, Any]) – A dictionary of tasks to be added to each job. The keys represent task names, and the values represent task specifications or configurations. Standard tasks include “split”, “extract”, “store”, “caption”, “dedup”, “filter”, “embed”.

Returns:

A Tuple containing the list of JobSpecs and list of job IDs corresponding to the submitted jobs. Each job ID is returned by the client’s add_job method.

Return type:

Tuple[List[JobSpec], List[str]]

Raises:

ValueError – If there is an error extracting the file content or type from any of the files, a ValueError will be logged, and the corresponding file will be skipped.

Notes

  • The function assumes that a utility function extract_file_content is defined elsewhere, which extracts the content and type from the provided file paths.

  • For each file, a JobSpec is created with relevant metadata, including document type and file content. Various tasks are conditionally added based on the provided tasks dictionary.

  • The job specification includes tracing options with a timestamp (in nanoseconds) for diagnostic purposes.

Examples

Suppose you have a batch of files and tasks to process: >>> files_batch = [“file1.txt”, “file2.pdf”] >>> tasks = {“split”: …, “extract_txt”: …, “store”: …} >>> client = NvIngestClient() >>> job_ids = client.create_job_specs_for_batch(files_batch, tasks) >>> print(job_ids) [‘job_12345’, ‘job_67890’]

In this example, jobs are created and submitted for the files in files_batch, with the tasks in tasks being added to each job specification. The returned job IDs are then printed.

See also

create_job_specs_for_batch

Function that creates job specifications for a batch of files.

JobSpec

The class representing a job specification.

create_task(
job_index: str | int,
task_type: TaskType,
task_params: Dict[str, Any] | None = None,
) None[source]#

Create and attach a new task to a pending job by type and parameters.

Parameters:
  • job_index (str or int) – Identifier of the job to modify.

  • task_type (TaskType) – Enum specifying the kind of task to create.

  • task_params (dict, optional) – Parameters for the new task.

Raises:

ValueError – If the job does not exist or is not pending.

fetch_job_result(
job_ids: str | List[str],
timeout: float = 100,
max_retries: int | None = None,
retry_delay: float = 1,
verbose: bool = False,
completion_callback: Callable[[Dict, str], None] | None = None,
return_failures: bool = False,
) List[Tuple[Dict | None, str]] | Tuple[List[Tuple[Dict | None, str]], List[Tuple[str, str]]][source]#

Fetches job results for multiple job IDs concurrently with individual timeouts and retry logic.

Parameters:
  • job_ids (Union[str, List[str]]) – A job ID or list of job IDs to fetch results for.

  • timeout (float) – Timeout for each fetch operation, in seconds.

  • max_retries (Optional[int]) – Maximum number of retries for jobs that are not ready yet.

  • retry_delay (float) – Delay between retry attempts, in seconds.

  • verbose (bool) – If True, logs additional information.

  • completion_callback (Optional[Callable[[Dict, str], None]]) – A callback function that is executed each time a job result is successfully fetched. It receives two arguments: the job result (a dict) and the job ID.

  • return_failures (bool) – If True, returns a separate list of failed jobs.

Returns:

List[Tuple[Optional[Dict], str]]
  • A list of tuples, each containing the job result (or None on failure) and the job ID.

  • If return_failures=True: Tuple[List[Tuple[Optional[Dict], str]], List[Tuple[str, str]]] - A tuple of:

    • List of successful job results.

    • List of failures containing job ID and error message.

Return type:

  • If return_failures=False

Raises:
  • ValueError – If there is an error in decoding the job result.

  • TimeoutError – If the fetch operation times out.

  • Exception – For all other unexpected issues.

fetch_job_result_async(
job_ids: str | List[str],
data_only: bool = True,
) Dict[Future, str][source]#

Fetches job results for a list or a single job ID asynchronously and returns a mapping of futures to job IDs.

Parameters:
  • job_ids (Union[str, List[str]]) – A single job ID or a list of job IDs.

  • timeout (float) – Timeout (connect, read) for fetching each job result, in seconds.

  • data_only (bool) – Whether to return only the data part of the job result.

Returns:

A dictionary mapping each future to its corresponding job ID.

Return type:

Dict[Future, str]

fetch_job_result_cli(
job_ids: str | List[str],
data_only: bool = False,
) List[Tuple[Any, str, str | None]][source]#

Fetch job results via CLI semantics (synchronous list return).

Parameters:
  • job_ids (str or list of str) – Single or multiple client-side job identifiers.

  • data_only (bool, optional) – If True, extract only the ‘data’ field. Default is False.

Returns:

List of tuples for each fetched job.

Return type:

list of (result_data, job_index, trace_id)

job_count() int[source]#

Get the number of jobs currently tracked by the client.

Returns:

The total count of jobs in internal state tracking.

Return type:

int

process_jobs_concurrently(
job_indices: str | List[str],
job_queue_id: str | None = None,
concurrency_limit: int = 64,
timeout: int = 100,
max_job_retries: int | None = None,
retry_delay: float = 5.0,
fail_on_submit_error: bool = False,
completion_callback: Callable[[Any, str], None] | None = None,
return_failures: bool = False,
data_only: bool = True,
verbose: bool = False,
) List[Any] | Tuple[List[Any], List[Tuple[str, str]]][source]#

Submit and fetch multiple jobs concurrently.

Parameters:
  • job_indices (str or list of str) – Single or multiple job indices to process.

  • job_queue_id (str, optional) – Queue identifier for submission.

  • concurrency_limit (int, optional) – Max number of simultaneous in-flight jobs. Default is 128.

  • timeout (int, optional) – Timeout in seconds per fetch attempt. Default is 100.

  • max_job_retries (int, optional) – Max retries for ‘not ready’ jobs. None for infinite. Default is None.

  • retry_delay (float, optional) – Delay in seconds between retry cycles. Default is 5.0.

  • fail_on_submit_error (bool, optional) – If True, abort on submission error. Default is False.

  • completion_callback (callable, optional) – Called on each successful fetch as (result_data, job_index).

  • return_failures (bool, optional) – If True, return (results, failures). Default is False.

  • data_only (bool, optional) – If True, return only payload ‘data’. Default is True.

  • verbose (bool, optional) – If True, enable debug logging. Default is False.

Returns:

  • results (list) – List of successful job results when return_failures is False.

  • results, failures (tuple) – Tuple of (successful results, failure tuples) when return_failures is True.

Raises:

RuntimeError – If fail_on_submit_error is True and a submission fails.

submit_job(
job_indices: str | List[str],
job_queue_id: str,
batch_size: int = 10,
) List[str][source]#

Submit one or more jobs in batches.

Parameters:
  • job_indices (str or list of str) – Job indices to submit.

  • job_queue_id (str) – Queue identifier for submission.

  • batch_size (int, optional) – Maximum number of jobs per batch. Default is 10.

Returns:

Trace identifiers for each submitted job.

Return type:

list of str

Raises:

Exception – Propagates first error if any job in a batch fails.

submit_job_async(
job_indices: str | List[str],
job_queue_id: str,
) Dict[Future, str][source]#

Asynchronously submits one or more jobs to a specified job queue using a thread pool. This method handles both single job ID or a list of job IDs.

Parameters:
  • job_indices (Union[str, List[str]]) – A single job ID or a list of job IDs to be submitted.

  • job_queue_id (str) – The ID of the job queue where the jobs will be submitted.

Returns:

A dictionary mapping futures to their respective job IDs for later retrieval of outcomes.

Return type:

Dict[Future, str]

Notes

  • This method queues the jobs for asynchronous submission and returns a mapping of futures to job IDs.

  • It does not wait for any of the jobs to complete.

  • Ensure that each job is in the proper state before submission.