Source code for nv_ingest.framework.orchestration.morpheus.stages.extractors.docx_extractor_stage

# SPDX-FileCopyrightText: Copyright (c) 2024, NVIDIA CORPORATION & AFFILIATES.
# All rights reserved.
# SPDX-License-Identifier: Apache-2.0


import functools
import logging

from morpheus.config import Config

from nv_ingest.framework.orchestration.morpheus.stages.meta.multiprocessing_stage import MultiProcessingBaseStage
from nv_ingest_api.internal.extract.docx.docx_extractor import extract_primitives_from_docx_internal
from nv_ingest_api.internal.schemas.extract.extract_docx_schema import DocxExtractorSchema

logger = logging.getLogger(__name__)


[docs] def generate_docx_extractor_stage( c: Config, extraction_config: dict, task: str = "docx-extract", task_desc: str = "docx_content_extractor", pe_count: int = 8, ): """ Helper function to generate a multiprocessing stage to perform document content extraction. Parameters ---------- c : Config Morpheus global configuration object. extraction_config : dict Configuration parameters for document content extractor. task : str The task name to match for the stage worker function. task_desc : str A descriptor to be used in latency tracing. pe_count : int The number of process engines to use for document content extraction. Returns ------- MultiProcessingBaseStage A Morpheus stage with the applied worker function. Raises ------ Exception If an error occurs during stage generation. """ try: validated_config = DocxExtractorSchema(**extraction_config) _wrapped_process_fn = functools.partial( extract_primitives_from_docx_internal, extraction_config=validated_config ) return MultiProcessingBaseStage( c=c, pe_count=pe_count, task=task, task_desc=task_desc, process_fn=_wrapped_process_fn, document_type="docx" ) except Exception as e: err_msg = f"generate_docx_extractor_stage: Error generating document extractor stage. " f"Original error: {e}" logger.error(err_msg, exc_info=True) raise type(e)(err_msg) from e