Dataproc Serverless
Dataproc Serverless allows users to run Spark workloads without the need to provision and manage their own clusters.
- For instructions on how to leverage L4 and A100 GPUs to accelerate Spark workloads,
refer to the following guide.
Version |
RAPIDS Accelerator Jar |
---|---|
1.1 LTS | rapids-4-spark_2.12-24.02.0.jar |
1.2 | rapids-4-spark_2.12-24.04.0.jar |
2.0 | rapids-4-spark_2.13-24.02.0.jar |
2.1 | rapids-4-spark_2.13-24.02.0.jar |
2.2 | rapids-4-spark_2.13-24.04.0.jar |
NOTE: Currently, the dataproc serverless jvm config for default charset is set based on the container configs which is US-ASCII and not UTF-8. Rapids users must set the jvm config -Dfile.encoding=UTF-8 explicitly to leverage full capabilities of the plugin. A future release of dataproc serverless 2.2 will change this back to UTF-8.
To get started with Dataproc Serverless, use the following example:
join-dataframes.py
from pyspark.sql import SparkSession
# Initialize a Spark session
spark = SparkSession.builder.appName("DataFrameJoin").getOrCreate()
# Create two DataFrames
df1 = spark.range(1, 10000001).toDF("a")
df2 = spark.range(1, 10000001).toDF("b")
# Perform the join and count
result = df1.join(df2, df1["a"] == df2["b"]).count()
# Display the result
print(result)
# Stop the Spark session
spark.stop()
Spark Batch Submission Command:
NUM_EXECUTORS=2
EXEC_CORES=8
DRIVER_CORES=4
DISK_SIZE=375G
BATCH_ID=join-df
BUCKET=your-bucket
gcloud dataproc batches submit \
pyspark \
--batch $BATCH_ID ./join-dataframes.py \
--deps-bucket $BUCKET \
--version 1.1 \
--subnet default \
--properties \
spark.executor.instances=$NUM_EXECUTORS,\
spark.driver.cores=$DRIVER_CORES,\
spark.executor.cores=$EXEC_CORES,\
spark.dataproc.driver.compute.tier=premium,\
spark.dataproc.executor.compute.tier=premium,\
spark.dataproc.executor.resource.accelerator.type=l4,\
spark.dataproc.driver.disk.tier=premium,\
spark.dataproc.executor.disk.tier=premium,\
spark.dataproc.driver.disk.size=$DISK_SIZE
Refer to the Supported Spark properties to understand the resource allocation properties in Dataproc Serverless.
Event Logs: There are two options for configuring event logs on Serverless:
Using Persistent History Server (PHS): - Use the
--history-server-cluster
argument to attach the History Server, which will automatically configurespark.eventLog.dir
.Explicitly Using
spark.eventLog
Properties: - Configure event logs by settingspark.eventLog.enabled=true
andspark.eventLog.dir=gs://<bucket-name>/eventLogs
.
Driver Logs: You can access Driver Logs in the Output section of the UI.
Executor Logs:
Serverless employs Log Explorer to grant users access to Executor Logs. To view Executor Logs,
use the following query in the search bar with the appropriate batch_id
(Click on VIEW LOGS):
resource.type="cloud_dataproc_batch"
resource.labels.batch_id="join-df"
SEARCH("`executor.log`")
severity=ERROR
- You can also pick the severity levels from:
INFO
DEFAULT
ERROR
WARNING
Refer to Dataproc Serverless Pricing for details on calculating the cost of a GPU batch workload.
To employ a custom RAPIDS JAR in Dataproc serverless, we need to build custom containers. Below is an example Docker image for building these containers:
# Debian is recommended.
FROM debian:11-slim
# Suppress interactive prompts
ENV DEBIAN_FRONTEND=noninteractive
# (Required) Install utilities required by Spark scripts.
RUN apt update && apt install -y procps tini libjemalloc2
# Enable jemalloc2 as the default memory allocator
ENV LD_PRELOAD=/usr/lib/x86_64-linux-gnu/libjemalloc.so.2
# Add RAPIDS Accelerator JAR.
ENV SPARK_EXTRA_JARS_DIR=/opt/spark/jars/
ENV SPARK_EXTRA_CLASSPATH='/opt/spark/jars/*'
RUN mkdir -p "${SPARK_EXTRA_JARS_DIR}"
COPY rapids-4-spark_2.12-24.08.1.jar "${SPARK_EXTRA_JARS_DIR}"
# (Required) Create the 'spark' group/user.
RUN groupadd -g 1099 spark
RUN useradd -u 1099 -g 1099 -d /home/spark -m spark
USER spark
Build Commands:
Run the following commands in the Dockerfile directory:
IMAGE=gcr.io/my-project/my-image:1.0.1
# Download RAPIDS Accelerator JAR
wget https://repo1.maven.org/maven2/com/nvidia/rapids-4-spark_2.12/24.08.1/rapids-4-spark_2.12-24.08.1.jar
# Build and push the image.
docker build -t "${IMAGE}" .
docker push "${IMAGE}"
Note: Ensure you check the Scala version of your runtime and use the corresponding RAPIDS Accelerator JAR version. You can verify the Scala version of your runtime here Additionally, follow the official documentation to download the right JAR. Modify the Dockerfile with the correct JAR before building the container.
COPY rapids-4-spark_2.12-24.08.1.jar "${SPARK_EXTRA_JARS_DIR}"
Spark batch submission command with custom JAR:
NUM_EXECUTORS=2
EXEC_CORES=8
DRIVER_CORES=4
DISK_SIZE=375G
BATCH_ID=join-df
BUCKET=your-bucket
CONTAINER_IMAGE=gcr.io/my-project/my-image:1.0.1
gcloud dataproc batches submit \
pyspark \
--batch $BATCH_ID ./join-dataframes.py \
--container-image $CONTAINER_IMAGE \
--deps-bucket $BUCKET \
--version 1.1 \
--subnet default \
--properties \
dataproc.sparkRapids.useDefaultJars=false,\
spark.executor.instances=$NUM_EXECUTORS,\
spark.driver.cores=$DRIVER_CORES,\
spark.executor.cores=$EXEC_CORES,\
spark.dataproc.driver.compute.tier=premium,\
spark.dataproc.executor.compute.tier=premium,\
spark.dataproc.executor.resource.accelerator.type=l4,\
spark.dataproc.driver.disk.tier=premium,\
spark.dataproc.executor.disk.tier=premium,\
spark.dataproc.driver.disk.size=$DISK_SIZE