Source code for nv_ingest_api.internal.extract.pdf.engines.unstructured_io

# SPDX-FileCopyrightText: Copyright (c) 2024, NVIDIA CORPORATION & AFFILIATES.
# All rights reserved.
# SPDX-License-Identifier: Apache-2.0


# Copyright (c) 2024, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import io
import logging
import uuid
import warnings
from typing import Dict, Any, Optional, List

import pandas as pd
import pypdfium2 as pdfium
from unstructured_client import UnstructuredClient
from unstructured_client.models import operations
from unstructured_client.models import shared
from unstructured_client.utils import BackoffStrategy
from unstructured_client.utils import RetryConfig

from nv_ingest_api.internal.enums.common import AccessLevelEnum, DocumentTypeEnum
from nv_ingest_api.internal.enums.common import ContentTypeEnum
from nv_ingest_api.internal.enums.common import ContentDescriptionEnum
from nv_ingest_api.internal.enums.common import TableFormatEnum
from nv_ingest_api.internal.enums.common import TextTypeEnum
from nv_ingest_api.internal.schemas.meta.metadata_schema import validate_metadata
from nv_ingest_api.util.metadata.aggregators import extract_pdf_metadata, construct_text_metadata

logger = logging.getLogger(__name__)


[docs] def unstructured_io_extractor( pdf_stream: io.BytesIO, extract_text: bool, extract_images: bool, extract_infographics: bool, extract_charts: bool, extract_tables: bool, extractor_config: Dict[str, Any], execution_trace_log: Optional[List[Any]] = None, ) -> pd.DataFrame: """ Helper function to use unstructured-io REST API to extract text from a bytestream PDF. This function sends the provided PDF stream to the unstructured-io API and returns the extracted text. Additional parameters for the extraction are provided via the extractor_config dictionary. Note that although flags for image, table, and infographics extraction are provided, the underlying API may not support all of these features. Parameters ---------- pdf_stream : io.BytesIO A bytestream representing the PDF to be processed. extract_text : bool Specifies whether to extract text. extract_images : bool Specifies whether to extract images. extract_infographics : bool Specifies whether to extract infographics. extract_tables : bool Specifies whether to extract tables. extractor_config : dict A dictionary containing additional extraction parameters: - unstructured_api_key : API key for unstructured.io. - unstructured_url : URL for the unstructured.io API endpoint. - unstructured_strategy : Strategy for extraction (default: "auto"). - unstructured_concurrency_level : Concurrency level for PDF splitting. - row_data : Row data containing source information. - text_depth : Depth of text extraction (e.g., "page"). - identify_nearby_objects : Flag for identifying nearby objects. - metadata_column : Column name for metadata extraction. Returns ------- str A string containing the extracted text. Raises ------ ValueError If an invalid text_depth value is provided. SDKError If there is an error during the extraction process. """ _ = execution_trace_log _ = extract_charts logger = logging.getLogger(__name__) logger.debug("Extracting PDF with unstructured-io backend.") # Get unstructured.io API key api_key = extractor_config.get("unstructured_api_key", None) # Get unstructured.io URL unstructured_url = extractor_config.get("unstructured_url", "https://api.unstructured.io/general/v0/general") # Get unstructured.io strategy strategy = extractor_config.get("unstructured_strategy", "auto") if (strategy != "hi_res") and (extract_images or extract_tables): warnings.warn("'hi_res' strategy required when extracting images or tables") # Get unstructured.io split PDF concurrency level concurrency_level = extractor_config.get("unstructured_concurrency_level", 10) # Get row_data from configuration row_data = extractor_config.get("row_data", None) # Get source_id and file name from row_data source_id = row_data.get("source_id", None) if row_data is not None else None file_name = row_data.get("id", "_.pdf") if row_data is not None else "_.pdf" # Get and validate text_depth text_depth_str = extractor_config.get("text_depth", "page") try: text_depth = TextTypeEnum[text_depth_str.upper()] except KeyError: valid_options = [e.name.lower() for e in TextTypeEnum] raise ValueError(f"Invalid text_depth value: {text_depth_str}. Expected one of: {valid_options}") # Optional setting: identify_nearby_objects identify_nearby_objects = extractor_config.get("identify_nearby_objects", True) # Get base metadata metadata_col = extractor_config.get("metadata_column", "metadata") if row_data is not None and hasattr(row_data, "index") and metadata_col in row_data.index: base_unified_metadata = row_data[metadata_col] elif row_data is not None: base_unified_metadata = row_data.get(metadata_col, {}) else: base_unified_metadata = {} # Handle infographics flag if extract_infographics: logger.debug("Infographics extraction requested but not supported by unstructured-io extractor.") # get base source_metadata base_source_metadata = base_unified_metadata.get("source_metadata", {}) # get source_location source_location = base_source_metadata.get("source_location", "") # get collection_id (assuming coming in from source_metadata...) collection_id = base_source_metadata.get("collection_id", "") # get partition_id (assuming coming in from source_metadata...) partition_id = base_source_metadata.get("partition_id", -1) # get access_level (assuming coming in from source_metadata...) access_level = base_source_metadata.get("access_level", AccessLevelEnum.UNKNOWN) source_metadata = { "source_name": file_name, "source_id": source_id, "source_location": source_location, "collection_id": collection_id, "summary": "", "partition_id": partition_id, "access_level": access_level, } doc = pdfium.PdfDocument(pdf_stream) pdf_metadata = extract_pdf_metadata(doc, source_id) document_metadata = { "source_type": pdf_metadata.source_type, "date_created": pdf_metadata.date_created, "last_modified": pdf_metadata.last_modified, } source_metadata.update(document_metadata) client = UnstructuredClient( retry_config=RetryConfig("backoff", BackoffStrategy(1, 50, 1.1, 100), False), server_url=unstructured_url, api_key_auth=api_key, ) req = operations.PartitionRequest( partition_parameters=shared.PartitionParameters( files=shared.Files( content=pdf_stream.getvalue(), file_name=file_name, ), strategy=strategy, languages=["eng"], coordinates=True, extract_image_block_types=["Image"] if extract_images else None, split_pdf_page=True, split_pdf_concurrency_level=concurrency_level, ), ) res = client.general.partition(request=req) extracted_data = [] accumulated_text = [] curr_page = 1 page_nearby_blocks = { "text": {"content": [], "bbox": []}, "images": {"content": [], "bbox": []}, "structured": {"content": [], "bbox": []}, } # Extract content from each element of partition response for block_idx, item in enumerate(res.elements): # Extract text if extract_text and item["type"] not in ("Image", "Table"): if item["metadata"]["page_number"] != curr_page: if text_depth == TextTypeEnum.PAGE: text_extraction = construct_text_metadata( accumulated_text, pdf_metadata.page_count, curr_page - 1, -1, text_depth, source_metadata, base_unified_metadata, ) if len(text_extraction) > 0: extracted_data.append(text_extraction) accumulated_text = [] page_nearby_blocks = { "text": {"content": [], "bbox": []}, "images": {"content": [], "bbox": []}, "structured": {"content": [], "bbox": []}, } curr_page = item["metadata"]["page_number"] accumulated_text.append(item["text"]) if text_depth == TextTypeEnum.BLOCK: points = item["metadata"]["coordinates"]["points"] text_extraction = construct_text_metadata( accumulated_text, pdf_metadata.page_count, item["metadata"]["page_number"] - 1, block_idx, text_depth, source_metadata, base_unified_metadata, bbox=(points[0][0], points[0][1], points[2][0], points[2][1]), ) if len(text_extraction) > 0: extracted_data.append(text_extraction) accumulated_text = [] if (extract_images and identify_nearby_objects) and (len(item["text"]) > 0): points = item["metadata"]["coordinates"]["points"] page_nearby_blocks["text"]["content"].append(" ".join(item["text"])) page_nearby_blocks["text"]["bbox"].append((points[0][0], points[0][1], points[2][0], points[2][1])) # Extract images if extract_images and item["type"] == "Image": base64_img = item["metadata"]["image_base64"] points = item["metadata"]["coordinates"]["points"] image_extraction = _construct_image_metadata( base64_img, item["text"], pdf_metadata.page_count, item["metadata"]["page_number"] - 1, block_idx, source_metadata, base_unified_metadata, page_nearby_blocks, bbox=(points[0][0], points[0][1], points[2][0], points[2][1]), ) extracted_data.append(image_extraction) # Extract tables if extract_tables and item["type"] == "Table": table = item["metadata"]["text_as_html"] points = item["metadata"]["coordinates"]["points"] table_extraction = _construct_table_metadata( table, pdf_metadata.page_count, item["metadata"]["page_number"] - 1, block_idx, source_metadata, base_unified_metadata, bbox=(points[0][0], points[0][1], points[2][0], points[2][1]), ) extracted_data.append(table_extraction) if extract_text and text_depth == TextTypeEnum.PAGE: text_extraction = construct_text_metadata( accumulated_text, pdf_metadata.page_count, curr_page - 1, -1, text_depth, source_metadata, base_unified_metadata, ) if len(text_extraction) > 0: extracted_data.append(text_extraction) elif extract_text and text_depth == TextTypeEnum.DOCUMENT: text_extraction = construct_text_metadata( accumulated_text, pdf_metadata.page_count, -1, -1, text_depth, source_metadata, base_unified_metadata, ) if len(text_extraction) > 0: extracted_data.append(text_extraction) return extracted_data
def _construct_image_metadata( image, image_text, page_count, page_idx, block_idx, source_metadata, base_unified_metadata, page_nearby_blocks, bbox, ): content_metadata = { "type": ContentTypeEnum.IMAGE, "description": ContentDescriptionEnum.PDF_IMAGE, "page_number": page_idx, "hierarchy": { "page_count": page_count, "page": page_idx, "block": block_idx, "line": -1, "span": -1, "nearby_objects": page_nearby_blocks, }, } image_metadata = { "image_type": DocumentTypeEnum.JPEG, "structured_image_type": ContentTypeEnum.UNKNOWN, "caption": "", "text": image_text, "image_location": bbox, } unified_metadata = base_unified_metadata.copy() unified_metadata.update( { "content": image, "source_metadata": source_metadata, "content_metadata": content_metadata, "image_metadata": image_metadata, } ) validated_unified_metadata = validate_metadata(unified_metadata) return [ContentTypeEnum.IMAGE.value, validated_unified_metadata.model_dump(), str(uuid.uuid4())] def _construct_table_metadata( table, page_count, page_idx, block_idx, source_metadata, base_unified_metadata, bbox, ): content_metadata = { "type": ContentTypeEnum.STRUCTURED, "description": ContentDescriptionEnum.PDF_TABLE, "page_number": page_idx, "hierarchy": { "page_count": page_count, "page": page_idx, "block": block_idx, "line": -1, "span": -1, }, } table_metadata = { "caption": "", "table_format": TableFormatEnum.HTML, "table_location": bbox, } unified_metadata = base_unified_metadata.copy() unified_metadata.update( { "content": table, "source_metadata": source_metadata, "content_metadata": content_metadata, "table_metadata": table_metadata, } ) validated_unified_metadata = validate_metadata(unified_metadata) return [ContentTypeEnum.STRUCTURED.value, validated_unified_metadata.model_dump(), str(uuid.uuid4())]