nv_ingest_client.cli.util package#
Submodules#
nv_ingest_client.cli.util.click module#
- class nv_ingest_client.cli.util.click.ClientType(value)[source]#
Bases:
str
,Enum
Enum for specifying client types.
- REST#
Represents a REST client.
- Type:
str
- REDIS#
Represents a Redis client.
- Type:
str
- KAFKA#
Represents a Kafka client.
- Type:
str
- KAFKA = 'KAFKA'#
- REDIS = 'REDIS'#
- REST = 'REST'#
- class nv_ingest_client.cli.util.click.LogLevel(value)[source]#
Bases:
str
,Enum
Enum for specifying logging levels.
- DEBUG#
Debug logging level.
- Type:
str
- INFO#
Informational logging level.
- Type:
str
- WARNING#
Warning logging level.
- Type:
str
- ERROR#
Error logging level.
- Type:
str
- CRITICAL#
Critical logging level.
- Type:
str
- CRITICAL = 'CRITICAL'#
- DEBUG = 'DEBUG'#
- ERROR = 'ERROR'#
- INFO = 'INFO'#
- WARNING = 'WARNING'#
- nv_ingest_client.cli.util.click.click_match_and_validate_files(
- ctx: Context,
- param: Parameter,
- value: List[str],
Matches and validates files based on the provided file source patterns.
- Parameters:
ctx (click.Context) – The Click context.
param (click.Parameter) – The parameter associated with the file matching option.
value (List[str]) – A list of file source patterns to match against.
- Returns:
A list of matching file paths. If no files match, an empty list is returned.
- Return type:
List[str]
- nv_ingest_client.cli.util.click.click_validate_batch_size(
- ctx: Context,
- param: Parameter,
- value: int,
Validates that the batch size is at least 1.
- Parameters:
ctx (click.Context) – The Click context.
param (click.Parameter) – The parameter associated with the batch size option.
value (int) – The batch size value provided.
- Returns:
The validated batch size.
- Return type:
int
- Raises:
click.BadParameter – If the batch size is less than 1.
- nv_ingest_client.cli.util.click.click_validate_file_exists(
- ctx: Context,
- param: Parameter,
- value: str | List[str] | None,
Validates that the given file(s) exist.
- Parameters:
ctx (click.Context) – The Click context.
param (click.Parameter) – The parameter associated with the file option.
value (Union[str, List[str], None]) – A file path or a list of file paths.
- Returns:
A list of validated file paths.
- Return type:
List[str]
- Raises:
click.BadParameter – If any file path does not exist.
- nv_ingest_client.cli.util.click.click_validate_task(
- ctx: Context,
- param: Parameter,
- value: List[str],
Validates and processes task definitions provided as strings.
Each task definition should be in the format “<task_id>:<json_options>”. If the separator ‘:’ is missing, an empty JSON options dictionary is assumed. The function uses a schema check (via check_schema) for validation and instantiates the corresponding task.
- Parameters:
ctx (click.Context) – The Click context.
param (click.Parameter) – The parameter associated with the task option.
value (List[str]) – A list of task strings to validate.
- Returns:
A dictionary mapping task IDs to their corresponding task objects.
- Return type:
Dict[str, TaskType]
- Raises:
click.BadParameter – If any task fails validation (including malformed JSON) or if duplicate tasks are detected.
- nv_ingest_client.cli.util.click.debug_print_click_options(ctx: Context) None [source]#
Retrieves all options from the Click context and pretty prints them.
- Parameters:
ctx (click.Context) – The Click context object from which to retrieve the command options.
- nv_ingest_client.cli.util.click.parse_task_options(
- task_id: str,
- options_str: str,
Parse the task options string as JSON.
- Parameters:
task_id (str) – The identifier of the task for which options are being parsed.
options_str (str) – The string containing JSON options.
- Returns:
The parsed options as a dictionary.
- Return type:
Dict[str, Any]
- Raises:
ValueError – If the JSON string is not well formatted. The error message will indicate the task, the error details (e.g., expected property format), and show the input that was provided.
- nv_ingest_client.cli.util.click.pre_process_dataset(
- dataset_json: str,
- shuffle_dataset: bool,
Loads a dataset from a JSON file and optionally shuffles the list of files.
- Parameters:
dataset_json (str) – The path to the dataset JSON file.
shuffle_dataset (bool) – Whether to shuffle the dataset before processing.
- Returns:
The list of file paths from the dataset. If ‘shuffle_dataset’ is True, the list will be shuffled.
- Return type:
List[str]
- Raises:
click.BadParameter – If the dataset file is not found or if its contents are not valid JSON.
nv_ingest_client.cli.util.processing module#
- nv_ingest_client.cli.util.processing.create_and_process_jobs(
- files: List[str],
- client: Any,
- tasks: Dict[str, Any],
- output_directory: str,
- batch_size: int,
- fail_on_error: bool = False,
- save_images_separately: bool = False,
Process a list of files by creating and submitting jobs for each file, then fetching and handling the results asynchronously.
This function creates job specifications (JobSpecs) for the provided list of files, submits the jobs to the client, and processes the results asynchronously. It handles job retries for timeouts, logs failures, and limits the number of JobSpecs in memory to batch_size * 2. Progress is reported on a per-file basis, including the pages processed per second.
- Parameters:
files (List[str]) – A list of file paths to be processed. Each file is used to create a job which is then submitted to the client.
client (Any) – An instance of NvIngestClient used to submit jobs and fetch results asynchronously.
tasks (Dict[str, Any]) – A dictionary of tasks to be added to each job. The keys represent task names (e.g., “split”, “extract”, “store”, “caption”, etc.) and the values represent task configurations.
output_directory (str) – The directory path where the processed job results will be saved. If an empty string or None is provided, results will not be saved.
batch_size (int) – The number of jobs to process in each batch. Memory is limited to batch_size * 2 jobs at any time.
fail_on_error (bool, optional) – If True, the function will raise an error and stop processing when encountering an unrecoverable error. If False, the function logs the error and continues processing other jobs. Default is False.
save_images_separately (bool, optional) – If True, images will be saved separately to disk. Default is False.
- Returns:
A tuple containing: - total_files (int): The total number of files processed. - trace_times (Dict[str, List[float]]): A dictionary mapping job IDs to a list of trace times
for diagnostic purposes.
total_pages_processed (int): The total number of pages processed from the files.
trace_ids (Dict[str, str]): A dictionary mapping a source file to its correlating trace_id.
- Return type:
Tuple[int, Dict[str, List[float]], int, Dict[str, str]]
- Raises:
RuntimeError – If fail_on_error is True and an error occurs during job submission or processing.
- nv_ingest_client.cli.util.processing.generate_job_batch_for_iteration(
- client: Any,
- pbar: Any,
- files: List[str],
- tasks: Dict[str, Any],
- processed: int,
- batch_size: int,
- retry_job_ids: List[str],
- fail_on_error: bool = False,
Generates a batch of job specifications for the current iteration of file processing. This function handles retrying failed jobs and creating new jobs for unprocessed files. The job specifications are then submitted for processing.
- Parameters:
client (Any) – The client object used to submit jobs asynchronously.
pbar (Any) – The progress bar object used to update the progress as jobs are processed.
files (List[str]) – The list of file paths to be processed.
tasks (Dict[str, Any]) – A dictionary of tasks to be executed as part of the job specifications.
processed (int) – The number of files that have been processed so far.
batch_size (int) – The maximum number of jobs to process in one batch.
retry_job_ids (List[str]) – A list of job IDs that need to be retried due to previous failures.
fail_on_error (bool, optional) – Whether to raise an error and stop processing if job specifications are missing. Default is False.
- Returns:
A tuple containing: - job_ids (List[str]): The list of job IDs created or retried in this iteration. - job_id_map_updates (Dict[str, str]): A dictionary mapping job IDs to their corresponding file names. - processed (int): The updated number of files processed.
- Return type:
Tuple[List[str], Dict[str, str], int]
- Raises:
RuntimeError – If fail_on_error is True and there are missing job specifications, a RuntimeError is raised.
- nv_ingest_client.cli.util.processing.get_valid_filename(name: Any) str [source]#
Return a sanitized version of the given filename.
This function, adapted from Django (django/django), converts the input string to a form that is safe to use as a filename. It trims leading and trailing spaces, replaces remaining spaces with underscores, and removes any characters that are not alphanumeric, dashes, underscores, or dots.
- Parameters:
name (Any) – The input value to be converted into a valid filename. It will be converted to a string.
- Returns:
A sanitized string that can be used as a filename.
- Return type:
str
- Raises:
ValueError – If a valid filename cannot be derived from the input.
Examples
>>> get_valid_filename("john's portrait in 2004.jpg") 'johns_portrait_in_2004.jpg'
- nv_ingest_client.cli.util.processing.organize_documents_by_type(
- response_data: List[Dict[str, Any]],
Organize documents by their content type.
This function takes a list of response documents, extracts the content type from each document’s metadata, and organizes the documents into a dictionary, where the keys are content types and the values are lists of documents belonging to each type.
- Parameters:
response_data (List[Dict[str, Any]]) – A list of documents, where each document is represented as a dictionary. Each dictionary must contain a ‘metadata’ field that may be either a JSON string or a dictionary. The metadata is expected to have a “content_metadata” field containing the document’s type.
- Returns:
A dictionary mapping document types (as strings) to lists of documents. Each key represents a document type, and the associated value is a list of documents that belong to that type.
- Return type:
Dict[str, List[Dict[str, Any]]]
Notes
If the ‘metadata’ field of a document is a string, it is parsed into a dictionary using json.loads.
The function assumes that each document’s metadata has a valid “content_metadata” field with a “type” key.
Documents are grouped by the value of the “type” key in their “content_metadata”.
Examples
>>> response_data = [ ... {"metadata": {"content_metadata": {"type": "report"}}}, ... {"metadata": '{"content_metadata": {"type": "summary"}}'}, ... {"metadata": {"content_metadata": {"type": "report"}}} ... ] >>> organize_documents_by_type(response_data) {'report': [{'metadata': {'content_metadata': {'type': 'report'}}}, {'metadata': {'content_metadata': {'type': 'report'}}}], 'summary': [{'metadata': {'content_metadata': {'type': 'summary'}}}]}
- nv_ingest_client.cli.util.processing.process_response(
- response: Dict[str, Any],
- stage_elapsed_times: defaultdict,
Process the response to extract trace data and calculate elapsed time for each stage.
This function iterates over trace data in the response, identifies entry and exit times for each stage, calculates the elapsed time, and appends the elapsed time to the corresponding stage in the provided stage_elapsed_times dictionary.
- Parameters:
response (Dict[str, Any]) – The response dictionary containing trace information for processing stages.
stage_elapsed_times (defaultdict) – A defaultdict where keys are stage names (str) and values are lists of elapsed times (int, in nanoseconds).
Notes
The function expects trace keys to include “entry” and “exit” substrings. For each entry key, the corresponding exit key is determined by replacing “entry” with “exit”. The stage name is assumed to be the third element when splitting the key by “::”.
- nv_ingest_client.cli.util.processing.report_overall_speed(
- total_pages_processed: int,
- start_time_ns: int,
- total_files: int,
Report the overall processing speed based on the number of pages and files processed.
This function calculates the total elapsed time from the start of processing and reports the throughput in terms of pages and files processed per second.
- Parameters:
total_pages_processed (int) – The total number of pages processed.
start_time_ns (int) – The nanosecond timestamp marking the start of processing.
total_files (int) – The total number of files processed.
Notes
The function converts the elapsed time from nanoseconds to seconds and logs the overall throughput.
- nv_ingest_client.cli.util.processing.report_stage_statistics(
- stage_elapsed_times: defaultdict,
- total_trace_elapsed: float,
- abs_elapsed: float,
Reports the statistics for each processing stage, including average, median, total time spent, and their respective percentages of the total processing time.
- Parameters:
stage_elapsed_times (defaultdict(list)) – A defaultdict containing lists of elapsed times for each processing stage, in nanoseconds.
total_trace_elapsed (float) – The total elapsed time across all processing stages, in nanoseconds.
abs_elapsed (float) – The absolute elapsed time from the start to the end of processing, in nanoseconds.
Notes
This function logs the average, median, and total time for each stage, along with the percentage of total computation. It also calculates and logs the unresolved time, if any, that is not accounted for by the recorded stages.
- nv_ingest_client.cli.util.processing.report_statistics(
- start_time_ns: int,
- stage_elapsed_times: defaultdict,
- total_pages_processed: int,
- total_files: int,
Aggregate and report statistics for the entire processing session.
This function calculates the absolute elapsed time from the start of processing to the current time and the total time taken by all stages. It then reports detailed stage statistics along with overall processing throughput.
- Parameters:
start_time_ns (int) – The nanosecond timestamp marking the start of the processing.
stage_elapsed_times (defaultdict) – A defaultdict where each key is a processing stage (str) and each value is a list of elapsed times (int, in nanoseconds) for that stage.
total_pages_processed (int) – The total number of pages processed during the session.
total_files (int) – The total number of files processed during the session.
Notes
The function calls report_stage_statistics to log detailed timing information per stage, then calls report_overall_speed to log the overall throughput.
- nv_ingest_client.cli.util.processing.save_response_data(
- response: Dict[str, Any],
- output_directory: str,
- images_to_disk: bool = False,
Save the response data into categorized metadata JSON files and optionally save images to disk.
This function processes the response data, organizes it based on document types, and saves the organized data into a specified output directory as JSON files. If ‘images_to_disk’ is True and the document type is ‘image’, it decodes and writes base64 encoded images to disk.
- Parameters:
response (Dict[str, Any]) – A dictionary containing the API response data. It must contain a “data” field, which is expected to be a list of document entries. Each document entry should contain metadata, which includes information about the document’s source.
output_directory (str) – The path to the directory where the JSON metadata files should be saved. Subdirectories will be created based on the document types, and the metadata files will be stored within these subdirectories.
images_to_disk (bool, optional) – If True, base64 encoded images in the ‘metadata.content’ field will be decoded and saved to disk. Default is False.
- Returns:
This function does not return any values. It writes output to the filesystem.
- Return type:
None
Notes
If ‘images_to_disk’ is True and ‘doc_type’ is ‘image’, images will be decoded and saved to disk with appropriate file types based on ‘metadata.image_metadata.image_type’.
nv_ingest_client.cli.util.system module#
- nv_ingest_client.cli.util.system.configure_logging(logger, log_level: str)[source]#
Configures the logging level based on a log_level string.
- Parameters:
logger (logging.Logger) – The logger to configure.
log_level (str) – The logging level as a string, expected to be one of ‘DEBUG’, ‘INFO’, ‘WARNING’, ‘ERROR’, ‘CRITICAL’.
- nv_ingest_client.cli.util.system.ensure_directory_with_permissions(directory_path: str)[source]#
Ensures that a directory exists and the current user has read/write permissions. If the directory does not exist, attempts to create it after checking the parent directory for write permission.
- Parameters:
directory_path (str) – The path to the directory to check or create.
- Returns:
True if the directory exists and has the correct permissions, or if it was successfully created. False if the directory cannot be created or does not have the correct permissions.
- Return type:
bool
- nv_ingest_client.cli.util.system.has_permissions(
- path: str,
- read: bool = False,
- write: bool = False,
Checks if the current user has specified permissions on a path.
- Parameters:
path (str) – The filesystem path to check permissions on.
read (bool, optional) – Whether to check for read permission.
write (bool, optional) – Whether to check for write permission.
- Returns:
True if the path has the specified permissions, False otherwise.
- Return type:
bool