nv_ingest.stages package#
Subpackages#
- nv_ingest.stages.embeddings package
- nv_ingest.stages.extractors package
- nv_ingest.stages.filters package
- nv_ingest.stages.nim package
- nv_ingest.stages.storages package
- nv_ingest.stages.transforms package
Submodules#
nv_ingest.stages.docx_extractor_stage module#
- nv_ingest.stages.docx_extractor_stage.decode_and_extract(
- base64_row,
- task_props,
- validated_config: Any,
- trace_info: Dict,
- default='python_docx',
Decodes base64 content from a row and extracts data from it using the specified extraction method.
- Parameters:
base64_row (pd.Series) – A Series containing the base64-encoded content and other relevant data. The key “content” should contain the base64 string, and the key “source_id” is optional.
task_props (dict or BaseModel) – A dictionary (or a BaseModel instance) containing instructions and parameters for extraction.
validated_config (Any) – Configuration object that contains docx_extraction_config.
trace_info (dict) – Dictionary containing trace information.
default (str, optional) – The default extraction method to use if the specified method is not available (default is “python_docx”).
- Returns:
The extracted data, or an exception tag if extraction fails.
- Return type:
Any
- Raises:
Exception – For any unhandled exception during extraction, an error is logged and a tagged error is returned.
- nv_ingest.stages.docx_extractor_stage.generate_docx_extractor_stage(
- c: Config,
- extractor_config: dict,
- task: str = 'docx-extract',
- task_desc: str = 'docx_content_extractor',
- pe_count: int = 1,
Helper function to generate a multiprocessing stage to perform document content extraction.
- Parameters:
c (Config) – Morpheus global configuration object.
extractor_config (dict) – Configuration parameters for document content extractor.
task (str) – The task name to match for the stage worker function.
task_desc (str) – A descriptor to be used in latency tracing.
pe_count (int) – The number of process engines to use for document content extraction.
- Returns:
A Morpheus stage with the applied worker function.
- Return type:
- Raises:
Exception – If an error occurs during stage generation.
nv_ingest.stages.multiprocessing_stage module#
- class nv_ingest.stages.multiprocessing_stage.MultiProcessingBaseStage(
- c: Config,
- task: str,
- task_desc: str,
- pe_count: int,
- process_fn: Callable[[DataFrame, dict], DataFrame],
- document_type: List[str] | str = None,
- filter_properties: dict = None,
Bases:
SinglePortStage
A IngestControlMessage-oriented base multiprocessing stage to increase parallelism of stages written in Python.
- Parameters:
c (Config) – Morpheus global configuration object.
task (str) – The task name to match for the stage worker function.
task_desc (str) – A descriptor to be used in latency tracing.
pe_count (int) – The number of process engines to use.
process_fn (Callable[[pd.DataFrame, dict], pd.DataFrame]) – The function that will be executed in each process engine. The function will accept a pandas DataFrame from a IngestControlMessage payload and a dictionary of task arguments.
- Returns:
A cuDF DataFrame containing the processed results.
- Return type:
cudf.DataFrame
Notes
The data flows through this class in the following way:
Input Stream Termination: The input stream is terminated by storing off the IngestControlMessage to a
ledger. This acts as a record for the incoming message.
Work Queue: The core work content of the IngestControlMessage is pushed to a work queue. This queue forwards the task to a global multi-process worker pool where the heavy-lifting occurs.
Global Worker Pool: The work is executed in parallel across multiple process engines via the worker pool. Each process engine applies the process_fn to the task data, which includes a pandas DataFrame and task-specific arguments.
Response Queue: After the work is completed by the worker pool, the results are pushed into a response queue.
- Post-Processing and Emission: The results from the response queue are post-processed, reconstructed into
their original format, and emitted from an observable source for further downstream processing or final output.
This design enhances parallelism and resource utilization across multiple processes, especially for tasks that involve heavy computations, such as large DataFrame operations.
- accepted_types() Tuple [source]#
Accepted input types for this stage are returned. Derived classes should override this method. An error will be generated if the input types to the stage do not match one of the available types returned from this method.
- Returns:
Accepted input types.
- Return type:
tuple
- compute_schema(
- schema: StageSchema,
Compute the schema for this stage based on the incoming schema from upstream stages.
Incoming schema and type information from upstream stages is available via the schema.input_schemas and schema.input_types properties.
Derived classes need to override this method, can set the output type(s) on schema by calling set_type for all output ports. For example a simple pass-thru stage might perform the following:
>>> for (port_idx, port_schema) in enumerate(schema.input_schemas): ... schema.output_schemas[port_idx].set_type(port_schema.get_type()) >>>
If the port types in upstream_schema are incompatible the stage should raise a RuntimeError.
- property document_type: str#
- async join()[source]#
Stops all running threads and processes gracefully, waits for all threads to complete, and calls the parent class’s join method.
Notes
This method sets the cancellation token to True to signal all running threads to stop. It then joins all threads to ensure they have completed execution before calling the parent class’s join method.
- property name: str#
The name of the stage. Used in logging. Each derived class should override this property with a unique name.
- Returns:
Name of a stage.
- Return type:
str
- observable_fn(
- obs: Observable,
- sub: Subscriber,
Sets up the observable pipeline to receive and process IngestControlMessage objects.
- Parameters:
obs (mrc.Observable) – The observable stream that emits IngestControlMessage objects.
sub (mrc.Subscriber) – The subscriber that receives processed results.
- Return type:
None
Notes
This function sets up the pipeline by creating a queue and a thread that runs the parent_receive function. The thread is responsible for managing child processes and collecting results.
- supported_execution_modes() tuple[ExecutionMode] [source]#
Returns a tuple of supported execution modes of this stage. By default this returns (ExecutionMode.GPU,). Subclasses can override this method to specify different execution modes.
For most stages the values will be static, and this can be accomplished by making use of either the CpuOnlyMixin or GpuAndCpuMixin mixins.
However, complex stages may choose to make this decision at runtime, in which case this method should be overridden. directly within the stage class.
- supports_cpp_node() bool [source]#
Specifies whether this Stage is capable of creating C++ nodes. During the build phase, this value will be combined with CppConfig.get_should_use_cpp() to determine whether or not a C++ node is created. This is an instance method to allow runtime decisions and derived classes to override base implementations.
- property task_desc: str#
- static work_package_input_handler(
- work_package_input_queue: Queue,
- work_package_response_queue: Queue,
- cancellation_token: Value,
- process_fn: Callable[[DataFrame, dict], DataFrame],
- process_pool: ProcessWorkerPoolSingleton,
Processes work packages received from the recv_queue, applies the process_fn to each package, and sends the results to the send_queue using a thread.
- Parameters:
work_package_input_queue (multiprocessing.Queue) – Queue from which work packages are received.
work_package_response_queue (multiprocessing.Queue) – Queue to which processed results are sent.
cancellation_token (multiprocessing.Value) – Shared flag to indicate when to stop processing.
process_pool (ProcessWorkerPoolSingleton) – Singleton process pool to handle the actual processing.
Notes
The method continuously retrieves work packages from the recv_queue, submits them to the process pool, and sends the results to the send_queue. It stops processing when the cancellation_token is set.
- static work_package_response_handler(
- mp_context,
- max_queue_size,
- work_package_input_queue: Queue,
- sub: Subscriber,
- cancellation_token: Value,
- process_fn: Callable[[DataFrame, dict], DataFrame],
- process_pool: ProcessWorkerPoolSingleton,
Manages child threads and collects results, forwarding them to the subscriber.
- Parameters:
mp_context (multiprocessing.context.BaseContext) – Context for creating multiprocessing objects.
max_queue_size (int) – Maximum size of the queues.
work_package_input_queue (multiprocessing.Queue) – Queue to send tasks to the child process.
sub (mrc.Subscriber) – Subscriber to send results to.
cancellation_token (multiprocessing.Value) – Shared flag to indicate when to stop processing.
process_pool (ProcessWorkerPoolSingleton) – Singleton process pool to handle the actual processing.
Notes
The method creates a child thread to handle tasks, retrieves completed work from work_package_response_queue, and forwards the results to the subscriber. It stops processing when the cancellation_token is set or the subscriber is unsubscribed.
- nv_ingest.stages.multiprocessing_stage.process_control_message(
- ctrl_msg,
- task,
- task_desc,
- ctrl_msg_ledger,
- send_queue,
Processes the control message, extracting the DataFrame payload and task properties, and puts the work package into the send queue.
- Parameters:
ctrl_msg (IngestControlMessage) – The control message to process.
task (str) – The task name.
task_desc (str) – Description of the task for tracing purposes.
ctrl_msg_ledger (dict) – Ledger to keep track of control messages.
send_queue (Queue) – Queue to send the work package to the child process.
- nv_ingest.stages.multiprocessing_stage.put_in_queue(ctrl_msg, pass_thru_recv_queue)[source]#
Puts the control message into the pass-through receive queue.
- Parameters:
ctrl_msg (IngestControlMessage) – The control message to put in the queue.
pass_thru_recv_queue (queue.Queue) – The queue to put the control message into.
- nv_ingest.stages.multiprocessing_stage.trace_message(ctrl_msg, task_desc)[source]#
Adds tracing metadata to the control message.
- Parameters:
ctrl_msg (IngestControlMessage) – The control message to trace.
task_desc (str) – Description of the task for tracing purposes.
nv_ingest.stages.pdf_extractor_stage module#
- nv_ingest.stages.pdf_extractor_stage.decode_and_extract(
- base64_row: Dict[str, Any],
- task_props: Dict[str, Any],
- validated_config: Any,
- default: str = 'pdfium',
- trace_info: List | None = None,
Decodes base64 content from a row and extracts data from it using the specified extraction method.
- Parameters:
base64_row (dict) – A dictionary containing the base64-encoded content and other relevant data. The key “content” should contain the base64 string, and the key “source_id” is optional.
task_props (dict) – A dictionary containing task properties. It should have the keys: - “method” (str): The extraction method to use (e.g., “pdfium”). - “params” (dict): Parameters to pass to the extraction function.
validated_config (Any) – Configuration object that contains pdfium_config. Used if the pdfium method is selected.
default (str, optional) – The default extraction method to use if the specified method in task_props is not available.
trace_info (Optional[List], optional) – An optional list for trace information to pass to the extraction function.
- Returns:
The extracted data from the decoded content. The exact return type depends on the extraction method used.
- Return type:
Any
- Raises:
KeyError – If the “content” key is missing from base64_row.
Exception – For any other unhandled exceptions during extraction, an error is logged, and the exception is re-raised.
- nv_ingest.stages.pdf_extractor_stage.generate_pdf_extractor_stage(
- c: Config,
- extractor_config: Dict[str, Any],
- task: str = 'extract',
- task_desc: str = 'pdf_content_extractor',
- pe_count: int = 1,
Helper function to generate a multiprocessing stage to perform PDF content extraction.
- Parameters:
c (Config) – Morpheus global configuration object.
extractor_config (dict) – Configuration parameters for the PDF content extractor.
task (str) – The task name to match for the stage worker function.
task_desc (str) – A descriptor to be used in latency tracing.
pe_count (int) – The number of process engines to use for PDF content extraction.
- Returns:
A Morpheus stage with the applied worker function.
- Return type:
- nv_ingest.stages.pdf_extractor_stage.process_pdf_bytes(df, task_props, validated_config, trace_info=None)[source]#
Processes a pandas DataFrame containing PDF files in base64 encoding. Each PDF’s content is replaced by the extracted text.
Parameters: - df: pandas DataFrame with columns ‘source_id’ and ‘content’ (base64 encoded PDFs). - task_props: dictionary containing instructions for the PDF processing task. - validated_config: configuration object for the extractor. - trace_info: optional trace information to include in extraction.
Returns: - A tuple containing:
A pandas DataFrame with the PDF content replaced by the extracted text.
A dictionary with trace information.
nv_ingest.stages.pptx_extractor_stage module#
- nv_ingest.stages.pptx_extractor_stage.decode_and_extract(
- base64_row,
- task_props,
- validated_config: Any,
- trace_info: Dict,
- default='python_pptx',
Decodes base64 content from a row and extracts data from it using the specified extraction method.
- Parameters:
base64_row (pd.Series) – A Series containing the base64-encoded content and other relevant data. The key “content” should contain the base64 string, and the key “source_id” is optional.
task_props (dict or BaseModel) – A dictionary (or BaseModel instance) containing instructions and parameters for extraction.
validated_config (Any) – Configuration object that contains pptx_extraction_config.
trace_info (dict) – Dictionary containing trace information.
default (str, optional) – The default extraction method to use if the specified method is not available (default is “python_pptx”).
- Returns:
The extracted data, or an exception tag if extraction fails.
- Return type:
Any
- nv_ingest.stages.pptx_extractor_stage.generate_pptx_extractor_stage(
- c: Config,
- extractor_config: dict,
- task: str = 'pptx-extract',
- task_desc: str = 'pptx_content_extractor',
- pe_count: int = 1,
Helper function to generate a multiprocessing stage to perform PPTX content extraction.
- Parameters:
c (Config) – Morpheus global configuration object.
extractor_config (dict) – Configuration parameters for PPTX content extractor.
task (str) – The task name to match for the stage worker function.
task_desc (str) – A descriptor to be used in latency tracing.
pe_count (int) – The number of process engines to use for PPTX content extraction.
- Returns:
A Morpheus stage with the applied worker function.
- Return type:
- Raises:
Exception – If an error occurs during stage generation.