Dataproc Serverless#

Dataproc Serverless allows users to run Spark workloads without the need to provision and manage their own clusters.

Accelerating Batch Workloads with GPUs on Dataproc Serverless:#

For instructions on how to leverage L4 and A100 GPUs to accelerate Spark workloads,

refer to the following guide.

Dataproc Serverless Runtime RAPIDS Version:#

Version

RAPIDS Accelerator Jar

1.1 LTS

rapids-4-spark_2.12-24.02.0.jar

1.2

rapids-4-spark_2.12-24.04.0.jar

2.0

rapids-4-spark_2.13-24.02.0.jar

2.1

rapids-4-spark_2.13-24.02.0.jar

2.2

rapids-4-spark_2.13-24.04.0.jar

Creating a Serverless Batch Workload with RAPIDS Accelerator:#

NOTE: Currently, the dataproc serverless jvm config for default charset is set based on the container configs which is US-ASCII and not UTF-8. Rapids users must set the jvm config -Dfile.encoding=UTF-8 explicitly to leverage full capabilities of the plugin. A future release of dataproc serverless 2.2 will change this back to UTF-8.

To get started with Dataproc Serverless, use the following example:

join-dataframes.py#

 1from pyspark.sql import SparkSession
 2
 3# Initialize a Spark session
 4spark = SparkSession.builder.appName("DataFrameJoin").getOrCreate()
 5
 6# Create two DataFrames
 7df1 = spark.range(1, 10000001).toDF("a")
 8df2 = spark.range(1, 10000001).toDF("b")
 9
10# Perform the join and count
11result = df1.join(df2, df1["a"] == df2["b"]).count()
12
13# Display the result
14print(result)
15
16# Stop the Spark session
17spark.stop()

Spark Batch Submission Command:#

 1NUM_EXECUTORS=2
 2EXEC_CORES=8
 3DRIVER_CORES=4
 4DISK_SIZE=375G
 5BATCH_ID=join-df
 6BUCKET=your-bucket
 7
 8gcloud dataproc batches submit \
 9pyspark \
10--batch $BATCH_ID ./join-dataframes.py \
11--deps-bucket $BUCKET \
12--version 1.1 \
13--subnet default \
14--properties \
15spark.executor.instances=$NUM_EXECUTORS,\
16spark.driver.cores=$DRIVER_CORES,\
17spark.executor.cores=$EXEC_CORES,\
18spark.dataproc.driver.compute.tier=premium,\
19spark.dataproc.executor.compute.tier=premium,\
20spark.dataproc.executor.resource.accelerator.type=l4,\
21spark.dataproc.driver.disk.tier=premium,\
22spark.dataproc.executor.disk.tier=premium,\
23spark.dataproc.driver.disk.size=$DISK_SIZE

Refer to the Supported Spark properties to understand the resource allocation properties in Dataproc Serverless.

Logging Configuration for Serverless Workloads:#

Event Logs: There are two options for configuring event logs on Serverless:

  1. Using Persistent History Server (PHS): - Use the --history-server-cluster argument to attach the History Server, which will automatically configure spark.eventLog.dir.

  2. Explicitly Using spark.eventLog Properties: - Configure event logs by setting spark.eventLog.enabled=true and spark.eventLog.dir=gs://<bucket-name>/eventLogs.

Driver Logs: You can access Driver Logs in the Output section of the UI.

Executor Logs: Serverless employs Log Explorer to grant users access to Executor Logs. To view Executor Logs, use the following query in the search bar with the appropriate batch_id (Click on VIEW LOGS):

1resource.type="cloud_dataproc_batch"
2resource.labels.batch_id="join-df"
3SEARCH("`executor.log`")
4severity=ERROR
You can also pick the severity levels from:
  1. INFO

  2. DEFAULT

  3. ERROR

  4. WARNING

Cost:#

Refer to Dataproc Serverless Pricing for details on calculating the cost of a GPU batch workload.

Using Custom RAPIDS Accelerator JAR for Serverless Workloads#

To employ a custom RAPIDS JAR in Dataproc serverless, we need to build custom containers. Below is an example Docker image for building these containers:

 1# Debian is recommended.
 2FROM debian:11-slim
 3
 4# Suppress interactive prompts
 5ENV DEBIAN_FRONTEND=noninteractive
 6
 7# (Required) Install utilities required by Spark scripts.
 8RUN apt update && apt install -y procps tini libjemalloc2
 9
10# Enable jemalloc2 as the default memory allocator
11ENV LD_PRELOAD=/usr/lib/x86_64-linux-gnu/libjemalloc.so.2
12
13# Add RAPIDS Accelerator JAR.
14ENV SPARK_EXTRA_JARS_DIR=/opt/spark/jars/
15ENV SPARK_EXTRA_CLASSPATH='/opt/spark/jars/*'
16RUN mkdir -p "${SPARK_EXTRA_JARS_DIR}"
17COPY rapids-4-spark_2.12-24.10.0.jar "${SPARK_EXTRA_JARS_DIR}"
18
19# (Required) Create the 'spark' group/user.
20RUN groupadd -g 1099 spark
21RUN useradd -u 1099 -g 1099 -d /home/spark -m spark
22USER spark

Build Commands:#

Run the following commands in the Dockerfile directory:

1IMAGE=gcr.io/my-project/my-image:1.0.1
2
3# Download RAPIDS Accelerator JAR
4wget https://repo1.maven.org/maven2/com/nvidia/rapids-4-spark_2.12/24.10.0/rapids-4-spark_2.12-24.10.0.jar
5
6# Build and push the image.
7docker build -t "${IMAGE}" .
8docker push "${IMAGE}"

Note: Ensure you check the Scala version of your runtime and use the corresponding RAPIDS Accelerator JAR version. You can verify the Scala version of your runtime here Additionally, follow the official documentation to download the right JAR. Modify the Dockerfile with the correct JAR before building the container.

1COPY rapids-4-spark_2.12-24.10.0.jar "${SPARK_EXTRA_JARS_DIR}"

Spark batch submission command with custom JAR:#

 1NUM_EXECUTORS=2
 2EXEC_CORES=8
 3DRIVER_CORES=4
 4DISK_SIZE=375G
 5BATCH_ID=join-df
 6BUCKET=your-bucket
 7CONTAINER_IMAGE=gcr.io/my-project/my-image:1.0.1
 8
 9gcloud dataproc batches submit \
10pyspark \
11--batch $BATCH_ID ./join-dataframes.py \
12--container-image $CONTAINER_IMAGE \
13--deps-bucket $BUCKET \
14--version 1.1 \
15--subnet default \
16--properties \
17dataproc.sparkRapids.useDefaultJars=false,\
18spark.executor.instances=$NUM_EXECUTORS,\
19spark.driver.cores=$DRIVER_CORES,\
20spark.executor.cores=$EXEC_CORES,\
21spark.dataproc.driver.compute.tier=premium,\
22spark.dataproc.executor.compute.tier=premium,\
23spark.dataproc.executor.resource.accelerator.type=l4,\
24spark.dataproc.driver.disk.tier=premium,\
25spark.dataproc.executor.disk.tier=premium,\
26spark.dataproc.driver.disk.size=$DISK_SIZE