# The Investigation of Skewness & Kurtosis in Spark (Scala)

** Published:**

Applying central moment functions in Spark might be tricky, especially for skewness and kurtosis.

A few weeks ago I was experimenting with these functions. Fortunately, the IDE shows all the relevant functions which might fit my needs. For instance, the IDE suggests `var_pop`

and `var_samp`

for variance calculation. Such suggestions might make us aware of our initial goal of leveraging variance functions (at least for me). The same goes for standard deviation which has `stddev_pop`

and `stddev_samp`

.

However, seems that such kind of feature isn’t available for skewness & kurtosis.

The available function is just `skewness`

for skewness and `kurtosis`

for kurtosis. The problem is that I don’t know whether such functions are for population (parameters) or sample (statistics) calculation.

I decided, once again, to take a dive into the Spark’s code itself.

## Locating the Appropriate Modules

So I started from the most obvious location. Since all these central moment functions are included as Spark SQL functions, we basically summon them by the following.

```
import org.apache.spark.sql.functions
val mean = df.agg(avg(columnName)).head.get(0).asInstanceOf[Double]
val stddev_pop = df.agg(stddev_pop(columnName)).head.get(0).asInstanceOf[Double]
val stddev_samp = df.agg(stddev_samp(columnName)).head.get(0).asInstanceOf[Double]
val var_pop = df.agg(var_pop(columnName)).head.get(0).asInstanceOf[Double]
val var_samp = df.agg(var_samp(columnName)).head.get(0).asInstanceOf[Double]
```

Therefore, we can start from `functions.scala`

module which is located here.

However, there is no any implementation for each function.

Looking further at the imported modules, seems that there might be a chance that the implementation are included there. The followings might be the best possible ones.

```
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate._
```

Long story short, after searching for the correct location of `sql.catalyst.expressions`

, I managed to find it here.

There is no any implementation for the central moment functions there. However, recall that there is also an import for `sql.catalyst.expressions.aggregate._`

there. And yes, there’s indeed a package called `aggregate`

which stores all the statistical modules.

Since our need is specified for central moments, just click on the `CentralMomentAgg.scala`

module.

From there, the investigation began.

## The Investigation

As you might have seen, this module provides the central moment functions, such as standard deviation, variance, skewness, and kurtosis. As expected, both of the standard deviation and variance have two separated classes, one is for parameter while the other is for statistic.

Back to our original question, how about skewness and kurtosis?

Well, as you can see, the class names don’t inform us whether each of them acts as a parameter or statistic. Just take a deeper look at the applied formula.

Starting from skewness, recall that the population skewness formula is given by the following.

```
skewness_pop = E [((X - mu_pop) / stddev_pop) ^ 3]
X: the random variable
mu_pop: population mean
stddev_pop: population standard deviation
```

The implemented formula in `CentralMomentAgg.scala`

is given as the following.

```
sqrt(n) * m3 / sqrt(m2 * m2 * m2)
```

where if `m`

refers to `(X - mu)`

, then `m2`

refers to `(X - mu)^2`

and `m3`

refers to `(X - mu)^3`

.

If you take a closer look, you might find that after replacing `stddev_pop`

in `skewness_pop`

with its formula and simplifying the `skewness_pop`

equation, the implemented formula is the simplified version of `skewness_pop`

.

To support the argument, recall that the sample skewness formula is given by the following.

```
skewness_samp = SUM(i=1 to n) [(xi - mu_samp) ^ 3] / n
--------------------------------------
stddev_samp ^ 3
```

After replacing `stddev_samp`

with the appropriate value (recall it has `n-1`

factor) and simplifying the above equation, we don’t end up with the implemented formula.

Therefore, we may conclude that the implemented skewness in `CentralMomentAgg.scala`

refers to the population skewness.

I performed the same step for kurtosis.

Recall that the population excess kurtosis is given as the following.

```
kurtosis_pop = E [((X - mu_pop) / stddev_pop) ^ 4]
excess_kurtosis_pop = kurtosis_pop - 3.0
X: the random variable
mu_pop: population_mean
stddev_pop: population standard deviation
```

The implemented formula in `CentralMomentAgg.scala`

is given as the following.

```
n * m4 / (m2 * m2) - 3.0
```

where if `m`

refers to `(X - mu)`

, then `m2`

refers to `(X - mu)^2`

and `m4`

refers to `(X - mu)^4`

.

If you take a closer look, you might find that after replacing `stddev_pop`

in `kurtosis_pop`

with its formula and simplifying the `kurtosis_pop`

equation, the implemented formula is the simplified version of `excess_kurtosis_pop`

.

To support the argument, recall that the sample kurtosis formula is given by the following.

```
kurtosis_samp = SUM(i=1 to n) [(xi - mu_samp) ^ 4] / n
--------------------------------------
stddev_samp ^ 4
excess_kurtosis_samp = kurtosis_samp - 3.0
```

After replacing `stddev_samp`

with the appropriate value (recall it has `n-1`

factor) and simplifying the above equation, we don’t end up with the implemented formula.

Therefore, we may conclude that the implemented kurtosis in `CentralMomentAgg.scala`

refers to the population kurtosis, specifically the population excess kurtosis.

## The Implementation in Spark (Scala)

Knowing that both of skewness and kurtosis are for population, I decided to implement the sample version for both of them in Spark (Scala).

With this, I presume that `calculateMean`

and `calculateStdDevSamp`

are already implemented.

### Sample Skewness

```
def calculateSkewnessSamp(df: DataFrame, column: String): Double = {
val observedDf = df.filter(!F.isnull(F.col(column)) && !F.isnan(F.col(column)))
val observationsMean = calculateMean(observedDf, column)
val observationsStdDev = calculateStdDevSamp(observedDf, column)
val totalObservations = observedDf.count()
val thirdCentralSampleMoment =
observedDf
.agg(F.sum(F.pow(F.col(column) - observationsMean, 3) / totalObservations))
.head
.get(0)
.asInstanceOf[Double]
val thirdPowerOfSampleStdDev = scala.math.pow(observationsStdDev, 3)
thirdCentralSampleMoment / thirdPowerOfSampleStdDev
}
```

### Sample Excess Kurtosis

```
def calculateExcessKurtosisSamp(df: DataFrame, column: String): Double = {
val observedDf = df.filter(!F.isnull(F.col(column)) && !F.isnan(F.col(column)))
val observationsMean = calculateMean(observedDf, column)
val observationsStdDev = calculateStdDevSamp(observedDf, column)
val totalObservations = observedDf.count()
val fourthCentralSampleMoment =
observedDf
.agg(F.sum(F.pow(F.col(column) - observationsMean, 4) / totalObservations))
.head
.get(0)
.asInstanceOf[Double]
val fourthPowerOfSampleStdDev = scala.math.pow(observationsStdDev, 4)
(fourthCentralSampleMoment / fourthPowerOfSampleStdDev) - 3.0
}
```