Mastering Spark [PART 20]: Resolving Reference Column Ambiguity After Self-Joining by Deep Copying the Dataframes

5 minute read

Published:

I encountered an intriguing result when joining a dataframe with itself (self-join). As you might have already known, one of the problems occurred when doing a self-join relates to duplicated column names. Because of this duplication, there’s an ambiguity when we do operations requiring us to provide the column names.

In this post I’m going to share what I did on my investigation. I used Spark in local mode.

Let’s start with creating a simple dataframe called df.

df = spark.createDataFrame([('a0', 'b0'), ('a1', 'b1'), ('a1', 'b2'), ('a1', 'b3'), ('a2', 'b2'), ('a2', 'b3'), ('a3', 'b3')], ['A','B'])

Several experiments were conducted after this step. Here we go.

Experiment 01

Let’s create two aliases (df0 and df1) for the previous dataframe.

df0 = df.alias('df0')
df1 = df.alias('df1')

I did a quick check both data frames with the following code.

df0 == df1

Which resulted in False.

Next, I joined both dataframes.

joined_df = df0.join(df1, 'A', how='left')

Here’s the result.

+---+---+---+
|  A|  B|  B|
+---+---+---+
| a3| b3| b3|
| a2| b2| b2|
| a2| b2| b3|
| a2| b3| b2|
| a2| b3| b3|
| a1| b1| b1|
| a1| b1| b2|
| a1| b1| b3|
| a1| b2| b1|
| a1| b2| b2|
| a1| b2| b3|
| a1| b3| b1|
| a1| b3| b2|
| a1| b3| b3|
| a0| b0| b0|
+---+---+---+

Now suppose I want to select one of the columns named B.

joined_df.select('B')

And certainly got the following error message.

pyspark.sql.utils.AnalysisException: "Reference 'B' is ambiguous, could be: df0.B, df1.B.;"

Now let’s try to obey the error message by providing the dataframe’s name holding the column.

col_B_df0 = joined_df.select(df0.B)
col_B_df1 = joined_df.select(df1.B)

Both col_B_df0 and col_B_df1 outputted the following dataframe.

+---+
|  B|
+---+
| b3|
| b2|
| b2|
| b3|
| b3|
| b1|
| b1|
| b1|
| b2|
| b2|
| b2|
| b3|
| b3|
| b3|
| b0|
+---+

The resulting dataframe is the same when we use df0.B and df1.B. However, when we use df1.B, we expect the following dataframe.

+---+
|  B|
+---+
| b3|
| b2|
| b3|
| b2|
| b3|
| b1|
| b2|
| b3|
| b1|
| b2|
| b3|
| b1|
| b2|
| b3|
| b0|
+---+

Experiment 02

The code was pretty similar, yet in this experiment we’ll use = operator when assigning a dataframe to a variable.

df0 = df
df1 = df

Did a quick check.

df0 == df1

And resulted in True.

Joined the two dataframes.

joined_df = df0.join(df1, 'A', how='left')

The resulting joined dataframe was the same as the 1st experiment. When selecting column B, I got the same result as well.

Experiment 03

In this experiment, the only difference was in the approach of making two dataframes that were the same. Precisely, I used a method called deep copy.

Here’s the code.

import copy

copied_schema = copy.deepcopy(df.schema)

df0 = df.rdd.zipWithIndex().map(lambda r: r[0]).toDF(copied_schema)
df1 = df.rdd.zipWithIndex().map(lambda r: r[0]).toDF(copied_schema)

And performed a quick check.

df0 == df1

I got False for the above command.

Next, I joined df0 and df1 and got the same results as the previous experiments. However, when I selected column B, the results were different. Take a look!

joined_df.select(df0.B).show()
------------------------------
+---+
|  B|
+---+
| b3|
| b2|
| b2|
| b3|
| b3|
| b1|
| b1|
| b1|
| b2|
| b2|
| b2|
| b3|
| b3|
| b3|
| b0|
+---+


joined_df.select(df1.B).show()
------------------------------
+---+
|  B|
+---+
| b3|
| b2|
| b3|
| b2|
| b3|
| b1|
| b2|
| b3|
| b1|
| b2|
| b3|
| b1|
| b2|
| b3|
| b0|
+---+

As I was curious and wanted to clarify the result further, I modified the dataframes by adding a new column called C.

from pyspark.sql import functions as F

# add a new column for df0
df0 = df0.withColumn('C', F.lit('c'))

# add a new column for df1
df1 = df1.withColumn('C', F.lit('CC'))

After being joined, the result looked like the following.

+---+---+---+---+---+
|  A|  B|  C|  B|  C|
+---+---+---+---+---+
| a3| b3|  c| b3| CC|
| a2| b2|  c| b2| CC|
| a2| b2|  c| b3| CC|
| a2| b3|  c| b2| CC|
| a2| b3|  c| b3| CC|
| a1| b1|  c| b1| CC|
| a1| b1|  c| b2| CC|
| a1| b1|  c| b3| CC|
| a1| b2|  c| b1| CC|
| a1| b2|  c| b2| CC|
| a1| b2|  c| b3| CC|
| a1| b3|  c| b1| CC|
| a1| b3|  c| b2| CC|
| a1| b3|  c| b3| CC|
| a0| b0|  c| b0| CC|
+---+---+---+---+---+

I selected column C and it returned the same error message (Reference is ambiguous) when I didn’t specify the dataframe’s name.

However, when I specified the dataframe’s name, here’s what I got.

joined_df.select(df0.C).show()
------------------------------
+---+
|  C|
+---+
|  c|
|  c|
|  c|
|  c|
|  c|
|  c|
|  c|
|  c|
|  c|
|  c|
|  c|
|  c|
|  c|
|  c|
|  c|
+---+

joined_df.select(df1.C).show()
------------------------------
+---+
|  C|
+---+
| CC|
| CC|
| CC|
| CC|
| CC|
| CC|
| CC|
| CC|
| CC|
| CC|
| CC|
| CC|
| CC|
| CC|
| CC|
+---+

Well, it seems that we need to make the schema of the two dataframes independent (two different schemas even though the schema’s content is the same). I decided to use a deep copy based on a thought that two dataframes created from scratch (using spark.createDataFrame()) would solve such an issue. And one way to achieve that was by applying a deep copy mechanism (the resulting dataframe won’t refer to the dataframe from which it was copied).

Thank you for reading.