Running Spark Jobs on Google Cloud Dataproc

9 minute read

Published:

In this post, we’re going to look at how to migrate Spark jobs to Google Cloud Dataproc.

There are several cases here.

  • Migrating existing Spark jobs to Cloud Dataproc
  • Improve the above by using Google Cloud Storage instead of local HDFS
  • Submitting Spark job to Cloud Dataproc cluster

Case 1: Migrating Spark Jobs to Cloud Dataproc

Basically, here are what we’re going to perform in this step.

  • Create a Cloud Dataproc cluster
  • Run an imported Jupyter notebook using the cluster’s default local HDFS to store source data
  • Process that data just as you would on any Hadoop cluster using Spark

Creating a Cloud Dataproc Cluster

  1. Make sure that the default Service Account xxxxxxxx-compute@developer.gserviceaccount.com exists (Navigation Menu > IAM & Admin > IAM)

NB: The account prefix (xxxxxxxx) is the project number.

  1. Get into the Dataproc service page under Big Data section

  2. Click Create Cluster.

  3. Enter sparktodp for Name.

  4. In the Component gateway section, select Enable access to the web interfaces of default and selected optional components on the cluster.

  5. Click Advanced options.

  6. In the Image section, click Change.

  7. Select 1.4 (Debian 10, Hadoop 2.9, Spark 2.4).

  8. Click Select.

  9. Under Optional components, click Select component.

  10. Select Anaconda and Jupyter Notebook.

  11. Click Select to close the Optional components dialog.

  12. Scroll down to the end of the page and click Create.

We can proceed to the next step without waiting for the cluster to fully start up.


Downloading the Source Data Repository

For demonstration, we’ll use sample data provided by Google Cloud Platform on this training-data-analyst.

  • Clone the repo to the local VM by running the following command in Cloud Shell.

git -C ~ clone https://github.com/GoogleCloudPlatform/training-data-analyst

  • Get the default Cloud Storage bucket used by Cloud Dataproc by running the following command in Cloud Shell.

export DP_STORAGE="gs://$(gcloud dataproc clusters describe sparktodp --region=us-central1 --format=json | jq -r '.config.configBucket')"

  • Copy the sample notebooks into the Jupyter working folder in the default Cloud Storage bucket.

gsutil -m cp ~/training-data-analyst/quests/sparktobq/*.ipynb $DP_STORAGE/notebooks/jupyter


Logging in to the Jupyter Notebook

When the Dataproc cluster has been fully started up, do the following steps.

  • On the Dataproc service page, click the name of the cluster to show the cluster details.

  • Click Web Interfaces.

  • Click the Jupyter link. This will open a new Jupyter tab in the browser.

  • In the newly opened tab, we can see the contents of the /notebooks/jupyter directory in the Cloud Storage bucket (has included our sample notebooks from the previous step).

  • Click the 01_spark.ipynb notebook to open it.

  • Click Cell and then Run All to run all of the cells in the notebook.

To grasp a bit understanding of what the notebook performs, please pay attention to the code and the output. In addition, pay attention too to where the data is stored and processed from (data source & data sink).

  • The first code cell retrieves the source data, which is an extract from the KDD Cup competition from the Knowledge, Discovery, and Data (KDD) conference in 1999.

!wget http://kdd.ics.uci.edu/databases/kddcup99/kddcup.data_10_percent.gz

  • The second code cell copies the source data to the default (local) HDFS.

!hadoop fs -put kddcup* /

  • The third code cell lists the contents of the root directory in the cluster’s HDFS.

!hadoop fs -ls /


Reading in Data

In cell In[4], Spark SQL is initialized and Spark is used to read in the source data as text and then returns the top 5 rows.

Notice that the source data is stored in HDFS (hdfs:///kddcup.data_10_percent.gz).

from pyspark.sql import SparkSession, SQLContext, Row

spark = SparkSession.builder.appName("kdd").getOrCreate()
sc = spark.sparkContext

data_file = "hdfs:///kddcup.data_10_percent.gz"

raw_rdd = sc.textFile(data_file).cache()
raw_rdd.take(5)

In cell In [5], each row is split, using , as the delimiter and parsed using a defined schema.

csv_rdd = raw_rdd.map(lambda row: row.split(","))
parsed_rdd = csv_rdd.map(lambda r: Row(
    duration=int(r[0]),
    protocol_type=r[1],
    service=r[2],
    flag=r[3],
    src_bytes=int(r[4]),
    dst_bytes=int(r[5]),
    wrong_fragment=int(r[7]),
    urgent=int(r[8]),
    hot=int(r[9]),
    num_failed_logins=int(r[10]),
    num_compromised=int(r[12]),
    su_attempted=r[14],
    num_root=int(r[15]),
    num_file_creations=int(r[16]),
    label=r[-1]
    )
)
parsed_rdd.take(5)

Performing Spark Analysis

We can also see several analysis done with Spark.

In cell In [6], a Spark dataframe is created from the parsed input data (in form of RDD). Then, an aggregation query is executed (df.groupBy('protocol_type').count().orderBy('count', ascending=False)).

sqlContext = SQLContext(sc)
df = sqlContext.createDataFrame(parsed_rdd)
connections_by_protocol = df.groupBy('protocol_type').count().orderBy('count', ascending=False)
connections_by_protocol.show()

Case 2: Using Cloud Storage instead of HDFS

In this second case, what we’re going to do is to decouple the storage requirements for the job from the compute requirements.

Basically, to accomplish the above objective, we only need to replace the HDFS calls with Google Cloud Storage calls. This can be achieved by replacing hdfs:// references with gs:// references in the code and adjusting folder names as necessary.

  • In the Cloud Shell, create a new cloud storage bucket for the source data.
export PROJECT_ID=$(gcloud info --format='value(config.project)')
gsutil mb gs://$PROJECT_ID
  • In the Cloud Shell, copy the source data into the created bucket.
wget http://kdd.ics.uci.edu/databases/kddcup99/kddcup.data_10_percent.gz
gsutil cp kddcup.data_10_percent.gz gs://$PROJECT_ID/
  • Go back to the 01_spark Jupyter Notebook tab in your browser.

  • Click File and then select Make a Copy.

  • Rename the copied notebook to De-couple-storage.

  • Go back to the tab for 01_spark. Select Save and checkpoint and Close and Halt to save and shutdown the notebook respectively.

  • In De-couple-storage tab, remove the code cells that download and copy the source data to HDFS.

  • The first cell now should be the code about reading in data.

  • Replace the contents of source data loading cell with the following code. Please replace the gcs_bucket variable accordingly.

from pyspark.sql import SparkSession, SQLContext, Row

gcs_bucket='[Your-Bucket-Name]'
spark = SparkSession.builder.appName("kdd").getOrCreate()
sc = spark.sparkContext
data_file = "gs://"+gcs_bucket+"//kddcup.data_10_percent.gz"
raw_rdd = sc.textFile(data_file).cache()
raw_rdd.take(5)

If you run all the cells, you will see the same output as you did previously (when the source data is loaded from HDFS). Moving the source data to Cloud Storage only requires that you repoint your storage source reference from hdfs:// to gs:// and adjust the path accordingly.


Case 3: Submitting Spark Jobs to Dataproc Cluster


Creating a Standalone PySpark Job File

In this last case, we’ll create a standalone Python file that can be deployed as a Cloud Dataproc Job. This job will basically perform the same functions as the sample notebook.

  • Create an empty text file named spark_analysis.py.

  • For plotting and argument parsing, paste the following code.

import matplotlib

# Explicitly sets the default plotting backend via matplotlib.use('agg') so that the plotting code runs outside of a Jupyter notebook.
matplotlib.use('agg')

import argparse
parser = argparse.ArgumentParser()
parser.add_argument("--bucket", help="bucket for input and output")
args = parser.parse_args()

BUCKET = args.bucket
  • Next, append all the codes in the notebook to the spark_analysis.py file.

  • In addition, remove the %matplotlib inline magic command where the Pandas bar chart is plotted.

After you have done all the above steps, let’s add several new codes to the spark_analysis.py file.

Append the following code into the new python file.

ax[0].get_figure().savefig('report.png');

import google.cloud.storage as gcs

bucket = gcs.Client().get_bucket(BUCKET)
for blob in bucket.list_blobs(prefix='sparktodp/'):
    blob.delete()
bucket.blob('sparktodp/report.png').upload_from_filename('report.png')

connections_by_protocol.write.format("csv").mode("overwrite").save(
    "gs://{}/sparktodp/connections_by_protocol".format(BUCKET))

Submitting the Spark Job from Inside the Notebook

Let’s execute the PySpark code (spark_analysis.py) from inside the notebook.

  • Create a new notebook file and add a new cell containing the following code.
BUCKET_list = !gcloud info --format='value(config.project)'
BUCKET=BUCKET_list[0]
print('Writing to {}'.format(BUCKET))
!/opt/conda/anaconda/bin/python spark_analysis.py --bucket=$BUCKET
  • List output files from the PySpark code in the cloud storage bucket.
!gsutil ls gs://$BUCKET/sparktodp/**
  • If you want to save a copy of the PySpark code to persistent storage, add a new cell and paste in the following:
!gsutil cp spark_analysis.py gs://$BUCKET/sparktodp/spark_analysis.py
  • Click Cell and then Run All.

Submitting the Spark Job from Cloud Shell

The previous subsection explains how to submit and run the Spark job from inside the Dataproc cluster environment.

In this subsection, we’re going to look at how to submit the job from outside the Dataproc cluster environment. In this case, the job is submitted from our local VM (the one connected via Cloud Shell).

  • Switch to your Cloud Shell and copy the PySpark code (spark_analysis.py) stored in the Cloud Storage previously.
gsutil cp gs://$PROJECT_ID/sparktodp/spark_analysis.py spark_analysis.py
  • Create a job submission script. Let’s call it submit_job.sh.
nano submit_job.sh
  • Paste the following into the shell script.
#!/bin/bash
gcloud dataproc jobs submit pyspark \
       --cluster sparktodp \
       --region us-central1 \
       spark_analysis.py \
       -- --bucket=$1
  • Exit from nano by pressing CTRL+X then Y and Enter key to exit and save.

  • Make the script executable.

chmod +x submit_onejob.sh
  • Launch the job.
./submit_onejob.sh $PROJECT_ID
  • Navigate to the Dataproc > Clusters page.

  • Click Jobs.

  • Click the name of the job that is listed. You can monitor progress here as well as from the Cloud shell.

  • Navigate to your cloud storage bucket and note that the output report (/sparktodp/report.png) has an updated time-stamp. This indicates that the submitted job from the Cloud Shell has completed successfully.

  • Navigate back to the Dataproc > Clusters page.

  • Select the sparktodp cluster and click Delete.

  • Click CONFIRM.

  • Close both Jupyter tabs in your browser.