nv_ingest_client.util package#
Subpackages#
Submodules#
nv_ingest_client.util.dataset module#
- nv_ingest_client.util.dataset.get_dataset_files(
- dataset_bytes: BytesIO,
- shuffle: bool = False,
Extracts and optionally shuffles the list of files contained in a dataset.
- Parameters:
dataset_bytes (BytesIO) – The BytesIO object containing the dataset in JSON format.
shuffle (bool, optional) – Whether to shuffle the list of files before returning. Defaults to False.
- Returns:
The list of files from the dataset, possibly shuffled.
- Return type:
list
- nv_ingest_client.util.dataset.get_dataset_statistics(dataset_bytes: BytesIO) str [source]#
Reads a dataset specification from a BytesIO object, computes statistics about the dataset, and returns a formatted string.
- Parameters:
dataset_bytes (BytesIO) – The BytesIO object containing the dataset in JSON format.
- Returns:
A formatted string containing statistics about the dataset.
- Return type:
str
nv_ingest_client.util.milvus module#
nv_ingest_client.util.process_json_files module#
nv_ingest_client.util.processing module#
- exception nv_ingest_client.util.processing.IngestJobFailure(
- message: str,
- description: str,
- annotations: Dict[str, Any],
Bases:
Exception
Custom exception to handle failed job ingestion results.
- nv_ingest_client.util.processing.check_schema(
- schema: Type[BaseModel],
- options: dict,
- task_id: str,
- original_str: str,
Validates the provided options against the given schema and returns a schema instance.
- Parameters:
schema (Type[BaseModel]) – A Pydantic model class used for validating the options.
options (dict) – The options dictionary to validate.
task_id (str) – The identifier of the task associated with the options.
original_str (str) – The original JSON string representation of the options.
- Returns:
An instance of the validated schema populated with the provided options.
- Return type:
BaseModel
- Raises:
ValueError – If validation fails, a ValueError is raised with a formatted error message highlighting the problematic parts of the original JSON string.
- nv_ingest_client.util.processing.format_validation_error(
- e: ValidationError,
- task_id: str,
- original_str: str,
Formats validation errors with appropriate highlights and returns a detailed error message.
- Parameters:
e (ValidationError) – The validation error raised by the schema.
task_id (str) – The identifier of the task for which the error occurred.
original_str (str) – The original JSON string that caused the validation error.
- Returns:
A detailed error message with highlighted problematic fields.
- Return type:
str
- nv_ingest_client.util.processing.handle_future_result(
- future: Future,
- timeout=10,
Handle the result of a completed future job and process annotations.
This function processes the result of a future, extracts annotations (if any), logs them, and checks the validity of the ingest result. If the result indicates a failure, a RuntimeError is raised with a description of the failure.
- Parameters:
future (concurrent.futures.Future) – A future object representing an asynchronous job. The result of this job will be processed once it completes.
timeout (Optional[int], default=None) – Maximum time to wait for the future result before timing out.
- Returns:
The result of the job as a dictionary, after processing and validation.
The trace_id returned by the submission endpoint
- Return type:
Tuple[Dict[str, Any], str]
- Raises:
IngestJobFailure – If the job result is invalid, this exception is raised with the failure description and the full result for further inspection.
Exception – For all other unexpected errors.
Notes
The future.result() is assumed to return a tuple where the first element is the actual result (as a dictionary), and the second element (if present) can be ignored.
Annotations in the result (if any) are logged for debugging purposes.
The check_ingest_result function (assumed to be defined elsewhere) is used to validate the result. If the result is invalid, a RuntimeError is raised.
Examples
Suppose we have a future object representing a job, a dictionary of futures to job IDs, and a directory for saving results:
>>> future = concurrent.futures.Future() >>> result, trace_id = handle_future_result(future, timeout=60)
In this example, the function processes the completed job and returns the result dictionary. If the job fails, it raises a RuntimeError.
See also
check_ingest_result
Function to validate the result of the job.
- nv_ingest_client.util.processing.highlight_error_in_original(
- original_str: str,
- task_name: str,
- error_detail: Dict[str, Any],
Highlights the error-causing text in the original JSON string based on the error type.
This function identifies the problematic portion of the JSON string by inspecting the provided error details. For errors due to extra fields, it highlights the extra field (using blue and bold formatting). For errors due to missing fields or insufficient string length, it appends a clear message indicating the missing field and its location.
- Parameters:
original_str (str) – The original JSON string that caused the error. This string will be modified to highlight the problematic field.
task_name (str) – The name of the task associated with the error. This is used in the error message when highlighting missing fields.
error_detail (Dict[str, Any]) –
A dictionary containing details about the error. Expected keys are: - ‘type’ (str): The type of error (e.g., “value_error.extra”, “value_error.missing”,
”value_error.any_str.min_length”).
’loc’ (List[Any]): A list representing the path to the error-causing field in the JSON structure.
- Returns:
The modified JSON string with the error-causing field highlighted or a message appended indicating the missing field.
- Return type:
str
Notes
The function uses the style function to apply formatting to the error-causing text.
If the error detail does not include the expected keys, a fallback is used and the original string is returned.
nv_ingest_client.util.util module#
- nv_ingest_client.util.util.check_ingest_result(
- json_payload: Dict,
Check the ingest result to determine if the process failed and extract a description.
This function examines the provided JSON payload to check whether the ingest operation has failed. If it has failed and failure annotations are present, the function augments the failure description with information about the specific event that caused the failure.
- Parameters:
json_payload (Dict) –
A dictionary containing the result of the ingest operation. The dictionary should have at least the following fields: - ‘status’ (str): The status of the ingest operation. A status of “failed” indicates
a failure.
’description’ (str): A textual description of the result.
’annotations’ (Dict): (optional) A dictionary of annotations, where each annotation may contain details about the failure.
- Returns:
A tuple containing: - A boolean indicating whether the ingest operation failed (True if it failed,
False otherwise).
A string containing the description of the result or the failure, augmented with details from the annotations if available.
- Return type:
Tuple[bool, str]
Notes
The function logs the ‘status’ and ‘description’ fields from the payload for debugging.
If the ‘status’ field contains “failed” and the ‘annotations’ field contains entries indicating failure, the function updates the description with the annotation details.
The function breaks after processing the first relevant annotation.
Examples
Suppose the JSON payload contains the following structure:
>>> json_payload = { ... "status": "failed", ... "description": "Ingest operation failed", ... "annotations": { ... "task1": {"task_result": "FAILURE", "message": "Network error"}, ... "task2": {"task_result": "SUCCESS"} ... } ... } >>> is_failed, description = check_ingest_result(json_payload) >>> print(is_failed) True >>> print(description) ↪ Event that caused this failure: task1 -> Network error
In this example, the function detects a failure and augments the description with the message from the failing task.
- nv_ingest_client.util.util.count_pages_for_documents(
- file_path: str,
- document_type: DocumentTypeEnum,
- nv_ingest_client.util.util.count_pages_for_text(file_path: str) int [source]#
Estimates the page count for text files based on word count, using the detect_encoding_and_read_text_file function for reading.
- nv_ingest_client.util.util.create_job_specs_for_batch(
- files_batch: List[str],
Create and job specifications (JobSpecs) for a batch of files. This function takes a batch of files, processes each file to extract its content and type, creates a job specification (JobSpec) for each file.
- Parameters:
files_batch (List[str]) – A list of file paths to be processed. Each file is assumed to be in a format compatible with the extract_file_content function, which extracts the file’s content and type.
- Returns:
A list of JobSpecs.
- Return type:
List[JobSpec]
- Raises:
ValueError – If there is an error extracting the file content or type from any of the files, a ValueError will be logged, and the corresponding file will be skipped.
Notes
The function assumes that a utility function extract_file_content is defined elsewhere, which extracts the content and type from the provided file paths.
For each file, a JobSpec is created with relevant metadata, including document type and file content.
The job specification includes tracing options with a timestamp (in nanoseconds) for diagnostic purposes.
Examples
Suppose you have a batch of files and tasks to process:
>>> files_batch = ["file1.txt", "file2.pdf"] >>> client = NvIngestClient() >>> job_specs = create_job_specs_for_batch(files_batch) >>> print(job_specs) [nv_ingest_client.primitives.jobs.job_spec.JobSpec object at 0x743acb468bb0>, <nv_ingest_client.primitives.jobs.job_spec.JobSpec object at 0x743acb469270>] # noqa: E501,W505
See also
extract_file_content
Function that extracts the content and type of a file.
JobSpec
The class representing a job specification.
- nv_ingest_client.util.util.filter_function_kwargs(func, **kwargs)[source]#
Filters and returns keyword arguments that match the parameters of a given function.
This function inspects the signature of func and extracts any keyword arguments from kwargs that correspond to the function’s parameters. It returns a dictionary containing only those arguments that func accepts, allowing for safe, dynamic parameter passing.
- Parameters:
func (Callable) – The function whose parameters will be used to filter kwargs.
kwargs (dict) – A dictionary of keyword arguments, which may include extra keys not accepted by func.
- Returns:
A dictionary of keyword arguments filtered to include only those parameters accepted by func.
- Return type:
dict
Example
>>> def example_function(a, b, c): ... pass >>> filtered_kwargs = filter_function_kwargs(example_function, a={}, b={}, d={}) >>> print(filtered_kwargs) {'a': 1, 'b': 2}
- nv_ingest_client.util.util.generate_matching_files(file_sources)[source]#
Generates a list of file paths that match the given patterns specified in file_sources.
- Parameters:
file_sources (list of str) – A list containing the file source patterns to match against.
- Returns:
A generator yielding paths to files that match the specified patterns.
- Return type:
generator
Notes
This function utilizes glob pattern matching to find files that match the specified patterns. It yields each matching file path, allowing for efficient processing of potentially large sets of files.
- nv_ingest_client.util.util.get_content(results: List[any])[source]#
Extracts the text and table text content from the results of an NV-Ingest python client job
- Parameters:
results (List[Any]) – The results of NV-Ingest python client job that contains the desired text and table content
- Returns:
A dictionary containing the extracted text content and the extracted table content
- Return type:
Dict
- nv_ingest_client.util.util.load_data_from_path(path: str) Dict [source]#
Loads data from a specified file path, preparing it for processing.
- Parameters:
path (str) – The path to the file from which data should be loaded.
- Returns:
A dictionary containing keys ‘file_name’, ‘id’, ‘content’, and ‘document_type’, each of which maps to a list that includes the respective details for the processed file.
- Return type:
dict
- Raises:
FileNotFoundError – If the specified path does not exist.
ValueError – If the specified path is not a file.
Notes
This function is designed to load and prepare file data for further processing, packaging the loaded data along with metadata such as file name and document type.
nv_ingest_client.util.zipkin module#
- class nv_ingest_client.util.zipkin.AsyncZipkinClient(
- host: str,
- port: int,
- concurrent_requests: int,
- max_retries: int = 10,
- retry_delay: int = 5,
Bases:
object
- async fetch(
- sem,
- trace_id: str,
- url: str,
Perform a GET request to the given URL with retry logic for 404 status codes.
- Parameters:
url – URL to make the GET request to.
sem – Semaphore to ensure only X concurrent requests are in flight
trace_id – The trace_id for the request
url – The complete URL for the request.
- Returns:
Dict[str, str] Containing trace_id and JSON str response
- Raises:
RuntimeError if the maximum retries are exceeded.