utils.distributed_utils#

Module Contents#

Functions#

check_dask_cwd

get_client

Initializes or connects to a Dask cluster. The Dask cluster can be CPU-based or GPU-based (if GPUs are available). The intialization ensures maximum memory efficiency for the GPU by: 1. Ensuring the PyTorch memory pool is the same as the RAPIDS memory pool. (If set_torch_to_use_rmm is True) 2. Enabling spilling for cuDF. (If enable_spilling is True)

get_current_client

Returns the context-local or latest initailised client. If no Client instances exist, returns None

get_filepath_without_extension

get_gpu_memory_info

Get the total GPU memory for each Dask worker. Returns: dict: A dictionary mapping Dask worker addresses (‘IP:PORT’) to their respective GPU memory (in bytes). Example: {‘192.168.0.100:9000’: 3.2e+10, ‘192.168.0.101:9000’: 3.2e+10} Note: If there is no active Dask client, an empty dictionary is returned.

get_network_interfaces

Gets a list of all valid network interfaces on a machine

get_num_workers

Returns the number of workers in the cluster

load_object_on_worker

This function checks if a Dask worker has a specified attribute for storing an object. If it does, then fetch the object and return it. If it does not, then load the object, set it as an attribute, and return it.

offload_object_on_worker

This function deletes an existing attribute from a Dask worker.

performance_report_if

Generates a performance report if a valid path is provided, or returns a no-op context manager if not.

performance_report_if_with_ts_suffix

Same as performance_report_if, except it suffixes the report_name with the timestamp.

process_all_batches

This function iterates over batches of data, loading a model and running inference per batch.

process_batch

This function loads a model on a Dask worker and then runs inference on a batch of data.

read_data

This function can read multiple data formats and returns a Dask-cuDF DataFrame.

read_data_blocksize

read_data_files_per_partition

read_pandas_pickle

This function reads a pickle file with Pandas.

read_single_partition

This function reads a file with cuDF, sorts the columns of the DataFrame and adds a filename column.

seed_all

Function to set seed for random number generators for reproducibility.

select_columns

single_partition_write_with_filename

This function processes a DataFrame and writes it to disk

start_dask_cpu_local_cluster

This function sets up a Dask cluster across all the CPUs present on the machine.

start_dask_gpu_local_cluster

This function sets up a Dask cluster across all the GPUs present on the machine.

write_to_disk

This function writes a Dask DataFrame to the specified file path. If write_to_filename is True, then it expects the DataFrame to have a filename_col that specifies where to write the document.

Data#

API#

utils.distributed_utils.LocalCUDACluster#

‘gpu_only_import_from(…)’

exception utils.distributed_utils.NoWorkerError#

Bases: Exception

Common base class for all non-exit exceptions.

Initialization

Initialize self. See help(type(self)) for accurate signature.

utils.distributed_utils.SUPPORTED_JSONL_COMPRESSIONS#

None

utils.distributed_utils.check_dask_cwd(file_list: list[str]) None#
utils.distributed_utils.get_client(
cluster_type: Literal[cpu, gpu] = 'cpu',
scheduler_address: str | None = None,
scheduler_file: str | None = None,
n_workers: int | None = os.cpu_count(),
threads_per_worker: int = 1,
nvlink_only: bool = False,
protocol: Literal[tcp, ucx] = 'tcp',
rmm_pool_size: str | int | None = '1024M',
enable_spilling: bool = True,
set_torch_to_use_rmm: bool = False,
rmm_async: bool = True,
rmm_maximum_pool_size: str | int | None = None,
rmm_managed_memory: bool = False,
rmm_release_threshold: str | int | None = None,
**cluster_kwargs,
) dask.distributed.Client#

Initializes or connects to a Dask cluster. The Dask cluster can be CPU-based or GPU-based (if GPUs are available). The intialization ensures maximum memory efficiency for the GPU by: 1. Ensuring the PyTorch memory pool is the same as the RAPIDS memory pool. (If set_torch_to_use_rmm is True) 2. Enabling spilling for cuDF. (If enable_spilling is True)

Args: cluster_type: If scheduler_address and scheduler_file are None, sets up a local (single node) cluster of the specified type. Either “cpu” or “gpu”. Defaults to “cpu”. Many options in get_client only apply to CPU-based or GPU-based clusters. Make sure you check the description of the parameter. scheduler_address: Address of existing Dask cluster to connect to. This can be the address of a Scheduler server like a string ‘127.0.0.1:8786’ or a cluster object like LocalCluster(). If specified, all other arguments are ignored and the client is connected to the existing cluster. The other configuration options should be done when setting up the Dask cluster. scheduler_file: Path to a file with scheduler information if available. If specified, all other arguments are ignored and the client is connected to the existing cluster. The other configuration options should be done when setting up the Dask cluster. n_workers: For CPU-based clusters only. The number of workers to start. Defaults to os.cpu_count(). For GPU-based clusters, the number of workers is locked to the number of GPUs in CUDA_VISIBLE_DEVICES. threads_per_worker: For CPU-based clusters only. The number of threads per each worker. Defaults to 1. Before increasing, ensure that your functions frequently release the GIL. nvlink_only: For GPU-based clusters only. Whether to use nvlink or not. protocol: For GPU-based clusters only. Protocol to use for communication. “tcp” or “ucx”. rmm_pool_size: For GPU-based clusters only. RMM pool size to initialize each worker with. Can be an integer (bytes), float (fraction of total device memory), string (like “5GB” or “5000M”), or None to disable RMM pools. enable_spilling: For GPU-based clusters only. Enables automatic spilling (and “unspilling”) of buffers from device to host to enable out-of-memory computation, i.e., computing on objects that occupy more memory than is available on the GPU. set_torch_to_use_rmm: For GPU-based clusters only. Sets up the PyTorch memory pool to be the same as the RAPIDS memory pool. This helps avoid OOM errors when using both PyTorch and RAPIDS on the same GPU. rmm_async: For GPU-based clusters only. Initializes each worker with RAPIDS Memory Manager (RMM) (see RMM documentation for more information: https://docs.rapids.ai/api/rmm/stable/) and sets it to use RMM’s asynchronous allocator. Warning: The asynchronous allocator requires CUDA Toolkit 11.2 or newer. It is also incompatible with RMM pools and managed memory. Trying to enable both will result in an exception. rmm_maximum_pool_size: For GPU-based clusters only. When rmm_pool_size is set, this argument indicates the maximum pool size. Can be an integer (bytes), float (fraction of total device memory), string (like “5GB” or “5000M”) or None. By default, the total available memory on the GPU is used. rmm_pool_size must be specified to use RMM pool and to set the maximum pool size. Note: When paired with –enable-rmm-async the maximum size cannot be guaranteed due to fragmentation. Note: This size is a per-worker configuration, and not cluster-wide. rmm_managed_memory: For GPU-based clusters only. Initialize each worker with RMM and set it to use managed memory. If disabled, RMM may still be used by specifying rmm_pool_size. Warning: Managed memory is currently incompatible with NVLink. Trying to enable both will result in an exception. rmm_release_threshold: For GPU-based clusters only. When rmm.async is True and the pool size grows beyond this value, unused memory held by the pool will be released at the next synchronization point. Can be an integer (bytes), float (fraction of total device memory), string (like “5GB” or “5000M”) or None. By default, this feature is disabled. Note: This size is a per-worker configuration, and not cluster-wide. cluster_kwargs: Additional keyword arguments for the LocalCluster or LocalCUDACluster configuration. See API documentation https://docs.dask.org/en/stable/deploying-python.html#distributed.deploy.local.LocalCluster for all LocalCluster parameters, or https://docs.rapids.ai/api/dask-cuda/nightly/api/ for all LocalCUDACluster parameters. Returns: A Dask client object.

utils.distributed_utils.get_current_client() dask.distributed.Client | None#

Returns the context-local or latest initailised client. If no Client instances exist, returns None

utils.distributed_utils.get_device_total_memory#

‘gpu_only_import_from(…)’

utils.distributed_utils.get_filepath_without_extension(path: str) str#
utils.distributed_utils.get_gpu_memory_info() dict[str, int]#

Get the total GPU memory for each Dask worker. Returns: dict: A dictionary mapping Dask worker addresses (‘IP:PORT’) to their respective GPU memory (in bytes). Example: {‘192.168.0.100:9000’: 3.2e+10, ‘192.168.0.101:9000’: 3.2e+10} Note: If there is no active Dask client, an empty dictionary is returned.

utils.distributed_utils.get_network_interfaces() list[str]#

Gets a list of all valid network interfaces on a machine

Returns: A list of all valid network interfaces on a machine

utils.distributed_utils.get_num_workers(client: dask.distributed.Client | None) int | None#

Returns the number of workers in the cluster

utils.distributed_utils.load_object_on_worker(
attr: str,
load_object_function: collections.abc.Callable,
load_object_kwargs: dict,
) Any#

This function checks if a Dask worker has a specified attribute for storing an object. If it does, then fetch the object and return it. If it does not, then load the object, set it as an attribute, and return it.

Args: attr: A string representing an attribute of a Dask worker. load_object_function: A user-provided function for how to load the object. load_object_kwargs: A dictionary of arguments necessary for load_object_function. Returns: The object of interest according to the attr.

utils.distributed_utils.offload_object_on_worker(attr: str) bool#

This function deletes an existing attribute from a Dask worker.

Args: attr: The name of the attribute to delete. Returns: True.

utils.distributed_utils.performance_report_if(
path: str | None = None,
report_name: str = 'dask-profile.html',
) contextlib.AbstractContextManager[Any]#

Generates a performance report if a valid path is provided, or returns a no-op context manager if not.

Args: path: The directory path where the performance report should be saved. If None, no report is generated. report_name: The name of the report file.

utils.distributed_utils.performance_report_if_with_ts_suffix(
path: str | None = None,
report_name: str = 'dask-profile',
) contextlib.AbstractContextManager[Any]#

Same as performance_report_if, except it suffixes the report_name with the timestamp.

utils.distributed_utils.process_all_batches(
loader_valid,
load_model_function,
load_model_kwargs,
run_inference_function,
run_inference_kwargs,
)#

This function iterates over batches of data, loading a model and running inference per batch.

Args: loader_valid: An iterable data object, such as a PyTorch DataLoader. load_model_function: A user-provided function for loading a classifier. load_model_kwargs: A dictionary of arguments necessary for load_model_function. run_inference_function: A user-provided function for running inference, which has “model” and “batch” arguments. run_inference_kwargs: A dictionary of arguments necessary for run_inference_function. Returns: A tensor of predictions for all batches of the data.

utils.distributed_utils.process_batch(
load_model_function,
load_model_kwargs,
run_inference_function,
run_inference_kwargs,
)#

This function loads a model on a Dask worker and then runs inference on a batch of data.

Args: load_model_function: A user-provided function for loading a classifier. load_model_kwargs: A dictionary of arguments necessary for load_model_function. run_inference_function: A user-provided function for running inference, which has a “model” argument. run_inference_kwargs: A dictionary of arguments necessary for run_inference_function. Returns: Whatever run_inference_function returns, such as a list or tensor of predictions.

utils.distributed_utils.read_data(
input_files: str | list[str],
file_type: str = 'pickle',
backend: Literal[utils.distributed_utils.cudf, pandas] = 'cudf',
blocksize: str | None = None,
files_per_partition: int | None = 1,
add_filename: bool | str = False,
input_meta: str | dict | None = None,
columns: list[str] | None = None,
read_func_single_partition: collections.abc.Callable[[list[str], str, bool, str | dict, dict], dask.dataframe.DataFrame | pandas.DataFrame] | None = None,
**kwargs,
) dask.dataframe.DataFrame#

This function can read multiple data formats and returns a Dask-cuDF DataFrame.

Args: input_files: The path of the input file(s). file_type: The type of the input file(s). backend: The backend to use for reading the data. blocksize: The size of desired indidivudal partition to be read from files. Either blocksize or files_per_partition must be set. files_per_partition: The number of files to read per partition. Either blocksize or files_per_partition must be set. add_filename: Whether to add a “file_name” column to the DataFrame. input_meta: A dictionary or a string formatted as a dictionary, which outlines the field names and their respective data types within the JSONL input file. columns: If not None, only these columns will be read from the file. There is a significant performance gain when specifying columns for Parquet files. read_func_single_partition: A function that reads a single partition of data. This can only be used in conjunction with files_per_partition. The function should take the following arguments: - files: A list of file paths that will be read in a single partition. - file_type: The type of the file to read (in case you want to handle different file types differently). - backend: The backend to use for reading the data. (cudf or pandas) - add_filename: Read below - columns: Read below - input_meta: Read below

Returns: A Dask-cuDF or a Dask-pandas DataFrame.

utils.distributed_utils.read_data_blocksize(
input_files: list[str],
backend: Literal[utils.distributed_utils.cudf, pandas],
file_type: Literal[parquet, jsonl],
blocksize: str,
add_filename: bool | str = False,
input_meta: str | dict | None = None,
columns: list[str] | None = None,
**kwargs,
) dask.dataframe.DataFrame#
utils.distributed_utils.read_data_files_per_partition(
input_files: list[str],
file_type: Literal[parquet, json, jsonl],
backend: Literal[utils.distributed_utils.cudf, pandas] = 'cudf',
add_filename: bool | str = False,
files_per_partition: int | None = None,
input_meta: str | dict | None = None,
columns: list[str] | None = None,
read_func_single_partition: collections.abc.Callable[[list[str], str, bool, str | dict, dict], dask.dataframe.DataFrame | pandas.DataFrame] | None = None,
**kwargs,
) dask.dataframe.DataFrame#
utils.distributed_utils.read_pandas_pickle(
file: str,
add_filename: bool | str = False,
columns: list[str] | None = None,
**kwargs,
) pandas.DataFrame#

This function reads a pickle file with Pandas.

Args: file: The path to the pickle file to read. add_filename: Whether to add a “file_name” column to the DataFrame. columns: If not None, only these columns will be read from the file. Returns: A Pandas DataFrame.

utils.distributed_utils.read_single_partition(
files: list[str],
backend: Literal[utils.distributed_utils.cudf, pandas] = 'cudf',
file_type: str = 'jsonl',
add_filename: bool | str = False,
input_meta: str | dict | None = None,
io_columns: list[str] | None = None,
**kwargs,
) utils.distributed_utils.cudf | pandas.DataFrame#

This function reads a file with cuDF, sorts the columns of the DataFrame and adds a filename column.

Args: files: The path to the jsonl files to read. backend: The backend to use for reading the data. Either “cudf” or “pandas”. add_filename: Whether to add a filename column to the DataFrame. If True, a new column is added to the DataFrame called file_name. If str, sets new column name. Default is False. file_type: The type of the file to read. input_meta: A dictionary or a string formatted as a dictionary, which outlines the field names and their respective data types within the JSONL input file. columns: If not None, only these columns will be read from the file. There is a significant performance gain when specifying columns for Parquet files.

Returns: A cudf DataFrame or a pandas DataFrame.

utils.distributed_utils.seed_all(seed: int = 42) None#

Function to set seed for random number generators for reproducibility.

Args: seed: The seed value to use for random number generators. Default is 42.

Returns: None

utils.distributed_utils.select_columns(
df: dask.dataframe.DataFrame | pandas.DataFrame | utils.distributed_utils.cudf,
columns: list[str],
file_type: Literal[jsonl, json, parquet],
add_filename: bool | str,
) dask.dataframe.DataFrame | pandas.DataFrame | utils.distributed_utils.cudf#
utils.distributed_utils.single_partition_write_with_filename(
df: pandas.DataFrame | cudf.DataFrame,
output_file_dir: str,
keep_filename_column: bool = False,
output_type: str = 'jsonl',
filename_col: str = 'file_name',
compression: str | None = None,
) cudf.Series | pandas.Series#

This function processes a DataFrame and writes it to disk

Args: df: A DataFrame. output_file_dir: The output file path. keep_filename_column: Boolean representing whether to keep or drop the filename_col, if it exists. output_type: The type of output file to write. Can be “jsonl” or “parquet”. filename_col: The name of the column that contains the filename. Default is “file_name” compression: The compression type to use. Only supported for JSONL files. Can be “gzip” or None Returns: If the DataFrame is non-empty, return a Series containing a single element, True. If the DataFrame is empty, return a Series containing a single element, False.

utils.distributed_utils.start_dask_cpu_local_cluster(
n_workers: int | None = os.cpu_count(),
threads_per_worker: int = 1,
**cluster_kwargs,
) dask.distributed.Client#

This function sets up a Dask cluster across all the CPUs present on the machine.

See get_client function for parameters.

utils.distributed_utils.start_dask_gpu_local_cluster(
nvlink_only: bool = False,
protocol: str = 'tcp',
rmm_pool_size: int | str | None = '1024M',
enable_spilling: bool = True,
set_torch_to_use_rmm: bool = True,
rmm_async: bool = True,
rmm_maximum_pool_size: int | str | None = None,
rmm_managed_memory: bool = False,
rmm_release_threshold: int | str | None = None,
**cluster_kwargs,
) dask.distributed.Client#

This function sets up a Dask cluster across all the GPUs present on the machine.

See get_client function for parameters.

utils.distributed_utils.write_to_disk(
df: dask.dataframe.DataFrame,
output_path: str,
write_to_filename: bool | str = False,
keep_filename_column: bool = False,
output_type: str = 'jsonl',
partition_on: str | None = None,
compression: str | None = None,
) None#

This function writes a Dask DataFrame to the specified file path. If write_to_filename is True, then it expects the DataFrame to have a filename_col that specifies where to write the document.

Args: df: A Dask DataFrame. output_path: The output file path. write_to_filename: Whether to write the filename using the filename column. If True the file_name column is used to write out. If str, uses that as the filename column to write to. keep_filename_column: Boolean representing whether to keep or drop the filename column, if it exists. output_type: The type of output file to write. Can be “jsonl” or “parquet”. partition_on: The column name to partition the data on. If specified, the data will be partitioned based on the unique values in this column, and each partition will be written to a separate directory compression: The compression type to use. Only supported for JSONL files. Can be “gzip” or None