# SPDX-FileCopyrightText: Copyright (c) 2024-25, NVIDIA CORPORATION & AFFILIATES.
# All rights reserved.
# SPDX-License-Identifier: Apache-2.0
"""Convert DOCX/PPTX files to PDF bytes via LibreOffice headless."""
from __future__ import annotations
import os
import shutil
import subprocess
import tempfile
import traceback
from typing import Any, Dict, List, Optional
import pandas as pd
from nemo_retriever.graph.abstract_operator import AbstractOperator
from nemo_retriever.graph.cpu_operator import CPUOperator
from nemo_retriever.graph.designer import designer_component
from nemo_retriever.graph.operator_archetype import ArchetypeOperator
SUPPORTED_EXTENSIONS = frozenset({".pdf", ".docx", ".pptx"})
def _error_record(
*,
source_path: Optional[str],
stage: str,
exc: BaseException,
) -> Dict[str, Any]:
return {
"bytes": b"",
"path": source_path,
"metadata": {
"source_path": source_path,
"error": {
"stage": str(stage),
"type": exc.__class__.__name__,
"message": str(exc),
"traceback": "".join(traceback.format_exception(type(exc), exc, exc.__traceback__)),
},
},
}
[docs]
def convert_to_pdf_bytes(file_bytes: bytes, extension: str) -> bytes:
"""Convert file bytes to PDF bytes.
If *extension* is ``".pdf"``, return *file_bytes* unchanged.
For ``".docx"`` / ``".pptx"``, write to a temp dir, invoke
``libreoffice --headless --convert-to pdf``, and return the
resulting PDF bytes.
Raises
------
FileNotFoundError
If the ``libreoffice`` binary is not on ``$PATH``.
subprocess.CalledProcessError
If LibreOffice conversion fails.
RuntimeError
If the expected PDF output file is missing after conversion.
"""
ext = extension.lower()
if ext == ".pdf":
return file_bytes
if ext not in SUPPORTED_EXTENSIONS:
raise ValueError(f"Unsupported extension: {extension!r}")
if shutil.which("libreoffice") is None:
raise FileNotFoundError(
"LibreOffice is required to convert DOCX/PPTX files to PDF but was not found on $PATH. "
"Please install LibreOffice (e.g. `apt-get install libreoffice`) and try again."
)
with tempfile.TemporaryDirectory() as tmp_dir:
# Strip leading dot for the filename suffix (e.g. ".docx" -> "docx").
input_path = os.path.join(tmp_dir, f"input{ext}")
with open(input_path, "wb") as f:
f.write(file_bytes)
command = [
"libreoffice",
"--headless",
"--convert-to",
"pdf",
input_path,
"--outdir",
tmp_dir,
]
subprocess.run(command, check=True, capture_output=True, text=True)
pdf_path = os.path.join(tmp_dir, "input.pdf")
if not os.path.exists(pdf_path):
raise RuntimeError(f"LibreOffice conversion produced no output for {extension} file")
with open(pdf_path, "rb") as f:
return f.read()
[docs]
def convert_batch_to_pdf(batch_df: Any) -> pd.DataFrame:
"""Convert a batch of files to PDF, passing PDFs through unchanged.
Expects a :class:`pandas.DataFrame` with at least ``bytes`` and ``path``
columns (the same schema produced by ``ray.data.read_binary_files``).
Rows whose path ends with a supported non-PDF extension are converted;
rows that are already PDFs are returned as-is. On error, an error record
is emitted (matching the pattern in ``pdf/split.py``).
"""
if isinstance(batch_df, list):
# If we get a list of files instead of a DataFrame, convert it to a DataFrame.
batch_df = pd.DataFrame({"path": batch_df})
if not isinstance(batch_df, pd.DataFrame):
raise NotImplementedError("convert_batch_to_pdf currently only supports pandas.DataFrame input.")
_EXPLICIT_COLS = frozenset(("bytes", "path"))
out_rows: List[Dict[str, Any]] = []
for _, row in batch_df.iterrows():
file_path = row.get("path") or ""
file_bytes = row.get("bytes", b"")
extra = {k: v for k, v in row.to_dict().items() if k not in _EXPLICIT_COLS}
ext = os.path.splitext(file_path)[1].lower() if file_path else ".pdf"
if ext not in SUPPORTED_EXTENSIONS:
out_row: Dict[str, Any] = {"bytes": file_bytes, "path": file_path}
out_row.update(extra)
out_rows.append(out_row)
continue
if ext == ".pdf" and len(file_bytes) > 0:
out_row = {"bytes": file_bytes, "path": file_path}
out_row.update(extra)
out_rows.append(out_row)
continue
try:
if not isinstance(file_bytes, (bytes, bytearray, memoryview)):
raise ValueError(f"Unsupported bytes payload type: {type(file_bytes)!r}")
pdf_bytes = convert_to_pdf_bytes(bytes(file_bytes), ext)
out_row = {"bytes": pdf_bytes, "path": file_path}
out_row.update(extra)
out_rows.append(out_row)
except FileNotFoundError:
raise # LibreOffice not installed — fail fast, don't swallow.
except BaseException as e:
err = _error_record(
source_path=str(file_path) if file_path else None,
stage="convert_to_pdf",
exc=e,
)
err.update(extra)
out_rows.append(err)
return pd.DataFrame(out_rows)
[docs]
@designer_component(
name="Doc-to-PDF Converter",
category="Document Processing",
compute="cpu",
description="Converts document formats (DOCX, PPTX, etc.) to PDF",
category_color="#64b4ff",
)
class DocToPdfConversionCPUActor(AbstractOperator, CPUOperator):
"""Ray Data actor that converts DOCX/PPTX batches to PDF.
Used with ``ray.data.Dataset.map_batches`` in the same style as
``PDFSplitActor``.
"""
def __init__(self) -> None:
super().__init__()
[docs]
def preprocess(self, data: Any, **kwargs: Any) -> Any:
return data
[docs]
def process(self, data: Any, **kwargs: Any) -> Any:
return convert_batch_to_pdf(data)
[docs]
def postprocess(self, data: Any, **kwargs: Any) -> Any:
return data
def __call__(self, batch_df: Any) -> Any:
return self.run(batch_df)
[docs]
class DocToPdfConversionActor(ArchetypeOperator):
_cpu_variant_class = DocToPdfConversionCPUActor
def __init__(self) -> None:
super().__init__()