Important

You are viewing the NeMo 2.0 documentation. This release introduces significant changes to the API and a new library, NeMo Run. We are currently porting all features from NeMo 1.0 to 2.0. For documentation on previous versions or features not yet available in 2.0, please refer to the NeMo 24.07 documentation.

Reading and writing datasets with NeMo Curator and Apache Spark#

Background#

NeMo Curator uses the DocumentDataset class to read and write JSONL and Parquet files. It is a wrapper around a Dask (or Dask-cuDF) DataFrame. Apache Spark can read and write JSONL and Parquet files generated by NeMo Curator, and similarly, NeMo Curator can work with the outputs generated by Spark.

Usage#

To demonstrate how this would work, consider the following example:

import dask.dataframe as dd
import pandas as pd
from nemo_curator.datasets import DocumentDataset

# Create sample data
data = {
    "id": [1, 2, 3],
    "text": [
        "This is a tiny story.",
        "Another tiny story appears here.",
        "Yet another tiny story for you."
    ]
}

# Convert to a pandas DataFrame first
df = pd.DataFrame(data)

# Convert pandas DataFrame to DocumentDataset
stories_ds = DocumentDataset(dd.from_pandas(df, npartitions=2))

# Write the dataset to JSONL files
stories_ds.to_json("tiny_stories/", write_to_filename=False)

This will create two JSONL files in the directory tiny_stories/:

tiny_stories/
    0.part
    1.part

Apache Spark can read these files using standard APIs. Let’s first create a Spark session called NeMoCuratorExample, then we can read files in the directory using:

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("NeMoCuratorExample").getOrCreate()

# Reading JSONL file
stories_df = spark.read.json("tiny_stories")
stories_df.show()

Let’s go ahead and add a couple of columns to the Spark DataFrame:

from pyspark.sql.functions import size, split, length

# Calculate Word Count
stories_df = stories_df.withColumn("WordCount", size(split(stories_df["text"], r"\s+")))

# Calculate Character Count
stories_df = stories_df.withColumn("CharacterCount", length(stories_df["text"]))

stories_df.write.mode("overwrite").parquet("tiny_stories_transformed")

To interoperate between NeMo Curator DocumentDataset and Spark DataFrames, we recommend using Parquet files for data exchange. The following code snippet demonstrates how to read output from a Spark DataFrame into a NeMo Curator DocumentDataset:

from nemo_curator.utils.file_utils import get_all_files_paths_under

# Ignores checksum and marker files created by Spark
processed_files = [
     filename for filename in get_all_files_paths_under("tiny_stories_transformed")
     if not (filename.endswith(".crc") or filename.endswith("_SUCCESS"))
]

stories_dataset = DocumentDataset.read_parquet(processed_files, backend="pandas")

It is worth noting that Spark typically tends to create checksum and other marker files which can vary by Spark distribution, so it is advisable to ignore them when reading data into a NeMo Curator DocumentDataset.