Mastering Spark [PART 09]: An Optimized Approach for Multiple Dataframe Columns Operation

8 minute read

Published:

I came across an interesting problem when playing with ensembled learning. For those who don’t know about ensembled learning, it’s simply a machine learning approach that combines several weak classifiers to derive the final result. One of the simplest examples is random forest algorithm. In random forest, each tree learns different parts (features and data points) of the dataset. When predicting a new data point, each tree gives a vote for its class of choice. The final class is the one who is voted by the majority of trees.

For the sake of clarity, let’s take a look at this code.

# Let's assume that I've created a custom class named MyRandomForestModel which consists of several methods, such as init, train, and predict
# The init method receives a single parameter called feature_cols which is a list of features used for training the model

rf1 = MyRandomForestModel(['F1', 'F2', 'F3', 'F4', 'F5', 'F6'])
rf2 = MyRandomForestModel(['F1', 'F2', 'F3', 'F4', 'F5', 'F6'])
rf3 = MyRandomForestModel(['F1', 'F2', 'F3', 'F4', 'F5', 'F6'])

params_rf1 = {'n_estimators': 100, 'max_depth': 5, 'random_state': 3, 'bootstrap': False, 'max_features': 'sqrt', 'class_weight': {0: 0.7, 1: 0.8}}
params_rf2 = {'n_estimators': 75, 'max_depth': 7, 'random_state': 3, 'bootstrap': False, 'max_features': 1.0, 'class_weight': {0: 0.8, 1: 1.0}}
params_rf3 = {'n_estimators': 50, 'max_depth': 4, 'random_state': 3, 'bootstrap': False, 'max_features': 'sqrt', 'class_weight': {0: 0.5, 1: 1.0}}

classifiers = [rf1, rf2, rf3]
params = [params_rf1, params_rf2, params_rf3]

# train the models
for clf, param in zip(classifiers, params):
    clf.train(df, param)

After training, we got three different classifiers. We predicted new unseen data using the classifiers independently and suppose that the performance metrics stated that all the three classifiers gave bad result. Therefore, one of the easiest approaches to improve the learning performance is by using ensembled approach.

Here’s the code for the ensembled learning.

# ENSEMBLED LEARNING CODE

for index, model in enumerate(classifiers):
    col_name = 'POSITIVE_PROB_' + str(index)
	  
    # the model's predict method creates a new column showing the probability of the data point belongs to the positive class
    df = model.predict(df)
    df = df.withColumn(col_name, df['POSITIVE_PROB']).drop('POSITIVE_PROB')

As you can see, the above code will create a new column every time after the prediction operation is done. Presumming that we have three models with prediction time pt_0, pt_1, and pt_2 respectively, then the prediction process in the ensembled code will take time approximately pt_0 + pt_1 + pt_2. The rationale is obvious. The predictions are executed sequentially. The second new column will be created after the first new column is created. The third will wait for the second to finish and so forth.

The problem is that each new column is independent from each other. I mean, Spark doesn’t need the value of the previous new column (POSITIVE_PROB_x) to create the next new column (POSITIVE_PROB_x+1). Moreover, it basically doesn’t need other columns to create a certain column. Formally, it doesn’t need the xth, (x+1)th, and (x+2)th columns to create the (x+10)th column. So we can say that each single prediction process as well as the new columns creation can be performed in parallel.

So now, the question is how we can make the above process runs in parallel? Or in other words, how do we optimize the multiple columns computation (from serial to parallel computation)?

The analysis is simple actually. The above code (ENSEMBLED LEARNING CODE) instructs Spark to execute the transformation (represented by withColumn operation) sequentially. Therefore, we need to make it to be executed in parallel. One way to achieve it is by implementing the to-be-parallelized process within a select clause. Here’s the code snippet.

def predict_and_create_new_col(df, index, model):
    col_name = 'POSITIVE_PROB_' + str(index)
	    
    # the model's predict method returns only a single Column object
    return (model.predict(df)).alias(col_name)

df = df.select(*(predict_and_create_new_col(df, index, model) for index, model in enumerate(classifiers)))

Simple, heh? As you can see, basically what we do here is select all the new columns created from the prediction process. One thing to note here is that by implementing this approach, I had to modify the prediction method in the MyRandomForestModel so that it returns a Column object and not a dataframe. I haven’t done further inspection yet but Spark yielded an error related to missing attributes when the prediction method returned a dataframe.

The problem was not done yet. I thought that surely I had to compare the performance of these two approaches. Simply put, I attached a timer to calculate how long each approach takes to complete the process. Surprisingly, both method gave similar numbers (very small difference) after I ran each approach repeatedly. WHY?

Well, I admit that I forgot to inspect the plan used by Spark to execute the process. I didn’t aware of this since I explicitly instructed Spark to do the operation in serial way.

To check the plan, we can simply use a method called explain and Spark will return its parsed, analyzed, optimized, and physical logical plan. I checked the first approach and here’s the result (I only showed you the logical and optimized plan):

# SPARK'S PLAN FOR THE FIRST APPROACH (SIMPLIFIED)

== Parsed Logical Plan ==
'Project [F1#0L, F2#1L, F3#2L, F4#3L, F5#4L, F6#5L, LABEL#6, SCORE_0#71, SCORE_1#84, (_my_predict_udf('F1, 'F2, 'F3, 'F4, 'F5, 'F6)) AS POSITIVE_PROB_2#98]
+- Project [F1#0L, F2#1L, F3#2L, F4#3L, F5#4L, F6#5L, LABEL#6, SCORE_0#71, (_my_predict_udf(F1#0L, F2#1L, F3#2L, F4#3L, F5#4L, F6#5L)) AS POSITIVE_PROB_1#84]
   +- Project [F1#0L, F2#1L, F3#2L, F4#3L, F5#4L, F6#5L, LABEL#6, (_my_predict_udf(F1#0L, F2#1L, F3#2L, F4#3L, F5#4L, F6#5L)) AS POSITIVE_PROB_0#71]

== Optimized Logical Plan ==
Project [F1#0L, F2#1L, F3#2L, F4#3L, F5#4L, F6#5L, LABEL#6, (pythonUDF0#112) AS POSITIVE_PROB_0#71, (pythonUDF1#113) AS POSITIVE_PROB_1#84, (pythonUDF2#114) AS POSITIVE_PROB_2#98]
+- !ArrowEvalPython [_my_predict_udf(F1#0L, F2#1L, F3#2L, F4#3L, F5#4L, F6#5L), _my_predict_udf(F1#0L, F2#1L, F3#2L, F4#3L, F5#4L, F6#5L), _my_predict_udf(F1#0L, F2#1L, F3#2L, F4#3L, F5#4L, F6#5L)], [F1#0L, F2#1L, F3#2L, F4#3L, F5#4L, F6#5L, LABEL#6, pythonUDF0#112, pythonUDF1#113, pythonUDF2#114]

I did the same for the second approach and here’s the result (only logical and optimized plan).

# SPARK'S PLAN FOR THE SECOND APPROACH (SIMPLIFIED)

== Parsed Logical Plan ==
'Project [(_my_predict_udf('F1, 'F2, 'F3, 'F4, 'F5, 'F6)) AS POSITIVE_PROB_0#71, (_my_predict_udf('F1, 'F2, 'F3, 'F4, 'F5, 'F6)) AS POSITIVE_PROB_1#73, (_my_predict_udf('F1, 'F2, 'F3, 'F4, 'F5, 'F6)) AS POSITIVE_PROB_2#75]

== Optimized Logical Plan ==
Project [(pythonUDF0#89) AS POSITIVE_PROB_0#71, (pythonUDF1#90) AS POSITIVE_PROB_1#73, (pythonUDF2#91) AS POSITIVE_PROB_2#75]
+- !ArrowEvalPython [_my_predict_udf(F1#0L, F2#1L, F3#2L, F4#3L, F5#4L, F6#5L), _my_predict_udf(F1#0L, F2#1L, F3#2L, F4#3L, F5#4L, F6#5L), _my_predict_udf(F1#0L, F2#1L, F3#2L, F4#3L, F5#4L, F6#5L)], [pythonUDF0#89, pythonUDF1#90, pythonUDF2#91]

As you can see, the optimized plan for both the first and second approach yields the same result! In other words, even though the first approach seems to be executed in serial way, Spark uses its catalyst optimizer to find the plan which it thinks is the most efficient. Another fact is that what I did on the second approach was building the optimized version of the first approach. LoL.

Well, although I only found out the way Spark builds the optimized plan, I think it was a great achievement since my hyphothesis that the second approach would be executed in parallel was correct. Yeay! If you see the optimized plan above, Spark does the computation to execute the transformation once. It’s denoted by Project keyword. Please take a look at the below note.

The first approach gives the following logical plan:

'Project [F1#0L, F2#1L, F3#2L, F4#3L, F5#4L, F6#5L, LABEL#6, SCORE_0#71, SCORE_1#84, (_my_predict_udf('F1, 'F2, 'F3, 'F4, 'F5, 'F6)) AS POSITIVE_PROB_2#98]
+- Project [F1#0L, F2#1L, F3#2L, F4#3L, F5#4L, F6#5L, LABEL#6, SCORE_0#71, (_my_predict_udf(F1#0L, F2#1L, F3#2L, F4#3L, F5#4L, F6#5L)) AS POSITIVE_PROB_1#84]
   +- Project [F1#0L, F2#1L, F3#2L, F4#3L, F5#4L, F6#5L, LABEL#6, (_my_predict_udf(F1#0L, F2#1L, F3#2L, F4#3L, F5#4L, F6#5L)) AS POSITIVE_PROB_0#71]

It has 3 Projects which means that the computation process to execute the transformation is done 3 times. It's obvious as the for-loop runs 3 times to do the prediction as well as create the new column.

Meanwhile, the second approach gives the following logical plan:

'Project [(_my_predict_udf('F1, 'F2, 'F3, 'F4, 'F5, 'F6)) AS POSITIVE_PROB_0#71, (_my_predict_udf('F1, 'F2, 'F3, 'F4, 'F5, 'F6)) AS POSITIVE_PROB_1#73, (_my_predict_udf('F1, 'F2, 'F3, 'F4, 'F5, 'F6)) AS POSITIVE_PROB_2#75]

It only has 1 Project which means that the computation process to execute the transformation is done once. As the result, the computation is done in parallel.

In addition, the optimized plan in the 1st approach is the same as the logical plan in the 2nd approach. This means that the 2nd approach shows one of the possible ways used by Spark to create a more efficient operation. YEAY!!

The conclusion? Well, I’ll have to search for different efficient methods.

Let me know if you have suggestions.