Optimizing BigQuery Queries - Part 3 (End)

4 minute read

Published:

This is the last part of series on how to optimize BigQuery queries.

D) Avoid Overwhelming a Worker

Certain operations, such as data sorting, have to be performed on a single worker. Having to sort too much data can overwhelm a worker’s memory and result in a “resources exceeded” (OOM) error. Even though Google’s machine is upgraded and maintained, the definition of “too much” here might changes over time. Currently, this is on the order of 1 GB.

Limiting Large Sorts

Suppose that we’d like to assign each rental with a number 1, 2, 3 etc. in the order that the rental ended.

We could perform such a task using the ROW_NUMBER() function.

SELECT
  rental_id,
  ROW_NUMBER() OVER(ORDER BY end_date) AS rental_number
FROM
  `bigquery-public-data.london_bicycles.cycle_hire`
ORDER BY
  rental_number ASC
LIMIT
  5

The above query took about 34.5 seconds to process a 372 MB data because it needs to sort the whole dataset on a single worker. In other words, the OVER(ORDER BY end_date) must be performed in a single worker.

Let’s try to distribute the sorting task to multiple workers. We can achieve this by partitioning the data into days and then sort the trip within each day. This makes the risk of sorting large data in a single worker be minimized.

WITH
  rentals_on_day AS (
  SELECT
    rental_id,
    end_date,
    EXTRACT(DATE
    FROM
      end_date) AS rental_date
  FROM
    `bigquery-public-data.london_bicycles.cycle_hire` )
    
SELECT
  rental_id,
  rental_date,
  ROW_NUMBER() OVER(PARTITION BY rental_date ORDER BY end_date) AS rental_number_on_day
FROM
  rentals_on_day
ORDER BY
  rental_date ASC,
  rental_number_on_day ASC
LIMIT
  5

This takes 15.1 seconds (~2x speedup) because the sorting is done on each partition (a single day).

However, using PARTITION might yield many rows with the same row number. This is because the ROW_NUMBER function restarts the row number on each partition. If we’d like to have unique number for each row, one way to do this is by appending the partition field to the row number, such as the following.

FORMAT('%s-%i', rental_date, ROW_NUMBER() OVER(PARTITION BY rental_date ORDER BY end_date)) AS rental_number_on_day

Be Careful of Data Skew

Another case of overwhelming a worker is when performing GROUP BY and then aggregate with ARRAY_AGG on the field that has much more values compared than the other fields.

Let’s demonstrate this with an example from GitHub data retrieved from BigQuery public dataset.

Since there are more than 3 million GitHub repos and the commits are well distributed among them, this query succeeds.

SELECT
  repo_name,
  ARRAY_AGG(
    STRUCT(
      author,
      committer,
      subject,
      message,
      trailer,
      difference,
      encoding)
    ORDER BY
      author.date.seconds)
FROM
  `bigquery-public-data.github_repos.commits`,
  UNNEST(repo_name) AS repo_name
GROUP BY
  repo_name

Let’s illustrate how data skew could damage the performance.

Since most of the GitHub’s users reside in only certain timezones, grouping by the timezone would fail. In this case, we are asking a single worker to sort a nearly 750GB of data (ORDER BY author.date.seconds).

SELECT
  author.tz_offset,
  ARRAY_AGG(
    STRUCT(
      author,
      committer,
      subject,
      message,
      trailer,
      difference,
      encoding)
    ORDER BY
      author.date.seconds)
FROM
  `bigquery-public-data.github_repos.commits`
GROUP BY
  author.tz_offset

To overcome data skew issue, it’s recommended to use one or a combination of multiple fields that is more granular. This consequently distributes the group’s data over more workers.

Using the above example, instead of grouping only by the timezone, we can pair repo name with timezone making the key more granular.

SELECT
  repo_name,
  author.tz_offset,
  ARRAY_AGG(
    STRUCT(
      author,
      committer,
      subject,
      message,
      trailer,
      difference,
      encoding)
    ORDER BY
      author.date.seconds)
FROM
  `bigquery-public-data.github_repos.commits`,
  UNNEST(repo_name) AS repo_name
GROUP BY
  repo_name,
  author.tz_offset

E) Approximate Aggregation Functions

Instead of using COUNT(DISTINCT ...) to calculate the number of unique values in a field, we can estimate it using APPROX_COUNT_DISTINCT. Please note that this approach is recommended when a small error in the result is tolerable.

Approximate Count

To illustrate, let’s find the number of unique GitHub repos using the following query.

SELECT
  COUNT(DISTINCT repo_name) AS num_repos
FROM
  `bigquery-public-data`.github_repos.commits,
  UNNEST(repo_name) AS repo_name

The above query took 8.3 seconds to complete. The exact result returned by the query is 3347770.

Now, let’s approximate it using the APPROX_COUNT_DISTINCT.

SELECT
  APPROX_COUNT_DISTINCT(repo_name) AS num_repos
FROM
  `bigquery-public-data`.github_repos.commits,
  UNNEST(repo_name) AS repo_name

Note that the above query took 3.9 seconds (~2x speedup). The estimated result is 3399473 (overestimates the correct answer by 1.5%).

The approximate algorithm is much more efficient than the exact algorithm only on large datasets and is recommended in use cases where errors (uncertainty) of approximately 1% are tolerable.

Other available approximate functions are APPROX_QUANTILES to compute percentiles, APPROX_TOP_COUNT to find the top elements and APPROX_TOP_SUM to compute top elements based on the sum of an element.