nv_ingest_client.client package#
Submodules#
nv_ingest_client.client.client module#
- exception nv_ingest_client.client.client.DataDecodeException(message='Data decoding error', data=None)[source]#
Bases:
Exception
Exception raised for errors in decoding data.
- message -- explanation of the error
- data -- the data that failed to decode, optionally
- class nv_ingest_client.client.client.NvIngestClient(
- message_client_allocator: ~typing.Type[~nv_ingest_client.message_clients.client_base.MessageBrokerClientBase] = <class 'nv_ingest_client.message_clients.rest.rest_client.RestClient'>,
- message_client_hostname: str | None = 'localhost',
- message_client_port: int | None = 7670,
- message_client_kwargs: ~typing.Dict | None = None,
- msg_counter_id: str | None = 'nv-ingest-message-id',
- worker_pool_size: int = 1,
Bases:
object
A client class for interacting with the nv-ingest service, supporting custom client allocators.
- add_job(
- job_spec: BatchJobSpec | JobSpec,
- create_job(
- payload: str,
- source_id: str,
- source_name: str,
- document_type: str | None = None,
- tasks: list | None = None,
- extended_options: dict | None = None,
Creates a new job with the specified parameters and adds it to the job tracking dictionary.
- Parameters:
job_id (uuid.UUID, optional) – The unique identifier for the job. If not provided, a new UUID will be generated.
payload (dict) – The payload associated with the job. Defaults to an empty dictionary if not provided.
tasks (list, optional) – A list of tasks to be associated with the job.
document_type (str) – The type of document to be processed.
source_id (str) – The source identifier for the job.
source_name (str) – The unique name of the job’s source data.
extended_options (dict, optional) – Additional options for job creation.
- Returns:
The job ID as a string.
- Return type:
str
- Raises:
ValueError – If a job with the specified job_id already exists.
- create_jobs_for_batch(
- files_batch: List[str],
- tasks: Dict[str, Any],
Create and submit job specifications (JobSpecs) for a batch of files, returning the job IDs. This function takes a batch of files, processes each file to extract its content and type, creates a job specification (JobSpec) for each file, and adds tasks from the provided task list. It then submits the jobs to the client and collects their job IDs.
- Parameters:
files_batch (List[str]) – A list of file paths to be processed. Each file is assumed to be in a format compatible with the extract_file_content function, which extracts the file’s content and type.
tasks (Dict[str, Any]) – A dictionary of tasks to be added to each job. The keys represent task names, and the values represent task specifications or configurations. Standard tasks include “split”, “extract”, “store”, “caption”, “dedup”, “filter”, “embed”.
- Returns:
A Tuple containing the list of JobSpecs and list of job IDs corresponding to the submitted jobs. Each job ID is returned by the client’s add_job method.
- Return type:
Tuple[List[JobSpec], List[str]]
- Raises:
ValueError – If there is an error extracting the file content or type from any of the files, a ValueError will be logged, and the corresponding file will be skipped.
Notes
The function assumes that a utility function extract_file_content is defined elsewhere, which extracts the content and type from the provided file paths.
For each file, a JobSpec is created with relevant metadata, including document type and file content. Various tasks are conditionally added based on the provided tasks dictionary.
The job specification includes tracing options with a timestamp (in nanoseconds) for diagnostic purposes.
Examples
Suppose you have a batch of files and tasks to process: >>> files_batch = [“file1.txt”, “file2.pdf”] >>> tasks = {“split”: …, “extract_txt”: …, “store”: …} >>> client = NvIngestClient() >>> job_ids = client.create_job_specs_for_batch(files_batch, tasks) >>> print(job_ids) [‘job_12345’, ‘job_67890’]
In this example, jobs are created and submitted for the files in files_batch, with the tasks in tasks being added to each job specification. The returned job IDs are then printed.
See also
create_job_specs_for_batch
Function that creates job specifications for a batch of files.
JobSpec
The class representing a job specification.
- create_task(
- job_index: str | int,
- task_type: TaskType,
- task_params: dict | None = None,
Creates a task of the specified type with given parameters and associates it with the existing job.
- Parameters:
job_index (Union[str, int]) – The unique identifier of the job to which the task will be added. This can be either a string or an integer.
task_type (TaskType) – The type of the task to be created, defined as an enum value.
task_params (dict) – A dictionary containing parameters for the task.
- Raises:
ValueError – If the job with the specified job_id does not exist or if an attempt is made to modify a job after its submission.
- fetch_job_result(
- job_ids: str | List[str],
- timeout: float = 100,
- max_retries: int | None = None,
- retry_delay: float = 1,
- verbose: bool = False,
- completion_callback: Callable[[Dict, str], None] | None = None,
- return_failures: bool = False,
Fetches job results for multiple job IDs concurrently with individual timeouts and retry logic.
- Parameters:
job_ids (Union[str, List[str]]) – A job ID or list of job IDs to fetch results for.
timeout (float) – Timeout for each fetch operation, in seconds.
max_retries (Optional[int]) – Maximum number of retries for jobs that are not ready yet.
retry_delay (float) – Delay between retry attempts, in seconds.
verbose (bool) – If True, logs additional information.
completion_callback (Optional[Callable[[Dict, str], None]]) – A callback function that is executed each time a job result is successfully fetched. It receives two arguments: the job result (a dict) and the job ID.
return_failures (bool) – If True, returns a separate list of failed jobs.
- Returns:
- List[Tuple[Optional[Dict], str]]
A list of tuples, each containing the job result (or None on failure) and the job ID.
If return_failures=True: Tuple[List[Tuple[Optional[Dict], str]], List[Tuple[str, str]]] - A tuple of:
List of successful job results.
List of failures containing job ID and error message.
- Return type:
If return_failures=False
- Raises:
ValueError – If there is an error in decoding the job result.
TimeoutError – If the fetch operation times out.
Exception – For all other unexpected issues.
- fetch_job_result_async(
- job_ids: str | List[str],
- timeout: float = 10,
- data_only: bool = True,
Fetches job results for a list or a single job ID asynchronously and returns a mapping of futures to job IDs.
- Parameters:
job_ids (Union[str, List[str]]) – A single job ID or a list of job IDs.
timeout (float) – Timeout for fetching each job result, in seconds.
data_only (bool) – Whether to return only the data part of the job result.
- Returns:
A dictionary mapping each future to its corresponding job ID.
- Return type:
Dict[Future, str]
- fetch_job_result_cli(
- job_ids: str | List[str],
- timeout: float = 100,
- data_only: bool = True,
- submit_job(
- job_indices: str | List[str],
- job_queue_id: str,
- batch_size: int = 10,
- submit_job_async(
- job_indices: str | List[str],
- job_queue_id: str,
Asynchronously submits one or more jobs to a specified job queue using a thread pool. This method handles both single job ID or a list of job IDs.
- Parameters:
job_indices (Union[str, List[str]]) – A single job ID or a list of job IDs to be submitted.
job_queue_id (str) – The ID of the job queue where the jobs will be submitted.
- Returns:
A dictionary mapping futures to their respective job IDs for later retrieval of outcomes.
- Return type:
Dict[Future, str]
Notes
This method queues the jobs for asynchronous submission and returns a mapping of futures to job IDs.
It does not wait for any of the jobs to complete.
Ensure that each job is in the proper state before submission.
nv_ingest_client.client.interface module#
- class nv_ingest_client.client.interface.Ingestor(
- documents: List[str] | None = None,
- client: NvIngestClient | None = None,
- job_queue_id: str = 'morpheus_task_queue',
- **kwargs,
Bases:
object
Ingestor provides an interface for building, managing, and running data ingestion jobs through NvIngestClient, allowing for chainable task additions and job state tracking.
- Parameters:
documents (List[str]) – List of document paths to be processed.
client (Optional[NvIngestClient], optional) – An instance of NvIngestClient. If not provided, a client is created.
job_queue_id (str, optional) – The ID of the job queue for job submission, default is “morpheus_task_queue”.
- all_tasks() Ingestor [source]#
Adds a default set of tasks to the batch job specification.
The default tasks include extracting text, tables, charts, images, deduplication, filtering, splitting, and embedding tasks.
- Returns:
Returns self for chaining.
- Return type:
- cancelled_jobs() int [source]#
Counts the jobs that have been cancelled.
- Returns:
Number of jobs in the CANCELLED state.
- Return type:
int
- caption(
- **kwargs: Any,
Adds a CaptionTask to the batch job specification.
- Parameters:
kwargs (dict) – Parameters specific to the CaptionTask.
- Returns:
Returns self for chaining.
- Return type:
- completed_jobs() int [source]#
Counts the jobs that have completed successfully.
- Returns:
Number of jobs in the COMPLETED state.
- Return type:
int
- dedup(
- **kwargs: Any,
Adds a DedupTask to the batch job specification.
- Parameters:
kwargs (dict) – Parameters specific to the DedupTask.
- Returns:
Returns self for chaining.
- Return type:
- embed(
- **kwargs: Any,
Adds an EmbedTask to the batch job specification.
- Parameters:
kwargs (dict) – Parameters specific to the EmbedTask.
- Returns:
Returns self for chaining.
- Return type:
- extract(
- **kwargs: Any,
Adds an ExtractTask for each document type to the batch job specification.
- Parameters:
kwargs (dict) – Parameters specific to the ExtractTask.
- Returns:
Returns self for chaining.
- Return type:
- failed_jobs() int [source]#
Counts the jobs that have failed.
- Returns:
Number of jobs in the FAILED state.
- Return type:
int
- files(
- documents: str | List[str],
Add documents to the manager for processing and check if they are all local.
- Parameters:
documents (List[str]) – A list of document paths or patterns to be processed.
- Returns:
Returns self for chaining. If all specified documents are local, _job_specs is initialized, and _all_local is set to True.
- Return type:
- filter(
- **kwargs: Any,
Adds a FilterTask to the batch job specification.
- Parameters:
kwargs (dict) – Parameters specific to the FilterTask.
- Returns:
Returns self for chaining.
- Return type:
- ingest(
- show_progress: bool = False,
- return_failures=False,
- **kwargs: Any,
Synchronously submits jobs to the NvIngestClient and fetches the results.
- Parameters:
kwargs (dict) – Additional parameters for submit_job and fetch_job_result methods of NvIngestClient. Optionally, include ‘show_progress’ (bool) to display a progress bar while fetching results.
- Returns:
Result of each job after execution.
- Return type:
List[Dict]
- ingest_async(
- **kwargs: Any,
Asynchronously submits jobs and returns a single future that completes when all jobs have finished.
- Parameters:
kwargs (dict) – Additional parameters for the submit_job_async method.
- Returns:
A future that completes when all submitted jobs have reached a terminal state.
- Return type:
Future
- load(**kwargs) Ingestor [source]#
Ensure all document files are accessible locally, downloading if necessary.
For each document in _documents, checks if the file exists locally. If not, attempts to download the file to a temporary directory using fsspec. Updates _documents with paths to local copies, initializes _job_specs, and sets _all_local to True upon successful loading.
- Parameters:
kwargs (dict) – Additional keyword arguments for remote file access via fsspec.
- Returns:
Returns self for chaining after ensuring all files are accessible locally.
- Return type:
- remaining_jobs() int [source]#
Counts the jobs that are not in a terminal state.
- Returns:
Number of jobs that are neither completed, failed, nor cancelled.
- Return type:
int
- split(
- **kwargs: Any,
Adds a SplitTask to the batch job specification.
- Parameters:
kwargs (dict) – Parameters specific to the SplitTask.
- Returns:
Returns self for chaining.
- Return type:
- store(
- **kwargs: Any,
Adds a StoreTask to the batch job specification.
- Parameters:
kwargs (dict) – Parameters specific to the StoreTask.
- Returns:
Returns self for chaining.
- Return type:
Module contents#
- class nv_ingest_client.client.Ingestor(
- documents: List[str] | None = None,
- client: NvIngestClient | None = None,
- job_queue_id: str = 'morpheus_task_queue',
- **kwargs,
Bases:
object
Ingestor provides an interface for building, managing, and running data ingestion jobs through NvIngestClient, allowing for chainable task additions and job state tracking.
- Parameters:
documents (List[str]) – List of document paths to be processed.
client (Optional[NvIngestClient], optional) – An instance of NvIngestClient. If not provided, a client is created.
job_queue_id (str, optional) – The ID of the job queue for job submission, default is “morpheus_task_queue”.
- all_tasks() Ingestor [source]#
Adds a default set of tasks to the batch job specification.
The default tasks include extracting text, tables, charts, images, deduplication, filtering, splitting, and embedding tasks.
- Returns:
Returns self for chaining.
- Return type:
- cancelled_jobs() int [source]#
Counts the jobs that have been cancelled.
- Returns:
Number of jobs in the CANCELLED state.
- Return type:
int
- caption(
- **kwargs: Any,
Adds a CaptionTask to the batch job specification.
- Parameters:
kwargs (dict) – Parameters specific to the CaptionTask.
- Returns:
Returns self for chaining.
- Return type:
- completed_jobs() int [source]#
Counts the jobs that have completed successfully.
- Returns:
Number of jobs in the COMPLETED state.
- Return type:
int
- dedup(
- **kwargs: Any,
Adds a DedupTask to the batch job specification.
- Parameters:
kwargs (dict) – Parameters specific to the DedupTask.
- Returns:
Returns self for chaining.
- Return type:
- embed(
- **kwargs: Any,
Adds an EmbedTask to the batch job specification.
- Parameters:
kwargs (dict) – Parameters specific to the EmbedTask.
- Returns:
Returns self for chaining.
- Return type:
- extract(
- **kwargs: Any,
Adds an ExtractTask for each document type to the batch job specification.
- Parameters:
kwargs (dict) – Parameters specific to the ExtractTask.
- Returns:
Returns self for chaining.
- Return type:
- failed_jobs() int [source]#
Counts the jobs that have failed.
- Returns:
Number of jobs in the FAILED state.
- Return type:
int
- files(
- documents: str | List[str],
Add documents to the manager for processing and check if they are all local.
- Parameters:
documents (List[str]) – A list of document paths or patterns to be processed.
- Returns:
Returns self for chaining. If all specified documents are local, _job_specs is initialized, and _all_local is set to True.
- Return type:
- filter(
- **kwargs: Any,
Adds a FilterTask to the batch job specification.
- Parameters:
kwargs (dict) – Parameters specific to the FilterTask.
- Returns:
Returns self for chaining.
- Return type:
- ingest(
- show_progress: bool = False,
- return_failures=False,
- **kwargs: Any,
Synchronously submits jobs to the NvIngestClient and fetches the results.
- Parameters:
kwargs (dict) – Additional parameters for submit_job and fetch_job_result methods of NvIngestClient. Optionally, include ‘show_progress’ (bool) to display a progress bar while fetching results.
- Returns:
Result of each job after execution.
- Return type:
List[Dict]
- ingest_async(
- **kwargs: Any,
Asynchronously submits jobs and returns a single future that completes when all jobs have finished.
- Parameters:
kwargs (dict) – Additional parameters for the submit_job_async method.
- Returns:
A future that completes when all submitted jobs have reached a terminal state.
- Return type:
Future
- load(**kwargs) Ingestor [source]#
Ensure all document files are accessible locally, downloading if necessary.
For each document in _documents, checks if the file exists locally. If not, attempts to download the file to a temporary directory using fsspec. Updates _documents with paths to local copies, initializes _job_specs, and sets _all_local to True upon successful loading.
- Parameters:
kwargs (dict) – Additional keyword arguments for remote file access via fsspec.
- Returns:
Returns self for chaining after ensuring all files are accessible locally.
- Return type:
- remaining_jobs() int [source]#
Counts the jobs that are not in a terminal state.
- Returns:
Number of jobs that are neither completed, failed, nor cancelled.
- Return type:
int
- split(
- **kwargs: Any,
Adds a SplitTask to the batch job specification.
- Parameters:
kwargs (dict) – Parameters specific to the SplitTask.
- Returns:
Returns self for chaining.
- Return type:
- store(
- **kwargs: Any,
Adds a StoreTask to the batch job specification.
- Parameters:
kwargs (dict) – Parameters specific to the StoreTask.
- Returns:
Returns self for chaining.
- Return type:
- class nv_ingest_client.client.NvIngestClient(
- message_client_allocator: ~typing.Type[~nv_ingest_client.message_clients.client_base.MessageBrokerClientBase] = <class 'nv_ingest_client.message_clients.rest.rest_client.RestClient'>,
- message_client_hostname: str | None = 'localhost',
- message_client_port: int | None = 7670,
- message_client_kwargs: ~typing.Dict | None = None,
- msg_counter_id: str | None = 'nv-ingest-message-id',
- worker_pool_size: int = 1,
Bases:
object
A client class for interacting with the nv-ingest service, supporting custom client allocators.
- add_job(
- job_spec: BatchJobSpec | JobSpec,
- create_job(
- payload: str,
- source_id: str,
- source_name: str,
- document_type: str | None = None,
- tasks: list | None = None,
- extended_options: dict | None = None,
Creates a new job with the specified parameters and adds it to the job tracking dictionary.
- Parameters:
job_id (uuid.UUID, optional) – The unique identifier for the job. If not provided, a new UUID will be generated.
payload (dict) – The payload associated with the job. Defaults to an empty dictionary if not provided.
tasks (list, optional) – A list of tasks to be associated with the job.
document_type (str) – The type of document to be processed.
source_id (str) – The source identifier for the job.
source_name (str) – The unique name of the job’s source data.
extended_options (dict, optional) – Additional options for job creation.
- Returns:
The job ID as a string.
- Return type:
str
- Raises:
ValueError – If a job with the specified job_id already exists.
- create_jobs_for_batch(
- files_batch: List[str],
- tasks: Dict[str, Any],
Create and submit job specifications (JobSpecs) for a batch of files, returning the job IDs. This function takes a batch of files, processes each file to extract its content and type, creates a job specification (JobSpec) for each file, and adds tasks from the provided task list. It then submits the jobs to the client and collects their job IDs.
- Parameters:
files_batch (List[str]) – A list of file paths to be processed. Each file is assumed to be in a format compatible with the extract_file_content function, which extracts the file’s content and type.
tasks (Dict[str, Any]) – A dictionary of tasks to be added to each job. The keys represent task names, and the values represent task specifications or configurations. Standard tasks include “split”, “extract”, “store”, “caption”, “dedup”, “filter”, “embed”.
- Returns:
A Tuple containing the list of JobSpecs and list of job IDs corresponding to the submitted jobs. Each job ID is returned by the client’s add_job method.
- Return type:
Tuple[List[JobSpec], List[str]]
- Raises:
ValueError – If there is an error extracting the file content or type from any of the files, a ValueError will be logged, and the corresponding file will be skipped.
Notes
The function assumes that a utility function extract_file_content is defined elsewhere, which extracts the content and type from the provided file paths.
For each file, a JobSpec is created with relevant metadata, including document type and file content. Various tasks are conditionally added based on the provided tasks dictionary.
The job specification includes tracing options with a timestamp (in nanoseconds) for diagnostic purposes.
Examples
Suppose you have a batch of files and tasks to process: >>> files_batch = [“file1.txt”, “file2.pdf”] >>> tasks = {“split”: …, “extract_txt”: …, “store”: …} >>> client = NvIngestClient() >>> job_ids = client.create_job_specs_for_batch(files_batch, tasks) >>> print(job_ids) [‘job_12345’, ‘job_67890’]
In this example, jobs are created and submitted for the files in files_batch, with the tasks in tasks being added to each job specification. The returned job IDs are then printed.
See also
create_job_specs_for_batch
Function that creates job specifications for a batch of files.
JobSpec
The class representing a job specification.
- create_task(
- job_index: str | int,
- task_type: TaskType,
- task_params: dict | None = None,
Creates a task of the specified type with given parameters and associates it with the existing job.
- Parameters:
job_index (Union[str, int]) – The unique identifier of the job to which the task will be added. This can be either a string or an integer.
task_type (TaskType) – The type of the task to be created, defined as an enum value.
task_params (dict) – A dictionary containing parameters for the task.
- Raises:
ValueError – If the job with the specified job_id does not exist or if an attempt is made to modify a job after its submission.
- fetch_job_result(
- job_ids: str | List[str],
- timeout: float = 100,
- max_retries: int | None = None,
- retry_delay: float = 1,
- verbose: bool = False,
- completion_callback: Callable[[Dict, str], None] | None = None,
- return_failures: bool = False,
Fetches job results for multiple job IDs concurrently with individual timeouts and retry logic.
- Parameters:
job_ids (Union[str, List[str]]) – A job ID or list of job IDs to fetch results for.
timeout (float) – Timeout for each fetch operation, in seconds.
max_retries (Optional[int]) – Maximum number of retries for jobs that are not ready yet.
retry_delay (float) – Delay between retry attempts, in seconds.
verbose (bool) – If True, logs additional information.
completion_callback (Optional[Callable[[Dict, str], None]]) – A callback function that is executed each time a job result is successfully fetched. It receives two arguments: the job result (a dict) and the job ID.
return_failures (bool) – If True, returns a separate list of failed jobs.
- Returns:
- List[Tuple[Optional[Dict], str]]
A list of tuples, each containing the job result (or None on failure) and the job ID.
If return_failures=True: Tuple[List[Tuple[Optional[Dict], str]], List[Tuple[str, str]]] - A tuple of:
List of successful job results.
List of failures containing job ID and error message.
- Return type:
If return_failures=False
- Raises:
ValueError – If there is an error in decoding the job result.
TimeoutError – If the fetch operation times out.
Exception – For all other unexpected issues.
- fetch_job_result_async(
- job_ids: str | List[str],
- timeout: float = 10,
- data_only: bool = True,
Fetches job results for a list or a single job ID asynchronously and returns a mapping of futures to job IDs.
- Parameters:
job_ids (Union[str, List[str]]) – A single job ID or a list of job IDs.
timeout (float) – Timeout for fetching each job result, in seconds.
data_only (bool) – Whether to return only the data part of the job result.
- Returns:
A dictionary mapping each future to its corresponding job ID.
- Return type:
Dict[Future, str]
- fetch_job_result_cli(
- job_ids: str | List[str],
- timeout: float = 100,
- data_only: bool = True,
- submit_job(
- job_indices: str | List[str],
- job_queue_id: str,
- batch_size: int = 10,
- submit_job_async(
- job_indices: str | List[str],
- job_queue_id: str,
Asynchronously submits one or more jobs to a specified job queue using a thread pool. This method handles both single job ID or a list of job IDs.
- Parameters:
job_indices (Union[str, List[str]]) – A single job ID or a list of job IDs to be submitted.
job_queue_id (str) – The ID of the job queue where the jobs will be submitted.
- Returns:
A dictionary mapping futures to their respective job IDs for later retrieval of outcomes.
- Return type:
Dict[Future, str]
Notes
This method queues the jobs for asynchronous submission and returns a mapping of futures to job IDs.
It does not wait for any of the jobs to complete.
Ensure that each job is in the proper state before submission.