RAPIDS Shuffle Manager#

The RAPIDS Shuffle Manager is an implementation of the ShuffleManager interface in Apache Spark that allows custom mechanisms to exchange shuffle data. We currently expose two modes of operation: Multi Threaded and UCX.

In Spark, shuffle managers are configured via the spark.shuffle.manager configuration variable.

Example Configuration# To configure the RAPIDS Shuffle Manager for Spark version 3.5.1 , use the following: --conf spark.shuffle.manager = com.nvidia.spark.rapids.spark351.RapidsShuffleManager

Supported Spark Versions# The following table shows the appropriate configuration to use for each Spark version supported in our plugin: Spark Shim spark.shuffle.manager value 3.2.0 com.nvidia.spark.rapids.spark320.RapidsShuffleManager 3.2.1 com.nvidia.spark.rapids.spark321.RapidsShuffleManager 3.2.2 com.nvidia.spark.rapids.spark322.RapidsShuffleManager 3.2.3 com.nvidia.spark.rapids.spark323.RapidsShuffleManager 3.2.4 com.nvidia.spark.rapids.spark324.RapidsShuffleManager 3.3.0 com.nvidia.spark.rapids.spark330.RapidsShuffleManager 3.3.1 com.nvidia.spark.rapids.spark331.RapidsShuffleManager 3.3.2 com.nvidia.spark.rapids.spark332.RapidsShuffleManager 3.3.3 com.nvidia.spark.rapids.spark333.RapidsShuffleManager 3.3.4 com.nvidia.spark.rapids.spark334.RapidsShuffleManager 3.4.0 com.nvidia.spark.rapids.spark340.RapidsShuffleManager 3.4.1 com.nvidia.spark.rapids.spark341.RapidsShuffleManager 3.4.2 com.nvidia.spark.rapids.spark342.RapidsShuffleManager 3.4.3 com.nvidia.spark.rapids.spark343.RapidsShuffleManager 3.4.4 com.nvidia.spark.rapids.spark344.RapidsShuffleManager 3.5.0 com.nvidia.spark.rapids.spark350.RapidsShuffleManager 3.5.1 com.nvidia.spark.rapids.spark351.RapidsShuffleManager 3.5.2 com.nvidia.spark.rapids.spark352.RapidsShuffleManager 3.5.3 com.nvidia.spark.rapids.spark353.RapidsShuffleManager 3.5.4 com.nvidia.spark.rapids.spark354.RapidsShuffleManager 3.5.5 com.nvidia.spark.rapids.spark355.RapidsShuffleManager 4.0.0 com.nvidia.spark.rapids.spark400.RapidsShuffleManager Databricks 11.3 com.nvidia.spark.rapids.spark330db.RapidsShuffleManager Databricks 12.2 com.nvidia.spark.rapids.spark332db.RapidsShuffleManager Databricks 13.3 com.nvidia.spark.rapids.spark341db.RapidsShuffleManager 3.2.1 CDH com.nvidia.spark.rapids.spark321cdh.RapidsShuffleManager 3.3.0 CDH com.nvidia.spark.rapids.spark330cdh.RapidsShuffleManager 3.3.2 CDH com.nvidia.spark.rapids.spark332cdh.RapidsShuffleManager

Multi-Threaded Mode# Multi-threaded mode (default) is similar to the built-in Spark shuffle, but it attempts to use more CPU threads for compute-intensive tasks, such as compression and decompression. The multi-threaded shuffle writer targets the “BypassMergeSortShuffle” shuffle algorithm in Spark, which is the default when spark.sql.shuffle.partitions is less than a threshold defined by the configuration variable spark.shuffle.sort.bypassMergeThreshold . The default value of this threshold is 200. The “BypassMergeSortShuffle” algorithm’s host memory requirement increases rapidly as the number of partitions increases, limiting the number of partitions with which the algorithm can be used. The shuffle reader is a single implementation irrespective of the number of partitions. To turn off multi-threaded reader entirely, set spark.rapids.shuffle.multiThreaded.reader.threads to zero. Minimum configuration: 1 --conf spark.shuffle.manager = com.nvidia.spark.rapids. [ shim package ] .RapidsShuffleManager \ 2 --conf spark.driver.extraClassPath = ${ SPARK_RAPIDS_PLUGIN_JAR } \ 3 --conf spark.executor.extraClassPath = ${ SPARK_RAPIDS_PLUGIN_JAR } By default, a thread pool of 20 threads is used for shuffle writes and reads. This configuration can be independently changed for writers and readers using: spark.rapids.shuffle.multiThreaded.[writer|reader].threads . An appropriate value for these pools is the number of cores in the system divided by the number of executors per machine. On the reader side, when blocks are received from the network, they’re queued onto these threads for decompression and decode. The amount of bytes we allow in flight per Spark task is controlled by: spark.rapids.shuffle.multiThreaded.maxBytesInFlight , and it’s set to 128MB-per-task as a default. This memory comes from the Netty off-heap pool, and this is sized at startup automatically by Netty, but this limit can be controlled by setting -Dio.netty.maxDirectMemory=[amount in Bytes] under spark.executor.extraJavaOptions .