NeMo Data Curator

The NeMo Data Curator is a Python library that consists of a collection of scalable data-mining modules for curating natural language processing (NLP) data for training large language models (LLMs). The modules within the NeMo Data Curator enable NLP researchers to mine high-quality text at scale from massive uncurated web corpora.

Currently, within the NeMo Data Curator, we support the following data-curation modules:

  • Configurable data download and text extraction:

    • Default implementations of download and extraction of Common Crawl, Wikipedia, and ArXiv data

    • Users can easily customize the download and extraction and extend to other datasets (see 0_download.rst within the docs directory for more info on download and extraction)

  • Text reformatting and cleaning via ftfy

  • Quality filtering:

    • Multilingual heuristic-based filtering

    • Classifier-based filtering via fastText

  • Document-level deduplication

    • Exact deduplication

    • Fuzzy deduplication. Our implementation of fuzzy deduplication builds off of the following existing libraries:

      • For computing MinHash signatures we use a modified version of the MinHasher class provided in pyLSH

      • For the locality sensitive hashing, we extended the Redis-based implementation found in datasketch beyond a single Redis server to a Redis Cluster. This enables this module to efficiently deduplicate large datasets that do not fit in memory of a single node (e.g., several TB of text)

    • Multilingual downstream-task decontamination

The modules are implemented in a scalable manner using Message Passing Interface (MPI) for Python (mpi4py) and we use Dask for creating balanced input jsonl files. With the scalable modules within the NeMo Data Curator, we have been have been able to fully process a Common Crawl Snapshot (consisting of 60 TB of compressed WARC files) in approximately two days using 30 CPU nodes (with hardware similar to the c5.24xlarge Amazon AWS C5 instance). Please note that the core functions used within the NeMo Data Curator (e.g., html extraction, text cleaning, heuristic filtering, etc.) have not been fully optimized. The main goal of the NeMo Data Curator is to provide users the capability to apply these functions to their large datasets using many compute nodes.

The modules within the NeMo Data Curator were in large part designed to curate high-quality documents from Common Crawl snapshots and to be able to do so in a scalable manner. In order to assess the quality of the Common Crawl documents curated by the modules in the NeMo Data Curator, we performed a series of ablation experiments in which we trained a 357M-parameter GPT-style model on the datasets resulting from the different stages of our data curation pipeline implemented in the NeMo Data Curator. The figure below demonstrates that the different data curation modules implemented within the NeMo Data Curator lead to improved model zero-shot downstream task performance.

zeroshot_ablations.png

In terms of scalability and compute performance, the following table shows the time required and resulting data size reduction of each step of processing the Common Crawl snapshot from November/December of 2020 using 30 nodes (with hardware similar to the c5.24xlarge Amazon AWS C5 instance):

Dataset

Download and text extraction

Text cleaning

Quality filtering

Fuzzy deduplication

Time Output size Time Output size Time Output size Time Output size
Common Crawl 2020-50 36 hrs 2.8 TB 1 hr 2.8 TB 0.2 hr 0.52 TB 5 hrs 0.32 TB

Additionally, the heuristic filtering and deduplication modules can be used for processing TBs of text originating from a variety of different corpora (news, books, dialogue, etc.). In the future, we plan to extend the download and extraction modules to prepare text from sources different than Common Crawl and add more rows to this table.

As mentioned above, the modules within the NeMo Data Curator enable users to scale data-mining and NLP processing tasks to many nodes within a compute cluster. This is accomplished using both mpi4py and the standard Python multiprocessing library. In general, the scripts within the NeMo Data Curator assume that one MPI rank is assigned to a single compute node within a cluster, and that within that compute node, the MPI rank will fork one process per CPU core available on that node in order to process the data assigned to that MPI rank. Assuming a SLURM-based cluster, the following code snippet demonstrates how the majority of the scripts within the NeMo Data Curator are expected to be launched:

Copy
Copied!
            

salloc --nodes=< number-of-nodes > # Allocates nodes (will need other cluster-specific commands) srun -l \ --mpi=< type of process management interface, pmi2, pmix, etc > \ --ntasks-per-node=1 \ --cpus-per-task=< number-of-cpus-per-node > \ example_script \ --input-data-dir=< path-to-input-data-directory > \ --output-data-dir=< path-to-output-data-directory > \ --cpus-per-node=< desired-number-of-processes-to-be-forked> \ --log-dir=< path-to-output-log-directory >

In the above code snippet, the salloc command allocates <number-of-nodes> compute nodes (this can be equivalently achieved via an #SBATCH directive), and the subsequent srun call will assign one MPI rank to a compute node (controlled by the ntasks-per-node argument). The additional cpus-per-task argument will determine the number of CPU cores made available to each MPI rank. In general, we recommend providing the maximum number of CPU cores available on each node. If the user desires to limit the total number of processes forked per node, they can control this via the --cpus-per-node argument. If this argument is not specified, then the majority of the scripts within the Nemo Data Curator will fork as many processes as CPU cores available.

Once the different MPI ranks have been allocated to their respective nodes, each expects an input directory of files in loose json format (often with the extension .jsonl, but this exact extension is not required), with one json containing a text sample per line. These files contain the documents that make up your NLP corpus. In order to process the documents in parallel, the files are distributed across the MPI ranks, each of which then further distributes the files across the processes local to each node. Therefore, to achieve large amounts of parallelism, it is recommended to have datasets spread across many balanced .jsonl files. In the scenario in which users’ datasets consist of a few large files, we provide the utility make_data_shards to write out many balanced .jsonl files from a single large input file.

Once each local process has been assigned its files, it will then proceed to read in the documents from each file, perform its processing/filtering task, and write the documents out to the directory specified by the --output-data-dir argument. For most tasks performed within the NeMo Data Curator, for each file in the input directory, a corresponding file (with the same name), is written to the output directory, thus keeping the number of files the same between input and output directories.

In addition to the --input-data-dir and --output-data-dir arguments, the majority of the scripts within the NeMo Data Curator have the --input-local-data-dir argument. While specifying the --input-data-dir argument assumes that each MPI rank has access to the input data path that is located on a distributed file system, depending on the system on which a user may be working, a distributed file system may not be available. Therefore, the argument --input-local-data-dir enables users to apply the scripts within the NeMo Data Curator to files that are local to specific compute nodes.

As a final point of the generic structure of the run command, we discuss the --log-dir argument. All MPI-enabled scripts within the NeMo Data Curator have the argument --log-dir to which each MPI rank and the processes local to each node will write out logs during the execution of the script. For the majority of the scripts in the NeMo Data Curator, the log directory has the following structure:

Copy
Copied!
            

./example_log_dir/ ├── rank_000 │   ├── local_000.log │   ├── local_001.log │   ├── ... │   ├── local_{ncpus_per_node-1}.log │   └── rank_000.log ├── rank_001 │   ├── local_000.log │   ├── local_001.log │   ├── ... │   ├── local_{ncpus_per_node-1}.log │   └── rank_001.log ├── ... │ └── rank_{number_of_nodes-1} ├── local_000.log ├── local_001.log ├── ... ├── local_{ncpus_per_node-1}.log └── rank_{number_of_nodes-1}.log

The output of the tree command shown above shows that within the output log directory, each MPI rank creates its own sub directory within which it writes its logs to the file rank_{rank_num}.log where rank_num is the rank index assigned to the MPI rank. Additionally, each local process will also create its own log file within the MPI rank that forked the local process. This directory structure for logging allows for tracking of the progress of each process created during the execution of the script as well as debugging in the case that troubleshooting becomes necessary. For each script, a default log directory name is provided and will be created automatically upon running the script. Additionally, a user can provide a non-existent or currently existing path to the --log-dir argument.

© Copyright 2023, NVIDIA. Last updated on Nov 14, 2023.