nv_ingest_client.client package#
Submodules#
nv_ingest_client.client.client module#
- exception nv_ingest_client.client.client.DataDecodeException(message='Data decoding error', data=None)[source]#
Bases:
ExceptionException raised for errors in decoding data.
- message -- explanation of the error
- data -- the data that failed to decode, optionally
- class nv_ingest_client.client.client.NvIngestClient(
- message_client_allocator: ~typing.Type[~nv_ingest_api.util.service_clients.client_base.MessageBrokerClientBase] = <class 'nv_ingest_api.util.service_clients.rest.rest_client.RestClient'>,
- message_client_hostname: str | None = 'localhost',
- message_client_port: int | None = 7670,
- message_client_kwargs: ~typing.Dict[str,
- ~typing.Any] | None = None,
- msg_counter_id: str | None = 'nv-ingest-message-id',
- worker_pool_size: int = 8,
Bases:
objectA client class for interacting with the nv-ingest service, supporting custom client allocators.
- add_job(
- job_spec: BatchJobSpec | JobSpec,
Add one or more jobs to the client for later processing.
- Parameters:
job_spec (JobSpec or BatchJobSpec) – A single job specification or a batch containing multiple specs.
- Returns:
The job index for a single spec, or a list of indices for a batch.
- Return type:
str or list of str
- Raises:
ValueError – If an unsupported type is provided.
- add_task(
- job_index: str,
- task: Task,
Attach an existing Task object to a pending job.
- Parameters:
job_index (str) – The client-side identifier of the target job.
task (Task) – The task instance to add.
- consume_completed_parent_trace_ids() List[str][source]#
Return and clear the set of completed parent trace identifiers.
- create_job(
- payload: Dict[str, Any],
- source_id: str,
- source_name: str,
- document_type: str | None = None,
- tasks: List[Task] | None = None,
- extended_options: Dict[str, Any] | None = None,
Construct and register a new job from provided metadata.
- Parameters:
payload (dict) – The data payload for the job.
source_id (str) – Identifier of the data source.
source_name (str) – Human-readable name for the source.
document_type (str, optional) – Type of document (inferred from source_name if omitted).
tasks (list of Task, optional) – Initial set of processing tasks to attach.
extended_options (dict, optional) – Extra parameters for advanced configuration.
- Returns:
The client-side job index.
- Return type:
str
- Raises:
ValueError – If job creation parameters are invalid.
- create_jobs_for_batch(
- files_batch: List[str],
- tasks: Dict[str, Any],
- pdf_split_page_count: int = None,
Create and submit job specifications (JobSpecs) for a batch of files, returning the job IDs. This function takes a batch of files, processes each file to extract its content and type, creates a job specification (JobSpec) for each file, and adds tasks from the provided task list. It then submits the jobs to the client and collects their job IDs.
- Parameters:
files_batch (List[str]) – A list of file paths to be processed. Each file is assumed to be in a format compatible with the extract_file_content function, which extracts the file’s content and type.
tasks (Dict[str, Any]) – A dictionary of tasks to be added to each job. The keys represent task names, and the values represent task specifications or configurations. Standard tasks include “split”, “extract”, “store”, “caption”, “dedup”, “filter”, “embed”.
pdf_split_page_count (int, optional) – Number of pages per PDF chunk for splitting (1-128). If provided, this will be added to the job spec’s extended_options for PDF files.
- Returns:
A Tuple containing the list of JobSpecs and list of job IDs corresponding to the submitted jobs. Each job ID is returned by the client’s add_job method.
- Return type:
Tuple[List[JobSpec], List[str]]
- Raises:
ValueError – If there is an error extracting the file content or type from any of the files, a ValueError will be logged, and the corresponding file will be skipped.
Notes
The function assumes that a utility function extract_file_content is defined elsewhere, which extracts the content and type from the provided file paths.
For each file, a JobSpec is created with relevant metadata, including document type and file content. Various tasks are conditionally added based on the provided tasks dictionary.
The job specification includes tracing options with a timestamp (in nanoseconds) for diagnostic purposes.
Examples
Suppose you have a batch of files and tasks to process: >>> files_batch = [“file1.txt”, “file2.pdf”] >>> tasks = {“split”: …, “extract_txt”: …, “store”: …} >>> client = NvIngestClient() >>> job_ids = client.create_job_specs_for_batch(files_batch, tasks) >>> print(job_ids) [‘job_12345’, ‘job_67890’]
In this example, jobs are created and submitted for the files in files_batch, with the tasks in tasks being added to each job specification. The returned job IDs are then printed.
See also
create_job_specs_for_batchFunction that creates job specifications for a batch of files.
JobSpecThe class representing a job specification.
- create_task(
- job_index: str | int,
- task_type: TaskType,
- task_params: Dict[str, Any] | None = None,
Create and attach a new task to a pending job by type and parameters.
- Parameters:
job_index (str or int) – Identifier of the job to modify.
task_type (TaskType) – Enum specifying the kind of task to create.
task_params (dict, optional) – Parameters for the new task.
- Raises:
ValueError – If the job does not exist or is not pending.
- fetch_job_result(
- job_ids: str | List[str],
- timeout: float = 100,
- max_retries: int | None = None,
- retry_delay: float = 1,
- verbose: bool = False,
- completion_callback: Callable[[Dict, str], None] | None = None,
- return_failures: bool = False,
Fetches job results for multiple job IDs concurrently with individual timeouts and retry logic.
- Parameters:
job_ids (Union[str, List[str]]) – A job ID or list of job IDs to fetch results for.
timeout (float) – Timeout for each fetch operation, in seconds.
max_retries (Optional[int]) – Maximum number of retries for jobs that are not ready yet.
retry_delay (float) – Delay between retry attempts, in seconds.
verbose (bool) – If True, logs additional information.
completion_callback (Optional[Callable[[Dict, str], None]]) – A callback function that is executed each time a job result is successfully fetched. It receives two arguments: the job result (a dict) and the job ID.
return_failures (bool) – If True, returns a separate list of failed jobs.
- Returns:
- List[Tuple[Optional[Dict], str]]
A list of tuples, each containing the job result (or None on failure) and the job ID.
If return_failures=True: Tuple[List[Tuple[Optional[Dict], str]], List[Tuple[str, str]]] - A tuple of:
List of successful job results.
List of failures containing job ID and error message.
- Return type:
If return_failures=False
- Raises:
ValueError – If there is an error in decoding the job result.
TimeoutError – If the fetch operation times out.
Exception – For all other unexpected issues.
- fetch_job_result_async(
- job_ids: str | List[str],
- data_only: bool = True,
- timeout: Tuple[int, float | None] | None = None,
Fetches job results for a list or a single job ID asynchronously and returns a mapping of futures to job IDs.
- Parameters:
job_ids (Union[str, List[str]]) – A single job ID or a list of job IDs.
timeout (float) – Timeout (connect, read) for fetching each job result, in seconds.
data_only (bool) – Whether to return only the data part of the job result.
- Returns:
A dictionary mapping each future to its corresponding job ID.
- Return type:
Dict[Future, str]
- fetch_job_result_cli(
- job_ids: str | List[str],
- data_only: bool = False,
- timeout: Tuple[int, float | None] | None = None,
Fetch job results via CLI semantics (synchronous list return).
- Parameters:
job_ids (str or list of str) – Single or multiple client-side job identifiers.
data_only (bool, optional) – If True, extract only the ‘data’ field. Default is False.
- Returns:
List of tuples for each fetched job.
- Return type:
list of (result_data, job_index, trace_id)
- job_count() int[source]#
Get the number of jobs currently tracked by the client.
- Returns:
The total count of jobs in internal state tracking.
- Return type:
int
- process_jobs_concurrently(
- job_indices: str | List[str],
- job_queue_id: str | None = None,
- batch_size: int | None = None,
- concurrency_limit: int = 64,
- timeout: int = 100,
- max_job_retries: int | None = None,
- retry_delay: float = 0.5,
- initial_fetch_delay: float = 0.3,
- fail_on_submit_error: bool = False,
- completion_callback: Callable[[Any, str], None] | None = None,
- return_failures: bool = False,
- data_only: bool = True,
- stream_to_callback_only: bool = False,
- return_full_response: bool = False,
- verbose: bool = False,
- return_traces: bool = False,
Submit and fetch multiple jobs concurrently.
- Parameters:
job_indices (str or list of str) – Single or multiple job indices to process.
job_queue_id (str, optional) – Queue identifier for submission.
batch_size (int, optional) – Maximum number of jobs to process in each internal batch. Higher values may improve throughput but increase memory usage. Must be >= 1. Default is 32.
concurrency_limit (int, optional) – Max number of simultaneous in-flight jobs. Default is 64.
timeout (int, optional) – Timeout in seconds per fetch attempt. Default is 100.
max_job_retries (int, optional) – Max retries for ‘not ready’ jobs. None for infinite. Default is None.
retry_delay (float, optional) – Delay in seconds between retry cycles. Default is 5.0.
fail_on_submit_error (bool, optional) – If True, abort on submission error. Default is False.
completion_callback (callable, optional) – Called on each successful fetch as (result_data, job_index).
return_failures (bool, optional) – If True, return (results, failures). Default is False.
data_only (bool, optional) – If True, return only payload ‘data’. Default is True.
return_full_response (bool, optional) – If True, results contain the full response envelopes (including ‘trace’ and ‘annotations’). Ignored when stream_to_callback_only=True. Default is False.
verbose (bool, optional) – If True, enable debug logging. Default is False.
return_traces (bool, optional) – If True, parent-level aggregated trace metrics are extracted and returned. Default is False.
- Returns:
results (list) – List of successful job results when return_failures is False.
results, failures (tuple) – Tuple of (successful results, failure tuples) when return_failures is True.
results, failures, traces (tuple) – Tuple of (successful results, failure tuples, trace dicts) when both return_failures and return_traces are True.
- Raises:
RuntimeError – If fail_on_submit_error is True and a submission fails.
- process_jobs_concurrently_async(
- job_indices: str | List[str],
- job_queue_id: str | None = None,
- batch_size: int | None = None,
- timeout: int = 100,
- max_job_retries: int | None = None,
- retry_delay: float = 0.5,
- initial_fetch_delay: float = 0.3,
- fail_on_submit_error: bool = False,
- completion_callback: Callable[[Any, str], None] | None = None,
- stream_to_callback_only: bool = False,
- return_full_response: bool = False,
- verbose: bool = False,
- return_traces: bool = False,
Submit and fetch multiple jobs concurrently and asynchronously.
This method initializes the processing and returns a Future immediately. The Future will resolve with a fixed 3-part tuple (results, failures, traces) once all jobs have completed.
Parameters are identical to process_jobs_concurrently.
- Returns:
A future that completes when all jobs are done. Its result is a tuple containing (successful_results, failures, traces).
- Return type:
Future[Tuple[List[Any], List[Tuple[str, str]], List[Optional[Dict[str, Any]]]]]
- register_parent_trace_id(trace_id: str | None) None[source]#
Record a parent trace identifier once its aggregation completed.
- submit_job(
- job_indices: str | List[str],
- job_queue_id: str,
- batch_size: int = 10,
Submit one or more jobs in batches.
- Parameters:
job_indices (str or list of str) – Job indices to submit.
job_queue_id (str) – Queue identifier for submission.
batch_size (int, optional) – Maximum number of jobs per batch. Default is 10.
- Returns:
Trace identifiers for each submitted job.
- Return type:
list of str
- Raises:
Exception – Propagates first error if any job in a batch fails.
- submit_job_async(
- job_indices: str | List[str],
- job_queue_id: str,
Asynchronously submits one or more jobs to a specified job queue using a thread pool. This method handles both single job ID or a list of job IDs.
- Parameters:
job_indices (Union[str, List[str]]) – A single job ID or a list of job IDs to be submitted.
job_queue_id (str) – The ID of the job queue where the jobs will be submitted.
- Returns:
A dictionary mapping futures to their respective job IDs for later retrieval of outcomes.
- Return type:
Dict[Future, str]
Notes
This method queues the jobs for asynchronous submission and returns a mapping of futures to job IDs.
It does not wait for any of the jobs to complete.
Ensure that each job is in the proper state before submission.
nv_ingest_client.client.ingest_job_handler module#
- class nv_ingest_client.client.ingest_job_handler.IngestJobHandler(
- client: Any,
- files: List[str],
- tasks: Dict[str, Any],
- output_directory: str,
- batch_size: int,
- fail_on_error: bool = False,
- save_images_separately: bool = False,
- show_progress: bool = True,
- show_telemetry: bool = False,
- job_queue_id: str = 'ingest_task_queue',
- pdf_split_page_count: int = None,
Bases:
objectA modular job handler that mirrors the CLI’s create_and_process_jobs flow, so the same proven scheduling/retry behavior can be reused by other entry points.
- Usage:
handler = IngestJobHandler(client, files, tasks, output_dir, batch_size) total_files, trace_times, total_pages, trace_ids = handler.run()
nv_ingest_client.client.interface module#
- class nv_ingest_client.client.interface.Ingestor(
- documents: List[str] | None = None,
- client: NvIngestClient | None = None,
- job_queue_id: str = 'ingest_task_queue',
- **kwargs,
Bases:
objectIngestor provides an interface for building, managing, and running data ingestion jobs through NvIngestClient, allowing for chainable task additions and job state tracking.
- Parameters:
documents (List[str]) – List of document paths to be processed.
client (Optional[NvIngestClient], optional) – An instance of NvIngestClient. If not provided, a client is created.
job_queue_id (str, optional) – The ID of the job queue for job submission, default is “ingest_task_queue”.
- all_tasks() Ingestor[source]#
Adds a default set of tasks to the batch job specification.
The default tasks include extracting text, tables, charts, images, deduplication, filtering, splitting, and embedding tasks.
- Returns:
Returns self for chaining.
- Return type:
- buffers(
- buffers: Tuple[str, BytesIO] | List[Tuple[str, BytesIO]],
Add buffers for processing.
- Parameters:
buffers (List[Tuple[str, BytesIO]]) – List of tuples containing the name of the buffer and the BytesIO object.
- cancelled_jobs() int[source]#
Counts the jobs that have been cancelled.
- Returns:
Number of jobs in the CANCELLED state.
- Return type:
int
- caption(
- **kwargs: Any,
Adds a CaptionTask to the batch job specification.
- Parameters:
kwargs (dict) – Parameters specific to the CaptionTask. Supports reasoning (bool), prompt (str), api_key (str), endpoint_url (str), and model_name (str).
- Returns:
Returns self for chaining.
- Return type:
- completed_jobs() int[source]#
Counts the jobs that have completed successfully.
- Returns:
Number of jobs in the COMPLETED state.
- Return type:
int
- dedup(
- **kwargs: Any,
Adds a DedupTask to the batch job specification.
- Parameters:
kwargs (dict) – Parameters specific to the DedupTask.
- Returns:
Returns self for chaining.
- Return type:
- embed(
- **kwargs: Any,
Adds an EmbedTask to the batch job specification.
- Parameters:
kwargs (dict) – Parameters specific to the EmbedTask.
- Returns:
Returns self for chaining.
- Return type:
- extract(
- **kwargs: Any,
Adds an ExtractTask for each document type to the batch job specification.
- Parameters:
kwargs (dict) – Parameters specific to the ExtractTask.
- Returns:
Returns self for chaining.
- Return type:
- failed_jobs() int[source]#
Counts the jobs that have failed.
- Returns:
Number of jobs in the FAILED state.
- Return type:
int
- files(
- documents: str | List[str],
Add documents (local paths, globs, or remote URIs) for processing.
Remote URIs will force _all_local=False. Local globs that match nothing are fine. Explicit local paths that don’t exist cause _all_local=False.
- filter(
- **kwargs: Any,
Adds a FilterTask to the batch job specification.
- Parameters:
kwargs (dict) – Parameters specific to the FilterTask.
- Returns:
Returns self for chaining.
- Return type:
- get_status() Dict[str, str][source]#
Returns a dictionary mapping document identifiers to their current status in the pipeline.
This method is designed for use with async ingestion to poll the status of submitted jobs. For each document submitted to the ingestor, the method returns its current processing state.
- Returns:
A dictionary where: - Keys are document identifiers (source names or source IDs) - Values are status strings representing the current state:
”pending”: Job created but not yet submitted
”submitted”: Job submitted and waiting for processing
”processing”: Job is currently being processed
”completed”: Job finished successfully
”failed”: Job encountered an error
”cancelled”: Job was cancelled
”unknown”: Job state could not be determined (initial state)
- Return type:
Dict[str, str]
Examples
>>> ingestor = Ingestor(documents=["doc1.pdf", "doc2.pdf"], client=client) >>> ingestor.extract().embed() >>> future = ingestor.ingest_async() >>> >>> # Poll status while processing >>> status = ingestor.get_status() >>> print(status) {'doc1.pdf': 'processing', 'doc2.pdf': 'submitted'} >>> >>> # Check again after some time >>> status = ingestor.get_status() >>> print(status) {'doc1.pdf': 'completed', 'doc2.pdf': 'processing'}
Notes
This method is most useful when called after ingest_async() to track progress
If called before any jobs are submitted, returns an empty dictionary or documents with “unknown” status
The method accesses internal job state from the client, so it reflects the most current known state
- ingest(
- show_progress: bool = False,
- return_failures: bool = False,
- save_to_disk: bool = False,
- return_traces: bool = False,
- **kwargs: Any,
Ingest documents by submitting jobs and fetching results concurrently.
- Parameters:
show_progress (bool, optional) – Whether to display a progress bar. Default is False.
return_failures (bool, optional) – If True, return a tuple (results, failures); otherwise, return only results. Default is False.
save_to_disk (bool, optional) – If True, save results to disk and return LazyLoadedList proxies. Default is False.
return_traces (bool, optional) – If True, return trace metrics alongside results. Default is False. Traces contain timing metrics (entry, exit, resident_time) for each stage.
**kwargs (Any) – Additional keyword arguments for the underlying client methods. Optional flags include include_parent_trace_ids=True to also return parent job trace identifiers (V2 API only).
- Returns:
Returns vary based on flags: - Default: list of results - return_failures=True: (results, failures) - return_traces=True: (results, traces) - return_failures=True, return_traces=True: (results, failures, traces) - Additional combinations with include_parent_trace_ids kwarg
- Return type:
list or tuple
Notes
Trace metrics include timing data for each processing stage. For detailed usage and examples, see src/nv_ingest/api/v2/README.md
- ingest_async(
- *,
- return_failures: bool = False,
- return_traces: bool = False,
- **kwargs: Any,
Asynchronously submits jobs and returns a single future that completes when all jobs have finished.
The return type of the future’s result is dynamic and mirrors the behavior of the synchronous ingest() method, controlled by the return_failures and return_traces flags. If a VDB upload is configured, the future will complete after the VDB upload finishes.
- Parameters:
return_failures (bool, optional) – If True, return a tuple containing failures; otherwise, only return results. Default is False.
return_traces (bool, optional) – If True, return trace metrics alongside results. Default is False.
kwargs (dict) – Additional parameters passed to the concurrent processor. Optional flags include include_parent_trace_ids=True to also return parent job trace identifiers (V2 API only).
- Returns:
A future that completes when all jobs and any subsequent VDB upload have finished. Its result will be one of the following: - Default: list of results - return_failures=True: (results, failures) - return_traces=True: (results, traces) - return_failures=True, return_traces=True: (results, failures, traces)
- Return type:
Future[Union[List[Any], Tuple[Any, …]]]
- load(**kwargs) Ingestor[source]#
Ensure all document files are accessible locally, downloading if necessary.
For each document in _documents, checks if the file exists locally. If not, attempts to download the file to a temporary directory using fsspec. Updates _documents with paths to local copies, initializes _job_specs, and sets _all_local to True upon successful loading.
- Parameters:
kwargs (dict) – Additional keyword arguments for remote file access via fsspec.
- Returns:
Returns self for chaining after ensuring all files are accessible locally.
- Return type:
- pdf_split_config(
- pages_per_chunk: int = 32,
Configure PDF splitting behavior for V2 API.
- Parameters:
pages_per_chunk (int, optional) – Number of pages per PDF chunk (default: 32) Server enforces boundaries: min=1, max=128
- Returns:
Self for method chaining
- Return type:
Notes
Only affects V2 API endpoints with PDF splitting support
Server will clamp values outside [1, 128] range
Smaller chunks = more parallelism but more overhead
Larger chunks = less overhead but reduced concurrency
- remaining_jobs() int[source]#
Counts the jobs that are not in a terminal state.
- Returns:
Number of jobs that are neither completed, failed, nor cancelled.
- Return type:
int
- save_to_disk(
- output_directory: str | None = None,
- cleanup: bool = True,
- compression: str | None = 'gzip',
Configures the Ingestor to save results to disk instead of memory.
This method enables disk-based storage for ingestion results. When called, the ingest() method will write the output for each processed document to a separate JSONL file. The return value of ingest() will be a list of LazyLoadedList objects, which are memory-efficient proxies to these files.
The output directory can be specified directly, via an environment variable, or a temporary directory will be created automatically.
- Parameters:
output_directory (str, optional) – The path to the directory where result files (.jsonl) will be saved. If not provided, it defaults to the value of the environment variable NV_INGEST_CLIENT_SAVE_TO_DISK_OUTPUT_DIRECTORY. If the environment variable is also not set, a temporary directory will be created. Defaults to None.
cleanup (bool, optional)) – If True, the entire output_directory will be recursively deleted when the Ingestor’s context is exited (i.e., when used in a with statement). Defaults to True.
compression (str, optional) – The compression algorithm to use for the saved result files. Currently, the only supported value is ‘gzip’. To disable compression, set this parameter to None. Defaults to ‘gzip’, which significantly reduces the disk space required for results. When enabled, files are saved with a .gz suffix (e.g., results.jsonl.gz).
- Returns:
Returns self for chaining.
- Return type:
- split(
- **kwargs: Any,
Adds a SplitTask to the batch job specification.
- Parameters:
kwargs (dict) – Parameters specific to the SplitTask.
- Returns:
Returns self for chaining.
- Return type:
- store(
- **kwargs: Any,
Adds a StoreTask to the batch job specification.
- Parameters:
kwargs (dict) – Parameters specific to the StoreTask.
- Returns:
Returns self for chaining.
- Return type:
- store_embed(
- **kwargs: Any,
Adds a StoreEmbedTask to the batch job specification.
- Parameters:
kwargs (dict) – Parameters specific to the StoreEmbedTask.
- Returns:
Returns self for chaining.
- Return type:
- udf(
- udf_function: str,
- udf_function_name: str | None = None,
- phase: PipelinePhase | int | str | None = None,
- target_stage: str | None = None,
- run_before: bool = False,
- run_after: bool = False,
Adds a UDFTask to the batch job specification.
- Parameters:
udf_function (str) – UDF specification. Supports three formats: 1. Inline function: ‘def my_func(control_message): …’ 2. Import path: ‘my_module.my_function’ 3. File path: ‘/path/to/file.py:function_name’
udf_function_name (str, optional) – Name of the function to execute from the UDF specification. If not provided, attempts to infer from udf_function.
phase (Union[PipelinePhase, int, str], optional) – Pipeline phase to execute UDF. Accepts phase names (‘extract’, ‘split’, ‘embed’, ‘response’) or numbers (1-4). Cannot be used with target_stage.
target_stage (str, optional) – Specific stage name to target for UDF execution. Cannot be used with phase.
run_before (bool, optional) – If True and target_stage is specified, run UDF before the target stage. Default: False.
run_after (bool, optional) – If True and target_stage is specified, run UDF after the target stage. Default: False.
- Returns:
Returns self for chaining.
- Return type:
- Raises:
ValueError – If udf_function_name cannot be inferred and is not provided explicitly, or if both phase and target_stage are specified, or if neither is specified.
- vdb_upload(
- purge_results_after_upload: bool = True,
- **kwargs: Any,
Adds a VdbUploadTask to the batch job specification.
- Parameters:
purge_results_after_upload (bool, optional) – If True, the saved result files will be deleted from disk after a successful upload. This requires save_to_disk() to be active. Defaults to True
kwargs (dict) – Parameters specific to the VdbUploadTask.
- Returns:
Returns self for chaining.
- Return type:
- class nv_ingest_client.client.interface.LazyLoadedList(
- filepath: str,
- expected_len: int | None = None,
- compression: str | None = None,
Bases:
Sequence
Module contents#
- class nv_ingest_client.client.Ingestor(
- documents: List[str] | None = None,
- client: NvIngestClient | None = None,
- job_queue_id: str = 'ingest_task_queue',
- **kwargs,
Bases:
objectIngestor provides an interface for building, managing, and running data ingestion jobs through NvIngestClient, allowing for chainable task additions and job state tracking.
- Parameters:
documents (List[str]) – List of document paths to be processed.
client (Optional[NvIngestClient], optional) – An instance of NvIngestClient. If not provided, a client is created.
job_queue_id (str, optional) – The ID of the job queue for job submission, default is “ingest_task_queue”.
- all_tasks() Ingestor[source]#
Adds a default set of tasks to the batch job specification.
The default tasks include extracting text, tables, charts, images, deduplication, filtering, splitting, and embedding tasks.
- Returns:
Returns self for chaining.
- Return type:
- buffers(
- buffers: Tuple[str, BytesIO] | List[Tuple[str, BytesIO]],
Add buffers for processing.
- Parameters:
buffers (List[Tuple[str, BytesIO]]) – List of tuples containing the name of the buffer and the BytesIO object.
- cancelled_jobs() int[source]#
Counts the jobs that have been cancelled.
- Returns:
Number of jobs in the CANCELLED state.
- Return type:
int
- caption(
- **kwargs: Any,
Adds a CaptionTask to the batch job specification.
- Parameters:
kwargs (dict) – Parameters specific to the CaptionTask. Supports reasoning (bool), prompt (str), api_key (str), endpoint_url (str), and model_name (str).
- Returns:
Returns self for chaining.
- Return type:
- completed_jobs() int[source]#
Counts the jobs that have completed successfully.
- Returns:
Number of jobs in the COMPLETED state.
- Return type:
int
- dedup(
- **kwargs: Any,
Adds a DedupTask to the batch job specification.
- Parameters:
kwargs (dict) – Parameters specific to the DedupTask.
- Returns:
Returns self for chaining.
- Return type:
- embed(
- **kwargs: Any,
Adds an EmbedTask to the batch job specification.
- Parameters:
kwargs (dict) – Parameters specific to the EmbedTask.
- Returns:
Returns self for chaining.
- Return type:
- extract(
- **kwargs: Any,
Adds an ExtractTask for each document type to the batch job specification.
- Parameters:
kwargs (dict) – Parameters specific to the ExtractTask.
- Returns:
Returns self for chaining.
- Return type:
- failed_jobs() int[source]#
Counts the jobs that have failed.
- Returns:
Number of jobs in the FAILED state.
- Return type:
int
- files(
- documents: str | List[str],
Add documents (local paths, globs, or remote URIs) for processing.
Remote URIs will force _all_local=False. Local globs that match nothing are fine. Explicit local paths that don’t exist cause _all_local=False.
- filter(
- **kwargs: Any,
Adds a FilterTask to the batch job specification.
- Parameters:
kwargs (dict) – Parameters specific to the FilterTask.
- Returns:
Returns self for chaining.
- Return type:
- get_status() Dict[str, str][source]#
Returns a dictionary mapping document identifiers to their current status in the pipeline.
This method is designed for use with async ingestion to poll the status of submitted jobs. For each document submitted to the ingestor, the method returns its current processing state.
- Returns:
A dictionary where: - Keys are document identifiers (source names or source IDs) - Values are status strings representing the current state:
”pending”: Job created but not yet submitted
”submitted”: Job submitted and waiting for processing
”processing”: Job is currently being processed
”completed”: Job finished successfully
”failed”: Job encountered an error
”cancelled”: Job was cancelled
”unknown”: Job state could not be determined (initial state)
- Return type:
Dict[str, str]
Examples
>>> ingestor = Ingestor(documents=["doc1.pdf", "doc2.pdf"], client=client) >>> ingestor.extract().embed() >>> future = ingestor.ingest_async() >>> >>> # Poll status while processing >>> status = ingestor.get_status() >>> print(status) {'doc1.pdf': 'processing', 'doc2.pdf': 'submitted'} >>> >>> # Check again after some time >>> status = ingestor.get_status() >>> print(status) {'doc1.pdf': 'completed', 'doc2.pdf': 'processing'}
Notes
This method is most useful when called after ingest_async() to track progress
If called before any jobs are submitted, returns an empty dictionary or documents with “unknown” status
The method accesses internal job state from the client, so it reflects the most current known state
- ingest(
- show_progress: bool = False,
- return_failures: bool = False,
- save_to_disk: bool = False,
- return_traces: bool = False,
- **kwargs: Any,
Ingest documents by submitting jobs and fetching results concurrently.
- Parameters:
show_progress (bool, optional) – Whether to display a progress bar. Default is False.
return_failures (bool, optional) – If True, return a tuple (results, failures); otherwise, return only results. Default is False.
save_to_disk (bool, optional) – If True, save results to disk and return LazyLoadedList proxies. Default is False.
return_traces (bool, optional) – If True, return trace metrics alongside results. Default is False. Traces contain timing metrics (entry, exit, resident_time) for each stage.
**kwargs (Any) – Additional keyword arguments for the underlying client methods. Optional flags include include_parent_trace_ids=True to also return parent job trace identifiers (V2 API only).
- Returns:
Returns vary based on flags: - Default: list of results - return_failures=True: (results, failures) - return_traces=True: (results, traces) - return_failures=True, return_traces=True: (results, failures, traces) - Additional combinations with include_parent_trace_ids kwarg
- Return type:
list or tuple
Notes
Trace metrics include timing data for each processing stage. For detailed usage and examples, see src/nv_ingest/api/v2/README.md
- ingest_async(
- *,
- return_failures: bool = False,
- return_traces: bool = False,
- **kwargs: Any,
Asynchronously submits jobs and returns a single future that completes when all jobs have finished.
The return type of the future’s result is dynamic and mirrors the behavior of the synchronous ingest() method, controlled by the return_failures and return_traces flags. If a VDB upload is configured, the future will complete after the VDB upload finishes.
- Parameters:
return_failures (bool, optional) – If True, return a tuple containing failures; otherwise, only return results. Default is False.
return_traces (bool, optional) – If True, return trace metrics alongside results. Default is False.
kwargs (dict) – Additional parameters passed to the concurrent processor. Optional flags include include_parent_trace_ids=True to also return parent job trace identifiers (V2 API only).
- Returns:
A future that completes when all jobs and any subsequent VDB upload have finished. Its result will be one of the following: - Default: list of results - return_failures=True: (results, failures) - return_traces=True: (results, traces) - return_failures=True, return_traces=True: (results, failures, traces)
- Return type:
Future[Union[List[Any], Tuple[Any, …]]]
- load(**kwargs) Ingestor[source]#
Ensure all document files are accessible locally, downloading if necessary.
For each document in _documents, checks if the file exists locally. If not, attempts to download the file to a temporary directory using fsspec. Updates _documents with paths to local copies, initializes _job_specs, and sets _all_local to True upon successful loading.
- Parameters:
kwargs (dict) – Additional keyword arguments for remote file access via fsspec.
- Returns:
Returns self for chaining after ensuring all files are accessible locally.
- Return type:
- pdf_split_config(
- pages_per_chunk: int = 32,
Configure PDF splitting behavior for V2 API.
- Parameters:
pages_per_chunk (int, optional) – Number of pages per PDF chunk (default: 32) Server enforces boundaries: min=1, max=128
- Returns:
Self for method chaining
- Return type:
Notes
Only affects V2 API endpoints with PDF splitting support
Server will clamp values outside [1, 128] range
Smaller chunks = more parallelism but more overhead
Larger chunks = less overhead but reduced concurrency
- remaining_jobs() int[source]#
Counts the jobs that are not in a terminal state.
- Returns:
Number of jobs that are neither completed, failed, nor cancelled.
- Return type:
int
- save_to_disk(
- output_directory: str | None = None,
- cleanup: bool = True,
- compression: str | None = 'gzip',
Configures the Ingestor to save results to disk instead of memory.
This method enables disk-based storage for ingestion results. When called, the ingest() method will write the output for each processed document to a separate JSONL file. The return value of ingest() will be a list of LazyLoadedList objects, which are memory-efficient proxies to these files.
The output directory can be specified directly, via an environment variable, or a temporary directory will be created automatically.
- Parameters:
output_directory (str, optional) – The path to the directory where result files (.jsonl) will be saved. If not provided, it defaults to the value of the environment variable NV_INGEST_CLIENT_SAVE_TO_DISK_OUTPUT_DIRECTORY. If the environment variable is also not set, a temporary directory will be created. Defaults to None.
cleanup (bool, optional)) – If True, the entire output_directory will be recursively deleted when the Ingestor’s context is exited (i.e., when used in a with statement). Defaults to True.
compression (str, optional) – The compression algorithm to use for the saved result files. Currently, the only supported value is ‘gzip’. To disable compression, set this parameter to None. Defaults to ‘gzip’, which significantly reduces the disk space required for results. When enabled, files are saved with a .gz suffix (e.g., results.jsonl.gz).
- Returns:
Returns self for chaining.
- Return type:
- split(
- **kwargs: Any,
Adds a SplitTask to the batch job specification.
- Parameters:
kwargs (dict) – Parameters specific to the SplitTask.
- Returns:
Returns self for chaining.
- Return type:
- store(
- **kwargs: Any,
Adds a StoreTask to the batch job specification.
- Parameters:
kwargs (dict) – Parameters specific to the StoreTask.
- Returns:
Returns self for chaining.
- Return type:
- store_embed(
- **kwargs: Any,
Adds a StoreEmbedTask to the batch job specification.
- Parameters:
kwargs (dict) – Parameters specific to the StoreEmbedTask.
- Returns:
Returns self for chaining.
- Return type:
- udf(
- udf_function: str,
- udf_function_name: str | None = None,
- phase: PipelinePhase | int | str | None = None,
- target_stage: str | None = None,
- run_before: bool = False,
- run_after: bool = False,
Adds a UDFTask to the batch job specification.
- Parameters:
udf_function (str) – UDF specification. Supports three formats: 1. Inline function: ‘def my_func(control_message): …’ 2. Import path: ‘my_module.my_function’ 3. File path: ‘/path/to/file.py:function_name’
udf_function_name (str, optional) – Name of the function to execute from the UDF specification. If not provided, attempts to infer from udf_function.
phase (Union[PipelinePhase, int, str], optional) – Pipeline phase to execute UDF. Accepts phase names (‘extract’, ‘split’, ‘embed’, ‘response’) or numbers (1-4). Cannot be used with target_stage.
target_stage (str, optional) – Specific stage name to target for UDF execution. Cannot be used with phase.
run_before (bool, optional) – If True and target_stage is specified, run UDF before the target stage. Default: False.
run_after (bool, optional) – If True and target_stage is specified, run UDF after the target stage. Default: False.
- Returns:
Returns self for chaining.
- Return type:
- Raises:
ValueError – If udf_function_name cannot be inferred and is not provided explicitly, or if both phase and target_stage are specified, or if neither is specified.
- vdb_upload(
- purge_results_after_upload: bool = True,
- **kwargs: Any,
Adds a VdbUploadTask to the batch job specification.
- Parameters:
purge_results_after_upload (bool, optional) – If True, the saved result files will be deleted from disk after a successful upload. This requires save_to_disk() to be active. Defaults to True
kwargs (dict) – Parameters specific to the VdbUploadTask.
- Returns:
Returns self for chaining.
- Return type:
- class nv_ingest_client.client.LazyLoadedList(
- filepath: str,
- expected_len: int | None = None,
- compression: str | None = None,
Bases:
Sequence
- class nv_ingest_client.client.NvIngestClient(
- message_client_allocator: ~typing.Type[~nv_ingest_api.util.service_clients.client_base.MessageBrokerClientBase] = <class 'nv_ingest_api.util.service_clients.rest.rest_client.RestClient'>,
- message_client_hostname: str | None = 'localhost',
- message_client_port: int | None = 7670,
- message_client_kwargs: ~typing.Dict[str,
- ~typing.Any] | None = None,
- msg_counter_id: str | None = 'nv-ingest-message-id',
- worker_pool_size: int = 8,
Bases:
objectA client class for interacting with the nv-ingest service, supporting custom client allocators.
- add_job(
- job_spec: BatchJobSpec | JobSpec,
Add one or more jobs to the client for later processing.
- Parameters:
job_spec (JobSpec or BatchJobSpec) – A single job specification or a batch containing multiple specs.
- Returns:
The job index for a single spec, or a list of indices for a batch.
- Return type:
str or list of str
- Raises:
ValueError – If an unsupported type is provided.
- add_task(
- job_index: str,
- task: Task,
Attach an existing Task object to a pending job.
- Parameters:
job_index (str) – The client-side identifier of the target job.
task (Task) – The task instance to add.
- consume_completed_parent_trace_ids() List[str][source]#
Return and clear the set of completed parent trace identifiers.
- create_job(
- payload: Dict[str, Any],
- source_id: str,
- source_name: str,
- document_type: str | None = None,
- tasks: List[Task] | None = None,
- extended_options: Dict[str, Any] | None = None,
Construct and register a new job from provided metadata.
- Parameters:
payload (dict) – The data payload for the job.
source_id (str) – Identifier of the data source.
source_name (str) – Human-readable name for the source.
document_type (str, optional) – Type of document (inferred from source_name if omitted).
tasks (list of Task, optional) – Initial set of processing tasks to attach.
extended_options (dict, optional) – Extra parameters for advanced configuration.
- Returns:
The client-side job index.
- Return type:
str
- Raises:
ValueError – If job creation parameters are invalid.
- create_jobs_for_batch(
- files_batch: List[str],
- tasks: Dict[str, Any],
- pdf_split_page_count: int = None,
Create and submit job specifications (JobSpecs) for a batch of files, returning the job IDs. This function takes a batch of files, processes each file to extract its content and type, creates a job specification (JobSpec) for each file, and adds tasks from the provided task list. It then submits the jobs to the client and collects their job IDs.
- Parameters:
files_batch (List[str]) – A list of file paths to be processed. Each file is assumed to be in a format compatible with the extract_file_content function, which extracts the file’s content and type.
tasks (Dict[str, Any]) – A dictionary of tasks to be added to each job. The keys represent task names, and the values represent task specifications or configurations. Standard tasks include “split”, “extract”, “store”, “caption”, “dedup”, “filter”, “embed”.
pdf_split_page_count (int, optional) – Number of pages per PDF chunk for splitting (1-128). If provided, this will be added to the job spec’s extended_options for PDF files.
- Returns:
A Tuple containing the list of JobSpecs and list of job IDs corresponding to the submitted jobs. Each job ID is returned by the client’s add_job method.
- Return type:
Tuple[List[JobSpec], List[str]]
- Raises:
ValueError – If there is an error extracting the file content or type from any of the files, a ValueError will be logged, and the corresponding file will be skipped.
Notes
The function assumes that a utility function extract_file_content is defined elsewhere, which extracts the content and type from the provided file paths.
For each file, a JobSpec is created with relevant metadata, including document type and file content. Various tasks are conditionally added based on the provided tasks dictionary.
The job specification includes tracing options with a timestamp (in nanoseconds) for diagnostic purposes.
Examples
Suppose you have a batch of files and tasks to process: >>> files_batch = [“file1.txt”, “file2.pdf”] >>> tasks = {“split”: …, “extract_txt”: …, “store”: …} >>> client = NvIngestClient() >>> job_ids = client.create_job_specs_for_batch(files_batch, tasks) >>> print(job_ids) [‘job_12345’, ‘job_67890’]
In this example, jobs are created and submitted for the files in files_batch, with the tasks in tasks being added to each job specification. The returned job IDs are then printed.
See also
create_job_specs_for_batchFunction that creates job specifications for a batch of files.
JobSpecThe class representing a job specification.
- create_task(
- job_index: str | int,
- task_type: TaskType,
- task_params: Dict[str, Any] | None = None,
Create and attach a new task to a pending job by type and parameters.
- Parameters:
job_index (str or int) – Identifier of the job to modify.
task_type (TaskType) – Enum specifying the kind of task to create.
task_params (dict, optional) – Parameters for the new task.
- Raises:
ValueError – If the job does not exist or is not pending.
- fetch_job_result(
- job_ids: str | List[str],
- timeout: float = 100,
- max_retries: int | None = None,
- retry_delay: float = 1,
- verbose: bool = False,
- completion_callback: Callable[[Dict, str], None] | None = None,
- return_failures: bool = False,
Fetches job results for multiple job IDs concurrently with individual timeouts and retry logic.
- Parameters:
job_ids (Union[str, List[str]]) – A job ID or list of job IDs to fetch results for.
timeout (float) – Timeout for each fetch operation, in seconds.
max_retries (Optional[int]) – Maximum number of retries for jobs that are not ready yet.
retry_delay (float) – Delay between retry attempts, in seconds.
verbose (bool) – If True, logs additional information.
completion_callback (Optional[Callable[[Dict, str], None]]) – A callback function that is executed each time a job result is successfully fetched. It receives two arguments: the job result (a dict) and the job ID.
return_failures (bool) – If True, returns a separate list of failed jobs.
- Returns:
- List[Tuple[Optional[Dict], str]]
A list of tuples, each containing the job result (or None on failure) and the job ID.
If return_failures=True: Tuple[List[Tuple[Optional[Dict], str]], List[Tuple[str, str]]] - A tuple of:
List of successful job results.
List of failures containing job ID and error message.
- Return type:
If return_failures=False
- Raises:
ValueError – If there is an error in decoding the job result.
TimeoutError – If the fetch operation times out.
Exception – For all other unexpected issues.
- fetch_job_result_async(
- job_ids: str | List[str],
- data_only: bool = True,
- timeout: Tuple[int, float | None] | None = None,
Fetches job results for a list or a single job ID asynchronously and returns a mapping of futures to job IDs.
- Parameters:
job_ids (Union[str, List[str]]) – A single job ID or a list of job IDs.
timeout (float) – Timeout (connect, read) for fetching each job result, in seconds.
data_only (bool) – Whether to return only the data part of the job result.
- Returns:
A dictionary mapping each future to its corresponding job ID.
- Return type:
Dict[Future, str]
- fetch_job_result_cli(
- job_ids: str | List[str],
- data_only: bool = False,
- timeout: Tuple[int, float | None] | None = None,
Fetch job results via CLI semantics (synchronous list return).
- Parameters:
job_ids (str or list of str) – Single or multiple client-side job identifiers.
data_only (bool, optional) – If True, extract only the ‘data’ field. Default is False.
- Returns:
List of tuples for each fetched job.
- Return type:
list of (result_data, job_index, trace_id)
- job_count() int[source]#
Get the number of jobs currently tracked by the client.
- Returns:
The total count of jobs in internal state tracking.
- Return type:
int
- process_jobs_concurrently(
- job_indices: str | List[str],
- job_queue_id: str | None = None,
- batch_size: int | None = None,
- concurrency_limit: int = 64,
- timeout: int = 100,
- max_job_retries: int | None = None,
- retry_delay: float = 0.5,
- initial_fetch_delay: float = 0.3,
- fail_on_submit_error: bool = False,
- completion_callback: Callable[[Any, str], None] | None = None,
- return_failures: bool = False,
- data_only: bool = True,
- stream_to_callback_only: bool = False,
- return_full_response: bool = False,
- verbose: bool = False,
- return_traces: bool = False,
Submit and fetch multiple jobs concurrently.
- Parameters:
job_indices (str or list of str) – Single or multiple job indices to process.
job_queue_id (str, optional) – Queue identifier for submission.
batch_size (int, optional) – Maximum number of jobs to process in each internal batch. Higher values may improve throughput but increase memory usage. Must be >= 1. Default is 32.
concurrency_limit (int, optional) – Max number of simultaneous in-flight jobs. Default is 64.
timeout (int, optional) – Timeout in seconds per fetch attempt. Default is 100.
max_job_retries (int, optional) – Max retries for ‘not ready’ jobs. None for infinite. Default is None.
retry_delay (float, optional) – Delay in seconds between retry cycles. Default is 5.0.
fail_on_submit_error (bool, optional) – If True, abort on submission error. Default is False.
completion_callback (callable, optional) – Called on each successful fetch as (result_data, job_index).
return_failures (bool, optional) – If True, return (results, failures). Default is False.
data_only (bool, optional) – If True, return only payload ‘data’. Default is True.
return_full_response (bool, optional) – If True, results contain the full response envelopes (including ‘trace’ and ‘annotations’). Ignored when stream_to_callback_only=True. Default is False.
verbose (bool, optional) – If True, enable debug logging. Default is False.
return_traces (bool, optional) – If True, parent-level aggregated trace metrics are extracted and returned. Default is False.
- Returns:
results (list) – List of successful job results when return_failures is False.
results, failures (tuple) – Tuple of (successful results, failure tuples) when return_failures is True.
results, failures, traces (tuple) – Tuple of (successful results, failure tuples, trace dicts) when both return_failures and return_traces are True.
- Raises:
RuntimeError – If fail_on_submit_error is True and a submission fails.
- process_jobs_concurrently_async(
- job_indices: str | List[str],
- job_queue_id: str | None = None,
- batch_size: int | None = None,
- timeout: int = 100,
- max_job_retries: int | None = None,
- retry_delay: float = 0.5,
- initial_fetch_delay: float = 0.3,
- fail_on_submit_error: bool = False,
- completion_callback: Callable[[Any, str], None] | None = None,
- stream_to_callback_only: bool = False,
- return_full_response: bool = False,
- verbose: bool = False,
- return_traces: bool = False,
Submit and fetch multiple jobs concurrently and asynchronously.
This method initializes the processing and returns a Future immediately. The Future will resolve with a fixed 3-part tuple (results, failures, traces) once all jobs have completed.
Parameters are identical to process_jobs_concurrently.
- Returns:
A future that completes when all jobs are done. Its result is a tuple containing (successful_results, failures, traces).
- Return type:
Future[Tuple[List[Any], List[Tuple[str, str]], List[Optional[Dict[str, Any]]]]]
- register_parent_trace_id(trace_id: str | None) None[source]#
Record a parent trace identifier once its aggregation completed.
- submit_job(
- job_indices: str | List[str],
- job_queue_id: str,
- batch_size: int = 10,
Submit one or more jobs in batches.
- Parameters:
job_indices (str or list of str) – Job indices to submit.
job_queue_id (str) – Queue identifier for submission.
batch_size (int, optional) – Maximum number of jobs per batch. Default is 10.
- Returns:
Trace identifiers for each submitted job.
- Return type:
list of str
- Raises:
Exception – Propagates first error if any job in a batch fails.
- submit_job_async(
- job_indices: str | List[str],
- job_queue_id: str,
Asynchronously submits one or more jobs to a specified job queue using a thread pool. This method handles both single job ID or a list of job IDs.
- Parameters:
job_indices (Union[str, List[str]]) – A single job ID or a list of job IDs to be submitted.
job_queue_id (str) – The ID of the job queue where the jobs will be submitted.
- Returns:
A dictionary mapping futures to their respective job IDs for later retrieval of outcomes.
- Return type:
Dict[Future, str]
Notes
This method queues the jobs for asynchronous submission and returns a mapping of futures to job IDs.
It does not wait for any of the jobs to complete.
Ensure that each job is in the proper state before submission.