Skip to content

Commit

Permalink
Merge pull request #206 from atamborrino/typedML
Browse files Browse the repository at this point in the history
Add TypedTransformer and TypedEstimator, towards a type-safe Spark ML API
  • Loading branch information
OlivierBlanvillain authored Dec 7, 2017
2 parents 88b20c7 + 905deeb commit 1957664
Show file tree
Hide file tree
Showing 22 changed files with 1,173 additions and 9 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ It consists of the following modules:

* `dataset` for more strongly typed `Dataset`s (supports Spark 2.2.x)
* `cats` for using Spark with [cats](https://github.com/typelevel/cats) (supports Cats 1.0.0-MF)
* `ml` for a more strongly typed use of Spark ML based on `dataset`
* `ml` for a more strongly typed Spark ML API based on `dataset`

The Frameless project and contributors support the
[Typelevel](http://typelevel.org/) [Code of Conduct](http://typelevel.org/conduct.html) and want all its
Expand All @@ -24,7 +24,7 @@ associated channels (e.g. GitHub, Gitter) to be a safe and friendly environment
* [Injection: Creating Custom Encoders](http://typelevel.org/frameless/Injection.html)
* [Job\[A\]](http://typelevel.org/frameless/Job.html)
* [Using Cats with RDDs](http://typelevel.org/frameless/Cats.html)
* [TypedDataset support for Spark ML](http://typelevel.org/frameless/TypedML.html)
* [Typed Spark ML](http://typelevel.org/frameless/TypedML.html)
* [Proof of Concept: TypedDataFrame](http://typelevel.org/frameless/TypedDataFrame.html)

## Why?
Expand Down
13 changes: 13 additions & 0 deletions dataset/src/test/scala/frameless/XN.scala
Original file line number Diff line number Diff line change
Expand Up @@ -64,3 +64,16 @@ object X5 {
implicit def ordering[A: Ordering, B: Ordering, C: Ordering, D: Ordering, E: Ordering]: Ordering[X5[A, B, C, D, E]] =
Ordering.Tuple5[A, B, C, D, E].on(x => (x.a, x.b, x.c, x.d, x.e))
}

case class X6[A, B, C, D, E, F](a: A, b: B, c: C, d: D, e: E, f: F)

object X6 {
implicit def arbitrary[A: Arbitrary, B: Arbitrary, C: Arbitrary, D: Arbitrary, E: Arbitrary, F: Arbitrary]: Arbitrary[X6[A, B, C, D, E, F]] =
Arbitrary(Arbitrary.arbTuple6[A, B, C, D, E, F].arbitrary.map((X6.apply[A, B, C, D, E, F] _).tupled))

implicit def cogen[A, B, C, D, E, F](implicit A: Cogen[A], B: Cogen[B], C: Cogen[C], D: Cogen[D], E: Cogen[E], F: Cogen[F]): Cogen[X6[A, B, C, D, E, F]] =
Cogen.tuple6(A, B, C, D, E, F).contramap(x => (x.a, x.b, x.c, x.d, x.e, x.f))

implicit def ordering[A: Ordering, B: Ordering, C: Ordering, D: Ordering, E: Ordering, F: Ordering]: Ordering[X6[A, B, C, D, E, F]] =
Ordering.Tuple6[A, B, C, D, E, F].on(x => (x.a, x.b, x.c, x.d, x.e, x.f))
}
277 changes: 270 additions & 7 deletions docs/src/main/tut/TypedML.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,22 @@
# TypedDataset support for Spark ML
# Typed Spark ML

The goal of the `frameless-ml` module is to be able to use Spark ML with `TypedDataset` and
to eventually provide a more strongly typed ML API for Spark. Currently, this module is at its very beginning and only
provides `TypedEncoder` instances for Spark ML's linear algebra data types.
The `frameless-ml` module provides a strongly typed Spark ML API leveraging `TypedDataset`s. It introduces `TypedTransformer`s
and `TypedEstimator`s, the type-safe equivalents of Spark ML's `Transformer` and `Estimator`.

A `TypedEstimator` fits models to data, i.e trains a ML model based on an input `TypedDataset`.
A `TypedTransformer` transforms one `TypedDataset` into another, usually by appending column(s) to it.

By calling the `fit` method of a `TypedEstimator`, the `TypedEstimator` will train a ML model using the `TypedDataset`
passed as input (representing the training data) and will return a `TypedTransformer` that represents the trained model.
This `TypedTransformer`can then be used to make predictions on an input `TypedDataset` (representing the test data)
using the `transform` method that will return a new `TypedDataset` with appended prediction column(s).

Both `TypedEstimator` and `TypedTransformer` check at compile-time the correctness of their inputs field names and types,
contrary to Spark ML API which only deals with DataFrames (the data structure with the lowest level of type-safety in Spark).

`frameless-ml` adds type-safety to Spark ML API but stays very close to it in terms of abstractions and API calls, so
please check [Spark ML documentation](https://spark.apache.org/docs/2.2.0/ml-pipeline.html) for more details
on `Transformer`s and `Estimator`s.

```tut:invisible
import org.apache.spark.{SparkConf, SparkContext}
Expand All @@ -14,17 +28,266 @@ spark.sparkContext.setLogLevel("WARN")
import spark.implicits._
```

## Example 1: predict a continuous value using a `TypedRandomForestRegressor`

In this example, we want to predict the sale price of a house depending on its square footage and the fact that the house
has a garden or not. We will use a `TypedRandomForestRegressor`.

### Training

As with the Spark ML API, we use a `TypedVectorAssembler` (the type-safe equivalent of `VectorAssembler`)
to compute feature vectors:

```tut:silent
import frameless._
import frameless.syntax._
import frameless.ml._
import frameless.ml.feature._
import frameless.ml.regression._
import org.apache.spark.ml.linalg.Vector
```

```tut:book
case class HouseData(squareFeet: Double, hasGarden: Boolean, price: Double)
val trainingData = TypedDataset.create(Seq(
HouseData(20, false, 100000),
HouseData(50, false, 200000),
HouseData(50, true, 250000),
HouseData(100, true, 500000)
))
case class Features(squareFeet: Double, hasGarden: Boolean)
val assembler = TypedVectorAssembler[Features]
case class HouseDataWithFeatures(squareFeet: Double, hasGarden: Boolean, price: Double, features: Vector)
val trainingDataWithFeatures = assembler.transform(trainingData).as[HouseDataWithFeatures]
```

In the above code snippet, `.as[HouseDataWithFeatures]` is a `TypedDataset`'s type-safe cast
(see [TypedDataset: Feature Overview](https://typelevel.org/frameless/FeatureOverview.html)):

```tut:silent
case class WrongHouseFeatures(
squareFeet: Double,
hasGarden: Int, // hasGarden has wrong type
price: Double,
features: Vector
)
```

```tut:book:fail
assembler.transform(trainingData).as[WrongHouseFeatures]
```

Moreover, `TypedVectorAssembler[Features]` will compile only if `Features` contains exclusively fields of type Numeric or Boolean:

```tut:silent
case class WrongFeatures(squareFeet: Double, hasGarden: Boolean, city: String)
```

```tut:book:fail
TypedVectorAssembler[WrongFeatures]
```

The subsequent call `assembler.transform(trainingData)` compiles only if `trainingData` contains all fields (names and types)
of `Features`:

```tut:book
case class WrongHouseData(squareFeet: Double, price: Double) // hasGarden is missing
val wrongTrainingData = TypedDataset.create(Seq(WrongHouseData(20, 100000)))
```

```tut:book:fail
assembler.transform(wrongTrainingData)
```

Then, we train the model. To train a Random Forest, one needs to feed it with features (what we predict from) and
with a label (what we predict). In our example, `price` is the label, `features` are the features:

```tut:book
case class RFInputs(price: Double, features: Vector)
val rf = TypedRandomForestRegressor[RFInputs]
val model = rf.fit(trainingDataWithFeatures).run()
```

`TypedRandomForestRegressor[RFInputs]` compiles only if `RFInputs`
contains only one field of type Double (the label) and one field of type Vector (the features):

```tut:silent
case class WrongRFInputs(labelOfWrongType: String, features: Vector)
```

```tut:book:fail
TypedRandomForestRegressor[WrongRFInputs]
```

The subsequent `rf.fit(trainingDataWithFeatures)` call compiles only if `trainingDataWithFeatures` contains the same fields
(names and types) as RFInputs.

```tut:book
val wrongTrainingDataWithFeatures = TypedDataset.create(Seq(HouseData(20, false, 100000))) // features are missing
```

```tut:book:fail
rf.fit(wrongTrainingDataWithFeatures)
```

### Prediction

We now want to predict `price` for `testData` using the previously trained model. Like the Spark ML API,
`testData` has a default value for `price` (`0` in our case) that will be ignored at prediction time. We reuse
our `assembler` to compute the feature vector of `testData`.

```tut:book
val testData = TypedDataset.create(Seq(HouseData(70, true, 0)))
val testDataWithFeatures = assembler.transform(testData).as[HouseDataWithFeatures]
case class HousePricePrediction(
squareFeet: Double,
hasGarden: Boolean,
price: Double,
features: Vector,
predictedPrice: Double
)
val predictions = model.transform(testDataWithFeatures).as[HousePricePrediction]
predictions.select(predictions.col('predictedPrice)).collect.run()
```

`model.transform(testDataWithFeatures)` will only compile if `testDataWithFeatures` contains a field `price` of type Double
and a field `features` of type Vector:

```tut:book:fail
model.transform(testData)
```

## Example 2: predict a categorical value using a `TypedRandomForestClassifier`

In this example, we want to predict in which city a house is located depending on its price and its square footage. We use a
`TypedRandomForestClassifier`.

### Training

As with the Spark ML API, we use a `TypedVectorAssembler` to compute feature vectors and a `TypedStringIndexer`
to index `city` values in order to be able to pass them to a `TypedRandomForestClassifier`
(which only accepts Double values as label):

```tut:silent
import frameless.ml.classification._
```

```tut:book
case class HouseData(squareFeet: Double, city: String, price: Double)
val trainingData = TypedDataset.create(Seq(
HouseData(100, "lyon", 100000),
HouseData(200, "lyon", 200000),
HouseData(100, "san francisco", 500000),
HouseData(150, "san francisco", 900000)
))
case class Features(price: Double, squareFeet: Double)
val vectorAssembler = TypedVectorAssembler[Features]
case class HouseDataWithFeatures(squareFeet: Double, city: String, price: Double, features: Vector)
val dataWithFeatures = vectorAssembler.transform(trainingData).as[HouseDataWithFeatures]
case class StringIndexerInput(city: String)
val indexer = TypedStringIndexer[StringIndexerInput]
val indexerModel = indexer.fit(dataWithFeatures).run()
case class HouseDataWithFeaturesAndIndex(
squareFeet: Double,
city: String,
price: Double,
features: Vector,
cityIndexed: Double
)
val indexedData = indexerModel.transform(dataWithFeatures).as[HouseDataWithFeaturesAndIndex]
```

Then, we train the model:

```tut:book
case class RFInputs(cityIndexed: Double, features: Vector)
val rf = TypedRandomForestClassifier[RFInputs]
val model = rf.fit(indexedData).run()
```

### Prediction

We now want to predict `city` for `testData` using the previously trained model. Like the Spark ML API,
`testData` has a default value for `city` (empty string in our case) that will be ignored at prediction time. We reuse
our `vectorAssembler` to compute the feature vector of `testData` and our `indexerModel` to index `city`.

```tut:book
val testData = TypedDataset.create(Seq(HouseData(120, "", 800000)))
val testDataWithFeatures = vectorAssembler.transform(testData).as[HouseDataWithFeatures]
val indexedTestData = indexerModel.transform(testDataWithFeatures).as[HouseDataWithFeaturesAndIndex]
case class HouseCityPredictionInputs(features: Vector, cityIndexed: Double)
val testInput = indexedTestData.project[HouseCityPredictionInputs]
case class HouseCityPredictionIndexed(
features: Vector,
cityIndexed: Double,
rawPrediction: Vector,
probability: Vector,
predictedCityIndexed: Double
)
val indexedPredictions = model.transform(testInput).as[HouseCityPredictionIndexed]
```

Then, we use a `TypedIndexToString` to get back a String value from `predictedCityIndexed`. `TypedIndexToString` takes
as input the label array computed by our previous `indexerModel`:

```tut:book
case class IndexToStringInput(predictedCityIndexed: Double)
val indexToString = TypedIndexToString[IndexToStringInput](indexerModel.transformer.labels)
case class HouseCityPrediction(
features: Vector,
cityIndexed: Double,
rawPrediction: Vector,
probability: Vector,
predictedCityIndexed: Double,
predictedCity: String
)
val predictions = indexToString.transform(indexedPredictions).as[HouseCityPrediction]
predictions.select(predictions.col('predictedCity)).collect.run()
```

## List of currently implemented `TypedEstimator`s

* `TypedRandomForestClassifier`
* `TypedRandomForestRegressor`
* ... [your contribution here](https://github.com/typelevel/frameless/issues/215) ... :)

## List of currently implemented `TypedTransformer`s

* `TypedIndexToString`
* `TypedStringIndexer`
* `TypedVectorAssembler`
* ... [your contribution here](https://github.com/typelevel/frameless/issues/215) ... :)

## Using Vector and Matrix with TypedDataset
## Using Vector and Matrix with `TypedDataset`

`frameless-ml` provides `TypedEncoder` instances for `org.apache.spark.ml.linalg.Vector`
and `org.apache.spark.ml.linalg.Matrix`:

```tut:book
```tut:silent
import frameless._
import frameless.ml._
import frameless.TypedDataset
import org.apache.spark.ml.linalg._
```

```tut:book
val vector = Vectors.dense(1, 2, 3)
val vectorDs = TypedDataset.create(Seq("label" -> vector))
Expand Down
27 changes: 27 additions & 0 deletions ml/src/main/scala/frameless/ml/TypedEstimator.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package frameless
package ml

import frameless.ops.SmartProject
import org.apache.spark.ml.{Estimator, Model}

/**
* A TypedEstimator fits models to data.
*/
trait TypedEstimator[Inputs, Outputs, M <: Model[M]] {
val estimator: Estimator[M]

def fit[T, F[_]](ds: TypedDataset[T])(
implicit
smartProject: SmartProject[T, Inputs],
F: SparkDelay[F]
): F[AppendTransformer[Inputs, Outputs, M]] = {
implicit val sparkSession = ds.dataset.sparkSession
F.delay {
val inputDs = smartProject.apply(ds)
val model = estimator.fit(inputDs.dataset)
new AppendTransformer[Inputs, Outputs, M] {
val transformer: M = model
}
}
}
}
Loading

0 comments on commit 1957664

Please sign in to comment.