Source code for nv_ingest_client.primitives.jobs.job_spec

# SPDX-FileCopyrightText: Copyright (c) 2024, NVIDIA CORPORATION & AFFILIATES.
# All rights reserved.
# SPDX-License-Identifier: Apache-2.0


import logging
from collections import defaultdict
from io import BytesIO
from typing import Dict
from typing import List
from typing import Optional
from typing import Union
from uuid import UUID

from nv_ingest_client.primitives.tasks import Task
from nv_ingest_client.primitives.tasks import ExtractTask
from nv_ingest_client.primitives.tasks.audio_extraction import AudioExtractionTask
from nv_ingest_client.primitives.tasks.table_extraction import TableExtractionTask
from nv_ingest_client.primitives.tasks.chart_extraction import ChartExtractionTask
from nv_ingest_client.primitives.tasks.infographic_extraction import InfographicExtractionTask
from nv_ingest_client.util.dataset import get_dataset_files
from nv_ingest_client.util.dataset import get_dataset_statistics

logger = logging.getLogger(__name__)


[docs] class JobSpec: """ Specification for creating a job for submission to the nv-ingest microservice. Parameters ---------- payload : Dict The payload data for the job. tasks : Optional[List], optional A list of tasks to be added to the job, by default None. source_id : Optional[str], optional An identifier for the source of the job, by default None. job_id : Optional[UUID], optional A unique identifier for the job, by default a new UUID is generated. extended_options : Optional[Dict], optional Additional options for job processing, by default None. Attributes ---------- _payload : Dict Storage for the payload data. _tasks : List Storage for the list of tasks. _source_id : str Storage for the source identifier. _job_id : UUID Storage for the job's unique identifier. _extended_options : Dict Storage for the additional options. Methods ------- to_dict() -> Dict: Converts the job specification to a dictionary. add_task(task): Adds a task to the job specification. """ def __init__( self, payload: str = None, tasks: Optional[List] = None, source_id: Optional[str] = None, source_name: Optional[str] = None, document_type: Optional[str] = None, extended_options: Optional[Dict] = None, ) -> None: self._document_type = document_type or "txt" self._extended_options = extended_options or {} self._job_id = None self._payload = payload self._source_id = source_id self._source_name = source_name self._tasks = tasks or [] def __str__(self) -> str: task_info = "\n".join(str(task) for task in self._tasks) return ( f"source-id: {self._source_id}\n" f"source-name: {self._source_name}\n" f"document-type: {self._document_type}\n" f"task count: {len(self._tasks)}\n" f"payload: {'<*** ' + str(len(self._payload)) + ' ***>' if self._payload else 'Empty'}\n" f"extended-options: {self._extended_options}\n" f"{task_info}" )
[docs] def to_dict(self) -> Dict: """ Converts the job specification instance into a dictionary suitable for JSON serialization. Returns ------- Dict A dictionary representation of the job specification. """ return { "job_payload": { "source_name": [self._source_name], "source_id": [self._source_id], "content": [self._payload], "document_type": [self._document_type], }, "job_id": str(self._job_id), "tasks": [task.to_dict() for task in self._tasks], "tracing_options": self._extended_options.get("tracing_options", {}), }
@property def payload(self) -> Dict: return self._payload @payload.setter def payload(self, payload: Dict) -> None: self._payload = payload @property def job_id(self) -> UUID: return self._job_id @job_id.setter def job_id(self, job_id: UUID) -> None: self._job_id = job_id @property def source_id(self) -> str: return self._source_id @source_id.setter def source_id(self, source_id: str) -> None: self._source_id = source_id @property def source_name(self) -> str: return self._source_name @source_name.setter def source_name(self, source_name: str) -> None: self._source_name = source_name @property def document_type(self) -> str: return self._document_type
[docs] def add_task(self, task) -> None: """ Adds a task to the job specification. Parameters ---------- task The task to add to the job specification. Assumes the task has a to_dict method. Raises ------ ValueError If the task does not have a to_dict method. """ if not isinstance(task, Task): raise ValueError("Task must derive from nv_ingest_client.primitives.Task class") self._tasks.append(task) if isinstance(task, ExtractTask) and (task._extract_tables is True): self._tasks.append(TableExtractionTask()) if isinstance(task, ExtractTask) and (task._extract_charts is True): self._tasks.append(ChartExtractionTask()) if isinstance(task, ExtractTask) and (task._extract_infographics is True): self._tasks.append(InfographicExtractionTask()) if isinstance(task, ExtractTask) and (task._extract_method == "audio"): extract_audio_params = task._extract_audio_params or {} self._tasks.append(AudioExtractionTask(**extract_audio_params))
[docs] class BatchJobSpec: """ A class used to represent a batch of job specifications (JobSpecs). This class allows for batch processing of multiple jobs, either from a list of `JobSpec` instances or from file paths. It provides methods for adding job specifications, associating tasks with those specifications, and serializing the batch to a dictionary format. Attributes ---------- _file_type_to_job_spec : defaultdict A dictionary that maps document types to a list of `JobSpec` instances. """ def __init__(self, job_specs_or_files: Optional[Union[List[JobSpec], List[str]]] = None) -> None: """ Initializes the BatchJobSpec instance. Parameters ---------- job_specs_or_files : Optional[Union[List[JobSpec], List[str]]], optional A list of either `JobSpec` instances or file paths. If provided, the instance will be initialized with the corresponding job specifications. """ self._file_type_to_job_spec = defaultdict(list) if job_specs_or_files: if isinstance(job_specs_or_files[0], JobSpec): self.from_job_specs(job_specs_or_files) elif isinstance(job_specs_or_files[0], str): self.from_files(job_specs_or_files) else: raise ValueError("Invalid input type for job_specs. Must be a list of JobSpec or file paths.")
[docs] def from_job_specs(self, job_specs: Union[JobSpec, List[JobSpec]]) -> None: """ Initializes the batch with a list of `JobSpec` instances. Parameters ---------- job_specs : Union[JobSpec, List[JobSpec]] A single `JobSpec` or a list of `JobSpec` instances to add to the batch. """ if isinstance(job_specs, JobSpec): job_specs = [job_specs] for job_spec in job_specs: self.add_job_spec(job_spec)
[docs] def from_files(self, files: Union[str, List[str]]) -> None: """ Initializes the batch by generating job specifications from file paths. Parameters ---------- files : Union[str, List[str]] A single file path or a list of file paths to create job specifications from. """ from nv_ingest_client.util.util import create_job_specs_for_batch from nv_ingest_client.util.util import generate_matching_files if isinstance(files, str): files = [files] matching_files = list(generate_matching_files(files)) if not matching_files: logger.warning(f"No files found matching {files}.") return job_specs = create_job_specs_for_batch(matching_files) for job_spec in job_specs: self.add_job_spec(job_spec)
def _from_dataset(self, dataset: str, shuffle_dataset: bool = True) -> None: """ Internal method to initialize the batch from a dataset. Parameters ---------- dataset : str The path to the dataset file. shuffle_dataset : bool, optional Whether to shuffle the dataset files before adding them to the batch, by default True. """ with open(dataset, "rb") as file: dataset_bytes = BytesIO(file.read()) if logger.isEnabledFor(logging.DEBUG): logger.debug(get_dataset_statistics(dataset_bytes)) dataset_files = get_dataset_files(dataset_bytes, shuffle_dataset) self.from_files(dataset_files)
[docs] @classmethod def from_dataset(cls, dataset: str, shuffle_dataset: bool = True): """ Class method to create a `BatchJobSpec` instance from a dataset. Parameters ---------- dataset : str The path to the dataset file. shuffle_dataset : bool, optional Whether to shuffle the dataset files before adding them to the batch, by default True. Returns ------- BatchJobSpec A new instance of `BatchJobSpec` initialized with the dataset files. """ batch_job_spec = cls() batch_job_spec._from_dataset(dataset, shuffle_dataset=shuffle_dataset) return batch_job_spec
[docs] def add_job_spec(self, job_spec: JobSpec) -> None: """ Adds a `JobSpec` to the batch. Parameters ---------- job_spec : JobSpec The job specification to add. """ self._file_type_to_job_spec[job_spec.document_type].append(job_spec)
[docs] def add_task(self, task, document_type=None): """ Adds a task to the relevant job specifications in the batch. If a `document_type` is provided, the task will be added to all job specifications matching that document type. If no `document_type` is provided, the task will be added to all job specifications in the batch. Parameters ---------- task : Task The task to add. Must derive from the `nv_ingest_client.primitives.Task` class. document_type : str, optional The document type used to filter job specifications. If not provided, the `document_type` is inferred from the task, or the task is applied to all job specifications. Raises ------ ValueError If the task does not derive from the `Task` class. """ if not isinstance(task, Task): raise ValueError("Task must derive from nv_ingest_client.primitives.Task class") document_type = document_type or task.to_dict().get("document_type") if document_type: target_job_specs = self._file_type_to_job_spec[document_type] else: target_job_specs = [] for job_specs in self._file_type_to_job_spec.values(): target_job_specs.extend(job_specs) for job_spec in target_job_specs: job_spec.add_task(task)
[docs] def to_dict(self) -> Dict[str, List[Dict]]: """ Serializes the batch of job specifications into a list of dictionaries. Returns ------- List[Dict] A list of dictionaries representing the job specifications in the batch. """ return { file_type: [j.to_dict() for j in job_specs] for file_type, job_specs in self._file_type_to_job_spec.items() }
def __str__(self) -> str: """ Returns a string representation of the batch. Returns ------- str A string representation of the job specifications in the batch. """ result = "" for file_type, job_specs in self._file_type_to_job_spec.items(): result += f"{file_type}\n" for job_spec in job_specs: result += str(job_spec) + "\n" return result @property def job_specs(self) -> Dict[str, List[str]]: """ A property that returns a dictionary of job specs categorized by document type. Returns ------- Dict[str, List[str]] A dictionary mapping document types to job specifications. """ return self._file_type_to_job_spec @property def file_types(self) -> List[str]: """ Returns the list of unique file types present in the batch. This property retrieves the document types currently stored in the batch's job specifications. Returns ------- List[str] A list of document types for the jobs in the batch. """ return list(self._file_type_to_job_spec.keys()) @property def tasks(self) -> Dict[str, List[Task]]: """ Adds a task to the relevant job specifications in the batch. If a `document_type` is provided, the task will be added to all job specifications matching that document type. If no `document_type` is provided, the task will be added to all job specifications in the batch. Parameters ---------- task : Task The task to add. Must derive from the `nv_ingest_client.primitives.Task` class. document_type : str, optional The document type used to filter job specifications. If not provided, the `document_type` is inferred from the task, or the task is applied to all job specifications. """ all_tasks = {} for file_type, job_specs in self._file_type_to_job_spec.items(): if not job_specs: continue # All job specs under the same file type should have the same tasks. tasks = job_specs[0]._tasks if not tasks: continue all_tasks[file_type] = tasks return all_tasks