When building a pipeline, it is often the case that the top-level dataframe is complex and changing, but that the work focus on transforming *columns* of this dataframe. Karps provides a way to express and compose complex functions on columns without having to run computations. Behind the scenes, Karps is able to take these functions and translate them into sequences of queries without having to deal with the details of collecting, joining and broadcasting data.

As an example, we are going to build functions that take a numerical dataset and that produce a *centered* version (the mean is substracted) and a *scaled* version (the variance is scaled to 1). Of course, such basic
operations are already built into Spark, but it is instructive to see how one would implement similar
functions in practice.

We will see that thanks to laziness and determinism, Karps is able to reuse some computations, and provide a high-level, lazy API.

In [1]:

```
:load KarpsDisplays KarpsDagDisplay
:extension DeriveGeneric
:extension FlexibleContexts
:extension OverloadedStrings
:extension GeneralizedNewtypeDeriving
:extension FlexibleInstances
:extension MultiParamTypeClasses
```

In [2]:

```
import Spark.Core.Dataset
import Spark.Core.Context
import Spark.Core.Column
import Spark.Core.ColumnFunctions
import Spark.Core.Functions
import Spark.Core.Row
import Spark.Core.Types
import Spark.Core.Try
import qualified Data.Vector as V
import qualified Data.Text as T
import Data.Text(Text)
import GHC.Generics
```

We will start with an extremely simple dataset:

In [3]:

```
let ds = dataset [-1, 1] :: Dataset Int
-- A column of data containing integers
let myData = asCol ds
```

Here is a first function that computes the mean of the data in a column. Giving some names to the elements is not necessary but helps when looking at the DAG of computations.

Note that we can use all the usual operators (+, /, etc.) even if the computation is lazy.

Also, note that all the operations are strongly typed: unlike SQL, the casting is almost always explicit since it can lead to loss of precision (or worse) otherwise.

In [4]:

```
myMean :: Column ref Int -> LocalData Double
myMean col =
let
cnt = asDouble (countCol col) @@ "mean_count"
s = asDouble (sumCol col) @@ "mean_sum"
in (s / cnt)
```

Now, if we apply it to our data, the result is rather anti-climactic: we just get a `LocalData`

out:

In [5]:

```
myMean myData
```

Let's build on this to make the centering function, which simply substracts the mean, and the scaling function, which builds on the other two:

Note that again, we need to cast the column, it is not going to be done for us.

Note: due a Haskell limitation, the `-`

operation is replaced by a `.-`

. This is because Haskell does not allow to mix different types together (here a column and an observable). This restriction is going to be lifted in the future.

In [6]:

```
myCenter :: Column ref Int -> Column ref Double
myCenter col =
let m = (myMean col) @@ "center_mean"
in (asDoubleCol col) .- m
myScaler :: Column ref Int -> Column ref Double
myScaler col =
let cnt = asDouble (countCol col) @@ "count"
centered = myCenter col
stdDev = sumCol (centered * centered) / cnt @@ "std_dev"
in centered ./ stdDev
```

What does the transform look like if we apply it? Let's run `showGraph`

on our simple dataset:

In [7]:

```
-- make a new scaled column:
let scaled = myScaler myData
-- pack it into a dataset to visualize it:
let out = pack1 scaled
showGraph out
```

This graph is pretty complicated, and you should click around to see what each node corresponds to. A couple of points are noteworthy:

Karps handles automatically and seemlessly the broadcasting and the reduction of the variables. In fact, Karps can broadcast pretty much anything that is understood by Spark dataframes.

Karps tries to reuse computations as much as possible: even if we did not make any attempt for it, the count of the dataset is reused between the calculation of the mean and of the variance. This is only possible because of laziness.

thanks to naming, even if the functions happen to be nested, we can still quickly relate one operator to the function that generated it.

Now, let's execute all of that:

In [8]:

```
conf = defaultConf {
confEndPoint = "http://10.0.2.2",
confRequestedSessionName = "col_ops6" }
createSparkSessionDef conf
```

In [9]:

```
exec1Def (collect scaled)
```

As a preview of the next chapter, here is the function to display the RDDs generated by Spark when running this command.

Each element comes from the graph before. You can see which ones are missing (they have been optimized away by Spark). When you click on a box, you can see the sequence of RDDs that was generated in the process.

In [10]:

```
displayRDD "0"
```

In [ ]:

```
```