Reading and writing datasets with NeMo Curator and Apache Spark#
Background#
NeMo Curator uses the DocumentDataset
class to read and write JSONL and Parquet files.
It is a wrapper around a Dask (or Dask-cuDF) DataFrame. Apache Spark can read and write JSONL and Parquet files generated by NeMo Curator, and similarly, NeMo Curator can work with the outputs generated by Spark.
Usage#
To demonstrate how this would work, consider the following example:
import dask.dataframe as dd
import pandas as pd
from nemo_curator.datasets import DocumentDataset
# Create sample data
data = {
"id": [1, 2, 3],
"text": [
"This is a tiny story.",
"Another tiny story appears here.",
"Yet another tiny story for you."
]
}
# Convert to a pandas DataFrame first
df = pd.DataFrame(data)
# Convert pandas DataFrame to DocumentDataset
stories_ds = DocumentDataset(dd.from_pandas(df, npartitions=2))
# Write the dataset to JSONL files
stories_ds.to_json("tiny_stories/", write_to_filename=False)
This will create two JSONL files in the directory tiny_stories/
:
tiny_stories/
0.part
1.part
Apache Spark can read these files using standard APIs. Let’s first create a Spark session called NeMoCuratorExample
, then we can read files in the directory using:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("NeMoCuratorExample").getOrCreate()
# Reading JSONL file
stories_df = spark.read.json("tiny_stories")
stories_df.show()
Let’s go ahead and add a couple of columns to the Spark DataFrame:
from pyspark.sql.functions import size, split, length
# Calculate Word Count
stories_df = stories_df.withColumn("WordCount", size(split(stories_df["text"], r"\s+")))
# Calculate Character Count
stories_df = stories_df.withColumn("CharacterCount", length(stories_df["text"]))
stories_df.write.mode("overwrite").parquet("tiny_stories_transformed")
To interoperate between NeMo Curator DocumentDataset
and Spark DataFrames, we recommend using Parquet files for data exchange.
The following code snippet demonstrates how to read output from a Spark DataFrame into a NeMo Curator DocumentDataset
:
from nemo_curator.utils.file_utils import get_all_files_paths_under
# Ignores checksum and marker files created by Spark
processed_files = [
filename for filename in get_all_files_paths_under("tiny_stories_transformed")
if not (filename.endswith(".crc") or filename.endswith("_SUCCESS"))
]
stories_dataset = DocumentDataset.read_parquet(processed_files, backend="pandas")
It is worth noting that Spark typically tends to create checksum and other marker files which can vary by Spark distribution,
so it is advisable to ignore them when reading data into a NeMo Curator DocumentDataset
.