nv_ingest_client.cli.util package#

Submodules#

nv_ingest_client.cli.util.click module#

class nv_ingest_client.cli.util.click.ClientType(value)[source]#

Bases: str, Enum

An enumeration.

KAFKA = 'KAFKA'#
REDIS = 'REDIS'#
REST = 'REST'#
class nv_ingest_client.cli.util.click.LogLevel(value)[source]#

Bases: str, Enum

An enumeration.

CRITICAL = 'CRITICAL'#
DEBUG = 'DEBUG'#
ERROR = 'ERROR'#
INFO = 'INFO'#
WARNING = 'WARNING'#
nv_ingest_client.cli.util.click.click_match_and_validate_files(ctx, param, value)[source]#

Matches and validates files based on the provided file source patterns.

Parameters:

value (list of str) – A list containing file source patterns to match against.

Returns:

A list of matching file paths if any matches are found; otherwise, None.

Return type:

list of str or None

nv_ingest_client.cli.util.click.click_validate_batch_size(ctx, param, value)[source]#
nv_ingest_client.cli.util.click.click_validate_file_exists(ctx, param, value)[source]#
nv_ingest_client.cli.util.click.click_validate_task(ctx, param, value)[source]#
nv_ingest_client.cli.util.click.debug_print_click_options(ctx)[source]#

Retrieves all options from the Click context and pretty prints them.

Parameters:

ctx (click.Context) – The Click context object from which to retrieve the command options.

nv_ingest_client.cli.util.click.pre_process_dataset(dataset_json: str, shuffle_dataset: bool)[source]#

Loads a dataset from a JSON file and optionally shuffles the list of files contained within.

Parameters:
  • dataset_json (str) – The path to the dataset JSON file.

  • shuffle_dataset (bool, optional) – Whether to shuffle the dataset before processing. Defaults to True.

Returns:

The list of files from the dataset, possibly shuffled.

Return type:

list

nv_ingest_client.cli.util.processing module#

nv_ingest_client.cli.util.processing.check_schema(
schema: Type[BaseModel],
options: dict,
task_id: str,
original_str: str,
) BaseModel[source]#
nv_ingest_client.cli.util.processing.create_and_process_jobs(
files: List[str],
client: NvIngestClient,
tasks: Dict[str, Any],
output_directory: str,
batch_size: int,
timeout: int = 10,
fail_on_error: bool = False,
save_images_separately: bool = False,
) Tuple[int, Dict[str, List[float]], int, Dict[str, str]][source]#

Process a list of files, creating and submitting jobs for each file, then fetch and handle the results.

This function creates job specifications (JobSpecs) for a list of files, submits the jobs to the client, and processes the results asynchronously. It handles job retries for timeouts, logs failures, and limits the number of JobSpecs in memory to batch_size * 2. Progress is reported on a per-file basis, including the pages processed per second.

Parameters:
  • files (List[str]) – A list of file paths to be processed. Each file is used to create a job, which is then submitted to the client.

  • client (NvIngestClient) – An instance of NvIngestClient used to submit jobs and fetch results asynchronously.

  • tasks (Dict[str, Any]) – A dictionary of tasks to be added to each job. The keys represent task names, and the values represent task configurations. Tasks may include “split”, “extract”, “store”, “caption”, etc.

  • output_directory (str) – The directory path where the processed job results will be saved. If an empty string or None is provided, results will not be saved.

  • batch_size (int) – The number of jobs to process in each batch. Memory is limited to batch_size * 2 jobs at any time.

  • timeout (int, optional) – The timeout in seconds for each job to complete before it is retried. Default is 10 seconds.

  • fail_on_error (bool, optional) – If True, the function will raise an error and stop processing when encountering an unrecoverable error. If False, the function logs the error and continues processing other jobs. Default is False.

Returns:

A tuple containing: - total_files (int): The total number of files processed. - trace_times (Dict[str, List[float]]): A dictionary mapping job IDs to a list of trace times for

diagnostic purposes.

  • total_pages_processed (int): The total number of pages processed from the files.

  • trace_ids (Dict[str, str]): A dictionary mapping a source file to its correlating trace_id

Return type:

Tuple[int, Dict[str, List[float]], int]

Raises:

RuntimeError – If fail_on_error is True and an error occurs during job submission or processing, a RuntimeError is raised.

Notes

  • The function limits the number of JobSpecs in memory to batch_size * 2 for efficient resource management.

  • It manages job retries for timeouts and logs decoding or processing errors.

  • The progress bar reports progress on a per-file basis and shows the pages processed per second.

Examples

Suppose we have a list of files and tasks to process:

>>> files = ["file1.txt", "file2.pdf", "file3.docx"]
>>> client = NvIngestClient()
>>> tasks = {"split": ..., "extract": ..., "store": ...}
>>> output_directory = "/path/to/output"
>>> batch_size = 5
>>> total_files, trace_times, total_pages_processed = create_and_process_jobs(
...     files, client, tasks, output_directory, batch_size
... )
>>> print(f"Processed {total_files} files, {total_pages_processed} pages.")

In this example, the function processes the files, submits jobs, fetches results, handles retries for timeouts, and logs failures. The number of files and pages processed is printed.

See also

generate_job_batch_for_iteration

Function to generate job batches for processing.

handle_future_result

Function to process and handle the result of completed future jobs.

nv_ingest_client.cli.util.processing.format_validation_error(
e: ValidationError,
task_id,
original_str: str,
) str[source]#

Formats validation errors with appropriate highlights and returns a detailed error message.

nv_ingest_client.cli.util.processing.generate_job_batch_for_iteration(
client: Any,
pbar: Any,
files: List[str],
tasks: Dict,
processed: int,
batch_size: int,
retry_job_ids: List[str],
fail_on_error: bool = False,
) Tuple[List[str], Dict[str, str], int][source]#

Generates a batch of job specifications for the current iteration of file processing. This function handles retrying failed jobs and creating new jobs for unprocessed files. The job specifications are then submitted for processing.

Parameters:
  • client (Any) – The client object used to submit jobs asynchronously.

  • pbar (Any) – The progress bar object to update the progress as jobs are processed.

  • files (List[str]) – The list of file paths to be processed.

  • tasks (Dict) – A dictionary of tasks to be executed as part of the job specifications.

  • processed (int) – The number of files that have been processed so far.

  • batch_size (int) – The maximum number of jobs to process in one batch.

  • retry_job_ids (List[str]) – A list of job IDs that need to be retried due to previous failures.

  • fail_on_error (bool, optional) – Whether to raise an error and stop processing if job specifications are missing, by default False.

Returns:

A tuple containing: - job_ids (List[str]): The list of job IDs created or retried in this iteration. - job_id_map_updates (Dict[str, str]): A dictionary mapping job IDs to their corresponding file names. - processed (int): The updated number of files processed.

Return type:

Tuple[List[str], Dict[str, str], int]

Raises:

RuntimeError – If fail_on_error is True and there are missing job specifications, a RuntimeError is raised.

nv_ingest_client.cli.util.processing.get_valid_filename(name)[source]#

Taken from django/django. Return the given string converted to a string that can be used for a clean filename. Remove leading and trailing spaces; convert other spaces to underscores; and remove anything that is not an alphanumeric, dash, underscore, or dot. >>> get_valid_filename(“john’s portrait in 2004.jpg”) ‘johns_portrait_in_2004.jpg’

nv_ingest_client.cli.util.processing.highlight_error_in_original(
original_str: str,
task_name: str,
error_detail: Dict[str, Any],
) str[source]#

Highlights the error-causing text in the original JSON string based on the error type.

This function identifies errors in the original JSON string and highlights the specific part of the string responsible for the error. It handles two main types of errors: - For ‘extra fields’ errors, it highlights the extra field that caused the issue. - For ‘missing fields’ errors, it appends a clear message indicating the missing field.

Parameters:
  • original_str (str) – The original JSON string that caused the error. The function will modify and return this string with the problematic fields highlighted.

  • task_name (str) – The name of the task associated with the error. This is included in the error message when highlighting missing fields.

  • error_detail (Dict[str, Any]) – A dictionary containing details about the error. The dictionary should contain the following keys: - ‘type’ (str): The type of error (e.g., “value_error.extra”, “value_error.missing”). - ‘loc’ (List[Any]): A list of keys/indices indicating the location of the error in the JSON structure.

Returns:

The original string with the error-causing field highlighted. If the error is related to extra fields, the problematic field is highlighted in blue and bold. If the error is due to missing fields, a message indicating the missing field is appended to the string.

Return type:

str

Notes

  • The function uses the style method (assumed to be defined elsewhere, likely from a terminal or text formatting library) to apply color and bold formatting to the highlighted text.

  • The ‘loc’ key in error_detail is a list that represents the path to the error-causing field in the JSON.

Examples

Suppose there is an error in the original JSON string due to an extra field:

>>> original_str = '{"name": "file1.txt", "extra_field": "some_value"}'
>>> task_name = "validate_document"
>>> error_detail = {
...     "type": "value_error.extra",
...     "loc": ["extra_field"]
... }
>>> highlighted_str = highlight_error_in_original(original_str, task_name, error_detail)
>>> print(highlighted_str)
{"name": "file1.txt", "extra_field": "some_value"}  # The 'extra_field' will be highlighted

In this case, the function will highlight the extra_field in blue and bold in the returned string.

See also

style

Function used to apply formatting to the error-causing text (e.g., coloring or bolding).

nv_ingest_client.cli.util.processing.organize_documents_by_type(
response_data: List[Dict[str, Any]],
) Dict[str, List[Dict[str, Any]]][source]#

Organize documents by their content type.

This function takes a list of response documents, extracts the content type from each document’s metadata, and organizes the documents into a dictionary, where the keys are content types and the values are lists of documents belonging to each type.

Parameters:

response_data (List[Dict[str, Any]]) – A list of documents, where each document is represented as a dictionary. Each dictionary must contain a ‘metadata’ field that may be either a JSON string or a dictionary. The metadata is expected to have a “content_metadata” field containing the document’s type.

Returns:

A dictionary mapping document types (as strings) to lists of documents. Each key represents a document type, and the associated value is a list of documents that belong to that type.

Return type:

Dict[str, List[Dict[str, Any]]]

Notes

  • The ‘metadata’ field of each document can be either a JSON string or a dictionary. If it is a string, it will be parsed into a dictionary.

  • The function assumes that each document has a valid “content_metadata” field, which contains a “type” key that indicates the document’s type.

  • If a document type is encountered for the first time, a new entry is created in the result dictionary, and subsequent documents of the same type are added to the corresponding list.

Examples

Suppose response_data contains the following structure:

>>> response_data = [
...     {"metadata": {"content_metadata": {"type": "report"}}},
...     {"metadata": '{"content_metadata": {"type": "summary"}}'},
...     {"metadata": {"content_metadata": {"type": "report"}}}
... ]
>>> organize_documents_by_type(response_data)
{'report': [{'metadata': {'content_metadata': {'type': 'report'}}},
            {'metadata': {'content_metadata': {'type': 'report'}}}],
 'summary': [{'metadata': {'content_metadata': {'type': 'summary'}}}]}

In this example, the documents are organized into two types: “report” and “summary”.

See also

json.loads

Used to parse metadata if it is a JSON string.

nv_ingest_client.cli.util.processing.process_response(response, stage_elapsed_times)[source]#

Process the response to extract trace data and calculate elapsed time for each stage.

Parameters:
  • response (dict) – The response dictionary containing trace information for processing stages.

  • stage_elapsed_times (defaultdict(list)) – A defaultdict to accumulate elapsed times for each processing stage.

Notes

The function iterates over trace data in the response, identifying entry and exit times for each stage, and calculates the elapsed time which is then appended to the respective stage in stage_elapsed_times.

nv_ingest_client.cli.util.processing.report_overall_speed(
total_pages_processed: int,
start_time_ns: int,
total_files: int,
) None[source]#

Reports the overall processing speed based on the number of pages and files processed.

Parameters:
  • total_pages_processed (int) – The total number of pages processed.

  • start_time_ns (int) – The nanosecond timestamp marking the start of processing.

  • total_files (int) – The total number of files processed.

Notes

This function calculates the total elapsed time from the start of processing and reports the throughput in terms of pages and files processed per second.

nv_ingest_client.cli.util.processing.report_stage_statistics(
stage_elapsed_times: defaultdict,
total_trace_elapsed: float,
abs_elapsed: float,
) None[source]#

Reports the statistics for each processing stage, including average, median, total time spent, and their respective percentages of the total processing time.

Parameters:
  • stage_elapsed_times (defaultdict(list)) – A defaultdict containing lists of elapsed times for each processing stage, in nanoseconds.

  • total_trace_elapsed (float) – The total elapsed time across all processing stages, in nanoseconds.

  • abs_elapsed (float) – The absolute elapsed time from the start to the end of processing, in nanoseconds.

Notes

This function logs the average, median, and total time for each stage, along with the percentage of total computation. It also calculates and logs the unresolved time, if any, that is not accounted for by the recorded stages.

nv_ingest_client.cli.util.processing.report_statistics(
start_time_ns: int,
stage_elapsed_times: defaultdict,
total_pages_processed: int,
total_files: int,
) None[source]#

Aggregates and reports statistics for the entire processing session.

Parameters:
  • start_time_ns (int) – The nanosecond timestamp marking the start of the processing.

  • stage_elapsed_times (defaultdict(list)) – A defaultdict where each key is a processing stage and each value is a list of elapsed times in nanoseconds for that stage.

  • total_pages_processed (int) – The total number of pages processed during the session.

  • total_files (int) – The total number of files processed during the session.

Notes

This function calculates the absolute elapsed time from the start of processing to the current time and the total time taken by all stages.

nv_ingest_client.cli.util.processing.save_response_data(response, output_directory, images_to_disk=False)[source]#

Save the response data into categorized metadata JSON files and optionally save images to disk.

This function processes the response data, organizes it based on document types, and saves the organized data into a specified output directory as JSON files. If ‘images_to_disk’ is True and the document type is ‘image’, it decodes and writes base64 encoded images to disk.

Parameters:
  • response (dict) – A dictionary containing the API response data. It must contain a “data” field, which is expected to be a list of document entries. Each document entry should contain metadata, which includes information about the document’s source.

  • output_directory (str) – The path to the directory where the JSON metadata files should be saved. Subdirectories will be created based on the document types, and the metadata files will be stored within these subdirectories.

  • images_to_disk (bool, optional) – If True, base64 encoded images in the ‘metadata.content’ field will be decoded and saved to disk.

Returns:

This function does not return any values. It writes output to the filesystem.

Return type:

None

Notes

  • If ‘images_to_disk’ is True and ‘doc_type’ is ‘image’, images will be decoded and saved to the disk with appropriate file types based on ‘metadata.image_metadata.image_type’.

nv_ingest_client.cli.util.system module#

nv_ingest_client.cli.util.system.configure_logging(logger, log_level: str)[source]#

Configures the logging level based on a log_level string.

Parameters:

log_level (str) – The logging level as a string, expected to be one of ‘DEBUG’, ‘INFO’, ‘WARNING’, ‘ERROR’, ‘CRITICAL’.

nv_ingest_client.cli.util.system.ensure_directory_with_permissions(directory_path: str)[source]#

Ensures that a directory exists and the current user has read/write permissions. If the directory does not exist, attempts to create it after checking the parent directory for write permission.

Parameters:

directory_path (str) – The path to the directory to check or create.

Returns:

True if the directory exists and has the correct permissions, or if it was successfully created. False if the directory cannot be created or does not have the correct permissions.

Return type:

bool

nv_ingest_client.cli.util.system.has_permissions(
path: str,
read: bool = False,
write: bool = False,
) bool[source]#

Checks if the current user has specified permissions on a path.

Parameters:
  • path (str) – The filesystem path to check permissions on.

  • read (bool, optional) – Whether to check for read permission.

  • write (bool, optional) – Whether to check for write permission.

Returns:

True if the path has the specified permissions, False otherwise.

Return type:

bool

nv_ingest_client.cli.util.tasks module#

Module contents#