utils.distributed_utils
#
Module Contents#
Functions#
Initializes or connects to a Dask cluster.
The Dask cluster can be CPU-based or GPU-based (if GPUs are available).
The intialization ensures maximum memory efficiency for the GPU by:
1. Ensuring the PyTorch memory pool is the same as the RAPIDS memory pool. (If |
|
Returns the context-local or latest initailised client. If no Client instances exist, returns None |
|
Get the total GPU memory for each Dask worker. Returns: dict: A dictionary mapping Dask worker addresses (‘IP:PORT’) to their respective GPU memory (in bytes). Example: {‘192.168.0.100:9000’: 3.2e+10, ‘192.168.0.101:9000’: 3.2e+10} Note: If there is no active Dask client, an empty dictionary is returned. |
|
Gets a list of all valid network interfaces on a machine |
|
Returns the number of workers in the cluster |
|
This function checks if a Dask worker has a specified attribute for storing an object. If it does, then fetch the object and return it. If it does not, then load the object, set it as an attribute, and return it. |
|
This function deletes an existing attribute from a Dask worker. |
|
Generates a performance report if a valid path is provided, or returns a no-op context manager if not. |
|
Same as performance_report_if, except it suffixes the report_name with the timestamp. |
|
This function iterates over batches of data, loading a model and running inference per batch. |
|
This function loads a model on a Dask worker and then runs inference on a batch of data. |
|
This function can read multiple data formats and returns a Dask-cuDF DataFrame. |
|
This function reads a pickle file with Pandas. |
|
This function reads a file with cuDF, sorts the columns of the DataFrame and adds a filename column. |
|
Function to set seed for random number generators for reproducibility. |
|
This function processes a DataFrame and writes it to disk |
|
This function sets up a Dask cluster across all the CPUs present on the machine. |
|
This function sets up a Dask cluster across all the GPUs present on the machine. |
|
This function writes a Dask DataFrame to the specified file path.
If write_to_filename is True, then it expects the
DataFrame to have a |
Data#
API#
- utils.distributed_utils.LocalCUDACluster#
‘gpu_only_import_from(…)’
- exception utils.distributed_utils.NoWorkerError#
Bases:
Exception
Common base class for all non-exit exceptions.
Initialization
Initialize self. See help(type(self)) for accurate signature.
- utils.distributed_utils.SUPPORTED_JSONL_COMPRESSIONS#
None
- utils.distributed_utils.check_dask_cwd(file_list: list[str]) None #
- utils.distributed_utils.get_client(
- cluster_type: Literal[cpu, gpu] = 'cpu',
- scheduler_address: str | None = None,
- scheduler_file: str | None = None,
- n_workers: int | None = os.cpu_count(),
- threads_per_worker: int = 1,
- nvlink_only: bool = False,
- protocol: Literal[tcp, ucx] = 'tcp',
- rmm_pool_size: str | int | None = '1024M',
- enable_spilling: bool = True,
- set_torch_to_use_rmm: bool = False,
- rmm_async: bool = True,
- rmm_maximum_pool_size: str | int | None = None,
- rmm_managed_memory: bool = False,
- rmm_release_threshold: str | int | None = None,
- **cluster_kwargs,
Initializes or connects to a Dask cluster. The Dask cluster can be CPU-based or GPU-based (if GPUs are available). The intialization ensures maximum memory efficiency for the GPU by: 1. Ensuring the PyTorch memory pool is the same as the RAPIDS memory pool. (If
set_torch_to_use_rmm
is True) 2. Enabling spilling for cuDF. (Ifenable_spilling
is True)Args: cluster_type: If scheduler_address and scheduler_file are None, sets up a local (single node) cluster of the specified type. Either “cpu” or “gpu”. Defaults to “cpu”. Many options in get_client only apply to CPU-based or GPU-based clusters. Make sure you check the description of the parameter. scheduler_address: Address of existing Dask cluster to connect to. This can be the address of a Scheduler server like a string ‘127.0.0.1:8786’ or a cluster object like LocalCluster(). If specified, all other arguments are ignored and the client is connected to the existing cluster. The other configuration options should be done when setting up the Dask cluster. scheduler_file: Path to a file with scheduler information if available. If specified, all other arguments are ignored and the client is connected to the existing cluster. The other configuration options should be done when setting up the Dask cluster. n_workers: For CPU-based clusters only. The number of workers to start. Defaults to os.cpu_count(). For GPU-based clusters, the number of workers is locked to the number of GPUs in CUDA_VISIBLE_DEVICES. threads_per_worker: For CPU-based clusters only. The number of threads per each worker. Defaults to 1. Before increasing, ensure that your functions frequently release the GIL. nvlink_only: For GPU-based clusters only. Whether to use nvlink or not. protocol: For GPU-based clusters only. Protocol to use for communication. “tcp” or “ucx”. rmm_pool_size: For GPU-based clusters only. RMM pool size to initialize each worker with. Can be an integer (bytes), float (fraction of total device memory), string (like “5GB” or “5000M”), or None to disable RMM pools. enable_spilling: For GPU-based clusters only. Enables automatic spilling (and “unspilling”) of buffers from device to host to enable out-of-memory computation, i.e., computing on objects that occupy more memory than is available on the GPU. set_torch_to_use_rmm: For GPU-based clusters only. Sets up the PyTorch memory pool to be the same as the RAPIDS memory pool. This helps avoid OOM errors when using both PyTorch and RAPIDS on the same GPU. rmm_async: For GPU-based clusters only. Initializes each worker with RAPIDS Memory Manager (RMM) (see RMM documentation for more information: https://docs.rapids.ai/api/rmm/stable/) and sets it to use RMM’s asynchronous allocator. Warning: The asynchronous allocator requires CUDA Toolkit 11.2 or newer. It is also incompatible with RMM pools and managed memory. Trying to enable both will result in an exception. rmm_maximum_pool_size: For GPU-based clusters only. When rmm_pool_size is set, this argument indicates the maximum pool size. Can be an integer (bytes), float (fraction of total device memory), string (like “5GB” or “5000M”) or None. By default, the total available memory on the GPU is used. rmm_pool_size must be specified to use RMM pool and to set the maximum pool size. Note: When paired with –enable-rmm-async the maximum size cannot be guaranteed due to fragmentation. Note: This size is a per-worker configuration, and not cluster-wide. rmm_managed_memory: For GPU-based clusters only. Initialize each worker with RMM and set it to use managed memory. If disabled, RMM may still be used by specifying rmm_pool_size. Warning: Managed memory is currently incompatible with NVLink. Trying to enable both will result in an exception. rmm_release_threshold: For GPU-based clusters only. When rmm.async is True and the pool size grows beyond this value, unused memory held by the pool will be released at the next synchronization point. Can be an integer (bytes), float (fraction of total device memory), string (like “5GB” or “5000M”) or None. By default, this feature is disabled. Note: This size is a per-worker configuration, and not cluster-wide. cluster_kwargs: Additional keyword arguments for the LocalCluster or LocalCUDACluster configuration. See API documentation https://docs.dask.org/en/stable/deploying-python.html#distributed.deploy.local.LocalCluster for all LocalCluster parameters, or https://docs.rapids.ai/api/dask-cuda/nightly/api/ for all LocalCUDACluster parameters. Returns: A Dask client object.
- utils.distributed_utils.get_current_client() dask.distributed.Client | None #
Returns the context-local or latest initailised client. If no Client instances exist, returns None
- utils.distributed_utils.get_device_total_memory#
‘gpu_only_import_from(…)’
- utils.distributed_utils.get_filepath_without_extension(path: str) str #
- utils.distributed_utils.get_gpu_memory_info() dict[str, int] #
Get the total GPU memory for each Dask worker. Returns: dict: A dictionary mapping Dask worker addresses (‘IP:PORT’) to their respective GPU memory (in bytes). Example: {‘192.168.0.100:9000’: 3.2e+10, ‘192.168.0.101:9000’: 3.2e+10} Note: If there is no active Dask client, an empty dictionary is returned.
- utils.distributed_utils.get_network_interfaces() list[str] #
Gets a list of all valid network interfaces on a machine
Returns: A list of all valid network interfaces on a machine
- utils.distributed_utils.get_num_workers(client: dask.distributed.Client | None) int | None #
Returns the number of workers in the cluster
- utils.distributed_utils.load_object_on_worker(
- attr: str,
- load_object_function: collections.abc.Callable,
- load_object_kwargs: dict,
This function checks if a Dask worker has a specified attribute for storing an object. If it does, then fetch the object and return it. If it does not, then load the object, set it as an attribute, and return it.
Args: attr: A string representing an attribute of a Dask worker. load_object_function: A user-provided function for how to load the object. load_object_kwargs: A dictionary of arguments necessary for
load_object_function
. Returns: The object of interest according to theattr
.
- utils.distributed_utils.offload_object_on_worker(attr: str) bool #
This function deletes an existing attribute from a Dask worker.
Args: attr: The name of the attribute to delete. Returns: True.
- utils.distributed_utils.performance_report_if(
- path: str | None = None,
- report_name: str = 'dask-profile.html',
Generates a performance report if a valid path is provided, or returns a no-op context manager if not.
Args: path: The directory path where the performance report should be saved. If None, no report is generated. report_name: The name of the report file.
- utils.distributed_utils.performance_report_if_with_ts_suffix(
- path: str | None = None,
- report_name: str = 'dask-profile',
Same as performance_report_if, except it suffixes the report_name with the timestamp.
- utils.distributed_utils.process_all_batches(
- loader_valid,
- load_model_function,
- load_model_kwargs,
- run_inference_function,
- run_inference_kwargs,
This function iterates over batches of data, loading a model and running inference per batch.
Args: loader_valid: An iterable data object, such as a PyTorch DataLoader. load_model_function: A user-provided function for loading a classifier. load_model_kwargs: A dictionary of arguments necessary for
load_model_function
. run_inference_function: A user-provided function for running inference, which has “model” and “batch” arguments. run_inference_kwargs: A dictionary of arguments necessary forrun_inference_function
. Returns: A tensor of predictions for all batches of the data.
- utils.distributed_utils.process_batch(
- load_model_function,
- load_model_kwargs,
- run_inference_function,
- run_inference_kwargs,
This function loads a model on a Dask worker and then runs inference on a batch of data.
Args: load_model_function: A user-provided function for loading a classifier. load_model_kwargs: A dictionary of arguments necessary for
load_model_function
. run_inference_function: A user-provided function for running inference, which has a “model” argument. run_inference_kwargs: A dictionary of arguments necessary forrun_inference_function
. Returns: Whateverrun_inference_function
returns, such as a list or tensor of predictions.
- utils.distributed_utils.read_data(
- input_files: str | list[str],
- file_type: str = 'pickle',
- backend: Literal[utils.distributed_utils.cudf, pandas] = 'cudf',
- blocksize: str | None = None,
- files_per_partition: int | None = 1,
- add_filename: bool | str = False,
- input_meta: str | dict | None = None,
- columns: list[str] | None = None,
- read_func_single_partition: collections.abc.Callable[[list[str], str, bool, str | dict, dict], dask.dataframe.DataFrame | pandas.DataFrame] | None = None,
- **kwargs,
This function can read multiple data formats and returns a Dask-cuDF DataFrame.
Args: input_files: The path of the input file(s). file_type: The type of the input file(s). backend: The backend to use for reading the data. blocksize: The size of desired indidivudal partition to be read from files. Either blocksize or files_per_partition must be set. files_per_partition: The number of files to read per partition. Either blocksize or files_per_partition must be set. add_filename: Whether to add a “file_name” column to the DataFrame. input_meta: A dictionary or a string formatted as a dictionary, which outlines the field names and their respective data types within the JSONL input file. columns: If not None, only these columns will be read from the file. There is a significant performance gain when specifying columns for Parquet files. read_func_single_partition: A function that reads a single partition of data. This can only be used in conjunction with files_per_partition. The function should take the following arguments: - files: A list of file paths that will be read in a single partition. - file_type: The type of the file to read (in case you want to handle different file types differently). - backend: The backend to use for reading the data. (cudf or pandas) - add_filename: Read below - columns: Read below - input_meta: Read below
Returns: A Dask-cuDF or a Dask-pandas DataFrame.
- utils.distributed_utils.read_data_blocksize(
- input_files: list[str],
- backend: Literal[utils.distributed_utils.cudf, pandas],
- file_type: Literal[parquet, jsonl],
- blocksize: str,
- add_filename: bool | str = False,
- input_meta: str | dict | None = None,
- columns: list[str] | None = None,
- **kwargs,
- utils.distributed_utils.read_data_files_per_partition(
- input_files: list[str],
- file_type: Literal[parquet, json, jsonl],
- backend: Literal[utils.distributed_utils.cudf, pandas] = 'cudf',
- add_filename: bool | str = False,
- files_per_partition: int | None = None,
- input_meta: str | dict | None = None,
- columns: list[str] | None = None,
- read_func_single_partition: collections.abc.Callable[[list[str], str, bool, str | dict, dict], dask.dataframe.DataFrame | pandas.DataFrame] | None = None,
- **kwargs,
- utils.distributed_utils.read_pandas_pickle(
- file: str,
- add_filename: bool | str = False,
- columns: list[str] | None = None,
- **kwargs,
This function reads a pickle file with Pandas.
Args: file: The path to the pickle file to read. add_filename: Whether to add a “file_name” column to the DataFrame. columns: If not None, only these columns will be read from the file. Returns: A Pandas DataFrame.
- utils.distributed_utils.read_single_partition(
- files: list[str],
- backend: Literal[utils.distributed_utils.cudf, pandas] = 'cudf',
- file_type: str = 'jsonl',
- add_filename: bool | str = False,
- input_meta: str | dict | None = None,
- io_columns: list[str] | None = None,
- **kwargs,
This function reads a file with cuDF, sorts the columns of the DataFrame and adds a filename column.
Args: files: The path to the jsonl files to read. backend: The backend to use for reading the data. Either “cudf” or “pandas”. add_filename: Whether to add a filename column to the DataFrame. If True, a new column is added to the DataFrame called
file_name
. If str, sets new column name. Default is False. file_type: The type of the file to read. input_meta: A dictionary or a string formatted as a dictionary, which outlines the field names and their respective data types within the JSONL input file. columns: If not None, only these columns will be read from the file. There is a significant performance gain when specifying columns for Parquet files.Returns: A cudf DataFrame or a pandas DataFrame.
- utils.distributed_utils.seed_all(seed: int = 42) None #
Function to set seed for random number generators for reproducibility.
Args: seed: The seed value to use for random number generators. Default is 42.
Returns: None
- utils.distributed_utils.select_columns(
- df: dask.dataframe.DataFrame | pandas.DataFrame | utils.distributed_utils.cudf,
- columns: list[str],
- file_type: Literal[jsonl, json, parquet],
- add_filename: bool | str,
- utils.distributed_utils.single_partition_write_with_filename(
- df: pandas.DataFrame | cudf.DataFrame,
- output_file_dir: str,
- keep_filename_column: bool = False,
- output_type: str = 'jsonl',
- filename_col: str = 'file_name',
- compression: str | None = None,
This function processes a DataFrame and writes it to disk
Args: df: A DataFrame. output_file_dir: The output file path. keep_filename_column: Boolean representing whether to keep or drop the
filename_col
, if it exists. output_type: The type of output file to write. Can be “jsonl” or “parquet”. filename_col: The name of the column that contains the filename. Default is “file_name” compression: The compression type to use. Only supported for JSONL files. Can be “gzip” or None Returns: If the DataFrame is non-empty, return a Series containing a single element, True. If the DataFrame is empty, return a Series containing a single element, False.
- utils.distributed_utils.start_dask_cpu_local_cluster(
- n_workers: int | None = os.cpu_count(),
- threads_per_worker: int = 1,
- **cluster_kwargs,
This function sets up a Dask cluster across all the CPUs present on the machine.
See get_client function for parameters.
- utils.distributed_utils.start_dask_gpu_local_cluster(
- nvlink_only: bool = False,
- protocol: str = 'tcp',
- rmm_pool_size: int | str | None = '1024M',
- enable_spilling: bool = True,
- set_torch_to_use_rmm: bool = True,
- rmm_async: bool = True,
- rmm_maximum_pool_size: int | str | None = None,
- rmm_managed_memory: bool = False,
- rmm_release_threshold: int | str | None = None,
- **cluster_kwargs,
This function sets up a Dask cluster across all the GPUs present on the machine.
See get_client function for parameters.
- utils.distributed_utils.write_to_disk(
- df: dask.dataframe.DataFrame,
- output_path: str,
- write_to_filename: bool | str = False,
- keep_filename_column: bool = False,
- output_type: str = 'jsonl',
- partition_on: str | None = None,
- compression: str | None = None,
This function writes a Dask DataFrame to the specified file path. If write_to_filename is True, then it expects the DataFrame to have a
filename_col
that specifies where to write the document.Args: df: A Dask DataFrame. output_path: The output file path. write_to_filename: Whether to write the filename using the filename column. If True the
file_name
column is used to write out. If str, uses that as the filename column to write to. keep_filename_column: Boolean representing whether to keep or drop the filename column, if it exists. output_type: The type of output file to write. Can be “jsonl” or “parquet”. partition_on: The column name to partition the data on. If specified, the data will be partitioned based on the unique values in this column, and each partition will be written to a separate directory compression: The compression type to use. Only supported for JSONL files. Can be “gzip” or None