In this post, we’re going to look at how to build a streaming data pipeline with Cloud Pub/Sub and Cloud Dataflow.
Step 1: Preparing an Environment As the Data Source
In this step, we’ll set up a VM instance that will act as the source of streaming data.
Connecting to the Training VM Instance
In the Console, on the Navigation menu, click Compute Engine > VM instances.
Search for an already created VM instance called training-vm.
Connect to the training-vm instance by clicking on SSH under Connect on the far right. A new terminal window will be opened.
Setting Up Several Things on the VM
We’ll download an example repository that will be used throughout this exploration.
git clone https://github.com/GoogleCloudPlatform/training-data-analyst
In addition, we also need to set up some environment variables. Fortunately, these variables are already prepared in a file called
To apply them to our instance, run the following:
This script sets the
DEVSHELL_PROJECT_ID (project ID) and
BUCKET (cloud storage bucket name) environment variables.
Step 2: Creating a BigQuery Dataset and Cloud Storage Bucket
The output of the Dataflow pipeline will be stored to a BigQuery table.
Creating a BigQuery Dataset
Do the following steps to create a BigQuery dataset.
Navigate to the BigQuery service page.
In the service page, create a new dataset called demos.
In the Resources section, click on your BigQuery project (qwiklabs-gcp-xxxx).
In the right side, click CREATE DATASET.
In the dataset creation dialog page, set the Dataset ID to demos and leave all other options at their default values.
Click Create button.
Creating a Cloud Storage Bucket
A storage bucket should already be exist and has the same name as the Project ID.
Step 3: Simulating Streaming Sensor Data into Cloud Pub/Sub
Let’s simulate that we have a machine that streams sensor data into Pub/Sub.
In the training-vm SSH terminal, run the following command to start the sensor simulator:
/training/sensor_magic.sh. The script reads sample data from a CSV file and publishes it to Pub/Sub.
Open a second SSH terminal and connect to the training-vm.
The new SSH terminal session won’t have the required environment variables. Same as the first SSH terminal session, run the following command to set the environment variables:
Step 4: Running Dataflow Pipeline
In the second training-vm SSH terminal, go to the following directory:
There, you can see the script called
run_oncloud.shthat is used to create and execute a Dataflow pipeline. The script is originally located here.
The script requires three arguments:
project id(your project ID),
bucket name(your cloud storage bucket name), and
classname(java file that runs aggregation).
There are 4 java files that you can choose from for
classnameargument. Each of them reads the sensor data from Pub/Sub and runs different aggregations.
Go into the source code directory:
Identify the source file
AverageSpeeds.javathat is located originally here.
Return to the second training-vm SSH terminal. Run the Dataflow pipeline to read from PubSub and store the results to BigQuery.
cd ~/training-data-analyst/courses/streaming/process/sandiego ./run_oncloud.sh $DEVSHELL_PROJECT_ID $BUCKET AverageSpeeds
- This script uses maven to build a Dataflow streaming pipeline in Java.
Example successful completion:
[INFO] ------------------------------------------------------------------------ [INFO] BUILD SUCCESS [INFO] ------------------------------------------------------------------------ [INFO] Total time: 45.542 s [INFO] Finished at: 2018-06-08T16:51:30+00:00 [INFO] Final Memory: 56M/216M [INFO] ------------------------------------------------------------------------
Step 5: Exploring the Pipeline
This Dataflow pipeline (
AverageSpeeds) reads messages from a Pub/Sub topic, parses the JSON of the input message, produces one main output and writes to BigQuery.
Navigate to the Dataflow page and click on your job to monitor progress.
Compare the code in
AverageSpeeds.javaand the pipeline graph on the page for your Dataflow job.
Notice the GetMessages pipeline step in the graph. This is the pipeline step that reads from the Pub/Sub topic. It creates a collection of Strings - which corresponds to Pub/Sub messages that have been read.
Notice the Time Window pipeline step in the graph. This pipeline step creates a window of a duration specified in the pipeline parameters (sliding window in this case). This window will accumulate the traffic data from the previous step until end of window, and pass it to the next steps for further transforms.
Notice the BySensor and AvgBySensor pipeline steps in the graph. This step groups of all events in the window by sensor id, while AvgBySensor will then compute the mean speed for each grouping.
Step 6: Monitoring Pipeline Metrics
When monitoring our pipeline, several metrics that are considered are figuring out how many elements the pipeline processes per second, what the system lag is, and how many data elements have been processed so far.
Let’s take a look at how to figure these out.
Navigate to Dataflow page and click on your job.
Select the GetMessages node in the pipeline graph and look at the step metrics on the right panel. There are several metrics, such as:
Notice there is an information of System Lag. It represents the amount of time data elements are waiting to be processed since they “arrived” in the input of the transformation step.
Also, notice Elements Added information under Output Collections that tells us how many data elements exited this step (for the Read PubSub Msg step of the pipeline it also represents the number of Pub/Sub messages read from the topic by the Pub/Sub IO connector).
Select the Time Window node in the pipeline graph. Notice that the Elements Added metric under the Input Collections of the Time Window step matches the Elements Added metric under the Output Collections of the previous step GetMessages.
Step 7: Autoscaling
Observe how Dataflow scales the number of workers to process the backlog of incoming Pub/Sub messages.
Navigate to the Dataflow page and click on your job.
Examine the JOB METRICS panel on the right, and review the Autoscaling section. There, we can see the number of workers that are currently being used to process messages from Pub/Sub.
Click on More history. In the Worker pool, you can see how Dataflow changed the number of workers. In addition, there is a column called Status that explains the reason for the changes.
The streaming data from the simulator publishes hundreds of messages per second to the Pub/Sub topic. This will cause Dataflow to increase the number of workers to keep the system lag of the pipeline at optimal levels.