Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add TypedTransformer and TypedEstimator, towards a type-safe Spark ML API #206

Merged
merged 12 commits into from
Dec 7, 2017
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ It consists of the following modules:

* `dataset` for more strongly typed `Dataset`s (supports Spark 2.2.x)
* `cats` for using Spark with [cats](https://github.com/typelevel/cats) (supports Cats 1.0.0-MF)
* `ml` for a more strongly typed use of Spark ML based on `dataset`
* `ml` for a more strongly typed Spark ML API based on `dataset`

The Frameless project and contributors support the
[Typelevel](http://typelevel.org/) [Code of Conduct](http://typelevel.org/conduct.html) and want all its
Expand All @@ -24,7 +24,7 @@ associated channels (e.g. GitHub, Gitter) to be a safe and friendly environment
* [Injection: Creating Custom Encoders](http://typelevel.org/frameless/Injection.html)
* [Job\[A\]](http://typelevel.org/frameless/Job.html)
* [Using Cats with RDDs](http://typelevel.org/frameless/Cats.html)
* [TypedDataset support for Spark ML](http://typelevel.org/frameless/TypedML.html)
* [Typed Spark ML](http://typelevel.org/frameless/TypedML.html)
* [Proof of Concept: TypedDataFrame](http://typelevel.org/frameless/TypedDataFrame.html)

## Why?
Expand Down
13 changes: 13 additions & 0 deletions dataset/src/test/scala/frameless/XN.scala
Original file line number Diff line number Diff line change
Expand Up @@ -64,3 +64,16 @@ object X5 {
implicit def ordering[A: Ordering, B: Ordering, C: Ordering, D: Ordering, E: Ordering]: Ordering[X5[A, B, C, D, E]] =
Ordering.Tuple5[A, B, C, D, E].on(x => (x.a, x.b, x.c, x.d, x.e))
}

case class X6[A, B, C, D, E, F](a: A, b: B, c: C, d: D, e: E, f: F)

object X6 {
implicit def arbitrary[A: Arbitrary, B: Arbitrary, C: Arbitrary, D: Arbitrary, E: Arbitrary, F: Arbitrary]: Arbitrary[X6[A, B, C, D, E, F]] =
Arbitrary(Arbitrary.arbTuple6[A, B, C, D, E, F].arbitrary.map((X6.apply[A, B, C, D, E, F] _).tupled))

implicit def cogen[A, B, C, D, E, F](implicit A: Cogen[A], B: Cogen[B], C: Cogen[C], D: Cogen[D], E: Cogen[E], F: Cogen[F]): Cogen[X6[A, B, C, D, E, F]] =
Cogen.tuple6(A, B, C, D, E, F).contramap(x => (x.a, x.b, x.c, x.d, x.e, x.f))

implicit def ordering[A: Ordering, B: Ordering, C: Ordering, D: Ordering, E: Ordering, F: Ordering]: Ordering[X6[A, B, C, D, E, F]] =
Ordering.Tuple6[A, B, C, D, E, F].on(x => (x.a, x.b, x.c, x.d, x.e, x.f))
}
279 changes: 271 additions & 8 deletions docs/src/main/tut/TypedML.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,22 @@
# TypedDataset support for Spark ML
# Typed Spark ML

The goal of the `frameless-ml` module is to be able to use Spark ML with `TypedDataset` and
to eventually provide a more strongly typed ML API for Spark. Currently, this module is at its very beginning and only
provides `TypedEncoder` instances for Spark ML's linear algebra data types.
The `frameless-ml` module provides a strongly typed Spark ML API leveraging `TypedDataset`s. It introduces `TypedTransformer`s
and `TypedEstimator`s, the type-safe equivalents of Spark ML's `Transformer` and `Estimator`.

A `TypedEstimator` fits models to data, i.e trains a ML model based on an input `TypedDataset`.
A `TypedTransformer` transforms one `TypedDataset` into another, usually by appending column(s) to it.

By calling the `fit` method of a `TypedEstimator`, the `TypedEstimator` will train a ML model using the `TypedDataset`
passed as input (representing the training data) and will return a `TypedTransformer` that represents the trained model.
This `TypedTransformer`can then be used to make predictions on an input `TypedDataset` (representing the test data)
using the `transform` method that will return a new `TypedDataset` with appended prediction column(s).

Both `TypedEstimator` and `TypedTransformer` check at compile-time the correctness of their inputs field names and types,
contrary to Spark ML API which only deals with DataFrames (the data structure with the lowest level of type-safety in Spark).

`frameless-ml` adds type-safety to Spark ML API but stays very close to it in terms of abstractions and API calls, so
please check [Spark ML documentation](https://spark.apache.org/docs/2.2.0/ml-pipeline.html) for more details
on `Transformer`s and `Estimator`s.

```tut:invisible
import org.apache.spark.{SparkConf, SparkContext}
Expand All @@ -15,17 +29,266 @@ spark.sparkContext.setLogLevel("WARN")

import spark.implicits._
```

## Example 1: predict a continuous value using a `TypedRandomForestRegressor`

In this example, we want to predict the sale price of a house depending on its square footage and the fact that the house
has a garden or not. We will use a `TypedRandomForestRegressor`.

### Training

As with the Spark ML API, we use a `TypedVectorAssembler` (the type-safe equivalent of `VectorAssembler`)
to compute feature vectors:

```tut:silent
import frameless._
import frameless.syntax._
import frameless.ml._
import frameless.ml.feature._
import frameless.ml.regression._
import org.apache.spark.ml.linalg.Vector
```

```tut:book
case class HouseData(squareFeet: Double, hasGarden: Boolean, price: Double)

val trainingData = TypedDataset.create(Seq(
HouseData(20, false, 100000),
HouseData(50, false, 200000),
HouseData(50, true, 250000),
HouseData(100, true, 500000)
))

case class Features(squareFeet: Double, hasGarden: Boolean)
val assembler = TypedVectorAssembler[Features]

case class HouseDataWithFeatures(squareFeet: Double, hasGarden: Boolean, price: Double, features: Vector)
val trainingDataWithFeatures = assembler.transform(trainingData).as[HouseDataWithFeatures]
```

In the above code snippet, `.as[HouseDataWithFeatures]` is a `TypedDataset`'s type-safe cast
(see [TypedDataset: Feature Overview](https://typelevel.org/frameless/FeatureOverview.html)):

```tut:silent
case class WrongHouseFeatures(
squareFeet: Double,
hasGarden: Int, // hasGarden has wrong type
price: Double,
features: Vector
)
```

```tut:book:fail
assembler.transform(trainingData).as[WrongHouseFeatures]
```

Moreover, `TypedVectorAssembler[Features]` will compile only if `Features` contains exclusively fields of type Numeric or Boolean:

```tut:silent
case class WrongFeatures(squareFeet: Double, hasGarden: Boolean, city: String)
```

```tut:book:fail
TypedVectorAssembler[WrongFeatures]
```

The subsequent call `assembler.transform(trainingData)` compiles only if `trainingData` contains all fields (names and types)
of `Features`:

```tut:book
case class WrongHouseData(squareFeet: Double, price: Double) // hasGarden is missing
val wrongTrainingData = TypedDataset.create(Seq(WrongHouseData(20, 100000)))
```

```tut:book:fail
assembler.transform(wrongTrainingData)
```

Then, we train the model. To train a Random Forest, one needs to feed it with features (what we predict from) and
with a label (what we predict). In our example, `price` is the label, `features` are the features:

```tut:book
case class RFInputs(price: Double, features: Vector)
val rf = TypedRandomForestRegressor[RFInputs]

val model = rf.fit(trainingDataWithFeatures).run()
```

`TypedRandomForestRegressor[RFInputs]` compiles only if `RFInputs`
contains only one field of type Double (the label) and one field of type Vector (the features):

```tut:silent
case class WrongRFInputs(labelOfWrongType: String, features: Vector)
```

```tut:book:fail
TypedRandomForestRegressor[WrongRFInputs]
```

The subsequent `rf.fit(trainingDataWithFeatures)` call compiles only if `trainingDataWithFeatures` contains the same fields
(names and types) as RFInputs.

```tut:book
val wrongTrainingDataWithFeatures = TypedDataset.create(Seq(HouseData(20, false, 100000))) // features are missing
```

```tut:book:fail
rf.fit(wrongTrainingDataWithFeatures)
```

### Prediction

We now want to predict `price` for `testData` using the previously trained model. Like the Spark ML API,
`testData` has a default value for `price` (`0` in our case) that will be ignored at prediction time. We reuse
our `assembler` to compute the feature vector of `testData`.

```tut:book
val testData = TypedDataset.create(Seq(HouseData(70, true, 0)))
val testDataWithFeatures = assembler.transform(testData).as[HouseDataWithFeatures]

case class HousePricePrediction(
squareFeet: Double,
hasGarden: Boolean,
price: Double,
features: Vector,
predictedPrice: Double
)
val predictions = model.transform(testDataWithFeatures).as[HousePricePrediction]

predictions.select(predictions.col('predictedPrice)).collect.run()
```

`model.transform(testDataWithFeatures)` will only compile if `testDataWithFeatures` contains a field `price` of type Double
and a field `features` of type Vector:

```tut:book:fail
model.transform(testData)
```

## Example 2: predict a categorical value using a `TypedRandomForestClassifier`

In this example, we want to predict in which city a house is located depending on its price and its square footage. We use a
`TypedRandomForestClassifier`.

### Training

As with the Spark ML API, we use a `TypedVectorAssembler` to compute feature vectors and a `TypedStringIndexer`
to index `city` values in order to be able to pass them to a `TypedRandomForestClassifier`
(which only accepts Double values as label):

```tut:silent
import frameless.ml.classification._
```

```tut:book
case class HouseData(squareFeet: Double, city: String, price: Double)

val trainingData = TypedDataset.create(Seq(
HouseData(100, "lyon", 100000),
HouseData(200, "lyon", 200000),
HouseData(100, "san francisco", 500000),
HouseData(150, "san francisco", 900000)
))

case class Features(price: Double, squareFeet: Double)
val vectorAssembler = TypedVectorAssembler[Features]

case class HouseDataWithFeatures(squareFeet: Double, city: String, price: Double, features: Vector)
val dataWithFeatures = vectorAssembler.transform(trainingData).as[HouseDataWithFeatures]

case class StringIndexerInput(city: String)
val indexer = TypedStringIndexer[StringIndexerInput]
val indexerModel = indexer.fit(dataWithFeatures).run()

case class HouseDataWithFeaturesAndIndex(
squareFeet: Double,
city: String,
price: Double,
features: Vector,
cityIndexed: Double
)
val indexedData = indexerModel.transform(dataWithFeatures).as[HouseDataWithFeaturesAndIndex]
```

Then, we train the model:

```tut:book
case class RFInputs(cityIndexed: Double, features: Vector)
val rf = TypedRandomForestClassifier[RFInputs]

val model = rf.fit(indexedData).run()
```

### Prediction

We now want to predict `city` for `testData` using the previously trained model. Like the Spark ML API,
`testData` has a default value for `city` (empty string in our case) that will be ignored at prediction time. We reuse
our `vectorAssembler` to compute the feature vector of `testData` and our `indexerModel` to index `city`.

```tut:book
val testData = TypedDataset.create(Seq(HouseData(120, "", 800000)))

val testDataWithFeatures = vectorAssembler.transform(testData).as[HouseDataWithFeatures]
val indexedTestData = indexerModel.transform(testDataWithFeatures).as[HouseDataWithFeaturesAndIndex]

case class HouseCityPredictionInputs(features: Vector, cityIndexed: Double)
val testInput = indexedTestData.project[HouseCityPredictionInputs]

case class HouseCityPredictionIndexed(
features: Vector,
cityIndexed: Double,
rawPrediction: Vector,
probability: Vector,
predictedCityIndexed: Double
)
val indexedPredictions = model.transform(testInput).as[HouseCityPredictionIndexed]
```

Then, we use a `TypedIndexToString` to get back a String value from `predictedCityIndexed`. `TypedIndexToString` takes
as input the label array computed by our previous `indexerModel`:

```tut:book
case class IndexToStringInput(predictedCityIndexed: Double)
val indexToString = TypedIndexToString[IndexToStringInput](indexerModel.transformer.labels)

case class HouseCityPrediction(
features: Vector,
cityIndexed: Double,
rawPrediction: Vector,
probability: Vector,
predictedCityIndexed: Double,
predictedCity: String
)
val predictions = indexToString.transform(indexedPredictions).as[HouseCityPrediction]

predictions.select(predictions.col('predictedCity)).collect.run()
```

## List of currently implemented `TypedEstimator`s

* `TypedRandomForestClassifier`
* `TypedRandomForestRegressor`
* ... [your contribution here](https://github.com/typelevel/frameless/issues/215) ... :)

## List of currently implemented `TypedTransformer`s

* `TypedIndexToString`
* `TypedStringIndexer`
* `TypedVectorAssembler`
* ... [your contribution here](https://github.com/typelevel/frameless/issues/215) ... :)

## Using Vector and Matrix with TypedDataset
## Using Vector and Matrix with `TypedDataset`

`frameless-ml` provides `TypedEncoder` instances for `org.apache.spark.ml.linalg.Vector`
and `org.apache.spark.ml.linalg.Matrix`:

```tut:book
```tut:silent
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's good to include here the imports a user needs to add. What do you think?

Copy link
Contributor Author

@atamborrino atamborrino Dec 4, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tut:silent will indeed show the imports in the documentation (unlike tut:invisible), it's just that it will not add the interpreted lines:

// import frameless.ml._ 
// import org.apache.spark.ml.linalg._

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry, yes you are right

import frameless._
import frameless.ml._
import frameless.TypedDataset
import org.apache.spark.ml.linalg._
```

```tut:book
val vector = Vectors.dense(1, 2, 3)
val vectorDs = TypedDataset.create(Seq("label" -> vector))

Expand All @@ -39,4 +302,4 @@ from `org.apache.spark.sql.types.UserDefinedType[A]` to `TypedEncoder[A]` define

```tut:invisible
spark.stop()
```
```
27 changes: 27 additions & 0 deletions ml/src/main/scala/frameless/ml/TypedEstimator.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package frameless
package ml

import frameless.ops.SmartProject
import org.apache.spark.ml.{Estimator, Model}

/**
* A TypedEstimator fits models to data.
*/
trait TypedEstimator[Inputs, Outputs, M <: Model[M]] {
val estimator: Estimator[M]

def fit[T, F[_]](ds: TypedDataset[T])(
implicit
smartProject: SmartProject[T, Inputs],
F: SparkDelay[F]
): F[AppendTransformer[Inputs, Outputs, M]] = {
implicit val sparkSession = ds.dataset.sparkSession
F.delay {
val inputDs = smartProject.apply(ds)
val model = estimator.fit(inputDs.dataset)
new AppendTransformer[Inputs, Outputs, M] {
val transformer: M = model
}
}
}
}
Loading