Creating Workflows in Google Cloud Composer

7 minute read

Published:

Workflow can simply be defined as a sequence of tasks to be performed to accomplish a goal.

Google Cloud Composer is a hosted version of Apache Airflow (an open source workflow management tool).

In this post, we’re going to look at how to utilize Cloud Composer to build a simple workflow, such as:

  • Creates a Cloud Dataproc cluster
  • Runs a Hadoop wordcount job on the Cloud Dataproc cluster
  • Removes the Cloud Dataproc cluster

Creating Cloud Composer Environment

Before building a workflow, we first need to create a Cloud Composer environment.

  • Go to Navigation menu > Composer

  • Click CREATE ENVIRONMENT and fill in the fields with the following information:

Name: highcpu
Location: us-central1
Zone: us-central1-a
Machine type: n1-highcpu-4

Leave all other settings as default.

  • Click Create.

Creating a Cloud Storage Bucket

For the purpose of our example, we’ll create a Cloud Storage bucket for storing the output from the Hadoop job.

  • Go to Navigation menu > Storage > Browser
  • Click Create bucket
  • Give your bucket a universally unique name, then click Create

Defining the Workflow

In Cloud Composer, we define a workflow as a Directed Acyclic Graph (DAG). DAG is simply a Python file comprising several tasks to be executed. The DAG file is stored in Airflow’s DAG_FOLDER.

The following is an example workflow code (DAG) from hadoop_tutorial.py.

# [START composer_hadoop_tutorial]
"""Example Airflow DAG that creates a Cloud Dataproc cluster, runs the Hadoop
wordcount example, and deletes the cluster.
This DAG relies on three Airflow variables
https://airflow.apache.org/concepts.html#variables
* gcp_project - Google Cloud Project to use for the Cloud Dataproc cluster.
* gce_zone - Google Compute Engine zone where Cloud Dataproc cluster should be
  created.
* gcs_bucket - Google Cloud Storage bucket to use for result of Hadoop job.
  See https://cloud.google.com/storage/docs/creating-buckets for creating a
  bucket.
"""

import datetime
import os

from airflow import models
from airflow.contrib.operators import dataproc_operator
from airflow.utils import trigger_rule

# Output file for Cloud Dataproc job.
output_file = os.path.join(
    models.Variable.get('gcs_bucket'), 'wordcount',
    datetime.datetime.now().strftime('%Y%m%d-%H%M%S')) + os.sep
# Path to Hadoop wordcount example available on every Dataproc cluster.
WORDCOUNT_JAR = (
    'file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar'
)
# Arguments to pass to Cloud Dataproc job.
input_file = 'gs://pub/shakespeare/rose.txt'
wordcount_args = ['wordcount', input_file, output_file]

yesterday = datetime.datetime.combine(
    datetime.datetime.today() - datetime.timedelta(1),
    datetime.datetime.min.time())

default_dag_args = {
    # Setting start date as yesterday starts the DAG immediately when it is
    # detected in the Cloud Storage bucket.
    'start_date': yesterday,
    # To email on failure or retry set 'email' arg to your email and enable
    # emailing here.
    'email_on_failure': False,
    'email_on_retry': False,
    # If a task fails, retry it once after waiting at least 5 minutes
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=5),
    'project_id': models.Variable.get('gcp_project')
}

# [START composer_hadoop_schedule]
with models.DAG(
        'composer_hadoop_tutorial',
        # Continue to run DAG once per day
        schedule_interval=datetime.timedelta(days=1),
        default_args=default_dag_args) as dag:
    # [END composer_hadoop_schedule]

    # Create a Cloud Dataproc cluster.
    create_dataproc_cluster = dataproc_operator.DataprocClusterCreateOperator(
        task_id='create_dataproc_cluster',
        # Give the cluster a unique name by appending the date scheduled.
        # See https://airflow.apache.org/code.html#default-variables
        cluster_name='composer-hadoop-tutorial-cluster-',
        num_workers=2,
        zone=models.Variable.get('gce_zone'),
        master_machine_type='n1-standard-1',
        worker_machine_type='n1-standard-1')

    # Run the Hadoop wordcount example installed on the Cloud Dataproc cluster
    # master node.
    run_dataproc_hadoop = dataproc_operator.DataProcHadoopOperator(
        task_id='run_dataproc_hadoop',
        main_jar=WORDCOUNT_JAR,
        cluster_name='composer-hadoop-tutorial-cluster-',
        arguments=wordcount_args)

    # Delete Cloud Dataproc cluster.
    delete_dataproc_cluster = dataproc_operator.DataprocClusterDeleteOperator(
        task_id='delete_dataproc_cluster',
        cluster_name='composer-hadoop-tutorial-cluster-',
        # Setting trigger_rule to ALL_DONE causes the cluster to be deleted
        # even if the Dataproc job fails.
        trigger_rule=trigger_rule.TriggerRule.ALL_DONE)

    # [START composer_hadoop_steps]
    # Define DAG dependencies.
    create_dataproc_cluster >> run_dataproc_hadoop >> delete_dataproc_cluster
    # [END composer_hadoop_steps]

# [END composer_hadoop_tutorial]

To manage the three workflow tasks, the DAG imports the following operators from airflow.contrib.operators (dataproc_operator):

  • DataprocClusterCreateOperator: Creates a Cloud Dataproc cluster
  • DataProcHadoopOperator: Submits a Hadoop wordcount job and writes the results to a Cloud Storage bucket
  • DataprocClusterDeleteOperator: Deletes the cluster to avoid incurring ongoing Compute Engine charges

The tasks run sequentially and the execution order (dependencies) is defined in the last lines:

# [START composer_hadoop_steps]
# Define DAG dependencies.
create_dataproc_cluster >> run_dataproc_hadoop >> delete_dataproc_cluster
# [END composer_hadoop_steps]

According to the above dependencies, Airflow will first create a DataProc cluster, then run the Hadoop job, and finally delete the DataProc cluster.

Looking deeper at the DAG, we can see that the DAG runs once each day.

with models.DAG(
        'composer_sample_quickstart',
        # Continue to run DAG once per day
        schedule_interval=datetime.timedelta(days=1),
        default_args=default_dag_args) as dag:

Because the start_date that is passed in to default_dag_args is set to yesterday, Cloud Composer schedules the workflow to start immediately after the DAG uploads.


Using the Airflow UI

Here are the steps to access the Airflow web interface using the GCP Console:

  • Go back to the Environments page.
  • In the Airflow webserver column for the environment, click Airflow.
  • Click on your lab credentials.

The Airflow web interface opens in a new browser window.


Setting Airflow Variables

As you can see from the DAG’s example, there are several variables that are used, such as gcs_bucket, gcp_project, and gce_zone.

To create these variables, do the followings:

  • Select Admin > Variables from the Airflow menu bar, then click Create.
  • Create the following Airflow variables:

gcp_project: <your project-id>
This is the Google Cloud Platform project ID you’re using.

gcs_bucket: gs://<my-bucket>
Replace <my-bucket> with the name of the Cloud Storage bucket you made earlier (for storing the output from the Hadoop job).

gce_zone: us-central1-a
This is the Compute Engine zone where your Cloud Dataproc cluster will be created.


Uploading the DAG to Cloud Storage

Before Cloud Composer runs the DAG, we first need to upload it to the storage bucket used by the Cloud Composer.

  • In Cloud Shell, clone the following repo to your local VM.
git clone https://github.com/GoogleCloudPlatform/python-docs-samples
  • Go to the python-docs-samples/composer/workflows/ directory.

  • Upload the hadoop_tutorial.py file to the Cloud Storage bucket that was automatically created when you created the Cloud Composer environment.

gsutil cp hadoop_tutorial.py <DAGs_folder_path>
  • Replace <DAGs_folder_path> with the path to the DAGs folder.

  • The path to the DAG’s folder can be known by going to Composer. Click on the environment you created earlier and then click on the Environment Configuration tab to see the details of the environment. Just search for the DAGs folder information.


Exploring the Uploaded DAG

If there are no errors found in the DAG file, the workflow’s name will appear in the DAG listing.

Let’s explore our DAG that’s just uploaded previously.

  • In Airflow, click composer_hadoop_tutorial to open the DAG details page.

  • In the toolbar, click Graph View. The border around each task indicates the status.

  • Once the status for create_dataproc_cluster task has changed to success, go to Navigation menu > Dataproc, then click on the created cluster. The created cluster is ephemeral which means that it only exists for the duration of the workflow and is deleted as part of the last workflow task.

  • Once the status of run_dataproc_hadoop task has changed to success, go to Navigation menu > Storage > Browser and click on the name of your bucket to see the results of the Hadoop job.

  • Once the status of delete_dataproc_cluster task has changed to success, you can see that the Dataproc cluster has been removed.