Important
NeMo 2.0 is an experimental feature and currently released in the dev container only: nvcr.io/nvidia/nemo:dev. Please refer to NeMo 2.0 overview for information on getting started.
Working with DocumentDataset
Background
Text datasets are responsible for storing metadata along with the core text/document.
jsonl`
files are common for their ease of processing and inspecting.
parquet
files are also a common format.
In both cases, a single dataset is often represented with multiple underlying files (called shards).
For example, if you have a large dataset named “books” it is likely you will store it in shards with each shard being named something like books_00.jsonl
, books_01.jsonl
, books_02.jsonl
, etc.
How you store your dataset in memory is just as important as how you store it on disk.
If you have a large dataset that is too big to fit directly into memory, you will have to somehow distribute it across multiple machines/nodes.
Furthermore, if curating your dataset takes a long time, it is likely to get interrupted due to some unforseen failure or another.
NeMo Curator’s DocumentDataset
employs Dask’s distributed dataframes to mangage large datasets across multiple nodes and allow for easy restarting of interrupted curation.
DocumentDataset
supports reading and writing to sharded jsonl
and parquet
files both on local disk and from remote sources directly like S3.
Usage
Reading and Writing
DocumentDataset
is the standard format for text datasets in NeMo Curator.
Imagine we have a “books” dataset stored in the following structure:
books_dataset/
books_00.jsonl
books_01.jsonl
books_02.jsonl
You could read, filter the dataset, and write it using the following methods
import nemo_curator as nc
from nemo_curator.datasets import DocumentDataset
from nemo_curator.utils.file_utils import get_all_files_paths_under
from nemo_curator.filters import WordCountFilter
files = get_all_files_paths_under("books_dataset/")
books = DocumentDataset.read_json(files, add_filename=True)
filter_step = nc.ScoreFilter(
WordCountFilter(min_words=80),
text_field="text",
score_field="word_count",
)
long_books = filter_step(books)
long_books.to_json("long_books/", write_to_filename=True)
Let’s walk through this code line by line.
files = get_all_files_paths_under("books_dataset/")
This retrieves a list of all files in the given directory. In our case, this is equivalent to writingfiles = ["books_dataset/books_00.jsonl", "books_dataset/books_01.jsonl", "books_dataset/books_02.jsonl"]
books = DocumentDataset.read_json(files, add_filename=True)
This will read the files listed into memory. Theadd_filename=True
option preserves the name of the shard (books_00.jsonl
,books_01.jsonl
, etc.) as an additionalfilename
field. When the dataset is written back to disk, this option (in conjunction with thewrite_to_filename
option) ensure that documents stay in their original shard. This can be useful for manually inspecting the results of filtering shard by shard.filter_step = ...
This constructs and applies a heuristic filter for the length of the document. More information is provided in the filtering page of the documentation.long_books.to_json("long_books/", write_to_filename=True)
This writes the filtered dataset to a new directory. As mentioned above, thewrite_to_filename=True
preserves the sharding of the dataset. If the dataset was not read in withadd_filename=True
, settingwrite_to_filename=True
will throw an error.
DocumentDataset
is just a wrapper around a Dask dataframe.
The underlying dataframe can be accessed with the DocumentDataset.df
member variable.
It is important to understand how Dask handles computation.
To quote from their documentation:
Dask is lazily evaluated. The result from a computation isn’t computed until you ask for it. Instead, a Dask task graph for the computation is produced.
Because of this, the call to DocumentDataset.read_json
will not execute immediately.
Instead, tasks that read each shard of the dataset will be placed on the task graph.
The task graph is only executed when a call to DocumentDataset.df.compute()
is made, or some operation that depends on DocumentDataset.df
calls .compute()
.
This allows us to avoid reading massive datasets into memory.
In our case, long_books.to_json()
internally calls .compute()
, so the task graph will be executed then.
Resuming from Interruptions
It can be helpful to track which documents in a dataset have already been processed so that long curation jobs can be resumed if they are interrupted. NeMo Curator provides a utility for easily tracking which dataset shards have already been processed. Consider a modified version of the code above:
from nemo_curator.utils.file_utils import get_remaining_files
files = get_remaining_files("books_dataset/", "long_books/", "jsonl")
books = DocumentDataset.read_json(files, add_filename=True)
filter_step = nc.ScoreFilter(
WordCountFilter(min_words=80),
text_field="text",
score_field="word_count",
)
long_books = filter_step(books)
long_books.to_json("long_books/", write_to_filename=True)
get_remaining_files
compares the input directory ("books_dataset/"
) and the output directory ("long_books"
) and returns a list of all the shards in the input directory that have not yet been written to the output directory.
While Dask provides an easy way to avoid reading too much data into memory, there are times when we may need to call persist()
or a similar operation that forces the dataset into memory.
In these cases, we recommend processing the input dataset in batches using a simple wrapper function around get_remaining_files
as shown below.
from nemo_curator.utils.file_utils import get_batched_files
for files in get_batched_files("books_dataset/", "long_books/", "jsonl", batch_size=64):
books = DocumentDataset.read_json(files, add_filename=True)
filter_step = nc.ScoreFilter(
WordCountFilter(min_words=80),
text_field="text",
score_field="word_count",
)
long_books = filter_step(books)
long_books.to_json("long_books/", write_to_filename=True)
This will read in 64 shards at a time, process them, and write them back to disk.
Like get_remaining_files
, it only includes files that are in the input directory and not in the output directory.
Blending and Shuffling
Blending data from multiple sources can be a great way of improving downstream model performance. This blending can be done during model training itself (i.e., online blending) or it can be done before training (i.e., offline blending). Online blending is useful for rapidly iterating in the training process. Meanwhile, offline blending is useful if you want to distribute the dataset. Online blending is currently possible in NeMo via NVIDIA Megatron Core, and NeMo Curator offers a way to perform blending offline.
Let’s take a look at how datasets can be combined using nc.blend_datasets
import nemo_curator as nc
books = DocumentDataset.read_json("books_dataset/")
articles = DocumentDataset.read_json("articles_dataset/")
journals = DocumentDataset.read_json("journals_dataset/")
datasets = [books, articles, journals]
target_samples = 1000
weights = [5.0, 2.0, 1.0]
blended_dataset = nc.blend_datasets(target_samples, datasets, weights)
blended_dataset.to_json("blended_dataset/")
datasets = [books, articles, journals]
Here, we are choosing to blend three different datasets. These datasets do not have to be in the same file format, or similar in size. So long as they can be read in as a DocumentDataset, they will be fine. The samples from each dataset are always drawn “in order”. The precise order depends on the format. For sharded jsonl files, the entries at the beginning of the file with the first name in sorted order will be chosen first.target_samples = 1000
This is the desired number of samples in the resulting dataset. By sample, we mean document or just generally a single datapoint. There may end up being more samples in the dataset depending on the weights.weights = [5.0, 2.0, 1.0]
The relative number of samples that should be taken from each dataset. Given these weights, the blended dataset will have five times as many samples from books as there are samples from journals. Similarly, there will be two times as many samples from articles when compared to samples from journals. Weights can be a list of non-negative real numbers.nc.blend_datasets
will do the normalization and combine the normalized weights with the target samples to determine how many samples should be taken from each dataset. In the case of the books dataset, the following would be the calculation.\[\lceil target\_samples \cdot w_i\rceil=\lceil 1000\cdot \frac{5}{8}\rceil=625\]If any datasets have fewer samples than the calculated weight, they will be oversampled to meet the quota. For example, if the books dataset only had 500 documents in it, the first 125 would be repeated to achieve the 625 samples.
blended_dataset = nc.blend_datasets(target_samples, datasets, weights)
We now call the function itself. Afterwards, we are left with a blended dataset that we can operate on like any other dataset. We can apply filters, deduplicate, or classify the documents.
Because blending datasets involves combining data from multiple sources, the sharding of the original datasets
cannot be preserved. The options add_filename=True
and write_to_filename=True
for reading and writing
datasets are therefore incompatible with nc.blend_datasets
.
Shuffling can be another important aspect of dataset management.
NeMo Curator’s nc.Shuffle
allows users to reorder all entries in the dataset.
Here is a small example on how this can be done:
import nemo_curator as nc
books = DocumentDataset.read_json("books_dataset/")
shuffle = nc.Shuffle(seed=42)
shuffled_books = shuffle(books)
shuffled_books.to_json("shuffled_books/")
shuffle = nc.Shuffle(seed=42)
This creates a shuffle operation that can be chained with the various other modules in NeMo Curator. In this example, we fix the seed to be 42. Setting the seed will guarantee determinism, but may be slightly slower (20-30% slower) depending on the dataset size.shuffled_books = shuffle(books)
The dataset has now been shuffled, and we can save it to the filesystem.