Airflow Feature Improvement: Spark Driver Status Polling Support for YARN, Mesos & K8S

According to the code base, the driver status tracking feature is only implemented for standalone cluster manager. However, based on this reference, we could also poll the driver status for mesos and kubernetes (cluster deploy mode). Additionally, such a feature is also possible for YARN.

I decided to raise this future improvement on JIRA. Here’s how the probable improvement.

There are two primary modules that need to be considered.

The first one is originally coded like this. The probable modificiation would be like the following.

def _resolve_should_track_driver_status(self):
        Determines whether or not this hook should poll the spark driver status through
        subsequent spark-submit status requests after the initial spark-submit request
        :return: if the driver status should be tracked
        if self._connection['deploy_mode'] == 'cluster':
              cluster_manager_schemes = ('spark://', 'mesos://', 'k8s://https://', 'yarn')
              if self._connection['master'].startswith(cluster_manager_schemes):
                    return True

The second one is originally coded like this. The possible modification would be like the following.

def _build_track_driver_status_command(self):
        # The driver id so we can poll for its status
        if not self._driver_id:
            raise AirflowException(
                "Invalid status: attempted to poll driver " +
                "status but no driver id is known. Giving up.")

        cluster_manager_schemes = ("spark://", "mesos://", "k8s://https://")
        if self._connection['master'].startswith(cluster_manager_schemes): 
            # standalone, mesos, kubernetes
            connection_cmd = self._get_spark_binary_path()
            connection_cmd += ["--master", self._connection['master']]
            connection_cmd += ["--status", self._driver_id]
            # yarn
            connection_cmd = ["yarn application -status"]
            connection_cmd += [self._driver_id]

        self.log.debug("Poll driver status cmd: %s", connection_cmd)

        return connection_cmd

Have any thought or different opinion? Really appreciate it.

Thank you for reading.