Mastering Spark [PART 15]: Optimizing Join on Skewed Dataframes

12 minute read

Published:

Joining two dataframes might not be an easy task when one of them has skewed data. Skewed data simply means that few element appears a lot more than the others.

Recently I was experimenting with such an issue. I joined two dataframes where a dataframe consists of skewed data and the other one consists of nearly evenly distributed data. Here’s the code snippet.

import time

import pyspark.sql.functions as F

from pyspark.sql import DataFrame, SparkSession

spark = SparkSession.builder.appName('join_skewed_df').getOrCreate()

def create_df(on_cols, amount_of_each_on_elements):
	ret_list = []
	for index, on_elmt in enumerate(on_cols):
		amount = amount_of_each_on_elements[index]
		for i in range(amount):
			ret_list.append((on_elmt, 'OS'+str(i)))

	return ret_list

on_cols = ['A','B','C']
amount_of_each_on_elements = [200000, 200000, 200000, 200000]

ret_list_skewed = create_df(on_cols, amount_of_each_on_elements)

skewed_df = spark.createDataFrame(ret_list_skewed, ['ON', 'OTHER_SKEWED_FEATURES'])

on_cols = ['A','B','C']
amount_of_each_on_elements = [10, 5, 10, 10]

ret_list_referenced = create_df(on_cols, amount_of_each_on_elements)

referenced_df = spark.createDataFrame(ret_list_referenced, ['ON', 'OTHER_REFERENCED_FEATURES'])

start = time.time()
joined_df = skewed_df.join(referenced_df, 'ON', how='left')
joined_df.collect()
print('TIME NEEDED: {}'.format(str(time.time() - start)))

joined_df.explain(True)

And here’s the brief report of what I got.

amount_of_each_on_elements skewed = [10000, 50, 1000]
amount_of_each_on_elements referenced = [10, 5, 10]
time: 5.079 s

amount_of_each_on_elements skewed = [100000, 50, 90000]
amount_of_each_on_elements referenced = [10, 5, 10]
time: 31.162 s

amount_of_each_on_elements skewed = [300000, 50, 1000]
amount_of_each_on_elements referenced = [10, 5, 10]
time: 44.352 s

amount_of_each_on_elements skewed = [200000, 2000, 2000]
amount_of_each_on_elements referenced = [10, 5, 10]
time: 17.954 s

As you can see, the time needed to complete the join operation correlates positively with the number of skewed elements in the dataframe. To understand the results better, let’s take a look at the physical plan.

== Physical Plan ==
*(4) Project [ON#0, OTHER_SKEWED_FEATURES#1, OTHER_REFERENCED_FEATURES#12]
+- SortMergeJoin [ON#0], [ON#11], LeftOuter
   :- *(1) Sort [ON#0 ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(ON#0, 200)
   :     +- Scan ExistingRDD[ON#0,OTHER_SKEWED_FEATURES#1]
   +- *(3) Sort [ON#11 ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(ON#11, 200)
         +- *(2) Filter isnotnull(ON#11)
            +- Scan ExistingRDD[ON#11,OTHER_REFERENCED_FEATURES#12]

According to the above physical plan, there are few things to note. First, we can see that Spark selected SortMergeJoin as the join’s algorithm. Just FYI, there are several algorithms for join operation, such as SortMergeJoin, ShuffleHashJoin, and BroadcastJoin. SortMergeJoin works similarly as ShuffleHashJoin. The only difference is that SortMergeJoin enables data spilling to disk, while ShuffleHashJoin requires all partitions to fit into memory (which sometimes causes OOM when the size of a partition is too large).

Second thing to note is that this SortMergeJoin also does repartitioning. We know this by the expression of Exchange hashpartitioning. From the expression, we know that the number of partitions created for each dataframe is 200 (which is the default value denoted by spark.sql.shuffle.partitions).

Well, to optimize the join operation, one thing that came to my mind was set the number of partitions based on the ON column. So, instead of getting 200 partitions after repartitioning, the number of partitions now should be equivalent to the number of distinct elements in the ON column. Here’s what I did.

start = time.time()

# repartition first before joining the dataframe
skewed_df = skewed_df.repartition(3, 'ON')
referenced_df = referenced_df.repartition(3, 'ON')

joined_df = skewed_df.join(referenced_df, 'ON', how='left')
joined_df.collect()
print('TIME NEEDED: {}'.format(str(time.time() - start)))

And here’s the results.

REPARTITIONING FIRST BEFORE JOINING THE DATAFRAMES
--------------------------------------------------

amount_of_each_on_elements skewed = [10000, 50, 1000]
amount_of_each_on_elements referenced = [10, 5, 10]
time: 3.487 s (approx. 1.46 x faster)

amount_of_each_on_elements skewed = [100000, 50, 90000]
amount_of_each_on_elements referenced = [10, 5, 10]
time: 16.058 s (approx. 1.94 x faster)

amount_of_each_on_elements skewed = [300000, 50, 1000]
amount_of_each_on_elements referenced = [10, 5, 10]
time: 24.405 s (approx. 1.82 x faster)

amount_of_each_on_elements skewed = [200000, 2000, 2000]
amount_of_each_on_elements referenced = [10, 5, 10]
time: 16.253 s (approx. 1.10 x faster)

Wow! The time needed reduces quite a lot. However, as you can see, the multiplication factor somewhat decreases from the 2nd config to the last config (from 1.94 to 1.82 to 1.10 times faster). Moreover, the difference between the third and the last config is pretty far, though (from 1.82 to 1.10). In my opinion, as the number of skewed data increases, the time needed to complete the join operation might be quite similar to our previous experiments (without repartitioning first). Even though, perhaps, this experiment yields faster completion time, the time gap from the previous experiment might be extremely low since the problem of empty partitions (which I’ve mentioned several times in my previous articles) would not be significantly considered as a concern to the performance anymore.

ALright, let’s take a look at the generated physical plan.

== Physical Plan ==
*(4) Project [ON#0, OTHER_SKEWED_FEATURES#1, OTHER_REFERENCED_FEATURES#12]
+- SortMergeJoin [ON#0], [ON#11], LeftOuter
   :- *(1) Sort [ON#0 ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(ON#0, 3)
   :     +- Scan ExistingRDD[ON#0,OTHER_SKEWED_FEATURES#1]
   +- *(3) Sort [ON#11 ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(ON#11, 3)
         +- *(2) Filter isnotnull(ON#11)
            +- Scan ExistingRDD[ON#11,OTHER_REFERENCED_FEATURES#12]

The difference is obvious. The expression of Exchange hashpartitioning now has 3 as the number of partitions. The other expressions are the same with the physical plan in the previous experiment.

Hmm…

Somehow I remembered that another algorithm of join operation was BroadcastJoin. This algorithm is suitable when the dataframe is small enough to fit in memory of each executor. Actually, you can control this constraint by setting Spark’s config in spark.sql.autoBroadcastJoinThreshold. This config denotes the maximum size of the dataframe to be able to be broadcasted. Therefore, you need to consider the size of memory of each executor before setting up the value for this config.

Just FYI, broadcast join eliminates data shuffling over the network by sending a copy of the small dataframe to each executor. Therefore, instead of doing shuffle over the networks to perform join operation, Spark might just refer to the local copy of the smaller dataframe within each partition.

Fortunately, my referenced dataframe was not so big which means I might be able to apply join operation using this algorithm. Here’s the modified code snippet.

start = time.time()

# broadcast the smaller dataframe
joined_df = skewed_df.join(F.broadcast(referenced_df), 'ON', how='left')
joined_df.collect()
print('TIME NEEDED: {}'.format(str(time.time() - start)))

One thing to note here is that in this experiment I didn’t use repartitioning first approach anymore as it might cause data shuffling over the network.

Here’s what I got.

BROADCASTING THE SMALLER DATAFRAME
----------------------------------

amount_of_each_on_elements skewed = [10000, 50, 1000]
amount_of_each_on_elements referenced = [10, 5, 10]
time: 2.202 s

amount_of_each_on_elements skewed = [100000, 50, 90000]
amount_of_each_on_elements referenced = [10, 5, 10]
time: 15.600 s

amount_of_each_on_elements skewed = [300000, 50, 1000]
amount_of_each_on_elements referenced = [10, 5, 10]
time: 23.065 s

amount_of_each_on_elements skewed = [200000, 2000, 2000]
amount_of_each_on_elements referenced = [10, 5, 10]
time: 16.557 s

The result is pretty similar to the previous experiment (with repartition first). Hmm…

Take a look at the generated physical plan. As you can see, this algorithm doesn’t require data shuffling.

== Physical Plan ==
*(2) Project [ON#0, OTHER_SKEWED_FEATURES#1, OTHER_REFERENCED_FEATURES#12]
+- *(2) BroadcastHashJoin [ON#0], [ON#11], LeftOuter, BuildRight
   :- Scan ExistingRDD[ON#0,OTHER_SKEWED_FEATURES#1]
   +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, false]))
      +- *(1) Filter isnotnull(ON#11)
         +- Scan ExistingRDD[ON#11,OTHER_REFERENCED_FEATURES#12]

Alright, my last attempt. This attempt was based on the insight that the join operation took long time as the data was skewed. Consequently, Spark tries to push all rows with the same value in the join key in the same partition. This obviously makes one partition has much bigger size than the others. How about if we somehow could split the large partition into several small partitions so that we ended up with more balanced partitions? Okay, perhaps my question seems to be a little bit confusing. Let’s take a look at the modified code snippet below.

def join_skewed_df(skewed_df, referenced_df, on, replication_factor):
	duplicated_referenced_df = referenced_df.withColumn(
		'replication_id',
		F.explode(F.array(*[F.lit(i) for i in range(replication_factor)]))
	)
	duplicated_referenced_df = duplicated_referenced_df.withColumn(
		'new_join_key',
		F.concat(F.col(on), F.lit('-'), F.col('replication_id'))
	)

	skewed_df = skewed_df.withColumn(
		'new_join_key',
		F.concat(F.col(on), F.lit('-'), F.round(F.rand() * (replication_factor - 1)).cast(IntegerType()))
	)
	
        num_of_distinct_skewed = skewed_df.select('new_join_key').distinct().count()
	skewed_df = skewed_df.repartition(num_of_distinct_skewed, 'new_join_key')

	num_of_distinct_referenced = duplicated_referenced_df.select('new_join_key').distinct().count()
	duplicated_referenced_df = duplicated_referenced_df.repartition(num_of_distinct_reference, 'new_join_key')
	
	return skewed_df.join(duplicated_referenced_df, 'new_join_key', how='left')

# call the method
start = time.time()
joined_df = join_skewed_df(skewed_df, referenced_df, 'ON', 5)
joined_df.collect()
print('TIME NEEDED: {}'.format(str(time.time() - start)))

Basically, what the above code does can be described as the followings:

  • Create a new column called replication_id in referenced_df. Simply put, each row in referenced_df is replicated replication_factor times. For instance, if we have 3 rows and replication factor of 5, then we'll end up with a new dataframe with 15 rows
  • Create a new column called new_join_key in referenced_df and skewed_df. This column gives each row in the original dataframe (before we added this new column) the same key. For instance, a dataframe with 2 rows and replication factor of 2 will result in [(row1, new_join_key_1), (row1, new_join_key_2), (row2, new_join_key_1), (row2, new_join_key_2)]
  • Repartition both dataframes based on the new_join_key column. This step is applied to minimize the problem of empty partitions
  • Join the skewed and referenced dataframe

Let’s take a look first at the result I got.

BALANCING PARTITIONS
--------------------

amount_of_each_on_elements skewed = [10000, 50, 1000]
amount_of_each_on_elements referenced = [10, 5, 10]
time: 10.953 s

amount_of_each_on_elements skewed = [100000, 50, 90000]
amount_of_each_on_elements referenced = [10, 5, 10]
time: 27.452 s

amount_of_each_on_elements skewed = [300000, 50, 1000]
amount_of_each_on_elements referenced = [10, 5, 10]
time: 35.324 s 

amount_of_each_on_elements skewed = [200000, 2000, 2000]
amount_of_each_on_elements referenced = [10, 5, 10]
time: 29.957 s

The results is not better. In my opinion, there are several causes of this. The first one is related to other transformations applied before joining the dataframes, one of them is withColumn (3 times). Since this approach involves other transformations, I think it’s fair to make them count in the processing time calculation.

However, in this approach, there’s another parameter to tune, namely replication factor. I have yet tried different value for this parameter, though. So if you want to play with it, please let me know what you got :)

There’s another thing to note here. I have yet tried to apply broadcasting to the referenced dataframe. In my hyphothesis, broadcasting might speed up the join operation. However, I think the result might not be different from the previous experiment (with broadcast join) since the whole process is quite similar and adding ‘randomness’ (new column) to the dataframe doesn’t change the process behavior.

Oh ya, here’s the generated physical plan.

== Physical Plan ==
*(5) Project [new_join_key#18, ON#0, OTHER_SKEWED_FEATURES#1, ON#4, OTHER_REFERENCED_FEATURES#5, replication_id#9]
+- SortMergeJoin [new_join_key#18], [new_join_key#13], LeftOuter
   :- *(2) Sort [new_join_key#18 ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(new_join_key#18, 15)
   :     +- *(1) Project [ON#0, OTHER_SKEWED_FEATURES#1, concat(ON#0, -, cast(cast(round((rand(5452837992027299162) * 9.0), 0) as int) as string)) AS new_join_key#18]
   :        +- Scan ExistingRDD[ON#0,OTHER_SKEWED_FEATURES#1]
   +- *(4) Sort [new_join_key#13 ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(new_join_key#13, 15)
         +- *(3) Project [ON#4, OTHER_REFERENCED_FEATURES#5, replication_id#9, concat(ON#4, -, cast(replication_id#9 as string)) AS new_join_key#13]
            +- *(3) Filter isnotnull(concat(ON#4, -, cast(replication_id#9 as string)))
               +- Generate explode([0,1,2,3,4]), [ON#4, OTHER_REFERENCED_FEATURES#5], false, [replication_id#9]
                  +- Scan ExistingRDD[ON#4,OTHER_REFERENCED_FEATURES#5]

Final Thoughts

  1. SortMergeJoin is good yet requires data shuffling. Data shuffling might give DEFAULT_NUMBER partitions afterwards. Hence, it's a good advice to repartition the dataframes first before performing join operation. Repartitioning first speeds up the join operation, yet as the number of skewed data increases, the time difference between applying and without applying repartition first might be pretty small
  2. BroadcastJoin is a good try when the dataframe is small enough to fit in each executor's memory. It speeds up the join process as Spark can just refer to the local copy of the smaller dataframe (eliminates data shuffling). However, it seems that the processing time is not a lot different when applying repartition first before joining
  3. Adding randomness to the skewed dataframe splits the single big partition into several small partitions (more evenly distributed). Applying this approach makes the join operation "safer" in terms of memory space. Think about it like this: before this approach, all rows from both dataframes with the same value in the join key column will go to the same partition. When using ShuffleHashJoin, this big partition must fit in memory. Well, it may cause out of memory obviously. Meanwhile, when using SortMergeJoin, this big partition may spill out to disk, thus slowing down the join operation since Spark needs to read the data from disk (it takes a lot of time, dude). However, when this randomness approach is applied, all rows with the same value of the new_join_key column (generated randomly) will go to the same partition. In this time, a single partition might not have much more data than the others. However, you need to tune the value of replication factor to come up with the mst optimal processing time

A long post for today.

Thanks for reading.