nv_ingest_client.util package#

Subpackages#

Submodules#

nv_ingest_client.util.dataset module#

nv_ingest_client.util.dataset.get_dataset_files(
dataset_bytes: BytesIO,
shuffle: bool = False,
) list[source]#

Extracts and optionally shuffles the list of files contained in a dataset.

Parameters:
  • dataset_bytes (BytesIO) – The BytesIO object containing the dataset in JSON format.

  • shuffle (bool, optional) – Whether to shuffle the list of files before returning. Defaults to False.

Returns:

The list of files from the dataset, possibly shuffled.

Return type:

list

nv_ingest_client.util.dataset.get_dataset_statistics(dataset_bytes: BytesIO) str[source]#

Reads a dataset specification from a BytesIO object, computes statistics about the dataset, and returns a formatted string.

Parameters:

dataset_bytes (BytesIO) – The BytesIO object containing the dataset in JSON format.

Returns:

A formatted string containing statistics about the dataset.

Return type:

str

nv_ingest_client.util.milvus module#

class nv_ingest_client.util.milvus.MilvusOperator(
collection_name: str | Dict = 'nv_ingest_collection',
milvus_uri: str = 'http://localhost:19530',
sparse: bool = False,
recreate: bool = True,
gpu_index: bool = True,
gpu_search: bool = True,
dense_dim: int = 2048,
minio_endpoint: str = 'localhost:9000',
enable_text: bool = True,
enable_charts: bool = True,
enable_tables: bool = True,
enable_images: bool = True,
enable_infographics: bool = True,
bm25_save_path: str = 'bm25_model.json',
compute_bm25_stats: bool = True,
access_key: str = 'minioadmin',
secret_key: str = 'minioadmin',
bucket_name: str = 'a-bucket',
**kwargs,
)[source]#

Bases: object

get_connection_params()[source]#
get_write_params()[source]#
run(records)[source]#
nv_ingest_client.util.milvus.bulk_insert_milvus(
collection_name: str,
writer: RemoteBulkWriter,
milvus_uri: str = 'http://localhost:19530',
)[source]#

This function initialize the bulk ingest of all minio uploaded records, and checks for milvus task completion. Once the function is complete all records have been uploaded to the milvus collection.

Parameters:
  • collection_name (str) – Name of the milvus collection.

  • writer (RemoteBulkWriter) – The Milvus Remote BulkWriter instance that was created with necessary params to access the minio instance corresponding to milvus.

  • milvus_uri (str,) – Milvus address with http(s) preffix and port. Can also be a file path, to activate milvus-lite.

nv_ingest_client.util.milvus.create_bm25_model(
records,
enable_text: bool = True,
enable_charts: bool = True,
enable_tables: bool = True,
enable_images: bool = True,
enable_infographics: bool = True,
enable_audio: bool = True,
) BM25EmbeddingFunction[source]#

This function takes the input records and creates a corpus, factoring in filters (i.e. texts, charts, tables) and fits a BM25 model with that information. If the user sets the log level to info, any time a record fails ingestion, it will be reported to the user.

Parameters:
  • records (List) – List of chunks with attached metadata

  • enable_text (bool, optional) – When true, ensure all text type records are used.

  • enable_charts (bool, optional) – When true, ensure all chart type records are used.

  • enable_tables (bool, optional) – When true, ensure all table type records are used.

  • enable_images (bool, optional) – When true, ensure all image type records are used.

  • enable_infographics (bool, optional) – When true, ensure all infographic type records are used.

  • enable_audio (bool, optional) – When true, ensure all audio transcript type records are used.

Returns:

Returns the model fitted to the selected corpus.

Return type:

BM25EmbeddingFunction

nv_ingest_client.util.milvus.create_collection(
client: MilvusClient,
collection_name: str,
schema: CollectionSchema,
index_params: IndexParams | None = None,
recreate=True,
)[source]#

Creates a milvus collection with the supplied name and schema. Within that collection, this function ensures that the desired indexes are created based on the IndexParams supplied.

Parameters:
  • client (MilvusClient) – Client connected to mivlus instance.

  • collection_name (str) – Name of the collection to be created.

  • schema (CollectionSchema,) – Schema that identifies the fields of data that will be available in the collection.

  • index_params (IndexParams, optional) – The parameters used to create the index(es) for the associated collection fields.

  • recreate (bool, optional) – If true, and the collection is detected, it will be dropped before being created again with the provided information (schema, index_params).

nv_ingest_client.util.milvus.create_meta_collection(
schema: CollectionSchema,
milvus_uri: str = 'http://localhost:19530',
collection_name: str = 'meta',
recreate=False,
)[source]#
nv_ingest_client.util.milvus.create_nvingest_collection(
collection_name: str,
milvus_uri: str = 'http://localhost:19530',
sparse: bool = False,
recreate: bool = True,
gpu_index: bool = True,
gpu_search: bool = True,
dense_dim: int = 2048,
recreate_meta: bool = False,
) CollectionSchema[source]#

Creates a milvus collection with an nv-ingest compatible schema under the target name.

Parameters:
  • collection_name (str) – Name of the collection to be created.

  • milvus_uri (str,) – Milvus address with http(s) preffix and port. Can also be a file path, to activate milvus-lite.

  • sparse (bool, optional) – When set to true, this adds a Sparse index to the IndexParams, usually activated for hybrid search.

  • recreate (bool, optional) – If true, and the collection is detected, it will be dropped before being created again with the provided information (schema, index_params).

  • gpu_cagra (bool, optional) – If true, creates a GPU_CAGRA index for dense embeddings.

  • dense_dim (int, optional) – Sets the dimension size for the dense embedding in the milvus schema.

Returns:

Returns a milvus collection schema, that represents the fields in the created collection.

Return type:

CollectionSchema

nv_ingest_client.util.milvus.create_nvingest_index_params(
sparse: bool = False,
gpu_index: bool = True,
gpu_search: bool = True,
local_index: bool = True,
) IndexParams[source]#

Creates index params necessary to create an index for a collection. At a minimum, this function will create a dense embedding index but can also create a sparse embedding index (BM25) for hybrid search.

Parameters:
  • sparse (bool, optional) – When set to true, this adds a Sparse index to the IndexParams, usually activated for hybrid search.

  • gpu_index (bool, optional) – When set to true, creates an index on the GPU. The index is GPU_CAGRA.

  • gpu_search (bool, optional) – When set to true, if using a gpu index, the search will be conducted using the GPU. Otherwise the search will be conducted on the CPU (index will be turned into HNSW).

Returns:

Returns index params setup for a dense embedding index and if specified, a sparse embedding index.

Return type:

IndexParams

nv_ingest_client.util.milvus.create_nvingest_meta_schema()[source]#
nv_ingest_client.util.milvus.create_nvingest_schema(
dense_dim: int = 1024,
sparse: bool = False,
local_index: bool = False,
) CollectionSchema[source]#

Creates a schema for the nv-ingest produced data. This is currently setup to follow the default expected schema fields in nv-ingest. You can see more about the declared fields in the nv_ingest.schemas.vdb_task_sink_schema.build_default_milvus_config function. This schema should have the fields declared in that function, at a minimum. To ensure proper data propagation to milvus.

Parameters:
  • dense_dim (int, optional) – The size of the embedding dimension.

  • sparse (bool, optional) – When set to true, this adds a Sparse field to the schema, usually activated for hybrid search.

Returns:

Returns a milvus collection schema, with the minimum required nv-ingest fields and extra fields (sparse), if specified by the user.

Return type:

CollectionSchema

nv_ingest_client.util.milvus.dense_retrieval(
queries,
collection_name: str,
client: MilvusClient,
dense_model,
top_k: int,
dense_field: str = 'vector',
output_fields: List[str] = ['text'],
)[source]#

This function takes the input queries and conducts a dense embedding search against the dense vector and return the top_k nearest records in the collection.

Parameters:
  • queries (List) – List of queries

  • collection (Collection) – Milvus Collection to search against

  • client (MilvusClient) – Client connected to mivlus instance.

  • dense_model (NVIDIAEmbedding) – Dense model to generate dense embeddings for queries.

  • top_k (int) – Number of search results to return per query.

  • dense_field (str) – The name of the anns_field that holds the dense embedding vector the collection.

Returns:

Nested list of top_k results per query.

Return type:

List

nv_ingest_client.util.milvus.grab_meta_collection_info(
collection_name: str,
meta_collection_name: str = 'meta',
timestamp: str | None = None,
embedding_model: str | None = None,
embedding_dim: int | None = None,
milvus_uri: str = 'http://localhost:19530',
)[source]#
nv_ingest_client.util.milvus.hybrid_retrieval(
queries,
collection_name: str,
client: MilvusClient,
dense_model,
sparse_model,
top_k: int,
dense_field: str = 'vector',
sparse_field: str = 'sparse',
output_fields: List[str] = ['text'],
gpu_search: bool = True,
local_index: bool = False,
)[source]#

This function takes the input queries and conducts a hybrid embedding search against the dense and sparse vectors, returning the top_k nearest records in the collection.

Parameters:
  • queries (List) – List of queries

  • collection (Collection) – Milvus Collection to search against

  • client (MilvusClient) – Client connected to mivlus instance.

  • dense_model (NVIDIAEmbedding) – Dense model to generate dense embeddings for queries.

  • sparse_model (model,) – Sparse model used to generate sparse embedding in the form of scipy.sparse.csr_array

  • top_k (int) – Number of search results to return per query.

  • dense_field (str) – The name of the anns_field that holds the dense embedding vector the collection.

  • sparse_field (str) – The name of the anns_field that holds the sparse embedding vector the collection.

Returns:

Nested list of top_k results per query.

Return type:

List

nv_ingest_client.util.milvus.log_new_meta_collection(
collection_name: str,
fields: List[str],
milvus_uri: str = 'http://localhost:19530',
creation_timestamp: str | None = None,
dense_index: str | None = None,
dense_dim: int | None = None,
sparse_index: str | None = None,
embedding_model: str | None = None,
sparse_model: str | None = None,
meta_collection_name: str = 'meta',
recreate: bool = False,
)[source]#
nv_ingest_client.util.milvus.nv_rerank(
query,
candidates,
reranker_endpoint: str | None = None,
model_name: str | None = None,
nvidia_api_key: str | None = None,
truncate: str = 'END',
max_batch_size: int = 64,
topk: int = 5,
)[source]#

This function allows a user to rerank a set of candidates using the nvidia reranker nim.

Parameters:
  • query (str) – Query the candidates are supposed to answer.

  • candidates (list) – List of the candidates to rerank.

  • reranker_endpoint (str) – The endpoint to the nvidia reranker

  • model_name (str) – The name of the model host in the nvidia reranker

  • nvidia_api_key (str,) – The nvidia reranker api key, necessary when using non-local asset

  • truncate (str [END, NONE]) – Truncate the incoming texts if length is longer than the model allows.

  • max_batch_size (int) – Max size for the number of candidates to rerank.

  • topk (int,) – The number of candidates to return after reranking.

Returns:

Dictionary with top_k reranked candidates.

Return type:

Dict

nv_ingest_client.util.milvus.nvingest_retrieval(
queries,
collection_name: str,
milvus_uri: str = 'http://localhost:19530',
top_k: int = 5,
hybrid: bool = False,
dense_field: str = 'vector',
sparse_field: str = 'sparse',
embedding_endpoint=None,
sparse_model_filepath: str = 'bm25_model.json',
model_name: str | None = None,
output_fields: List[str] = ['text', 'source', 'content_metadata'],
gpu_search: bool = True,
nv_ranker: bool = False,
nv_ranker_endpoint: str | None = None,
nv_ranker_model_name: str | None = None,
nv_ranker_nvidia_api_key: str | None = None,
nv_ranker_truncate: str = 'END',
nv_ranker_top_k: int = 5,
nv_ranker_max_batch_size: int = 64,
nv_ranker_candidate_multiple: int = 10,
)[source]#

This function takes the input queries and conducts a hybrid/dense embedding search against the vectors, returning the top_k nearest records in the collection.

Parameters:
  • queries (List) – List of queries

  • collection (Collection) – Milvus Collection to search against

  • milvus_uri (str,) – Milvus address with http(s) preffix and port. Can also be a file path, to activate milvus-lite.

  • top_k (int) – Number of search results to return per query.

  • hybrid (bool, optional) – If True, will calculate distances for both dense and sparse embeddings.

  • dense_field (str, optional) – The name of the anns_field that holds the dense embedding vector the collection.

  • sparse_field (str, optional) – The name of the anns_field that holds the sparse embedding vector the collection.

  • embedding_endpoint (str, optional) – Number of search results to return per query.

  • sparse_model_filepath (str, optional) – The path where the sparse model has been loaded.

  • model_name (str, optional) – The name of the dense embedding model available in the NIM embedding endpoint.

  • nv_ranker (bool) – Set to True to use the nvidia reranker.

  • nv_ranker_endpoint (str) – The endpoint to the nvidia reranker

  • nv_ranker_model_name (str) – The name of the model host in the nvidia reranker

  • nv_ranker_nvidia_api_key (str,) – The nvidia reranker api key, necessary when using non-local asset

  • truncate (str [END, NONE]) – Truncate the incoming texts if length is longer than the model allows.

  • nv_ranker_max_batch_size (int) – Max size for the number of candidates to rerank.

  • nv_ranker_top_k (int,) – The number of candidates to return after reranking.

Returns:

Nested list of top_k results per query.

Return type:

List

nv_ingest_client.util.milvus.reconstruct_pages(anchor_record, records_list, page_signum: int = 0)[source]#

This function allows a user reconstruct the pages for a retrieved chunk.

Parameters:
  • anchor_record (dict) – Query the candidates are supposed to answer.

  • records_list (list) – List of the candidates to rerank.

  • page_signum (int) – The endpoint to the nvidia reranker

Returns:

Full page(s) corresponding to anchor record.

Return type:

String

nv_ingest_client.util.milvus.remove_records(
source_name: str,
collection_name: str,
milvus_uri: str = 'http://localhost:19530',
)[source]#

This function allows a user to remove chunks associated with an ingested file. Supply the full path of the file you would like to remove and this function will remove all the chunks associated with that file in the target collection.

Parameters:
  • source_name (str) – The full file path of the file you would like to remove from the collection.

  • collection_name (str) – Milvus Collection to query against

  • milvus_uri (str,) – Milvus address with http(s) preffix and port. Can also be a file path, to activate milvus-lite.

Returns:

Dictionary with one key, delete_cnt. The value represents the number of entities removed.

Return type:

Dict

nv_ingest_client.util.milvus.stream_insert_milvus(
records,
client: ~pymilvus.milvus_client.milvus_client.MilvusClient,
collection_name: str,
sparse_model=None,
enable_text: bool = True,
enable_charts: bool = True,
enable_tables: bool = True,
enable_images: bool = True,
enable_infographics: bool = True,
enable_audio: bool = True,
record_func=<function _record_dict>,
)[source]#

This function takes the input records and creates a corpus, factoring in filters (i.e. texts, charts, tables) and fits a BM25 model with that information. If the user sets the log level to info, any time a record fails ingestion, it will be reported to the user.

Parameters:
  • records (List) – List of chunks with attached metadata

  • collection_name (str) – Milvus Collection to search against

  • sparse_model (model,) – Sparse model used to generate sparse embedding in the form of scipy.sparse.csr_array

  • enable_text (bool, optional) – When true, ensure all text type records are used.

  • enable_charts (bool, optional) – When true, ensure all chart type records are used.

  • enable_tables (bool, optional) – When true, ensure all table type records are used.

  • enable_images (bool, optional) – When true, ensure all image type records are used.

  • enable_infographics (bool, optional) – When true, ensure all infographic type records are used.

  • enable_audio (bool, optional) – When true, ensure all audio transcript type records are used.

  • record_func (function, optional) – This function will be used to parse the records for necessary information.

nv_ingest_client.util.milvus.verify_embedding(element)[source]#
nv_ingest_client.util.milvus.write_meta_collection(
collection_name: str,
fields: List[str],
milvus_uri: str = 'http://localhost:19530',
creation_timestamp: str | None = None,
dense_index: str | None = None,
dense_dim: int | None = None,
sparse_index: str | None = None,
embedding_model: str | None = None,
sparse_model: str | None = None,
meta_collection_name: str = 'meta',
)[source]#
nv_ingest_client.util.milvus.write_records_minio(
records,
writer: ~pymilvus.bulk_writer.remote_bulk_writer.RemoteBulkWriter,
sparse_model=None,
enable_text: bool = True,
enable_charts: bool = True,
enable_tables: bool = True,
enable_images: bool = True,
enable_infographics: bool = True,
enable_audio: bool = True,
record_func=<function _record_dict>,
) RemoteBulkWriter[source]#

Writes the supplied records to milvus using the supplied writer. If a sparse model is supplied, it will be used to generate sparse embeddings to allow for hybrid search. Will filter records based on type, depending on what types are enabled via the boolean parameters. If the user sets the log level to info, any time a record fails ingestion, it will be reported to the user.

Parameters:
  • records (List) – List of chunks with attached metadata

  • writer (RemoteBulkWriter) – The Milvus Remote BulkWriter instance that was created with necessary params to access the minio instance corresponding to milvus.

  • sparse_model (model,) – Sparse model used to generate sparse embedding in the form of scipy.sparse.csr_array

  • enable_text (bool, optional) – When true, ensure all text type records are used.

  • enable_charts (bool, optional) – When true, ensure all chart type records are used.

  • enable_tables (bool, optional) – When true, ensure all table type records are used.

  • enable_images (bool, optional) – When true, ensure all image type records are used.

  • enable_infographics (bool, optional) – When true, ensure all infographic type records are used.

  • enable_audio (bool, optional) – When true, ensure all audio transcript type records are used.

  • record_func (function, optional) – This function will be used to parse the records for necessary information.

Returns:

Returns the writer supplied, with information related to minio records upload.

Return type:

RemoteBulkWriter

nv_ingest_client.util.milvus.write_to_nvingest_collection(
records,
collection_name: str,
milvus_uri: str = 'http://localhost:19530',
minio_endpoint: str = 'localhost:9000',
sparse: bool = True,
enable_text: bool = True,
enable_charts: bool = True,
enable_tables: bool = True,
enable_images: bool = True,
enable_infographics: bool = True,
bm25_save_path: str = 'bm25_model.json',
compute_bm25_stats: bool = True,
access_key: str = 'minioadmin',
secret_key: str = 'minioadmin',
bucket_name: str = 'a-bucket',
threshold: int = 10,
)[source]#

This function takes the input records and creates a corpus, factoring in filters (i.e. texts, charts, tables) and fits a BM25 model with that information.

Parameters:
  • records (List) – List of chunks with attached metadata

  • collection_name (str) – Milvus Collection to search against

  • milvus_uri (str,) – Milvus address with http(s) preffix and port. Can also be a file path, to activate milvus-lite.

  • minio_endpoint (str,) – Endpoint for the minio instance attached to your milvus.

  • enable_text (bool, optional) – When true, ensure all text type records are used.

  • enable_charts (bool, optional) – When true, ensure all chart type records are used.

  • enable_tables (bool, optional) – When true, ensure all table type records are used.

  • enable_images (bool, optional) – When true, ensure all image type records are used.

  • enable_infographics (bool, optional) – When true, ensure all infographic type records are used.

  • sparse (bool, optional) – When true, incorporates sparse embedding representations for records.

  • bm25_save_path (str, optional) – The desired filepath for the sparse model if sparse is True.

  • access_key (str, optional) – Minio access key.

  • secret_key (str, optional) – Minio secret key.

  • bucket_name (str, optional) – Minio bucket name.

nv_ingest_client.util.process_json_files module#

nv_ingest_client.util.process_json_files.ingest_json_results_to_blob(result_content)[source]#

Parse a JSON string or BytesIO object, combine and sort entries, and create a blob string.

Returns:

The generated blob string.

Return type:

str

nv_ingest_client.util.processing module#

exception nv_ingest_client.util.processing.IngestJobFailure(
message: str,
description: str,
annotations: Dict[str, Any],
)[source]#

Bases: Exception

Custom exception to handle failed job ingestion results.

nv_ingest_client.util.processing.handle_future_result(
future: Future,
timeout: int | None = None,
) Tuple[Dict[str, Any], str][source]#

Handle the result of a completed future job and process annotations.

This function processes the result of a future, extracts annotations (if any), logs them, and checks the validity of the ingest result. If the result indicates a failure, a RuntimeError is raised with a description of the failure.

Parameters:
  • future (concurrent.futures.Future) – A future object representing an asynchronous job. The result of this job will be processed once it completes.

  • timeout (Optional[int], default=None) – Maximum time to wait for the future result before timing out.

Returns:

  • The result of the job as a dictionary, after processing and validation.

  • The trace_id returned by the submission endpoint

Return type:

Tuple[Dict[str, Any], str]

Raises:
  • IngestJobFailure – If the job result is invalid, this exception is raised with the failure description and the full result for further inspection.

  • Exception – For all other unexpected errors.

Notes

  • The future.result() is assumed to return a tuple where the first element is the actual result (as a dictionary), and the second element (if present) can be ignored.

  • Annotations in the result (if any) are logged for debugging purposes.

  • The check_ingest_result function (assumed to be defined elsewhere) is used to validate the result. If the result is invalid, a RuntimeError is raised.

Examples

Suppose we have a future object representing a job, a dictionary of futures to job IDs, and a directory for saving results:

>>> future = concurrent.futures.Future()
>>> result, trace_id = handle_future_result(future, timeout=60)

In this example, the function processes the completed job and returns the result dictionary. If the job fails, it raises a RuntimeError.

See also

check_ingest_result

Function to validate the result of the job.

nv_ingest_client.util.util module#

class nv_ingest_client.util.util.ClientConfigSchema[source]#

Bases: object

nv_ingest_client.util.util.check_ingest_result(
json_payload: Dict,
) Tuple[bool, str][source]#

Check the ingest result to determine if the process failed and extract a description.

This function examines the provided JSON payload to check whether the ingest operation has failed. If it has failed and failure annotations are present, the function augments the failure description with information about the specific event that caused the failure.

Parameters:

json_payload (Dict) –

A dictionary containing the result of the ingest operation. The dictionary should have at least the following fields: - ‘status’ (str): The status of the ingest operation. A status of “failed” indicates

a failure.

  • ’description’ (str): A textual description of the result.

  • ’annotations’ (Dict): (optional) A dictionary of annotations, where each annotation may contain details about the failure.

Returns:

A tuple containing: - A boolean indicating whether the ingest operation failed (True if it failed,

False otherwise).

  • A string containing the description of the result or the failure, augmented with details from the annotations if available.

Return type:

Tuple[bool, str]

Notes

  • The function logs the ‘status’ and ‘description’ fields from the payload for debugging.

  • If the ‘status’ field contains “failed” and the ‘annotations’ field contains entries indicating failure, the function updates the description with the annotation details.

  • The function breaks after processing the first relevant annotation.

Examples

Suppose the JSON payload contains the following structure:

>>> json_payload = {
...     "status": "failed",
...     "description": "Ingest operation failed",
...     "annotations": {
...         "task1": {"task_result": "FAILURE", "message": "Network error"},
...         "task2": {"task_result": "SUCCESS"}
...     }
... }
>>> is_failed, description = check_ingest_result(json_payload)
>>> print(is_failed)
True
>>> print(description)
↪ Event that caused this failure: task1 -> Network error

In this example, the function detects a failure and augments the description with the message from the failing task.

nv_ingest_client.util.util.count_pages_for_documents(
file_path: str,
document_type: DocumentTypeEnum,
) int[source]#
nv_ingest_client.util.util.count_pages_for_text(file_path: str) int[source]#

Estimates the page count for text files based on word count, using the detect_encoding_and_read_text_file function for reading.

nv_ingest_client.util.util.create_job_specs_for_batch(
files_batch: List[str],
) List[JobSpec][source]#

Create and job specifications (JobSpecs) for a batch of files. This function takes a batch of files, processes each file to extract its content and type, creates a job specification (JobSpec) for each file.

Parameters:

files_batch (List[str]) – A list of file paths to be processed. Each file is assumed to be in a format compatible with the extract_file_content function, which extracts the file’s content and type.

Returns:

A list of JobSpecs.

Return type:

List[JobSpec]

Raises:

ValueError – If there is an error extracting the file content or type from any of the files, a ValueError will be logged, and the corresponding file will be skipped.

Notes

  • The function assumes that a utility function extract_file_content is defined elsewhere, which extracts the content and type from the provided file paths.

  • For each file, a JobSpec is created with relevant metadata, including document type and file content.

  • The job specification includes tracing options with a timestamp (in nanoseconds) for diagnostic purposes.

Examples

Suppose you have a batch of files and tasks to process:

>>> files_batch = ["file1.txt", "file2.pdf"]
>>> client = NvIngestClient()
>>> job_specs = create_job_specs_for_batch(files_batch)
>>> print(job_specs)
[nv_ingest_client.primitives.jobs.job_spec.JobSpec object at 0x743acb468bb0>, <nv_ingest_client.primitives.jobs.job_spec.JobSpec object at 0x743acb469270>]  # noqa: E501,W505

See also

extract_file_content

Function that extracts the content and type of a file.

JobSpec

The class representing a job specification.

nv_ingest_client.util.util.estimate_page_count(file_path: str) int[source]#
nv_ingest_client.util.util.filter_function_kwargs(func, **kwargs)[source]#

Filters and returns keyword arguments that match the parameters of a given function.

This function inspects the signature of func and extracts any keyword arguments from kwargs that correspond to the function’s parameters. It returns a dictionary containing only those arguments that func accepts, allowing for safe, dynamic parameter passing.

Parameters:
  • func (Callable) – The function whose parameters will be used to filter kwargs.

  • kwargs (dict) – A dictionary of keyword arguments, which may include extra keys not accepted by func.

Returns:

A dictionary of keyword arguments filtered to include only those parameters accepted by func.

Return type:

dict

Example

>>> def example_function(a, b, c):
...     pass
>>> filtered_kwargs = filter_function_kwargs(example_function, a=1, b=2, d=4)
>>> print(filtered_kwargs)
{'a': 1, 'b': 2}
nv_ingest_client.util.util.generate_matching_files(file_sources)[source]#

Generates a list of file paths that match the given patterns specified in file_sources.

Parameters:

file_sources (list of str) – A list containing the file source patterns to match against.

Returns:

A generator yielding paths to files that match the specified patterns.

Return type:

generator

Notes

This function utilizes glob pattern matching to find files that match the specified patterns. It yields each matching file path, allowing for efficient processing of potentially large sets of files.

nv_ingest_client.util.util.get_content(results: List[any])[source]#

Extracts the text and table text content from the results of an NV-Ingest python client job

Parameters:

results (List[Any]) – The results of NV-Ingest python client job that contains the desired text and table content

Returns:

A dictionary containing the extracted text content and the extracted table content

Return type:

Dict

nv_ingest_client.util.util.load_data_from_path(path: str) Dict[source]#

Loads data from a specified file path, preparing it for processing.

Parameters:

path (str) – The path to the file from which data should be loaded.

Returns:

A dictionary containing keys ‘file_name’, ‘id’, ‘content’, and ‘document_type’, each of which maps to a list that includes the respective details for the processed file.

Return type:

dict

Raises:
  • FileNotFoundError – If the specified path does not exist.

  • ValueError – If the specified path is not a file.

Notes

This function is designed to load and prepare file data for further processing, packaging the loaded data along with metadata such as file name and document type.

nv_ingest_client.util.zipkin module#

class nv_ingest_client.util.zipkin.AsyncZipkinClient(
host: str,
port: int,
concurrent_requests: int,
max_retries: int = 10,
retry_delay: int = 5,
)[source]#

Bases: object

async fetch(
sem,
trace_id: str,
url: str,
) Dict[str, str][source]#

Perform a GET request to the given URL with retry logic for 404 status codes.

Parameters:
  • url – URL to make the GET request to.

  • sem – Semaphore to ensure only X concurrent requests are in flight

  • trace_id – The trace_id for the request

  • url – The complete URL for the request.

Returns:

Dict[str, str] Containing trace_id and JSON str response

Raises:

RuntimeError if the maximum retries are exceeded.

async get_metrics(trace_ids: List[str])[source]#
nv_ingest_client.util.zipkin.collect_traces_from_zipkin(
zipkin_host: str,
zipkin_port: int,
trace_id_map: Dict[str, str],
concurrent_requests: int | None = 1,
) Dict[str, str][source]#
nv_ingest_client.util.zipkin.write_results_to_output_directory(
output_directory: str,
trace_responses: List[Dict[str, str]],
) None[source]#

Module contents#