RAPIDS Accelerator for Apache Spark ML Library Integration

The RAPIDS Accelerator for Apache Spark can be used to accelerate the ETL portions (e.g., loading training data from parquet files) of applications using ML libraries with Spark DataFrame APIs. Examples of such libraries include the original Apache Spark MLlib, XGBoost, RAPIDS Accelerator ML, and the DL inference UDF function introduced in Spark 3.4. The latter three also enable leveraging GPUs (in the case of the DL inference UDF, indirectly via the underlying DL framework) to accelerate the core ML algorithms, and thus, in conjunction with the RAPIDS Accelerator for Apache Spark for ETL, can further enhance the cost-benefit of GPU accelerated Spark clusters.

For Spark API compatible ML libraries that implement their core ML computations inside pandas UDFs, such as XGBoost’s pySpark API, RAPIDS Accelerator ML pySpark API, and the DL inference UDF it is recommended to enable the RAPIDS Accelerator for Apache Spark’s support for GPU accelerated pandas UDFs.

RMM

One consideration when using the RAPIDS Accelerator for Apache Spark with a GPU accelerated ML library is the sharing of GPU memory between the two, as the ML library would typically have a distinct GPU memory manager from the RAPIDS Accelerator’s RMM instance. Accordingly, you may need to disable RMM pooling in the RAPIDS Accelerator via the config spark.rapids.memory.gpu.pool when exporting data to an ML library since that library will likely not have access to any of the memory that the RAPIDS Accelerator’s RMM instance is holding. Similarly, aggressive GPU memory reservation on the side of the ML library may also need to be disabled, as via these steps in the case of Tensorflow.

ColumnarRdd

When developing a GPU accelerated ML library for Spark, there are cases where you may want to get access to the raw data on the GPU, preferably without copying it. One use case for this is exporting the data to the ML library after doing feature extraction. To enable this for Scala development, the RAPIDS Accelerator for Apache Spark provides a simple utility com.nvidia.spark.rapids.ColumnarRdd that can be used to convert a DataFrame to an RDD[ai.rapids.cudf.Table]. Each Table will have the same schema as the DataFrame passed in.

Note that Table is not a typical thing in an RDD so special care needs to be taken when working with it. By default, it is not serializable so repartitioning the RDD or any other operator that involves a shuffle will not work. This is because it is relatively expensive to serialize and deserialize GPU data using a conventional Spark shuffle. In addition, most of the memory associated with the Table is on the GPU itself. So, each Table must be closed when it is no longer needed to avoid running out of GPU memory. By convention, it is the responsibility of the one consuming the data to close it when they no longer need it.

Copy
Copied!
            

val df = spark.sql("""select my_column from my_table""") val rdd: RDD[Table] = ColumnarRdd(df) // Compute the max of the first column val maxValue = rdd.map(table => { val max = table.getColumn(0).max().getLong // Close the table to avoid leaks table.close() max }).max()

Examples of Spark ML Implementations leveraging ColumnarRdd

Both the Scala Spark PCA implementation in RAPIDS Accelerator ML and XGBoost’s GPU accelerated Scala SparkAPI leverage ColumnarRdd (search for ColumnarRdd in these files) to accelerate data transfer between the RAPIDS Accelerator for Apache Spark and the respective core ML algorithm computations. XGBoost in particular enables this when detecting that the RAPIDS Accelerator for Apache Spark is present and enabled.

Previous Output Details
Next RAPIDS Shuffle Manager
© Copyright 2024, NVIDIA. Last updated on Apr 23, 2024.