Apache Spark [PART 30]: Machine Learning Model Re-train Mechanism via YARN Cluster Mode

3 minute read

Published:

Deploying a machine learning (ML) model to a production system is not the end of the whole AI engineering process. The deployed model might be obsolete over a period of time.

Re-training the model comes to rescue. The question is simply, how is the proper mechanism to do that?

Suppose that the development uses Spark as the big data framework and the application is deployed via YARN cluster mode. The created ML model is stored in HDFS.

In addition, let’s presume that we have two jobs that will be submitted to the cluster, those are main.py (main module) and trainer.py (module for re-training the model).

My team and I encountered such a task recently. Here’s a brief overview of how we managed to accomplish it.

For each job, its application ID is stored into a table in a database. Let’s call the table with app_id_repository. Use the following code to retrieve the application ID.

spark = SparkSession.builder.appName(‘job_name’).getOrCreate()
app_id = spark.sparkContext.applicationId

In trainer job, the main module’s ID is fetched from the database and written to a file. The file is then stored into HDFS. This way was used simply to retrieve the main module’s ID for the purpose of killing applications running on YARN.

For the sake of clarity, here’s the code used to accomplish the above task.

main_module_id = <call_fetch_method_of_the_table>

temp_file_name = 'main_module_id.txt'
with open(temp_file_name, 'w') as f:
    f.write(main_module_id)

destination_path = os.path.join(hdfs_util.get_home_directory(), temp_file_name)
hdfs_util.upload(temp_file_name, destination_path)

Notice that there’s a util module called hdfs_util used to create a connection to HDFS, upload & download files, and other supporting tasks.

Next, we just need to create the script file to execute the re-train mechanism, such as the following:

#!/usr/bin/env bash

# exit when any command fails
set -e

# keep track of the last executed command
trap 'last_command=$current_command; current_command=$BASH_COMMAND' DEBUG

# echo an error message before exiting
trap 'echo "\"${last_command}\" command filed with exit code $?."' EXIT

# run trainer
./run.sh --master yarn \
    --deploy-mode cluster \
    jobs/trainer.py

# kill current main job
MAIN_MODULE_ID_FILE_NAME="main_module_id.txt"
yarn application -kill $(hdfs dfs -cat $MAIN_MODULE_ID_FILE_NAME)
hdfs dfs -rm $MAIN_MODULE_ID_FILE_NAME

# rerun main module
./run.sh --master yarn \
    --deploy-mode cluster \
    --conf spark.yarn.submit.waitAppCompletion=false \
    jobs/main.py

Basically, the above script executes the trainer job to retrain the model. After the retrain process completes, the currently running main module is killed and the file containing the main module’s ID is removed from HDFS by the following commands:

# kill the currently running main module
yarn application -kill $(hdfs dfs -cat $MAIN_MODULE_ID_FILE_NAME)

# remove the file
hdfs dfs -rm $MAIN_MODULE_ID_FILE_NAME

After the current main module is killed, the main module is re-executed.

In addition, you might notice about run.sh in the above script. Simply, run.sh is a script containing the spark-submit command along with its additional parameters. Please refer to my previous article (Apache Spark [PART 29]: Multiple Extra Java Options for Spark Submit Config Parameter) for the example of run.sh.