Mastering Spark [PART 18]: Ensuring Dataframe Partitions After Equi-joining (Inner)

6 minute read

Published:

The problem is really simple. After equi-joining (inner) two dataframes, a certain operation is applied to each partition. Precisely, such an operation can be accomplished by the following code:

joined_df = df_0.join(df_1, 'ON', how='inner')
resulting_partitions = joined_df.rdd.mapPartitionsWithIndex(func)

Sometimes, when using inner join we expect that all the elements in each partition after join operation is done to be the same. We expect it as the func would output the correct results when our expectation is true. One way to ensure this is by using the following code (done before mapPartitionsWithIndex):

joined_df = joined_df.repartition(num_of_distinct_elements, 'ON')

However, such a repartitioning might yield a lot of unnecessary shuffling, serialization, and so on. So, the question is, is the content of each partition after the inner join operation really what we desired?

I did a quick and simple experiment on this intriguing problem. The code I used is shown below.

def create_df(on_cols, amount_of_each_on_elements):
	ret_list = []
	for index, on_elmt in enumerate(on_cols):
		amount = amount_of_each_on_elements[index]
		for i in range(amount):
			ret_list.append((on_elmt, 'OS'+str(i)))

	return ret_list

def func(index, iterator):
	l = []
	for row in iterator:
		l.append((index, row))
	return l


on_cols = ['A','B','C']
amount_of_each_on_elements = [5, 5, 5]

ret_list = create_df(on_cols, amount_of_each_on_elements)
df_0 = spark.createDataFrame(ret_list, ['ON', 'OTHER_SKEWED_FEATURES'])

on_cols = ['A', 'B', 'C', 'D', 'E', 'F']
amount_of_each_on_elements = [5, 5, 5, 5, 5, 5]

ret_list = create_df(on_cols, amount_of_each_on_elements)
df_1 = spark.createDataFrame(ret_list, ['ON', 'OTHER_REFERENCED_FEATURES'])

df_0 = df_0.repartition(3, 'ON')
df_1 = df_1.repartition(6, 'ON')

joined_df = df_0.join(df_1, 'ON', how='inner')

print('Num of partitions: {}'.format(joined_df.rdd.getNumPartitions()))
for index, partition in enumerate(joined_df.rdd.glom().collect()):
	print('Partition {}'.format(index))
	print(partition)

resulting_partitions = joined_df.rdd.mapPartitionsWithIndex(func).glom().collect()

for partition in resulting_partitions:
	print(partition)

And here’s what I got (for brevity, the results of some experiments are not shown).

df_0 has 3 elements A, B, and C, each has 5, 5, and 5 rows respectively (repartitioned to 3 partitions)
df_1 has 3 elements A, B, and C, each has 5, 5, and 5 rows respectively (repartitioned to 3 partitions)
After mapPartitions, the resulting_partitions is the same with the joined_df's partitions.

======

df_0 has 3 elements A, B, and C, each has 100, 5, and 5 rows respectively (repartitioned to 3 partitions)
df_1 has 3 elements A, B, and C, each has 5, 5, and 5 rows respectively (repartitioned to 3 partitions)
After mapPartitions, the resulting_partitions is the same with the joined_df's partitions.

======

df_0 has 3 elements A, B, and C, each has 5, 5, and 5 rows respectively (repartitioned to 3 partitions)
df_1 has 6 elements A, B, C, D, E, and F, each has 5, 5, 5, 5, and 5 rows respectively (repartitioned to 6 partitions)

The more detail result is shown below.

AFTER JOIN PARTITIONS
+++++++++++++++++++++
Partition 0
empty
Partition 1
Row(ON='C', OTHER_SKEWED_FEATURES='OS0', OTHER_REFERENCED_FEATURES='OS0'), Row(ON='C', OTHER_SKEWED_FEATURES='OS0', OTHER_REFERENCED_FEATURES='OS1'), Row(ON='C', OTHER_SKEWED_FEATURES='OS0', OTHER_REFERENCED_FEATURES='OS2'), Row(ON='C', OTHER_SKEWED_FEATURES='OS0', OTHER_REFERENCED_FEATURES='OS3'), Row(ON='C', OTHER_SKEWED_FEATURES='OS0', OTHER_REFERENCED_FEATURES='OS4'), Row(ON='C', OTHER_SKEWED_FEATURES='OS1', OTHER_REFERENCED_FEATURES='OS0'), Row(ON='C', OTHER_SKEWED_FEATURES='OS1', OTHER_REFERENCED_FEATURES='OS1'), Row(ON='C', OTHER_SKEWED_FEATURES='OS1', OTHER_REFERENCED_FEATURES='OS2'), Row(ON='C', OTHER_SKEWED_FEATURES='OS1', OTHER_REFERENCED_FEATURES='OS3'), Row(ON='C', OTHER_SKEWED_FEATURES='OS1', OTHER_REFERENCED_FEATURES='OS4'), Row(ON='C', OTHER_SKEWED_FEATURES='OS2', OTHER_REFERENCED_FEATURES='OS0'), Row(ON='C', OTHER_SKEWED_FEATURES='OS2', OTHER_REFERENCED_FEATURES='OS1'), Row(ON='C', OTHER_SKEWED_FEATURES='OS2', OTHER_REFERENCED_FEATURES='OS2'), Row(ON='C', OTHER_SKEWED_FEATURES='OS2', OTHER_REFERENCED_FEATURES='OS3'), Row(ON='C', OTHER_SKEWED_FEATURES='OS2', OTHER_REFERENCED_FEATURES='OS4'), Row(ON='C', OTHER_SKEWED_FEATURES='OS3', OTHER_REFERENCED_FEATURES='OS0'), Row(ON='C', OTHER_SKEWED_FEATURES='OS3', OTHER_REFERENCED_FEATURES='OS1'), Row(ON='C', OTHER_SKEWED_FEATURES='OS3', OTHER_REFERENCED_FEATURES='OS2'), Row(ON='C', OTHER_SKEWED_FEATURES='OS3', OTHER_REFERENCED_FEATURES='OS3'), Row(ON='C', OTHER_SKEWED_FEATURES='OS3', OTHER_REFERENCED_FEATURES='OS4'), Row(ON='C', OTHER_SKEWED_FEATURES='OS4', OTHER_REFERENCED_FEATURES='OS0'), Row(ON='C', OTHER_SKEWED_FEATURES='OS4', OTHER_REFERENCED_FEATURES='OS1'), Row(ON='C', OTHER_SKEWED_FEATURES='OS4', OTHER_REFERENCED_FEATURES='OS2'), Row(ON='C', OTHER_SKEWED_FEATURES='OS4', OTHER_REFERENCED_FEATURES='OS3'), Row(ON='C', OTHER_SKEWED_FEATURES='OS4', OTHER_REFERENCED_FEATURES='OS4')
Partition 2
Row(ON='A', OTHER_SKEWED_FEATURES='OS0', OTHER_REFERENCED_FEATURES='OS0'), Row(ON='A', OTHER_SKEWED_FEATURES='OS0', OTHER_REFERENCED_FEATURES='OS1'), Row(ON='A', OTHER_SKEWED_FEATURES='OS0', OTHER_REFERENCED_FEATURES='OS2'), Row(ON='A', OTHER_SKEWED_FEATURES='OS0', OTHER_REFERENCED_FEATURES='OS3'), Row(ON='A', OTHER_SKEWED_FEATURES='OS0', OTHER_REFERENCED_FEATURES='OS4'), Row(ON='A', OTHER_SKEWED_FEATURES='OS1', OTHER_REFERENCED_FEATURES='OS0'), Row(ON='A', OTHER_SKEWED_FEATURES='OS1', OTHER_REFERENCED_FEATURES='OS1'), Row(ON='A', OTHER_SKEWED_FEATURES='OS1', OTHER_REFERENCED_FEATURES='OS2'), Row(ON='A', OTHER_SKEWED_FEATURES='OS1', OTHER_REFERENCED_FEATURES='OS3'), Row(ON='A', OTHER_SKEWED_FEATURES='OS1', OTHER_REFERENCED_FEATURES='OS4'), Row(ON='A', OTHER_SKEWED_FEATURES='OS2', OTHER_REFERENCED_FEATURES='OS0'), Row(ON='A', OTHER_SKEWED_FEATURES='OS2', OTHER_REFERENCED_FEATURES='OS1'), Row(ON='A', OTHER_SKEWED_FEATURES='OS2', OTHER_REFERENCED_FEATURES='OS2'), Row(ON='A', OTHER_SKEWED_FEATURES='OS2', OTHER_REFERENCED_FEATURES='OS3'), Row(ON='A', OTHER_SKEWED_FEATURES='OS2', OTHER_REFERENCED_FEATURES='OS4'), Row(ON='A', OTHER_SKEWED_FEATURES='OS3', OTHER_REFERENCED_FEATURES='OS0'), Row(ON='A', OTHER_SKEWED_FEATURES='OS3', OTHER_REFERENCED_FEATURES='OS1'), Row(ON='A', OTHER_SKEWED_FEATURES='OS3', OTHER_REFERENCED_FEATURES='OS2'), Row(ON='A', OTHER_SKEWED_FEATURES='OS3', OTHER_REFERENCED_FEATURES='OS3'), Row(ON='A', OTHER_SKEWED_FEATURES='OS3', OTHER_REFERENCED_FEATURES='OS4'), Row(ON='A', OTHER_SKEWED_FEATURES='OS4', OTHER_REFERENCED_FEATURES='OS0'), Row(ON='A', OTHER_SKEWED_FEATURES='OS4', OTHER_REFERENCED_FEATURES='OS1'), Row(ON='A', OTHER_SKEWED_FEATURES='OS4', OTHER_REFERENCED_FEATURES='OS2'), Row(ON='A', OTHER_SKEWED_FEATURES='OS4', OTHER_REFERENCED_FEATURES='OS3'), Row(ON='A', OTHER_SKEWED_FEATURES='OS4', OTHER_REFERENCED_FEATURES='OS4')
Partition 3
Row(ON='B', OTHER_SKEWED_FEATURES='OS0', OTHER_REFERENCED_FEATURES='OS0'), Row(ON='B', OTHER_SKEWED_FEATURES='OS0', OTHER_REFERENCED_FEATURES='OS1'), Row(ON='B', OTHER_SKEWED_FEATURES='OS0', OTHER_REFERENCED_FEATURES='OS2'), Row(ON='B', OTHER_SKEWED_FEATURES='OS0', OTHER_REFERENCED_FEATURES='OS3'), Row(ON='B', OTHER_SKEWED_FEATURES='OS0', OTHER_REFERENCED_FEATURES='OS4'), Row(ON='B', OTHER_SKEWED_FEATURES='OS1', OTHER_REFERENCED_FEATURES='OS0'), Row(ON='B', OTHER_SKEWED_FEATURES='OS1', OTHER_REFERENCED_FEATURES='OS1'), Row(ON='B', OTHER_SKEWED_FEATURES='OS1', OTHER_REFERENCED_FEATURES='OS2'), Row(ON='B', OTHER_SKEWED_FEATURES='OS1', OTHER_REFERENCED_FEATURES='OS3'), Row(ON='B', OTHER_SKEWED_FEATURES='OS1', OTHER_REFERENCED_FEATURES='OS4'), Row(ON='B', OTHER_SKEWED_FEATURES='OS2', OTHER_REFERENCED_FEATURES='OS0'), Row(ON='B', OTHER_SKEWED_FEATURES='OS2', OTHER_REFERENCED_FEATURES='OS1'), Row(ON='B', OTHER_SKEWED_FEATURES='OS2', OTHER_REFERENCED_FEATURES='OS2'), Row(ON='B', OTHER_SKEWED_FEATURES='OS2', OTHER_REFERENCED_FEATURES='OS3'), Row(ON='B', OTHER_SKEWED_FEATURES='OS2', OTHER_REFERENCED_FEATURES='OS4'), Row(ON='B', OTHER_SKEWED_FEATURES='OS3', OTHER_REFERENCED_FEATURES='OS0'), Row(ON='B', OTHER_SKEWED_FEATURES='OS3', OTHER_REFERENCED_FEATURES='OS1'), Row(ON='B', OTHER_SKEWED_FEATURES='OS3', OTHER_REFERENCED_FEATURES='OS2'), Row(ON='B', OTHER_SKEWED_FEATURES='OS3', OTHER_REFERENCED_FEATURES='OS3'), Row(ON='B', OTHER_SKEWED_FEATURES='OS3', OTHER_REFERENCED_FEATURES='OS4'), Row(ON='B', OTHER_SKEWED_FEATURES='OS4', OTHER_REFERENCED_FEATURES='OS0'), Row(ON='B', OTHER_SKEWED_FEATURES='OS4', OTHER_REFERENCED_FEATURES='OS1'), Row(ON='B', OTHER_SKEWED_FEATURES='OS4', OTHER_REFERENCED_FEATURES='OS2'), Row(ON='B', OTHER_SKEWED_FEATURES='OS4', OTHER_REFERENCED_FEATURES='OS3'), Row(ON='B', OTHER_SKEWED_FEATURES='OS4', OTHER_REFERENCED_FEATURES='OS4')
Partition 4
empty
Partition 5
empty

MAP PARTITIONS
++++++++++++++
Partition 0
empty
Partition 1
(1, Row(ON='C', OTHER_SKEWED_FEATURES='OS0', OTHER_REFERENCED_FEATURES='OS0')), (1, Row(ON='C', OTHER_SKEWED_FEATURES='OS0', OTHER_REFERENCED_FEATURES='OS1')), (1, Row(ON='C', OTHER_SKEWED_FEATURES='OS0', OTHER_REFERENCED_FEATURES='OS2')), (1, Row(ON='C', OTHER_SKEWED_FEATURES='OS0', OTHER_REFERENCED_FEATURES='OS3')), (1, Row(ON='C', OTHER_SKEWED_FEATURES='OS0', OTHER_REFERENCED_FEATURES='OS4')), (1, Row(ON='C', OTHER_SKEWED_FEATURES='OS1', OTHER_REFERENCED_FEATURES='OS0')), (1, Row(ON='C', OTHER_SKEWED_FEATURES='OS1', OTHER_REFERENCED_FEATURES='OS1')), (1, Row(ON='C', OTHER_SKEWED_FEATURES='OS1', OTHER_REFERENCED_FEATURES='OS2')), (1, Row(ON='C', OTHER_SKEWED_FEATURES='OS1', OTHER_REFERENCED_FEATURES='OS3')), (1, Row(ON='C', OTHER_SKEWED_FEATURES='OS1', OTHER_REFERENCED_FEATURES='OS4')), (1, Row(ON='C', OTHER_SKEWED_FEATURES='OS2', OTHER_REFERENCED_FEATURES='OS0')), (1, Row(ON='C', OTHER_SKEWED_FEATURES='OS2', OTHER_REFERENCED_FEATURES='OS1')), (1, Row(ON='C', OTHER_SKEWED_FEATURES='OS2', OTHER_REFERENCED_FEATURES='OS2')), (1, Row(ON='C', OTHER_SKEWED_FEATURES='OS2', OTHER_REFERENCED_FEATURES='OS3')), (1, Row(ON='C', OTHER_SKEWED_FEATURES='OS2', OTHER_REFERENCED_FEATURES='OS4')), (1, Row(ON='C', OTHER_SKEWED_FEATURES='OS3', OTHER_REFERENCED_FEATURES='OS0')), (1, Row(ON='C', OTHER_SKEWED_FEATURES='OS3', OTHER_REFERENCED_FEATURES='OS1')), (1, Row(ON='C', OTHER_SKEWED_FEATURES='OS3', OTHER_REFERENCED_FEATURES='OS2')), (1, Row(ON='C', OTHER_SKEWED_FEATURES='OS3', OTHER_REFERENCED_FEATURES='OS3')), (1, Row(ON='C', OTHER_SKEWED_FEATURES='OS3', OTHER_REFERENCED_FEATURES='OS4')), (1, Row(ON='C', OTHER_SKEWED_FEATURES='OS4', OTHER_REFERENCED_FEATURES='OS0')), (1, Row(ON='C', OTHER_SKEWED_FEATURES='OS4', OTHER_REFERENCED_FEATURES='OS1')), (1, Row(ON='C', OTHER_SKEWED_FEATURES='OS4', OTHER_REFERENCED_FEATURES='OS2')), (1, Row(ON='C', OTHER_SKEWED_FEATURES='OS4', OTHER_REFERENCED_FEATURES='OS3')), (1, Row(ON='C', OTHER_SKEWED_FEATURES='OS4', OTHER_REFERENCED_FEATURES='OS4'))
Partition 2
(2, Row(ON='A', OTHER_SKEWED_FEATURES='OS0', OTHER_REFERENCED_FEATURES='OS0')), (2, Row(ON='A', OTHER_SKEWED_FEATURES='OS0', OTHER_REFERENCED_FEATURES='OS1')), (2, Row(ON='A', OTHER_SKEWED_FEATURES='OS0', OTHER_REFERENCED_FEATURES='OS2')), (2, Row(ON='A', OTHER_SKEWED_FEATURES='OS0', OTHER_REFERENCED_FEATURES='OS3')), (2, Row(ON='A', OTHER_SKEWED_FEATURES='OS0', OTHER_REFERENCED_FEATURES='OS4')), (2, Row(ON='A', OTHER_SKEWED_FEATURES='OS1', OTHER_REFERENCED_FEATURES='OS0')), (2, Row(ON='A', OTHER_SKEWED_FEATURES='OS1', OTHER_REFERENCED_FEATURES='OS1')), (2, Row(ON='A', OTHER_SKEWED_FEATURES='OS1', OTHER_REFERENCED_FEATURES='OS2')), (2, Row(ON='A', OTHER_SKEWED_FEATURES='OS1', OTHER_REFERENCED_FEATURES='OS3')), (2, Row(ON='A', OTHER_SKEWED_FEATURES='OS1', OTHER_REFERENCED_FEATURES='OS4')), (2, Row(ON='A', OTHER_SKEWED_FEATURES='OS2', OTHER_REFERENCED_FEATURES='OS0')), (2, Row(ON='A', OTHER_SKEWED_FEATURES='OS2', OTHER_REFERENCED_FEATURES='OS1')), (2, Row(ON='A', OTHER_SKEWED_FEATURES='OS2', OTHER_REFERENCED_FEATURES='OS2')), (2, Row(ON='A', OTHER_SKEWED_FEATURES='OS2', OTHER_REFERENCED_FEATURES='OS3')), (2, Row(ON='A', OTHER_SKEWED_FEATURES='OS2', OTHER_REFERENCED_FEATURES='OS4')), (2, Row(ON='A', OTHER_SKEWED_FEATURES='OS3', OTHER_REFERENCED_FEATURES='OS0')), (2, Row(ON='A', OTHER_SKEWED_FEATURES='OS3', OTHER_REFERENCED_FEATURES='OS1')), (2, Row(ON='A', OTHER_SKEWED_FEATURES='OS3', OTHER_REFERENCED_FEATURES='OS2')), (2, Row(ON='A', OTHER_SKEWED_FEATURES='OS3', OTHER_REFERENCED_FEATURES='OS3')), (2, Row(ON='A', OTHER_SKEWED_FEATURES='OS3', OTHER_REFERENCED_FEATURES='OS4')), (2, Row(ON='A', OTHER_SKEWED_FEATURES='OS4', OTHER_REFERENCED_FEATURES='OS0')), (2, Row(ON='A', OTHER_SKEWED_FEATURES='OS4', OTHER_REFERENCED_FEATURES='OS1')), (2, Row(ON='A', OTHER_SKEWED_FEATURES='OS4', OTHER_REFERENCED_FEATURES='OS2')), (2, Row(ON='A', OTHER_SKEWED_FEATURES='OS4', OTHER_REFERENCED_FEATURES='OS3')), (2, Row(ON='A', OTHER_SKEWED_FEATURES='OS4', OTHER_REFERENCED_FEATURES='OS4'))
Partition 3
(3, Row(ON='B', OTHER_SKEWED_FEATURES='OS0', OTHER_REFERENCED_FEATURES='OS0')), (3, Row(ON='B', OTHER_SKEWED_FEATURES='OS0', OTHER_REFERENCED_FEATURES='OS1')), (3, Row(ON='B', OTHER_SKEWED_FEATURES='OS0', OTHER_REFERENCED_FEATURES='OS2')), (3, Row(ON='B', OTHER_SKEWED_FEATURES='OS0', OTHER_REFERENCED_FEATURES='OS3')), (3, Row(ON='B', OTHER_SKEWED_FEATURES='OS0', OTHER_REFERENCED_FEATURES='OS4')), (3, Row(ON='B', OTHER_SKEWED_FEATURES='OS1', OTHER_REFERENCED_FEATURES='OS0')), (3, Row(ON='B', OTHER_SKEWED_FEATURES='OS1', OTHER_REFERENCED_FEATURES='OS1')), (3, Row(ON='B', OTHER_SKEWED_FEATURES='OS1', OTHER_REFERENCED_FEATURES='OS2')), (3, Row(ON='B', OTHER_SKEWED_FEATURES='OS1', OTHER_REFERENCED_FEATURES='OS3')), (3, Row(ON='B', OTHER_SKEWED_FEATURES='OS1', OTHER_REFERENCED_FEATURES='OS4')), (3, Row(ON='B', OTHER_SKEWED_FEATURES='OS2', OTHER_REFERENCED_FEATURES='OS0')), (3, Row(ON='B', OTHER_SKEWED_FEATURES='OS2', OTHER_REFERENCED_FEATURES='OS1')), (3, Row(ON='B', OTHER_SKEWED_FEATURES='OS2', OTHER_REFERENCED_FEATURES='OS2')), (3, Row(ON='B', OTHER_SKEWED_FEATURES='OS2', OTHER_REFERENCED_FEATURES='OS3')), (3, Row(ON='B', OTHER_SKEWED_FEATURES='OS2', OTHER_REFERENCED_FEATURES='OS4')), (3, Row(ON='B', OTHER_SKEWED_FEATURES='OS3', OTHER_REFERENCED_FEATURES='OS0')), (3, Row(ON='B', OTHER_SKEWED_FEATURES='OS3', OTHER_REFERENCED_FEATURES='OS1')), (3, Row(ON='B', OTHER_SKEWED_FEATURES='OS3', OTHER_REFERENCED_FEATURES='OS2')), (3, Row(ON='B', OTHER_SKEWED_FEATURES='OS3', OTHER_REFERENCED_FEATURES='OS3')), (3, Row(ON='B', OTHER_SKEWED_FEATURES='OS3', OTHER_REFERENCED_FEATURES='OS4')), (3, Row(ON='B', OTHER_SKEWED_FEATURES='OS4', OTHER_REFERENCED_FEATURES='OS0')), (3, Row(ON='B', OTHER_SKEWED_FEATURES='OS4', OTHER_REFERENCED_FEATURES='OS1')), (3, Row(ON='B', OTHER_SKEWED_FEATURES='OS4', OTHER_REFERENCED_FEATURES='OS2')), (3, Row(ON='B', OTHER_SKEWED_FEATURES='OS4', OTHER_REFERENCED_FEATURES='OS3')), (3, Row(ON='B', OTHER_SKEWED_FEATURES='OS4', OTHER_REFERENCED_FEATURES='OS4'))
Partition 4
empty
Partition 5
empty

======

df_0 has 3 elements A, B, and C, each has 5, 5, and 5 rows respectively (repartitioned to 3 partitions)
df_1 has 3 elements X, Y, and Z, each has 5, 5, 5, and 5 rows respectively (repartitioned to 3 partitions)

AFTER JOIN PARTITIONS
+++++++++++++++++++++
Partition 0
empty
Partition 1
empty
Partition 2
empty

MAP PARTITIONS
++++++++++++++
Partition 0
empty
Partition 1
empty
Partition 2
empty

Based on the result above, we can see that after the inner join is done, the number of unique elements in each partition is 1. However, I think other types of join might not give the same result (the number of unique elements in each partition might not be 1) unless we create a custom partitioner.

Thanks for reading.