Distributed Computing Reference#
This reference documents NeMo Curator’s distributed computing functionality, which uses Dask to process large datasets across multiple machines.
API Reference#
get_client()
#
from nemo_curator.utils.distributed_utils import get_client
client = get_client(
cluster_type="cpu",
scheduler_address=None,
scheduler_file=None,
n_workers=os.cpu_count(),
threads_per_worker=1,
nvlink_only=False,
protocol="tcp",
rmm_pool_size="1024M",
enable_spilling=True,
set_torch_to_use_rmm=False,
rmm_async=True,
rmm_maximum_pool_size=None,
rmm_managed_memory=False,
rmm_release_threshold=None,
**cluster_kwargs
)
Initializes or connects to a Dask cluster.
Parameters:
cluster_type
: Either “cpu” or “gpu”. Sets up local cluster type ifscheduler_address
andscheduler_file
are None.scheduler_address
: Address of existing Dask cluster to connect to (e.g., ‘127.0.0.1:8786’).scheduler_file
: Path to a file with scheduler information.n_workers
: (CPU clusters only) Number of workers to start. Defaults toos.cpu_count()
.threads_per_worker
: (CPU clusters only) Number of threads per worker. Defaults to 1.nvlink_only
: (GPU clusters only) Whether to use NVLink for communication.protocol
: (GPU clusters only) Protocol for communication, “tcp” or “ucx”.rmm_pool_size
: (GPU clusters only) RAPIDS Memory Manager pool size per worker.enable_spilling
: (GPU clusters only) Whether to enable automatic memory spilling.set_torch_to_use_rmm
: (GPU clusters only) Whether to use RMM for PyTorch allocations.rmm_async
: (GPU clusters only) Whether to use RMM’s asynchronous allocator.rmm_maximum_pool_size
: (GPU clusters only) Maximum pool size for RMM.rmm_managed_memory
: (GPU clusters only) Whether to use CUDA managed memory.rmm_release_threshold
: (GPU clusters only) Threshold for releasing memory from the pool.cluster_kwargs
: Additional keyword arguments for LocalCluster or LocalCUDACluster.
Returns:
A Dask client object connected to the specified cluster.
Client Setup Examples#
Local CPU Cluster#
# 8 CPU workers
client = get_client(
cluster_type="cpu",
n_workers=8,
threads_per_worker=1
)
Local GPU Cluster#
# One worker per GPU
client = get_client(
cluster_type="gpu",
rmm_pool_size="4GB",
enable_spilling=True
)
Connect to Existing Cluster#
# Connect to scheduler
client = get_client(
scheduler_address="tcp://scheduler-address:8786"
)
# Using scheduler file
client = get_client(
scheduler_file="/path/to/scheduler.json"
)
Partition Control#
Control how data is partitioned across workers:
from nemo_curator.datasets import DocumentDataset
# Adjust partition size based on cluster resources
dataset = DocumentDataset.read_json(
files,
blocksize="1GB", # Size per partition
files_per_partition=100 # Files per partition
)
Resource Management#
Monitor and manage cluster resources:
# Access dashboard
print(client.dashboard_link)
# Get worker memory information
worker_memory = client.get_worker_logs()
# Restart workers if needed
client.restart()