Delta Lake Support#
The RAPIDS Accelerator for Apache Spark provides limited support for Delta Lake tables. This document details the Delta Lake features that are supported.
The following Delta versions are supported by the RAPIDS Accelerator. The most comprehensively benchmarked combination is Delta 3.3.0/Spark 3.5.3.
Delta Lake version 4.0.0 on Apache Spark 4.0.0
Delta Lake version [3.3.0,3.3.2] on Apache Spark [3.5.3,4.0.0)
Delta Lake version 2.1.1, 2.2.0, and 2.3.0 on Apache Spark 3.3.x
Delta Lake version 2.4.0 on Apache Spark 3.4.x
Delta Lake on Databricks 12.2 LTS
The Information about compatible Spark versions is derived from Delta Lake Release Notes, for instance, 4.0.0 and 3.3.0
Reading Delta Lake Tables#
Delta Lake reads are accelerated by treating the underlying Parquet files as standard Parquet scans. Tables with liquid clustering or deletion vectors (DVs) are also read on the GPU.
Table snapshot calculation from the transaction log runs on the CPU. Larger tables see the greatest benefit from GPU acceleration, as scan work outweighs the cost of snapshot calculation.
Deletion Vector Handling in Query Plans#
For tables with deletion vectors, the DV predicate is pushed into the Parquet scan and applied during the read. The filter node that Spark normally inserts to apply the DV is consequently removed from the query plan.
Writing Delta Lake Tables#
Delta Lake write acceleration is enabled by default. To disable acceleration of Delta Lake writes, set spark.rapids.sql.format.delta.write.enabled=false.
Write Operations Supported#
Most write operations to Delta Lake tables are supported, including tables with liquid clustering.
DML operations (DELETE, UPDATE, MERGE) on tables with DVs enabled may trigger a CPU fallback because the necessary DV creation is not currently supported.
Automatic Optimization of Writes#
Delta Lake on Databricks has automatic optimization features for optimized writes and automatic compaction.
The optimized write is supported for Delta 3.3.0, Delta 4.0, and Databricks. The plugin uses a similar, but slightly different algorithm from the Databricks version. The following table describes configuration settings that control the operation of the optimized write within the plugin.
Configuration |
Default |
Description |
|---|---|---|
spark.databricks.delta.optimizeWrite.binSize |
512 |
Target uncompressed partition size in megabytes. See Delta Lake documentation for details. |
spark.databricks.delta.optimizeWrite.smallPartitionFactor |
0.5 |
Merge partitions smaller than this factor multiplied by the target partition size |
spark.databricks.delta.optimizeWrite.mergedPartitionFactor |
1.2 |
Avoid combining partitions larger than this factor multiplied by the target partition size |
Automatic Compaction is also supported on Delta 3.3.0, Delta 4.0, and Databricks. The plugin uses different algorithms for OSS Delta and Databricks Delta. The following table describes configuration settings supported by the plugin for OSS Delta.
Configuration |
Default |
Description |
|---|---|---|
spark.databricks.delta.autoCompact.enabled |
false |
Enable/disable auto compaction for writes to Delta directories |
spark.databricks.delta.properties.defaults.autoOptimize.autoCompact |
false |
Whether to enable auto compaction by default, if spark.databricks.delta.autoCompact.enabled isn’t set |
spark.databricks.delta.autoCompact.minNumFiles |
50 |
Minimum number of files that must exist in the Delta directory before Auto Optimize begins compaction. See Delta Lake documentation for details. |
spark.databricks.delta.autoCompact.minFileSize |
Half of spark.databricks.delta.autoCompact.maxFileSize |
Files which are smaller than this threshold (in bytes) will be grouped together and rewritten as larger files by the auto compaction. |
spark.databricks.delta.autoCompact.maxFileSize |
128MB |
Target file size produced by auto compaction. |
The following table describes configuration settings supported by the plugin for Databricks Delta.
Configuration |
Default |
Description |
|---|---|---|
spark.databricks.delta.autoCompact.enabled |
false |
Enable/disable auto compaction for writes to Delta directories |
spark.databricks.delta.properties.defaults.autoOptimize.autoCompact |
false |
Whether to enable auto compaction by default, if spark.databricks.delta.autoCompact.enabled isn’t set |
spark.databricks.delta.autoCompact.minNumFiles |
50 |
Minimum number of files that must exist in the Delta directory before Auto Optimize begins compaction. See Databricks Delta Lake documentation for details. |
spark.databricks.delta.autoCompact.maxFileSize |
128MB |
Target file size produced by auto compaction. See Databricks Delta Lake documentation for details. |
spark.databricks.delta.autoCompact.target |
partition |
Target files for auto compaction. “table”, “commit”, “partition” options are available. If “table”, all files in the table are eligible for auto compaction. If “commit”, added/updated files by the commit are eligible. If “partition”, all files in partitions containing any added/updated files by the commit are eligible. |
spark.databricks.delta.autoCompact.maxCompactBytes |
20GB |
Maximum amount of data to compact together |
Optimized write support requires round-robin partitioning of the data, and round-robin partitioning requires sorting across all columns for deterministic operation. If the GPU can’t support sorting a particular column type in order to support the round-robin partitioning, the Delta Lake write will fallback to the CPU.
RapidsDeltaWrite Node in Query Plans#
A side-effect of performing a GPU accelerated Delta Lake write is a new node will appear in the query plan, RapidsDeltaWrite. Normally, the writing of Delta Lake files isn’t represented by a dedicated node in query plans, as it’s implicitly covered by higher-level operations such as SaveIntoDataSourceCommand that wrap the entire query along with the write operation afterwards. The RAPIDS Accelerator places a node in the plan being written to mark the point at which the write occurs and adds statistics showing the time spent performing the low-level write operation.
Merging Into Delta Lake Tables#
Delta Lake merge acceleration is enabled by default. To disable acceleration of Delta Lake merge operations, set spark.rapids.sql.command.MergeIntoCommand=false and also set spark.rapids.sql.command.MergeIntoCommandEdge=false on Databricks platforms.
Merging into Delta Lake tables via the SQL MERGE INTO statement is supported on all platforms, including tables with liquid clustering. Merging via the DeltaTable merge API is supported on Databricks. On non-Databricks platforms, the DeltaTable merge API is GPU-accelerated only with Delta Lake 3.3 or later; on Delta Lake 2.x use SQL MERGE INTO instead.
Note
Merging with a WHEN NOT MATCHED BY SOURCE clause is not supported and will fall back to the CPU.
RapidsProcessDeltaMergeJoin Node in Query Plans#
A side-effect of performing GPU accelerated Delta Lake merge operations is a new node will appear in the query plan, RapidsProcessDeltaMergeJoin. Normally, the Delta Lake merge is performed via a join and then post-processing of the join via a MapPartitions node. Instead the GPU performs the join post-processing via this new RapidsProcessDeltaMergeJoin node.
Delete Operations on Delta Lake Tables#
Delta Lake delete acceleration is enabled by default. To disable acceleration of Delta Lake delete operations, set spark.rapids.sql.command.DeleteCommand=false and also set spark.rapids.sql.command.DeleteCommandEdge=false on Databricks platforms.
Deleting data from Delta Lake tables is supported via the SQL DELETE FROM statement or the DeltaTable delete API, including tables with liquid clustering.
num_affected_rows Difference with Databricks#
The Delta Lake delete command returns a single row result with a num_affected_rows column. When entire partition files in the table are deleted, the open source Delta Lake and RAPIDS Acclerator implementations of delete can return -1 for num_affected_rows since it could be expensive to open the files and produce an accurate row count. Databricks changed the behavior of delete operations that delete entire partition files to return the actual row count. This is only a difference in the statistics of the operation, and the table contents will still be accurately deleted with the RAPIDS Accelerator.
Update Operations on Delta Lake Tables#
Delta Lake update acceleration is enabled by default. To disable acceleration of Delta Lake update operations, set spark.rapids.sql.command.UpdateCommand=false and also set spark.rapids.sql.command.UpdateCommandEdge=false on Databricks platforms.
Updating data from Delta Lake tables is supported via the SQL UPDATE statement or the DeltaTable update API, including tables with liquid clustering.