nv_ingest_client.util package#
Subpackages#
- nv_ingest_client.util.file_processing package
- nv_ingest_client.util.vdb package
- Submodules
- nv_ingest_client.util.vdb.adt_vdb module
- nv_ingest_client.util.vdb.lancedb module
- nv_ingest_client.util.vdb.milvus module
Milvusadd_metadata()bulk_insert_milvus()cleanup_records()create_bm25_model()create_collection()create_meta_collection()create_nvingest_collection()create_nvingest_index_params()create_nvingest_meta_schema()create_nvingest_schema()dense_retrieval()embed_index_collection()get_embeddings()grab_meta_collection_info()hybrid_retrieval()log_new_meta_collection()nv_rerank()nvingest_retrieval()pandas_file_reader()pull_all_milvus()reconstruct_pages()recreate_elements()reindex_collection()remove_records()stream_insert_milvus()verify_embedding()wait_for_index()write_meta_collection()write_records_minio()write_to_nvingest_collection()
- nv_ingest_client.util.vdb.opensearch module
- Module contents
Submodules#
nv_ingest_client.util.dataset module#
- nv_ingest_client.util.dataset.get_dataset_files(
- dataset_bytes: BytesIO,
- shuffle: bool = False,
Extracts and optionally shuffles the list of files contained in a dataset.
- Parameters:
dataset_bytes (BytesIO) – The BytesIO object containing the dataset in JSON format.
shuffle (bool, optional) – Whether to shuffle the list of files before returning. Defaults to False.
- Returns:
The list of files from the dataset, possibly shuffled.
- Return type:
list
- nv_ingest_client.util.dataset.get_dataset_statistics(dataset_bytes: BytesIO) str[source]#
Reads a dataset specification from a BytesIO object, computes statistics about the dataset, and returns a formatted string.
- Parameters:
dataset_bytes (BytesIO) – The BytesIO object containing the dataset in JSON format.
- Returns:
A formatted string containing statistics about the dataset.
- Return type:
str
nv_ingest_client.util.document_analysis module#
Utility functions for analyzing document-level chunk composition from nv-ingest results.
This module provides analysis capabilities for understanding the distribution and types of extracted content elements across individual documents. It enables customers to gain visibility into their document composition for performance optimization and capacity planning decisions.
- nv_ingest_client.util.document_analysis.analyze_document_chunks(
- results: List[List[Dict[str, Any]]] | List[Dict[str, Any]],
Analyze ingestor results to count elements by type and page for each document.
This function processes results from nv-ingest ingestion and provides a per-document, per-page breakdown of extracted content types, enabling customers to understand document composition and page-level distribution for optimization and planning purposes.
- Parameters:
results (Union[List[List[Dict[str, Any]]], List[Dict[str, Any]]]) – Ingestor results from ingestor.ingest() in standard List[List[Dict]] format, or flattened List[Dict] format. Handles both regular lists and LazyLoadedList objects automatically.
- Returns:
Dictionary mapping document names to page-level element type counts with structure: {
- ”document1.pdf”: {
- “total”: {
“text”: 7, “charts”: 1, “tables”: 1, “unstructured_images”: 0, “infographics”: 0, “page_images”: 0
}, “1”: {
”text”: 3, “charts”: 1, “tables”: 0, “unstructured_images”: 0, “infographics”: 0, “page_images”: 0
}, “2”: {
”text”: 4, “charts”: 0, “tables”: 1, “unstructured_images”: 0, “infographics”: 0, “page_images”: 0
}
}, “document2.pdf”: {…}
}
- Return type:
Dict[str, Dict[str, Dict[str, int]]]
Notes
Requires purge_results_after_upload=False in vdb_upload() configuration
Automatically handles LazyLoadedList objects from nv-ingest client
Returns zero counts for missing element types
Assumes valid nv-ingest output format with guaranteed metadata structure
Examples
>>> from nv_ingest_client.util.document_analysis import analyze_document_chunks >>> >>> # After running ingestion >>> results, failures = ingestor.ingest(show_progress=True, return_failures=True) >>> >>> # Analyze document composition by page >>> breakdown = analyze_document_chunks(results) >>> >>> for doc_name, pages in breakdown.items(): ... total_counts = pages["total"] ... total_elements = sum(total_counts.values()) ... page_count = len(pages) - 1 # Subtract 1 for "total" key ... print(f"{doc_name}: {total_elements} elements across {page_count} pages") ... print(f" total: {total_elements} elements ({total_counts['text']} text, {total_counts['charts']} charts)") ... for page_name, counts in pages.items(): ... if page_name != "total": # Skip total when listing pages ... page_total = sum(counts.values()) ... print( f" page {page_name}: {page_total} elements " f"({counts['text']} text, {counts['charts']} charts)" )
nv_ingest_client.util.image_disk_utils module#
Utility functions for saving images from ingestion results to disk as actual image files.
This module provides comprehensive utilities for extracting and saving base64-encoded images from nv-ingest results to local filesystem. Features include: - Configurable filtering by image type (charts, tables, infographics, etc.) - Descriptive filename generation with source and page information - Organized directory structure by image type - Detailed image counting and statistics
Typical use cases: - Debugging and visual inspection of extracted content - Quality assessment of image extraction pipeline
- nv_ingest_client.util.image_disk_utils.save_images_from_ingestor_results(
- results: List[List[Dict[str, Any]]],
- output_directory: str,
- **kwargs,
Save images from Ingestor.ingest() results.
- Parameters:
results (List[List[Dict[str, Any]]]) – Results from Ingestor.ingest(), where each inner list contains document results for one source file. Can also handle LazyLoadedList objects when save_to_disk=True is used.
output_directory (str) – Directory where images will be saved.
**kwargs – Additional arguments passed to save_images_to_disk(). Includes output_format (“auto”, “png”, or “jpeg”) and other filtering options.
- Returns:
Dictionary with counts of images saved by type.
- Return type:
Dict[str, int]
- nv_ingest_client.util.image_disk_utils.save_images_from_response(
- response: Dict[str, Any],
- output_directory: str,
- **kwargs,
Convenience function to save images from a full API response.
- Parameters:
response (Dict[str, Any]) – Full API response containing a “data” field with document results.
output_directory (str) – Directory where images will be saved.
**kwargs – Additional arguments passed to save_images_to_disk(). Includes output_format (“auto”, “png”, or “jpeg”) and other filtering options.
- Returns:
Dictionary with counts of images saved by type.
- Return type:
Dict[str, int]
- nv_ingest_client.util.image_disk_utils.save_images_to_disk(
- response_data: List[Dict[str, Any]],
- output_directory: str,
- save_charts: bool = True,
- save_tables: bool = True,
- save_infographics: bool = True,
- save_page_images: bool = False,
- save_raw_images: bool = False,
- organize_by_type: bool = True,
- output_format: str = 'auto',
Save base64-encoded images from ingestion results to disk as actual image files.
This utility extracts images from ingestion response data and saves them to disk with descriptive filenames that include the image subtype and page information. It provides granular control over which types of images to save.
- Parameters:
response_data (List[Dict[str, Any]]) – List of document results from ingestion, each containing metadata with base64 images.
output_directory (str) – Base directory where images will be saved.
save_charts (bool, optional) – Whether to save chart images. Default is True.
save_tables (bool, optional) – Whether to save table images. Default is True.
save_infographics (bool, optional) – Whether to save infographic images. Default is True.
save_page_images (bool, optional) – Whether to save page-as-image files. Default is False.
save_raw_images (bool, optional) – Whether to save raw/natural images. Default is False.
organize_by_type (bool, optional) – Whether to organize images into subdirectories by type. Default is True.
output_format (str, optional) – Output image format for saved files. Default is “auto”. - “auto”: Preserve original format (fastest, no conversion) - “jpeg”: Convert to JPEG (smaller files, good compression) - “png”: Convert to PNG (lossless quality) Use “auto” for maximum speed by avoiding format conversion.
- Returns:
Dictionary with counts of images saved by type.
- Return type:
Dict[str, int]
- Raises:
ValueError – If output_format is not supported.
Examples
>>> from nv_ingest_client.util.image_disk_utils import save_images_to_disk >>> >>> # Save only charts and tables >>> counts = save_images_to_disk( ... response_data, ... "./output/images", ... save_charts=True, ... save_tables=True, ... save_page_images=False ... ) >>> print(f"Saved {counts['chart']} charts and {counts['table']} tables")
nv_ingest_client.util.milvus module#
nv_ingest_client.util.process_json_files module#
nv_ingest_client.util.processing module#
- exception nv_ingest_client.util.processing.IngestJobFailure(
- message: str,
- description: str,
- annotations: Dict[str, Any],
Bases:
ExceptionCustom exception to handle failed job ingestion results.
- nv_ingest_client.util.processing.check_schema(
- schema: Type[BaseModel],
- options: dict,
- task_id: str,
- original_str: str,
Validates the provided options against the given schema and returns a schema instance.
- Parameters:
schema (Type[BaseModel]) – A Pydantic model class used for validating the options.
options (dict) – The options dictionary to validate.
task_id (str) – The identifier of the task associated with the options.
original_str (str) – The original JSON string representation of the options.
- Returns:
An instance of the validated schema populated with the provided options.
- Return type:
BaseModel
- Raises:
ValueError – If validation fails, a ValueError is raised with a formatted error message highlighting the problematic parts of the original JSON string.
- nv_ingest_client.util.processing.format_validation_error(
- e: ValidationError,
- task_id: str,
- original_str: str,
Formats validation errors with appropriate highlights and returns a detailed error message.
- Parameters:
e (ValidationError) – The validation error raised by the schema.
task_id (str) – The identifier of the task for which the error occurred.
original_str (str) – The original JSON string that caused the validation error.
- Returns:
A detailed error message with highlighted problematic fields.
- Return type:
str
- nv_ingest_client.util.processing.handle_future_result(
- future: Future,
- timeout=10,
Handle the result of a completed future job and process annotations.
This function processes the result of a future, extracts annotations (if any), logs them, and checks the validity of the ingest result. If the result indicates a failure, a RuntimeError is raised with a description of the failure.
- Parameters:
future (concurrent.futures.Future) – A future object representing an asynchronous job. The result of this job will be processed once it completes.
timeout (Optional[int], default=None) – Maximum time to wait for the future result before timing out.
- Returns:
The result of the job as a dictionary, after processing and validation.
The trace_id returned by the submission endpoint
- Return type:
Tuple[Dict[str, Any], str]
- Raises:
IngestJobFailure – If the job result is invalid, this exception is raised with the failure description and the full result for further inspection.
Exception – For all other unexpected errors.
Notes
The future.result() is assumed to return a tuple where the first element is the actual result (as a dictionary), and the second element (if present) can be ignored.
Annotations in the result (if any) are logged for debugging purposes.
The check_ingest_result function (assumed to be defined elsewhere) is used to validate the result. If the result is invalid, a RuntimeError is raised.
Examples
Suppose we have a future object representing a job, a dictionary of futures to job IDs, and a directory for saving results:
>>> future = concurrent.futures.Future() >>> result, trace_id = handle_future_result(future, timeout=60)
In this example, the function processes the completed job and returns the result dictionary. If the job fails, it raises a RuntimeError.
See also
check_ingest_resultFunction to validate the result of the job.
- nv_ingest_client.util.processing.highlight_error_in_original(
- original_str: str,
- task_name: str,
- error_detail: Dict[str, Any],
Highlights the error-causing text in the original JSON string based on the error type.
This function identifies the problematic portion of the JSON string by inspecting the provided error details. For errors due to extra fields, it highlights the extra field (using blue and bold formatting). For errors due to missing fields or insufficient string length, it appends a clear message indicating the missing field and its location.
- Parameters:
original_str (str) – The original JSON string that caused the error. This string will be modified to highlight the problematic field.
task_name (str) – The name of the task associated with the error. This is used in the error message when highlighting missing fields.
error_detail (Dict[str, Any]) –
A dictionary containing details about the error. Expected keys are: - ‘type’ (str): The type of error (e.g., “value_error.extra”, “value_error.missing”,
”value_error.any_str.min_length”).
’loc’ (List[Any]): A list representing the path to the error-causing field in the JSON structure.
- Returns:
The modified JSON string with the error-causing field highlighted or a message appended indicating the missing field.
- Return type:
str
Notes
The function uses the style function to apply formatting to the error-causing text.
If the error detail does not include the expected keys, a fallback is used and the original string is returned.
nv_ingest_client.util.system module#
- nv_ingest_client.util.system.ensure_directory_with_permissions(directory_path: str)[source]#
Ensures that a directory exists and the current user has read/write permissions. If the directory does not exist, attempts to create it after checking the parent directory for write permission.
- Parameters:
directory_path (str) – The path to the directory to check or create.
- Returns:
True if the directory exists and has the correct permissions, or if it was successfully created. False if the directory cannot be created or does not have the correct permissions.
- Return type:
bool
- nv_ingest_client.util.system.has_permissions(
- path: str,
- read: bool = False,
- write: bool = False,
Checks if the current user has specified permissions on a path.
- Parameters:
path (str) – The filesystem path to check permissions on.
read (bool, optional) – Whether to check for read permission.
write (bool, optional) – Whether to check for write permission.
- Returns:
True if the path has the specified permissions, False otherwise.
- Return type:
bool
nv_ingest_client.util.transport module#
- nv_ingest_client.util.transport.infer_microservice(
- data,
- model_name: str | None = None,
- embedding_endpoint: str | None = None,
- nvidia_api_key: str | None = None,
- input_type: str = 'passage',
- truncate: str = 'END',
- batch_size: int = 8191,
- grpc: bool = False,
- input_names: list = ['text'],
- output_names: list = ['embeddings'],
- dtypes: list = ['BYTES'],
This function takes the input data and creates a list of embeddings using the NVIDIA embedding microservice.
nv_ingest_client.util.util module#
- nv_ingest_client.util.util.apply_pdf_split_config_to_job_specs(
- job_specs: List[JobSpec],
- pages_per_chunk: int,
Apply PDF split configuration to a list of JobSpec objects.
Modifies job specs in-place by adding pdf_config to extended_options for PDF files only.
- Parameters:
job_specs (List[JobSpec]) – List of job specifications to potentially modify
pages_per_chunk (int) – Number of pages per PDF chunk (will be stored as-is; server performs clamping)
Notes
Only modifies job specs with document_type == “pdf” (case-insensitive)
Modifies job specs in-place
Safe to call on mixed document types (only PDFs are affected)
- nv_ingest_client.util.util.balanced_groups_flat_order(
- file_paths,
- group_size=16,
- weight_fn=<function getsize>,
- nv_ingest_client.util.util.check_ingest_result(
- json_payload: Dict,
Check the ingest result to determine if the process failed and extract a description.
This function examines the provided JSON payload to check whether the ingest operation has failed. If it has failed and failure annotations are present, the function augments the failure description with information about the specific event that caused the failure.
- Parameters:
json_payload (Dict) –
A dictionary containing the result of the ingest operation. The dictionary should have at least the following fields: - ‘status’ (str): The status of the ingest operation. A status of “failed” indicates
a failure.
’description’ (str): A textual description of the result.
’annotations’ (Dict): (optional) A dictionary of annotations, where each annotation may contain details about the failure.
- Returns:
A tuple containing: - A boolean indicating whether the ingest operation failed (True if it failed,
False otherwise).
A string containing the description of the result or the failure, augmented with details from the annotations if available.
- Return type:
Tuple[bool, str]
Notes
The function logs the ‘status’ and ‘description’ fields from the payload for debugging.
If the ‘status’ field contains “failed” and the ‘annotations’ field contains entries indicating failure, the function updates the description with the annotation details.
The function breaks after processing the first relevant annotation.
Examples
Suppose the JSON payload contains the following structure:
>>> json_payload = { ... "status": "failed", ... "description": "Ingest operation failed", ... "annotations": { ... "task1": {"task_result": "FAILURE", "message": "Network error"}, ... "task2": {"task_result": "SUCCESS"} ... } ... } >>> is_failed, description = check_ingest_result(json_payload) >>> print(is_failed) True >>> print(description) ↪ Event that caused this failure: task1 -> Network error
In this example, the function detects a failure and augments the description with the message from the failing task.
- nv_ingest_client.util.util.create_job_specs_for_batch(
- files_batch: List[str],
Create and job specifications (JobSpecs) for a batch of files. This function takes a batch of files, processes each file to extract its content and type, creates a job specification (JobSpec) for each file.
- Parameters:
files_batch (List[str]) – A list of file paths to be processed. Each file is assumed to be in a format compatible with the extract_file_content function, which extracts the file’s content and type.
- Returns:
A list of JobSpecs.
- Return type:
List[JobSpec]
- Raises:
ValueError – If there is an error extracting the file content or type from any of the files, a ValueError will be logged, and the corresponding file will be skipped.
Notes
The function assumes that a utility function extract_file_content is defined elsewhere, which extracts the content and type from the provided file paths.
For each file, a JobSpec is created with relevant metadata, including document type and file content.
The job specification includes tracing options with a timestamp (in nanoseconds) for diagnostic purposes.
Examples
Suppose you have a batch of files and tasks to process:
>>> files_batch = ["file1.txt", "file2.pdf"] >>> client = NvIngestClient() >>> job_specs = create_job_specs_for_batch(files_batch) >>> print(job_specs) [nv_ingest_client.primitives.jobs.job_spec.JobSpec object at 0x743acb468bb0>, <nv_ingest_client.primitives.jobs.job_spec.JobSpec object at 0x743acb469270>] # noqa: E501,W505
See also
extract_file_contentFunction that extracts the content and type of a file.
JobSpecThe class representing a job specification.
- nv_ingest_client.util.util.create_job_specs_for_buffers(
- buffers: List[Tuple[str, BytesIO]],
Create and job specifications (JobSpecs) for a list of buffers. This function takes a list of buffers, processes each buffer to extract its content and type, creates a job specification (JobSpec) for each buffer.
- Parameters:
buffers (List[Tuple[str, BytesIO]]) – A list of tuples containing the name of the buffer and the BytesIO object.
- Returns:
A list of JobSpecs.
- Return type:
List[JobSpec]
- nv_ingest_client.util.util.filter_function_kwargs(func, **kwargs)[source]#
Filters and returns keyword arguments that match the parameters of a given function.
This function inspects the signature of func and extracts any keyword arguments from kwargs that correspond to the function’s parameters. It returns a dictionary containing only those arguments that func accepts, allowing for safe, dynamic parameter passing.
- Parameters:
func (Callable) – The function whose parameters will be used to filter kwargs.
kwargs (dict) – A dictionary of keyword arguments, which may include extra keys not accepted by func.
- Returns:
A dictionary of keyword arguments filtered to include only those parameters accepted by func.
- Return type:
dict
Example
>>> def example_function(a, b, c): ... pass >>> filtered_kwargs = filter_function_kwargs(example_function, a={}, b={}, d={}) >>> print(filtered_kwargs) {'a': 1, 'b': 2}
- nv_ingest_client.util.util.generate_matching_files(file_sources)[source]#
Generates a list of file paths that match the given patterns specified in file_sources.
- Parameters:
file_sources (list of str) – A list containing the file source patterns to match against.
- Returns:
A generator yielding paths to files that match the specified patterns.
- Return type:
generator
Notes
This function utilizes glob pattern matching to find files that match the specified patterns. It yields each matching file path, allowing for efficient processing of potentially large sets of files.
- nv_ingest_client.util.util.get_content(results: List[any])[source]#
Extracts the text and table text content from the results of an NV-Ingest python client job
- Parameters:
results (List[Any]) – The results of NV-Ingest python client job that contains the desired text and table content
- Returns:
A dictionary containing the extracted text content and the extracted table content
- Return type:
Dict
- nv_ingest_client.util.util.load_data_from_path(path: str) Dict[source]#
Loads data from a specified file path, preparing it for processing.
- Parameters:
path (str) – The path to the file from which data should be loaded.
- Returns:
A dictionary containing keys ‘file_name’, ‘id’, ‘content’, and ‘document_type’, each of which maps to a list that includes the respective details for the processed file.
- Return type:
dict
- Raises:
FileNotFoundError – If the specified path does not exist.
ValueError – If the specified path is not a file.
Notes
This function is designed to load and prepare file data for further processing, packaging the loaded data along with metadata such as file name and document type.
nv_ingest_client.util.zipkin module#
- class nv_ingest_client.util.zipkin.AsyncZipkinClient(
- host: str,
- port: int,
- concurrent_requests: int,
- max_retries: int = 10,
- retry_delay: int = 5,
Bases:
object- async fetch(
- sem,
- trace_id: str,
- url: str,
Perform a GET request to the given URL with retry logic for 404 status codes.
- Parameters:
url – URL to make the GET request to.
sem – Semaphore to ensure only X concurrent requests are in flight
trace_id – The trace_id for the request
url – The complete URL for the request.
- Returns:
Dict[str, str] Containing trace_id and JSON str response
- Raises:
RuntimeError if the maximum retries are exceeded.