Source code for nv_ingest.extraction_workflows.pdf.adobe_helper

# SPDX-FileCopyrightText: Copyright (c) 2024, NVIDIA CORPORATION & AFFILIATES.
# All rights reserved.
# SPDX-License-Identifier: Apache-2.0


# Copyright (c) 2024, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import io
import json
import logging
import random
import time
import uuid
import zipfile

import pandas as pd
import pypdfium2 as pdfium

from nv_ingest.schemas.metadata_schema import AccessLevelEnum
from nv_ingest.schemas.metadata_schema import ContentTypeEnum
from nv_ingest.schemas.metadata_schema import ImageTypeEnum
from nv_ingest.schemas.metadata_schema import StdContentDescEnum
from nv_ingest.schemas.metadata_schema import TableFormatEnum
from nv_ingest.schemas.metadata_schema import TextTypeEnum
from nv_ingest.schemas.metadata_schema import validate_metadata
from nv_ingest.util.converters import bytetools
from nv_ingest.util.pdf.metadata_aggregators import construct_text_metadata
from nv_ingest.util.pdf.metadata_aggregators import extract_pdf_metadata

ADOBE_INSTALLED = True
try:
    from adobe.pdfservices.operation.auth.service_principal_credentials import ServicePrincipalCredentials
    from adobe.pdfservices.operation.exception.exceptions import SdkException
    from adobe.pdfservices.operation.exception.exceptions import ServiceApiException
    from adobe.pdfservices.operation.exception.exceptions import ServiceUsageException
    from adobe.pdfservices.operation.io.cloud_asset import CloudAsset
    from adobe.pdfservices.operation.io.stream_asset import StreamAsset
    from adobe.pdfservices.operation.pdf_services import PDFServices
    from adobe.pdfservices.operation.pdf_services_media_type import PDFServicesMediaType
    from adobe.pdfservices.operation.pdfjobs.jobs.extract_pdf_job import ExtractPDFJob
    from adobe.pdfservices.operation.pdfjobs.params.extract_pdf import extract_renditions_element_type
    from adobe.pdfservices.operation.pdfjobs.params.extract_pdf.extract_element_type import ExtractElementType
    from adobe.pdfservices.operation.pdfjobs.params.extract_pdf.extract_pdf_params import ExtractPDFParams
    from adobe.pdfservices.operation.pdfjobs.params.extract_pdf.table_structure_type import TableStructureType
    from adobe.pdfservices.operation.pdfjobs.result.extract_pdf_result import ExtractPDFResult

    ExtractRenditionsElementType = (
        extract_renditions_element_type.ExtractRenditionsElementType
    )  # black / isort conflict
except ImportError:
    ADOBE_INSTALLED = False
logger = logging.getLogger(__name__)


[docs] def adobe( pdf_stream: io.BytesIO, extract_text: bool, extract_images: bool, extract_tables: bool, **kwargs, ): """ Helper function to use unstructured-io REST API to extract text from a bytestream PDF. Parameters ---------- pdf_stream : io.BytesIO A bytestream PDF. extract_text : bool Specifies whether or not to extract text. extract_images : bool Specifies whether or not to extract images. extract_tables : bool Specifies whether or not to extract tables. **kwargs The keyword arguments are used for additional extraction parameters. Returns ------- str A string of extracted text. Raises ------ SDKError If there is an error with the extraction. """ logger.debug("Extracting PDF with Adobe backend.") if not ADOBE_INSTALLED: err_msg = ( "Adobe SDK not installed -- cannot extract PDF.\r\nTo install the adobe SDK please review the " "license agreement at https://github.com/adobe/pdfservices-python-sdk?tab=License-1-ov-file and" "re-launch the nv-ingest microservice with -e INSTALL_ADOBE_SDK=True." ) logger.error(err_msg) raise RuntimeError(err_msg) # get adobe api key client_id = kwargs.get("adobe_client_id", None) client_secret = kwargs.get("adobe_client_secret", None) # get row_data row_data = kwargs.get("row_data", None) # get source_id source_id = row_data.get("source_id", None) file_name = row_data.get("id", "_.pdf") # get text_depth text_depth = kwargs.get("text_depth", "page") text_depth = TextTypeEnum[text_depth.upper()] # TODO: Not configurable anywhere at the moment; likely don't need to but may be a small perf gain. identify_nearby_objects = kwargs.get("identify_nearby_objects", True) # get base metadata metadata_col = kwargs.get("metadata_column", "metadata") base_unified_metadata = row_data[metadata_col] if metadata_col in row_data.index else {} # get base source_metadata base_source_metadata = base_unified_metadata.get("source_metadata", {}) # get source_location source_location = base_source_metadata.get("source_location", "") # get collection_id (assuming coming in from source_metadata...) collection_id = base_source_metadata.get("collection_id", "") # get partition_id (assuming coming in from source_metadata...) partition_id = base_source_metadata.get("partition_id", -1) # get access_level (assuming coming in from source_metadata...) access_level = base_source_metadata.get("access_level", AccessLevelEnum.LEVEL_1) source_metadata = { "source_name": file_name, "source_id": source_id, "source_location": source_location, "collection_id": collection_id, "summary": "", "partition_id": partition_id, "access_level": access_level, } doc = pdfium.PdfDocument(pdf_stream) pdf_metadata = extract_pdf_metadata(doc, source_id) document_metadata = { "source_type": pdf_metadata.source_type, "date_created": pdf_metadata.date_created, "last_modified": pdf_metadata.last_modified, } source_metadata.update(document_metadata) retry_delay = 1 max_delay = 50 while True: try: # Initial setup, create credentials instance credentials = ServicePrincipalCredentials( client_id=client_id, client_secret=client_secret, ) # Creates a PDF Services instance pdf_services = PDFServices(credentials=credentials) # Creates an asset(s) from source file(s) and upload input_asset = pdf_services.upload(input_stream=pdf_stream, mime_type=PDFServicesMediaType.PDF) # Create parameters for the job elements_to_extract = [] if extract_text: elements_to_extract.append(ExtractElementType.TEXT) if extract_tables: elements_to_extract.append(ExtractElementType.TABLES) extract_pdf_params = ExtractPDFParams( table_structure_type=TableStructureType.CSV, elements_to_extract=elements_to_extract, elements_to_extract_renditions=[ExtractRenditionsElementType.FIGURES] if extract_images else [], ) # Creates a new job instance extract_pdf_job = ExtractPDFJob(input_asset=input_asset, extract_pdf_params=extract_pdf_params) # Submit the job and gets the job result location = pdf_services.submit(extract_pdf_job) pdf_services_response = pdf_services.get_job_result(location, ExtractPDFResult) # Get content from the resulting asset(s) result_asset: CloudAsset = pdf_services_response.get_result().get_resource() stream_asset: StreamAsset = pdf_services.get_content(result_asset) archive = zipfile.ZipFile(io.BytesIO(stream_asset.get_input_stream())) jsonentry = archive.open("structuredData.json") jsondata = jsonentry.read() data = json.loads(jsondata) # Request successful break except (ServiceApiException, ServiceUsageException, SdkException) as e: if isinstance(e, ServiceUsageException) and (retry_delay * 1.1) < max_delay: time.sleep(retry_delay) retry_delay *= 1.1 retry_delay += random.uniform(0, 1) logging.error(f"Exception encountered while executing operation: {e}, retrying in {int(retry_delay)}s.") else: logging.exception(f"Exception encountered while executing operation: {e}") return [] extracted_data = [] accumulated_text = [] page_idx = 0 page_nearby_blocks = { "text": {"content": [], "bbox": []}, "images": {"content": [], "bbox": []}, "structured": {"content": [], "bbox": []}, } for block_idx, item in enumerate(data["elements"]): # Extract text if extract_text and "Text" in item and "Table" not in item["Path"] and "Figure" not in item["Path"]: if item["Page"] != page_idx: if text_depth == TextTypeEnum.PAGE: text_extraction = construct_text_metadata( accumulated_text, pdf_metadata.page_count, page_idx, block_idx, text_depth, source_metadata, base_unified_metadata, bbox=(0, 0, data["pages"][page_idx]["width"], data["pages"][page_idx]["height"]), ) if len(text_extraction) > 0: extracted_data.append(text_extraction) accumulated_text = [] page_nearby_blocks = { "text": {"content": [], "bbox": []}, "images": {"content": [], "bbox": []}, "structured": {"content": [], "bbox": []}, } page_idx = item["Page"] accumulated_text.append(item["Text"].strip()) if text_depth == TextTypeEnum.BLOCK: bounds = item["Bounds"] text_extraction = construct_text_metadata( accumulated_text, pdf_metadata.page_count, item["Page"], block_idx, text_depth, source_metadata, base_unified_metadata, bbox=(bounds[0], bounds[1], bounds[2], bounds[3]), ) if len(text_extraction) > 0: extracted_data.append(text_extraction) accumulated_text = [] if (extract_images and identify_nearby_objects) and (len(item["Text"]) > 0): bounds = item["Bounds"] page_nearby_blocks["text"]["content"].append(" ".join(item["Text"].strip())) page_nearby_blocks["text"]["bbox"].append((bounds[0], bounds[1], bounds[2], bounds[3])) # Extract images if extract_images and item["Path"].endswith("/Figure"): bounds = item["Bounds"] try: figure = archive.open(item["filePaths"][0]) base64_img = bytetools.base64frombytes(figure.read()) except KeyError: base64_img = "" image_extraction = _construct_image_metadata( base64_img, item.get("Text", ""), pdf_metadata.page_count, item["Page"], block_idx, source_metadata, base_unified_metadata, page_nearby_blocks, bbox=(bounds[0], bounds[1], bounds[2], bounds[3]), ) extracted_data.append(image_extraction) # Extract tables if extract_tables and item["Path"].endswith("/Table"): bounds = item["Bounds"] try: df = pd.read_csv(archive.open(item["filePaths"][0]), delimiter=",") except KeyError: df = pd.DataFrame() table_extraction = _construct_table_metadata( df.to_markdown(), pdf_metadata.page_count, item["Page"], block_idx, source_metadata, base_unified_metadata, bbox=(bounds[0], bounds[1], bounds[2], bounds[3]), ) extracted_data.append(table_extraction) if text_depth == TextTypeEnum.PAGE: text_extraction = construct_text_metadata( accumulated_text, pdf_metadata.page_count, page_idx, block_idx, text_depth, source_metadata, base_unified_metadata, # bbox=(0, 0, data["pages"][page_idx]["width"], data["pages"][page_idx]["height"]), ) if len(text_extraction) > 0: extracted_data.append(text_extraction) if extract_text and text_depth == TextTypeEnum.DOCUMENT: text_extraction = construct_text_metadata( accumulated_text, pdf_metadata.page_count, -1, -1, text_depth, source_metadata, base_unified_metadata, ) if len(text_extraction) > 0: extracted_data.append(text_extraction) return extracted_data
def _construct_image_metadata( image, image_text, page_count, page_idx, block_idx, source_metadata, base_unified_metadata, page_nearby_blocks, bbox, ): content_metadata = { "type": ContentTypeEnum.IMAGE, "description": StdContentDescEnum.PDF_IMAGE, "page_number": page_idx, "hierarchy": { "page_count": page_count, "page": page_idx, "block": block_idx, "line": -1, "span": -1, "nearby_objects": page_nearby_blocks, }, } image_metadata = { "image_type": ImageTypeEnum.PNG, "structured_image_type": ImageTypeEnum.image_type_1, "caption": "", "text": image_text, "image_location": bbox, "width": bbox[2] - bbox[0], "height": bbox[3] - bbox[1], } unified_metadata = base_unified_metadata.copy() unified_metadata.update( { "content": image, "source_metadata": source_metadata, "content_metadata": content_metadata, "image_metadata": image_metadata, } ) validated_unified_metadata = validate_metadata(unified_metadata) return [ContentTypeEnum.IMAGE.value, validated_unified_metadata.model_dump(), str(uuid.uuid4())] def _construct_table_metadata( table, page_count, page_idx, block_idx, source_metadata, base_unified_metadata, bbox, ): content_metadata = { "type": ContentTypeEnum.STRUCTURED, "description": StdContentDescEnum.PDF_TABLE, "page_number": page_idx, "hierarchy": { "page_count": page_count, "page": page_idx, "block": block_idx, "line": -1, "span": -1, }, } table_metadata = { "caption": "", "table_format": TableFormatEnum.MARKDOWN, "table_location": bbox, } unified_metadata = base_unified_metadata.copy() unified_metadata.update( { "content": table, "source_metadata": source_metadata, "content_metadata": content_metadata, "table_metadata": table_metadata, } ) validated_unified_metadata = validate_metadata(unified_metadata) return [ContentTypeEnum.STRUCTURED.value, validated_unified_metadata.model_dump(), str(uuid.uuid4())]