Mastering Spark [PART 11]: Too Lazy to Process the Whole Dataframe

2 minute read


One of the characteristics of Spark that makes me interested to explore this framework further is its lazy evaluation approach. Simply put, Spark won’t execute the transformation until an action is called. I think it’s logical since when we only specify the transformation plan and don’t ask it to execute the plan, why it needs to force itself to do the computation on the data? In addition, by implementing this lazy evaluation approach, Spark might be able to optimize the logical plan. The task of making the query to be more efficient manually might be reduced significantly. Cool, right?

However, there was one simple thing that made me surprised about this lazy behavior. Consider this case. You defined a transformation plan on a dataframe. Then, you called an action using a show() method. By default, Spark will only show 20 rows. You might want to retrieve more rows and you did that by passing an integer N as a parameter to the show() method where N > 20. You did the same thing again but this time the new integer M is larger than N. The question is, would the execution time needed by all the three cases be pretty similar or even the same? An intriguing question. Let’s write some code.

import time

from pyspark import SparkContext, SparkConf, SQLContext

conf = SparkConf().setAppName("test")
sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)

columns = ['F0', 'F1', 'F2', 'F3', 'F4', 'F5', 'F6', 'F7', 'F8', 'F9', 'F10']

# L is the list of tuples representing rows in the dataframe
rdd = sc.parallelize(L)
df = rdd.toDF(columns)

start = time.time()

df = df.withColumn('NEW_COL', df['F0'] * df['F1'] * df['F2'])

# shows the first N elements

print('TIME: ' + str(time.time() - start))

You can play with the number of N to answer the previous question. I set N to several values, such as 20, 100, 500, 2500, 12500, 62500, and 312500. Here’s what I got.

Number of rows: time needed to execute the plan (in secs)

20 rows: 0.888638973236084 secs
100 rows: 0.8791308403015137 secs
500 rows: 0.9497220516204834 secs
2500 rows: 1.0902130603790283 secs
12500 rows: 1.3850088119506836 secs
62500 rows: 2.189028739929199 secs
312500 rows: 8.4181969165802 secs

As you can see from the above result, as I increased the number of rows, the time needed to execute the plan increased as well. This might conclude that Spark only applies the transformation to N rows. We specify the value of N by ourselves. Even though the resulting dataframe actually has M rows (where M > N), Spark only process N rows since we asked it to do so. The remaining (M - N) rows won’t be processed. Well, I think it’s efficient since we don’t need to wait for Spark to process the whole dataframe (M rows) only to retrieve N rows (where N <= M).