nv_ingest_client.util package#
Subpackages#
Submodules#
nv_ingest_client.util.dataset module#
- nv_ingest_client.util.dataset.get_dataset_files(
- dataset_bytes: BytesIO,
- shuffle: bool = False,
Extracts and optionally shuffles the list of files contained in a dataset.
- Parameters:
dataset_bytes (BytesIO) – The BytesIO object containing the dataset in JSON format.
shuffle (bool, optional) – Whether to shuffle the list of files before returning. Defaults to False.
- Returns:
The list of files from the dataset, possibly shuffled.
- Return type:
list
- nv_ingest_client.util.dataset.get_dataset_statistics(dataset_bytes: BytesIO) str [source]#
Reads a dataset specification from a BytesIO object, computes statistics about the dataset, and returns a formatted string.
- Parameters:
dataset_bytes (BytesIO) – The BytesIO object containing the dataset in JSON format.
- Returns:
A formatted string containing statistics about the dataset.
- Return type:
str
nv_ingest_client.util.milvus module#
- class nv_ingest_client.util.milvus.MilvusOperator(
- collection_name: str | Dict = 'nv_ingest_collection',
- milvus_uri: str = 'http://localhost:19530',
- sparse: bool = False,
- recreate: bool = True,
- gpu_index: bool = True,
- gpu_search: bool = True,
- dense_dim: int = 2048,
- minio_endpoint: str = 'localhost:9000',
- enable_text: bool = True,
- enable_charts: bool = True,
- enable_tables: bool = True,
- enable_images: bool = True,
- enable_infographics: bool = True,
- bm25_save_path: str = 'bm25_model.json',
- compute_bm25_stats: bool = True,
- access_key: str = 'minioadmin',
- secret_key: str = 'minioadmin',
- bucket_name: str = 'a-bucket',
- meta_dataframe: str | DataFrame | None = None,
- meta_source_field: str | None = None,
- meta_fields: list[str] | None = None,
- stream: bool = False,
- **kwargs,
Bases:
object
- nv_ingest_client.util.milvus.add_metadata(
- element,
- meta_dataframe,
- meta_source_field,
- meta_data_fields,
- nv_ingest_client.util.milvus.bulk_insert_milvus(
- collection_name: str,
- writer: RemoteBulkWriter,
- milvus_uri: str = 'http://localhost:19530',
This function initialize the bulk ingest of all minio uploaded records, and checks for milvus task completion. Once the function is complete all records have been uploaded to the milvus collection.
- Parameters:
collection_name (str) – Name of the milvus collection.
writer (RemoteBulkWriter) – The Milvus Remote BulkWriter instance that was created with necessary params to access the minio instance corresponding to milvus.
milvus_uri (str,) – Milvus address with http(s) preffix and port. Can also be a file path, to activate milvus-lite.
- nv_ingest_client.util.milvus.create_bm25_model(
- records,
- enable_text: bool = True,
- enable_charts: bool = True,
- enable_tables: bool = True,
- enable_images: bool = True,
- enable_infographics: bool = True,
- enable_audio: bool = True,
This function takes the input records and creates a corpus, factoring in filters (i.e. texts, charts, tables) and fits a BM25 model with that information. If the user sets the log level to info, any time a record fails ingestion, it will be reported to the user.
- Parameters:
records (List) – List of chunks with attached metadata
enable_text (bool, optional) – When true, ensure all text type records are used.
enable_charts (bool, optional) – When true, ensure all chart type records are used.
enable_tables (bool, optional) – When true, ensure all table type records are used.
enable_images (bool, optional) – When true, ensure all image type records are used.
enable_infographics (bool, optional) – When true, ensure all infographic type records are used.
enable_audio (bool, optional) – When true, ensure all audio transcript type records are used.
- Returns:
Returns the model fitted to the selected corpus.
- Return type:
BM25EmbeddingFunction
- nv_ingest_client.util.milvus.create_collection(
- client: MilvusClient,
- collection_name: str,
- schema: CollectionSchema,
- index_params: IndexParams | None = None,
- recreate=True,
Creates a milvus collection with the supplied name and schema. Within that collection, this function ensures that the desired indexes are created based on the IndexParams supplied.
- Parameters:
client (MilvusClient) – Client connected to mivlus instance.
collection_name (str) – Name of the collection to be created.
schema (CollectionSchema,) – Schema that identifies the fields of data that will be available in the collection.
index_params (IndexParams, optional) – The parameters used to create the index(es) for the associated collection fields.
recreate (bool, optional) – If true, and the collection is detected, it will be dropped before being created again with the provided information (schema, index_params).
- nv_ingest_client.util.milvus.create_meta_collection(
- schema: CollectionSchema,
- milvus_uri: str = 'http://localhost:19530',
- collection_name: str = 'meta',
- recreate=False,
- nv_ingest_client.util.milvus.create_nvingest_collection(
- collection_name: str,
- milvus_uri: str = 'http://localhost:19530',
- sparse: bool = False,
- recreate: bool = True,
- gpu_index: bool = True,
- gpu_search: bool = True,
- dense_dim: int = 2048,
- recreate_meta: bool = False,
Creates a milvus collection with an nv-ingest compatible schema under the target name.
- Parameters:
collection_name (str) – Name of the collection to be created.
milvus_uri (str,) – Milvus address with http(s) preffix and port. Can also be a file path, to activate milvus-lite.
sparse (bool, optional) – When set to true, this adds a Sparse index to the IndexParams, usually activated for hybrid search.
recreate (bool, optional) – If true, and the collection is detected, it will be dropped before being created again with the provided information (schema, index_params).
gpu_cagra (bool, optional) – If true, creates a GPU_CAGRA index for dense embeddings.
dense_dim (int, optional) – Sets the dimension size for the dense embedding in the milvus schema.
- Returns:
Returns a milvus collection schema, that represents the fields in the created collection.
- Return type:
CollectionSchema
- nv_ingest_client.util.milvus.create_nvingest_index_params(
- sparse: bool = False,
- gpu_index: bool = True,
- gpu_search: bool = True,
- local_index: bool = True,
Creates index params necessary to create an index for a collection. At a minimum, this function will create a dense embedding index but can also create a sparse embedding index (BM25) for hybrid search.
- Parameters:
sparse (bool, optional) – When set to true, this adds a Sparse index to the IndexParams, usually activated for hybrid search.
gpu_index (bool, optional) – When set to true, creates an index on the GPU. The index is GPU_CAGRA.
gpu_search (bool, optional) – When set to true, if using a gpu index, the search will be conducted using the GPU. Otherwise the search will be conducted on the CPU (index will be turned into HNSW).
- Returns:
Returns index params setup for a dense embedding index and if specified, a sparse embedding index.
- Return type:
IndexParams
- nv_ingest_client.util.milvus.create_nvingest_schema(
- dense_dim: int = 1024,
- sparse: bool = False,
- local_index: bool = False,
Creates a schema for the nv-ingest produced data. This is currently setup to follow the default expected schema fields in nv-ingest. You can see more about the declared fields in the nv_ingest.schemas.vdb_task_sink_schema.build_default_milvus_config function. This schema should have the fields declared in that function, at a minimum. To ensure proper data propagation to milvus.
- Parameters:
dense_dim (int, optional) – The size of the embedding dimension.
sparse (bool, optional) – When set to true, this adds a Sparse field to the schema, usually activated for hybrid search.
- Returns:
Returns a milvus collection schema, with the minimum required nv-ingest fields and extra fields (sparse), if specified by the user.
- Return type:
CollectionSchema
- nv_ingest_client.util.milvus.dense_retrieval(
- queries,
- collection_name: str,
- client: MilvusClient,
- dense_model,
- top_k: int,
- dense_field: str = 'vector',
- output_fields: List[str] = ['text'],
- _filter: str = '',
This function takes the input queries and conducts a dense embedding search against the dense vector and return the top_k nearest records in the collection.
- Parameters:
queries (List) – List of queries
collection (Collection) – Milvus Collection to search against
client (MilvusClient) – Client connected to mivlus instance.
dense_model (NVIDIAEmbedding) – Dense model to generate dense embeddings for queries.
top_k (int) – Number of search results to return per query.
dense_field (str) – The name of the anns_field that holds the dense embedding vector the collection.
- Returns:
Nested list of top_k results per query.
- Return type:
List
- nv_ingest_client.util.milvus.embed_index_collection(
- data,
- collection_name,
- batch_size: int = 256,
- embedding_endpoint: str | None = None,
- model_name: str | None = None,
- nvidia_api_key: str | None = None,
- milvus_uri: str = 'http://localhost:19530',
- sparse: bool = False,
- recreate: bool = True,
- gpu_index: bool = True,
- gpu_search: bool = True,
- dense_dim: int = 2048,
- minio_endpoint: str = 'localhost:9000',
- enable_text: bool = True,
- enable_charts: bool = True,
- enable_tables: bool = True,
- enable_images: bool = True,
- enable_infographics: bool = True,
- bm25_save_path: str = 'bm25_model.json',
- compute_bm25_stats: bool = True,
- access_key: str = 'minioadmin',
- secret_key: str = 'minioadmin',
- bucket_name: str = 'a-bucket',
- meta_dataframe: str | DataFrame | None = None,
- meta_source_field: str | None = None,
- meta_fields: list[str] | None = None,
- **kwargs,
This function takes the input data and creates a collection in Milvus, it will embed the records using the NVIDIA embedding model and store them in the collection. After embedding the records, it will run the same ingestion process as the vdb_upload stage in the Ingestor pipeline.
- Parameters:
data (Union[str, List]) – The data to be ingested. Can be a list of records or a file path.
collection_name (Union[str, Dict], optional) – The name of the Milvus collection or a dictionary containing collection configuration. Defaults to “nv_ingest_collection”.
embedding_endpoint (str, optional) – The endpoint for the NVIDIA embedding service. Defaults to None.
model_name (str, optional) – The name of the embedding model. Defaults to None.
nvidia_api_key (str, optional) – The API key for NVIDIA services. Defaults to None.
milvus_uri (str, optional) – The URI of the Milvus server. Defaults to “http://localhost:19530”.
sparse (bool, optional) – Whether to use sparse indexing. Defaults to False.
recreate (bool, optional) – Whether to recreate the collection if it already exists. Defaults to True.
gpu_index (bool, optional) – Whether to use GPU for indexing. Defaults to True.
gpu_search (bool, optional) – Whether to use GPU for search operations. Defaults to True.
dense_dim (int, optional) – The dimensionality of dense vectors. Defaults to 2048.
minio_endpoint (str, optional) – The endpoint for the MinIO server. Defaults to “localhost:9000”.
enable_text (bool, optional) – Whether to enable text data ingestion. Defaults to True.
enable_charts (bool, optional) – Whether to enable chart data ingestion. Defaults to True.
enable_tables (bool, optional) – Whether to enable table data ingestion. Defaults to True.
enable_images (bool, optional) – Whether to enable image data ingestion. Defaults to True.
enable_infographics (bool, optional) – Whether to enable infographic data ingestion. Defaults to True.
bm25_save_path (str, optional) – The file path to save the BM25 model. Defaults to “bm25_model.json”.
compute_bm25_stats (bool, optional) – Whether to compute BM25 statistics. Defaults to True.
access_key (str, optional) – The access key for MinIO authentication. Defaults to “minioadmin”.
secret_key (str, optional) – The secret key for MinIO authentication. Defaults to “minioadmin”.
bucket_name (str, optional) – The name of the MinIO bucket. Defaults to “a-bucket”.
meta_dataframe (Union[str, pd.DataFrame], optional) – A metadata DataFrame or the path to a CSV file containing metadata. Defaults to None.
meta_source_field (str, optional) – The field in the metadata that serves as the source identifier. Defaults to None.
meta_fields (list[str], optional) – A list of metadata fields to include. Defaults to None.
**kwargs – Additional keyword arguments for customization.
- nv_ingest_client.util.milvus.get_embeddings(full_records, embedder, batch_size=256)[source]#
This function takes the input records and creates a list of embeddings. The default batch size is 256, but this can be adjusted based on the available resources, to a maximum of 259. This is set by the NVIDIA embedding microservice.
- nv_ingest_client.util.milvus.grab_meta_collection_info(
- collection_name: str,
- meta_collection_name: str = 'meta',
- timestamp: str | None = None,
- embedding_model: str | None = None,
- embedding_dim: int | None = None,
- milvus_uri: str = 'http://localhost:19530',
- nv_ingest_client.util.milvus.hybrid_retrieval(
- queries,
- collection_name: str,
- client: MilvusClient,
- dense_model,
- sparse_model,
- top_k: int,
- dense_field: str = 'vector',
- sparse_field: str = 'sparse',
- output_fields: List[str] = ['text'],
- gpu_search: bool = True,
- local_index: bool = False,
- _filter: str = '',
This function takes the input queries and conducts a hybrid embedding search against the dense and sparse vectors, returning the top_k nearest records in the collection.
- Parameters:
queries (List) – List of queries
collection (Collection) – Milvus Collection to search against
client (MilvusClient) – Client connected to mivlus instance.
dense_model (NVIDIAEmbedding) – Dense model to generate dense embeddings for queries.
sparse_model (model,) – Sparse model used to generate sparse embedding in the form of scipy.sparse.csr_array
top_k (int) – Number of search results to return per query.
dense_field (str) – The name of the anns_field that holds the dense embedding vector the collection.
sparse_field (str) – The name of the anns_field that holds the sparse embedding vector the collection.
- Returns:
Nested list of top_k results per query.
- Return type:
List
- nv_ingest_client.util.milvus.log_new_meta_collection(
- collection_name: str,
- fields: List[str],
- milvus_uri: str = 'http://localhost:19530',
- creation_timestamp: str | None = None,
- dense_index: str | None = None,
- dense_dim: int | None = None,
- sparse_index: str | None = None,
- embedding_model: str | None = None,
- sparse_model: str | None = None,
- meta_collection_name: str = 'meta',
- recreate: bool = False,
- nv_ingest_client.util.milvus.nv_rerank(
- query,
- candidates,
- reranker_endpoint: str | None = None,
- model_name: str | None = None,
- nvidia_api_key: str | None = None,
- truncate: str = 'END',
- max_batch_size: int = 64,
- topk: int = 5,
This function allows a user to rerank a set of candidates using the nvidia reranker nim.
- Parameters:
query (str) – Query the candidates are supposed to answer.
candidates (list) – List of the candidates to rerank.
reranker_endpoint (str) – The endpoint to the nvidia reranker
model_name (str) – The name of the model host in the nvidia reranker
nvidia_api_key (str,) – The nvidia reranker api key, necessary when using non-local asset
truncate (str [END, NONE]) – Truncate the incoming texts if length is longer than the model allows.
max_batch_size (int) – Max size for the number of candidates to rerank.
topk (int,) – The number of candidates to return after reranking.
- Returns:
Dictionary with top_k reranked candidates.
- Return type:
Dict
- nv_ingest_client.util.milvus.nvingest_retrieval(
- queries,
- collection_name: str,
- milvus_uri: str = 'http://localhost:19530',
- top_k: int = 5,
- hybrid: bool = False,
- dense_field: str = 'vector',
- sparse_field: str = 'sparse',
- embedding_endpoint=None,
- sparse_model_filepath: str = 'bm25_model.json',
- model_name: str | None = None,
- output_fields: List[str] = ['text', 'source', 'content_metadata'],
- gpu_search: bool = True,
- nv_ranker: bool = False,
- nv_ranker_endpoint: str | None = None,
- nv_ranker_model_name: str | None = None,
- nv_ranker_nvidia_api_key: str | None = None,
- nv_ranker_truncate: str = 'END',
- nv_ranker_top_k: int = 50,
- nv_ranker_max_batch_size: int = 64,
- _filter: str = '',
This function takes the input queries and conducts a hybrid/dense embedding search against the vectors, returning the top_k nearest records in the collection.
- Parameters:
queries (List) – List of queries
collection (Collection) – Milvus Collection to search against
milvus_uri (str,) – Milvus address with http(s) preffix and port. Can also be a file path, to activate milvus-lite.
top_k (int) – Number of search results to return per query.
hybrid (bool, optional) – If True, will calculate distances for both dense and sparse embeddings.
dense_field (str, optional) – The name of the anns_field that holds the dense embedding vector the collection.
sparse_field (str, optional) – The name of the anns_field that holds the sparse embedding vector the collection.
embedding_endpoint (str, optional) – Number of search results to return per query.
sparse_model_filepath (str, optional) – The path where the sparse model has been loaded.
model_name (str, optional) – The name of the dense embedding model available in the NIM embedding endpoint.
nv_ranker (bool) – Set to True to use the nvidia reranker.
nv_ranker_endpoint (str) – The endpoint to the nvidia reranker
nv_ranker_model_name (str) – The name of the model host in the nvidia reranker
nv_ranker_nvidia_api_key (str,) – The nvidia reranker api key, necessary when using non-local asset
truncate (str [END, NONE]) – Truncate the incoming texts if length is longer than the model allows.
nv_ranker_max_batch_size (int) – Max size for the number of candidates to rerank.
nv_ranker_top_k (int,) – The number of candidates to return after reranking.
- Returns:
Nested list of top_k results per query.
- Return type:
List
- nv_ingest_client.util.milvus.pull_all_milvus(
- collection_name: str,
- milvus_uri: str = 'http://localhost:19530',
- write_dir: str | None = None,
- batch_size: int = 1000,
This function takes the input collection name and pulls all the records from the collection. It will either return the records as a list of dictionaries or write them to a specified directory in JSON format. :param collection_name: Milvus Collection to query against :type collection_name: str :param milvus_uri: Milvus address with http(s) preffix and port. Can also be a file path, to activate
milvus-lite.
- Parameters:
write_dir (str, optional) – Directory to write the records to. If None, the records will be returned as a list.
batch_size (int, optional) – The number of records to pull in each batch. Defaults to 1000.
- Returns:
List of records/files with records from the collection.
- Return type:
List
- nv_ingest_client.util.milvus.reconstruct_pages(anchor_record, records_list, page_signum: int = 0)[source]#
This function allows a user reconstruct the pages for a retrieved chunk.
- Parameters:
anchor_record (dict) – Query the candidates are supposed to answer.
records_list (list) – List of the candidates to rerank.
page_signum (int) – The endpoint to the nvidia reranker
- Returns:
Full page(s) corresponding to anchor record.
- Return type:
String
- nv_ingest_client.util.milvus.recreate_elements(data)[source]#
This function takes the input data and creates a list of elements with the necessary metadata for ingestion.
- Parameters:
data (List) – List of chunks with attached metadata
- Returns:
List of elements with metadata.
- Return type:
List
- nv_ingest_client.util.milvus.reindex_collection(
- current_collection_name: str,
- new_collection_name: str | None = None,
- write_dir: str | None = None,
- embedding_endpoint: str | None = None,
- model_name: str | None = None,
- nvidia_api_key: str | None = None,
- milvus_uri: str = 'http://localhost:19530',
- sparse: bool = False,
- recreate: bool = True,
- gpu_index: bool = True,
- gpu_search: bool = True,
- dense_dim: int = 2048,
- minio_endpoint: str = 'localhost:9000',
- enable_text: bool = True,
- enable_charts: bool = True,
- enable_tables: bool = True,
- enable_images: bool = True,
- enable_infographics: bool = True,
- bm25_save_path: str = 'bm25_model.json',
- compute_bm25_stats: bool = True,
- access_key: str = 'minioadmin',
- secret_key: str = 'minioadmin',
- bucket_name: str = 'a-bucket',
- meta_dataframe: str | DataFrame | None = None,
- meta_source_field: str | None = None,
- meta_fields: list[str] | None = None,
- embed_batch_size: int = 256,
- query_batch_size: int = 1000,
- **kwargs,
This function will reindex a collection in Milvus. It will pull all the records from the current collection, embed them using the NVIDIA embedding model, and store them in a new collection. After embedding the records, it will run the same ingestion process as the vdb_upload stage in the Ingestor pipeline. This function will get embedding_endpoint, model_name and nvidia_api_key defaults from the environment variables set in the environment if not explicitly set in the function call.
- Parameters:
(str) (current_collection_name)
(str (meta_source_field)
optional) (The batch size for querying. Defaults to 1000.)
(str
optional)
(str
optional)
(str
optional)
(str
optional)
(str
optional)
(bool (compute_bm25_stats)
optional)
(bool
optional)
(bool
optional)
(bool
optional)
(int (query_batch_size)
optional)
(str
optional)
(bool
optional)
(bool
optional)
(bool
optional)
(bool
optional)
(bool
optional)
(str
optional)
(bool
optional)
(str
optional)
(str
optional)
(str
optional)
(Union[str (meta_dataframe) – containing metadata. Defaults to None.
pd.DataFrame] (A metadata DataFrame or the path to a CSV file) – containing metadata. Defaults to None.
optional) – containing metadata. Defaults to None.
(str – Defaults to None.
optional) – Defaults to None.
(list[str] (meta_fields)
optional)
(int
optional)
(int
optional)
**kwargs (Additional keyword arguments for customization.)
- nv_ingest_client.util.milvus.remove_records(
- source_name: str,
- collection_name: str,
- milvus_uri: str = 'http://localhost:19530',
This function allows a user to remove chunks associated with an ingested file. Supply the full path of the file you would like to remove and this function will remove all the chunks associated with that file in the target collection.
- Parameters:
source_name (str) – The full file path of the file you would like to remove from the collection.
collection_name (str) – Milvus Collection to query against
milvus_uri (str,) – Milvus address with http(s) preffix and port. Can also be a file path, to activate milvus-lite.
- Returns:
Dictionary with one key, delete_cnt. The value represents the number of entities removed.
- Return type:
Dict
- nv_ingest_client.util.milvus.stream_insert_milvus(
- records,
- client: ~pymilvus.milvus_client.milvus_client.MilvusClient,
- collection_name: str,
- sparse_model=None,
- enable_text: bool = True,
- enable_charts: bool = True,
- enable_tables: bool = True,
- enable_images: bool = True,
- enable_infographics: bool = True,
- enable_audio: bool = True,
- record_func=<function _record_dict>,
- meta_dataframe=None,
- meta_source_field=None,
- meta_fields=None,
This function takes the input records and creates a corpus, factoring in filters (i.e. texts, charts, tables) and fits a BM25 model with that information. If the user sets the log level to info, any time a record fails ingestion, it will be reported to the user.
- Parameters:
records (List) – List of chunks with attached metadata
collection_name (str) – Milvus Collection to search against
sparse_model (model,) – Sparse model used to generate sparse embedding in the form of scipy.sparse.csr_array
enable_text (bool, optional) – When true, ensure all text type records are used.
enable_charts (bool, optional) – When true, ensure all chart type records are used.
enable_tables (bool, optional) – When true, ensure all table type records are used.
enable_images (bool, optional) – When true, ensure all image type records are used.
enable_infographics (bool, optional) – When true, ensure all infographic type records are used.
enable_audio (bool, optional) – When true, ensure all audio transcript type records are used.
record_func (function, optional) – This function will be used to parse the records for necessary information.
- nv_ingest_client.util.milvus.write_meta_collection(
- collection_name: str,
- fields: List[str],
- milvus_uri: str = 'http://localhost:19530',
- creation_timestamp: str | None = None,
- dense_index: str | None = None,
- dense_dim: int | None = None,
- sparse_index: str | None = None,
- embedding_model: str | None = None,
- sparse_model: str | None = None,
- meta_collection_name: str = 'meta',
- nv_ingest_client.util.milvus.write_records_minio(
- records,
- writer: ~pymilvus.bulk_writer.remote_bulk_writer.RemoteBulkWriter,
- sparse_model=None,
- enable_text: bool = True,
- enable_charts: bool = True,
- enable_tables: bool = True,
- enable_images: bool = True,
- enable_infographics: bool = True,
- enable_audio: bool = True,
- record_func=<function _record_dict>,
- meta_dataframe=None,
- meta_source_field=None,
- meta_fields=None,
Writes the supplied records to milvus using the supplied writer. If a sparse model is supplied, it will be used to generate sparse embeddings to allow for hybrid search. Will filter records based on type, depending on what types are enabled via the boolean parameters. If the user sets the log level to info, any time a record fails ingestion, it will be reported to the user.
- Parameters:
records (List) – List of chunks with attached metadata
writer (RemoteBulkWriter) – The Milvus Remote BulkWriter instance that was created with necessary params to access the minio instance corresponding to milvus.
sparse_model (model,) – Sparse model used to generate sparse embedding in the form of scipy.sparse.csr_array
enable_text (bool, optional) – When true, ensure all text type records are used.
enable_charts (bool, optional) – When true, ensure all chart type records are used.
enable_tables (bool, optional) – When true, ensure all table type records are used.
enable_images (bool, optional) – When true, ensure all image type records are used.
enable_infographics (bool, optional) – When true, ensure all infographic type records are used.
enable_audio (bool, optional) – When true, ensure all audio transcript type records are used.
record_func (function, optional) – This function will be used to parse the records for necessary information.
- Returns:
Returns the writer supplied, with information related to minio records upload.
- Return type:
RemoteBulkWriter
- nv_ingest_client.util.milvus.write_to_nvingest_collection(
- records,
- collection_name: str,
- milvus_uri: str = 'http://localhost:19530',
- minio_endpoint: str = 'localhost:9000',
- sparse: bool = True,
- enable_text: bool = True,
- enable_charts: bool = True,
- enable_tables: bool = True,
- enable_images: bool = True,
- enable_infographics: bool = True,
- bm25_save_path: str = 'bm25_model.json',
- compute_bm25_stats: bool = True,
- access_key: str = 'minioadmin',
- secret_key: str = 'minioadmin',
- bucket_name: str = 'a-bucket',
- threshold: int = 1000,
- meta_dataframe=None,
- meta_source_field=None,
- meta_fields=None,
- stream: bool = False,
This function takes the input records and creates a corpus, factoring in filters (i.e. texts, charts, tables) and fits a BM25 model with that information.
- Parameters:
records (List) – List of chunks with attached metadata
collection_name (str) – Milvus Collection to search against
milvus_uri (str,) – Milvus address with http(s) preffix and port. Can also be a file path, to activate milvus-lite.
minio_endpoint (str,) – Endpoint for the minio instance attached to your milvus.
enable_text (bool, optional) – When true, ensure all text type records are used.
enable_charts (bool, optional) – When true, ensure all chart type records are used.
enable_tables (bool, optional) – When true, ensure all table type records are used.
enable_images (bool, optional) – When true, ensure all image type records are used.
enable_infographics (bool, optional) – When true, ensure all infographic type records are used.
sparse (bool, optional) – When true, incorporates sparse embedding representations for records.
bm25_save_path (str, optional) – The desired filepath for the sparse model if sparse is True.
access_key (str, optional) – Minio access key.
secret_key (str, optional) – Minio secret key.
bucket_name (str, optional) – Minio bucket name.
stream (bool, optional) – When true, the records will be inserted into milvus using the stream insert method.
nv_ingest_client.util.process_json_files module#
nv_ingest_client.util.processing module#
- exception nv_ingest_client.util.processing.IngestJobFailure(
- message: str,
- description: str,
- annotations: Dict[str, Any],
Bases:
Exception
Custom exception to handle failed job ingestion results.
- nv_ingest_client.util.processing.check_schema(
- schema: Type[BaseModel],
- options: dict,
- task_id: str,
- original_str: str,
Validates the provided options against the given schema and returns a schema instance.
- Parameters:
schema (Type[BaseModel]) – A Pydantic model class used for validating the options.
options (dict) – The options dictionary to validate.
task_id (str) – The identifier of the task associated with the options.
original_str (str) – The original JSON string representation of the options.
- Returns:
An instance of the validated schema populated with the provided options.
- Return type:
BaseModel
- Raises:
ValueError – If validation fails, a ValueError is raised with a formatted error message highlighting the problematic parts of the original JSON string.
- nv_ingest_client.util.processing.format_validation_error(
- e: ValidationError,
- task_id: str,
- original_str: str,
Formats validation errors with appropriate highlights and returns a detailed error message.
- Parameters:
e (ValidationError) – The validation error raised by the schema.
task_id (str) – The identifier of the task for which the error occurred.
original_str (str) – The original JSON string that caused the validation error.
- Returns:
A detailed error message with highlighted problematic fields.
- Return type:
str
- nv_ingest_client.util.processing.handle_future_result(
- future: Future,
- timeout=10,
Handle the result of a completed future job and process annotations.
This function processes the result of a future, extracts annotations (if any), logs them, and checks the validity of the ingest result. If the result indicates a failure, a RuntimeError is raised with a description of the failure.
- Parameters:
future (concurrent.futures.Future) – A future object representing an asynchronous job. The result of this job will be processed once it completes.
timeout (Optional[int], default=None) – Maximum time to wait for the future result before timing out.
- Returns:
The result of the job as a dictionary, after processing and validation.
The trace_id returned by the submission endpoint
- Return type:
Tuple[Dict[str, Any], str]
- Raises:
IngestJobFailure – If the job result is invalid, this exception is raised with the failure description and the full result for further inspection.
Exception – For all other unexpected errors.
Notes
The future.result() is assumed to return a tuple where the first element is the actual result (as a dictionary), and the second element (if present) can be ignored.
Annotations in the result (if any) are logged for debugging purposes.
The check_ingest_result function (assumed to be defined elsewhere) is used to validate the result. If the result is invalid, a RuntimeError is raised.
Examples
Suppose we have a future object representing a job, a dictionary of futures to job IDs, and a directory for saving results:
>>> future = concurrent.futures.Future() >>> result, trace_id = handle_future_result(future, timeout=60)
In this example, the function processes the completed job and returns the result dictionary. If the job fails, it raises a RuntimeError.
See also
check_ingest_result
Function to validate the result of the job.
- nv_ingest_client.util.processing.highlight_error_in_original(
- original_str: str,
- task_name: str,
- error_detail: Dict[str, Any],
Highlights the error-causing text in the original JSON string based on the error type.
This function identifies the problematic portion of the JSON string by inspecting the provided error details. For errors due to extra fields, it highlights the extra field (using blue and bold formatting). For errors due to missing fields or insufficient string length, it appends a clear message indicating the missing field and its location.
- Parameters:
original_str (str) – The original JSON string that caused the error. This string will be modified to highlight the problematic field.
task_name (str) – The name of the task associated with the error. This is used in the error message when highlighting missing fields.
error_detail (Dict[str, Any]) –
A dictionary containing details about the error. Expected keys are: - ‘type’ (str): The type of error (e.g., “value_error.extra”, “value_error.missing”,
”value_error.any_str.min_length”).
’loc’ (List[Any]): A list representing the path to the error-causing field in the JSON structure.
- Returns:
The modified JSON string with the error-causing field highlighted or a message appended indicating the missing field.
- Return type:
str
Notes
The function uses the style function to apply formatting to the error-causing text.
If the error detail does not include the expected keys, a fallback is used and the original string is returned.
nv_ingest_client.util.util module#
- nv_ingest_client.util.util.check_ingest_result(
- json_payload: Dict,
Check the ingest result to determine if the process failed and extract a description.
This function examines the provided JSON payload to check whether the ingest operation has failed. If it has failed and failure annotations are present, the function augments the failure description with information about the specific event that caused the failure.
- Parameters:
json_payload (Dict) –
A dictionary containing the result of the ingest operation. The dictionary should have at least the following fields: - ‘status’ (str): The status of the ingest operation. A status of “failed” indicates
a failure.
’description’ (str): A textual description of the result.
’annotations’ (Dict): (optional) A dictionary of annotations, where each annotation may contain details about the failure.
- Returns:
A tuple containing: - A boolean indicating whether the ingest operation failed (True if it failed,
False otherwise).
A string containing the description of the result or the failure, augmented with details from the annotations if available.
- Return type:
Tuple[bool, str]
Notes
The function logs the ‘status’ and ‘description’ fields from the payload for debugging.
If the ‘status’ field contains “failed” and the ‘annotations’ field contains entries indicating failure, the function updates the description with the annotation details.
The function breaks after processing the first relevant annotation.
Examples
Suppose the JSON payload contains the following structure:
>>> json_payload = { ... "status": "failed", ... "description": "Ingest operation failed", ... "annotations": { ... "task1": {"task_result": "FAILURE", "message": "Network error"}, ... "task2": {"task_result": "SUCCESS"} ... } ... } >>> is_failed, description = check_ingest_result(json_payload) >>> print(is_failed) True >>> print(description) ↪ Event that caused this failure: task1 -> Network error
In this example, the function detects a failure and augments the description with the message from the failing task.
- nv_ingest_client.util.util.count_pages_for_documents(
- file_path: str,
- document_type: DocumentTypeEnum,
- nv_ingest_client.util.util.count_pages_for_text(file_path: str) int [source]#
Estimates the page count for text files based on word count, using the detect_encoding_and_read_text_file function for reading.
- nv_ingest_client.util.util.create_job_specs_for_batch(
- files_batch: List[str],
Create and job specifications (JobSpecs) for a batch of files. This function takes a batch of files, processes each file to extract its content and type, creates a job specification (JobSpec) for each file.
- Parameters:
files_batch (List[str]) – A list of file paths to be processed. Each file is assumed to be in a format compatible with the extract_file_content function, which extracts the file’s content and type.
- Returns:
A list of JobSpecs.
- Return type:
List[JobSpec]
- Raises:
ValueError – If there is an error extracting the file content or type from any of the files, a ValueError will be logged, and the corresponding file will be skipped.
Notes
The function assumes that a utility function extract_file_content is defined elsewhere, which extracts the content and type from the provided file paths.
For each file, a JobSpec is created with relevant metadata, including document type and file content.
The job specification includes tracing options with a timestamp (in nanoseconds) for diagnostic purposes.
Examples
Suppose you have a batch of files and tasks to process:
>>> files_batch = ["file1.txt", "file2.pdf"] >>> client = NvIngestClient() >>> job_specs = create_job_specs_for_batch(files_batch) >>> print(job_specs) [nv_ingest_client.primitives.jobs.job_spec.JobSpec object at 0x743acb468bb0>, <nv_ingest_client.primitives.jobs.job_spec.JobSpec object at 0x743acb469270>] # noqa: E501,W505
See also
extract_file_content
Function that extracts the content and type of a file.
JobSpec
The class representing a job specification.
- nv_ingest_client.util.util.filter_function_kwargs(func, **kwargs)[source]#
Filters and returns keyword arguments that match the parameters of a given function.
This function inspects the signature of func and extracts any keyword arguments from kwargs that correspond to the function’s parameters. It returns a dictionary containing only those arguments that func accepts, allowing for safe, dynamic parameter passing.
- Parameters:
func (Callable) – The function whose parameters will be used to filter kwargs.
kwargs (dict) – A dictionary of keyword arguments, which may include extra keys not accepted by func.
- Returns:
A dictionary of keyword arguments filtered to include only those parameters accepted by func.
- Return type:
dict
Example
>>> def example_function(a, b, c): ... pass >>> filtered_kwargs = filter_function_kwargs(example_function, a={}, b={}, d={}) >>> print(filtered_kwargs) {'a': 1, 'b': 2}
- nv_ingest_client.util.util.generate_matching_files(file_sources)[source]#
Generates a list of file paths that match the given patterns specified in file_sources.
- Parameters:
file_sources (list of str) – A list containing the file source patterns to match against.
- Returns:
A generator yielding paths to files that match the specified patterns.
- Return type:
generator
Notes
This function utilizes glob pattern matching to find files that match the specified patterns. It yields each matching file path, allowing for efficient processing of potentially large sets of files.
- nv_ingest_client.util.util.get_content(results: List[any])[source]#
Extracts the text and table text content from the results of an NV-Ingest python client job
- Parameters:
results (List[Any]) – The results of NV-Ingest python client job that contains the desired text and table content
- Returns:
A dictionary containing the extracted text content and the extracted table content
- Return type:
Dict
- nv_ingest_client.util.util.load_data_from_path(path: str) Dict [source]#
Loads data from a specified file path, preparing it for processing.
- Parameters:
path (str) – The path to the file from which data should be loaded.
- Returns:
A dictionary containing keys ‘file_name’, ‘id’, ‘content’, and ‘document_type’, each of which maps to a list that includes the respective details for the processed file.
- Return type:
dict
- Raises:
FileNotFoundError – If the specified path does not exist.
ValueError – If the specified path is not a file.
Notes
This function is designed to load and prepare file data for further processing, packaging the loaded data along with metadata such as file name and document type.
nv_ingest_client.util.zipkin module#
- class nv_ingest_client.util.zipkin.AsyncZipkinClient(
- host: str,
- port: int,
- concurrent_requests: int,
- max_retries: int = 10,
- retry_delay: int = 5,
Bases:
object
- async fetch(
- sem,
- trace_id: str,
- url: str,
Perform a GET request to the given URL with retry logic for 404 status codes.
- Parameters:
url – URL to make the GET request to.
sem – Semaphore to ensure only X concurrent requests are in flight
trace_id – The trace_id for the request
url – The complete URL for the request.
- Returns:
Dict[str, str] Containing trace_id and JSON str response
- Raises:
RuntimeError if the maximum retries are exceeded.