Source code for nv_ingest_api.util.converters.dftools

# SPDX-FileCopyrightText: Copyright (c) 2024, NVIDIA CORPORATION & AFFILIATES.
# All rights reserved.
# SPDX-License-Identifier: Apache-2.0

import io
import json

import fastparquet
import pandas as pd

import cudf


[docs] class MemoryFiles: def __init__(self): self.output = {}
[docs] def open(self, fn, mode="rb"): if mode != "wb": try: self.output[fn].seek(0) except KeyError: raise FileNotFoundError return self.output[fn] i = io.BytesIO() self.output[fn] = i self.output[fn].close = lambda: None return i
[docs] def pandas_to_cudf( df: pd.DataFrame, deserialize_cols: list = [], default_cols: dict = {"document_type": str, "metadata": str}, default_type: type = str, ) -> cudf.DataFrame: """ Helper function to convert from pandas to cudf until https://github.com/apache/arrow/pull/40412 is resolved. Parameters ---------- df : pd.DataFrame A pandas dataframe. Returns ------- cudf.DataFrame A cuDF dataframe. """ if not df.empty: files = MemoryFiles() for col in deserialize_cols: df[col] = df[col].apply(lambda x: json.loads(x)) df = pd.concat([df, df.iloc[0:1]], axis=0) fastparquet.write("_", df, open_with=files.open, compression="UNCOMPRESSED", object_encoding="json") with files.output["_"] as bytes_buf: gdf = cudf.read_parquet(bytes_buf).iloc[:-1] gdf.index.name = None return gdf else: gdf = cudf.DataFrame({col: [] for col in default_cols}) for col in df.columns: field_type = default_cols.get(col, default_type) gdf[col] = gdf[col].astype(field_type) return gdf
[docs] def cudf_to_pandas(gdf: cudf.DataFrame, deserialize_cols: list = []) -> pd.DataFrame: """ Helper function to convert from cudf to pandas until https://github.com/apache/arrow/pull/40412 is resolved. Parameters ---------- gdf : cudf.DataFrame A cuDF dataframe. nested_cols : list A list of columns containing nested data. Returns ------- pd.DataFrame A pandas dataframe. """ with io.BytesIO() as bytes_buf: gdf.to_parquet(bytes_buf) df = pd.read_parquet(bytes_buf, engine="fastparquet", index=None) for col in deserialize_cols: if col in df.columns: df[col] = df[col].apply(lambda x: json.loads(x)) return df
[docs] def cudf_to_json(gdf: cudf.DataFrame, deserialize_cols: list = []) -> str: """ Helper function to convert from cudf to json until https://github.com/apache/arrow/pull/40412 is resolved. Parameters ---------- gdf : cudf.DataFrame A cuDF dataframe. nested_cols : list A list of columns containing nested data. Returns ------- str A JSON formated string. """ records = [] dict_vals = cudf_to_pandas(gdf).to_dict(orient="records") for d in dict_vals: temp = {} for key, val in d.items(): if key in deserialize_cols: val = json.loads(val) temp[key] = val records.append(temp) return records