Reading and writing datasets with NeMo Curator and Apache Spark#

Background#

NeMo Curator uses the DocumentDataset class to read and write JSONL and Parquet files. It is a wrapper around a Dask (or Dask-cuDF) DataFrame. Apache Spark can read and write JSONL and Parquet files generated by NeMo Curator, and similarly, NeMo Curator can work with the outputs generated by Spark.

Usage#

To demonstrate how this would work, consider the following example:

import dask.dataframe as dd
import pandas as pd
from nemo_curator.datasets import DocumentDataset

# Create sample data
data = {
    "id": [1, 2, 3],
    "text": [
        "This is a tiny story.",
        "Another tiny story appears here.",
        "Yet another tiny story for you."
    ]
}

# Convert to a pandas DataFrame first
df = pd.DataFrame(data)

# Convert pandas DataFrame to DocumentDataset
stories_ds = DocumentDataset(dd.from_pandas(df, npartitions=2))

# Write the dataset to JSONL files
stories_ds.to_json("tiny_stories/", write_to_filename=False)

This will create two JSONL files in the directory tiny_stories/:

tiny_stories/
    0.part
    1.part

Apache Spark can read these files using standard APIs. Let’s first create a Spark session called NeMoCuratorExample, then we can read files in the directory using:

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("NeMoCuratorExample").getOrCreate()

# Reading JSONL file
stories_df = spark.read.json("tiny_stories")
stories_df.show()

Let’s go ahead and add a couple of columns to the Spark DataFrame:

from pyspark.sql.functions import size, split, length

# Calculate Word Count
stories_df = stories_df.withColumn("WordCount", size(split(stories_df["text"], r"\s+")))

# Calculate Character Count
stories_df = stories_df.withColumn("CharacterCount", length(stories_df["text"]))

stories_df.write.mode("overwrite").parquet("tiny_stories_transformed")

To interoperate between NeMo Curator DocumentDataset and Spark DataFrames, we recommend using Parquet files for data exchange. The following code snippet demonstrates how to read output from a Spark DataFrame into a NeMo Curator DocumentDataset:

from nemo_curator.utils.file_utils import get_all_files_paths_under

# Ignores checksum and marker files created by Spark
processed_files = [
     filename for filename in get_all_files_paths_under("tiny_stories_transformed")
     if not (filename.endswith(".crc") or filename.endswith("_SUCCESS"))
]

stories_dataset = DocumentDataset.read_parquet(processed_files, backend="pandas")

It is worth noting that Spark typically tends to create checksum and other marker files which can vary by Spark distribution, so it is advisable to ignore them when reading data into a NeMo Curator DocumentDataset.