From 5bd6fe28132b986ca6eafd444047cda86fd39f0d Mon Sep 17 00:00:00 2001 From: frosforever Date: Thu, 16 Nov 2017 14:33:41 -0800 Subject: [PATCH 1/3] Wip --- .../main/scala/frameless/TypedDataset.scala | 29 ++++++++++++++++--- 1 file changed, 25 insertions(+), 4 deletions(-) diff --git a/dataset/src/main/scala/frameless/TypedDataset.scala b/dataset/src/main/scala/frameless/TypedDataset.scala index 18406f926..5393fd8f9 100644 --- a/dataset/src/main/scala/frameless/TypedDataset.scala +++ b/dataset/src/main/scala/frameless/TypedDataset.scala @@ -2,13 +2,14 @@ package frameless import frameless.ops._ import org.apache.spark.rdd.RDD -import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, CreateStruct, EqualTo} -import org.apache.spark.sql.catalyst.plans.logical.{Join, Project} +import org.apache.spark.sql.catalyst.expressions.{ Alias, Attribute, AttributeReference, CreateStruct, EqualTo } +import org.apache.spark.sql.catalyst.plans.logical.{ Join, Project } import org.apache.spark.sql.types.StructType -import org.apache.spark.sql.catalyst.plans.{Inner, LeftOuter} +import org.apache.spark.sql.catalyst.plans.{ Inner, LeftOuter } import org.apache.spark.sql._ import shapeless._ -import shapeless.ops.hlist.{Prepend, ToTraversable, Tupler} +import shapeless.ops.hlist.{ Prepend, ToTraversable, Tupler } +import shapeless.ops.record.Remove /** [[TypedDataset]] is a safer interface for working with `Dataset`. * @@ -605,6 +606,26 @@ class TypedDataset[T] protected[frameless](val dataset: Dataset[T])(implicit val } } + object dropMany extends ProductArgs { + def applyProduct[U <: HList, TRep <: HList, Removed <: HList, Out0 <: HList, Out](columns: U)( + implicit + genOfA: Generic.Aux[T, TRep], + dropped: Remove.Aux[T, U, Removed], +// ct: ColumnTypes.Aux[T, U, Out0], + + toTraversable: ToTraversable.Aux[U, List, UntypedExpression[T]], + tupler: Tupler.Aux[Removed, Out], + encoder: TypedEncoder[Out] + ): TypedDataset[Out] = { + val selected = dataset.toDF() + .drop(toTraversable. + .select(toTraversable(columns).map(c => new Column(c.expr)):_*) + .as[Out](TypedExpressionEncoder[Out]) + + TypedDataset.create[Out](selected) + } + } + /** Prepends a new column to the Dataset. * * {{{ From efc019a9cbb7580bd251c231efc4e51550ca6aeb Mon Sep 17 00:00:00 2001 From: frosforever Date: Thu, 16 Nov 2017 18:21:25 -0800 Subject: [PATCH 2/3] add drop single column --- .../main/scala/frameless/TypedDataset.scala | 47 ++++++++++--------- .../src/test/scala/frameless/DropTest.scala | 24 ++++++++++ 2 files changed, 49 insertions(+), 22 deletions(-) create mode 100644 dataset/src/test/scala/frameless/DropTest.scala diff --git a/dataset/src/main/scala/frameless/TypedDataset.scala b/dataset/src/main/scala/frameless/TypedDataset.scala index 5393fd8f9..a6ddee745 100644 --- a/dataset/src/main/scala/frameless/TypedDataset.scala +++ b/dataset/src/main/scala/frameless/TypedDataset.scala @@ -2,14 +2,14 @@ package frameless import frameless.ops._ import org.apache.spark.rdd.RDD -import org.apache.spark.sql.catalyst.expressions.{ Alias, Attribute, AttributeReference, CreateStruct, EqualTo } -import org.apache.spark.sql.catalyst.plans.logical.{ Join, Project } +import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, CreateStruct, EqualTo} +import org.apache.spark.sql.catalyst.plans.logical.{Join, Project} import org.apache.spark.sql.types.StructType -import org.apache.spark.sql.catalyst.plans.{ Inner, LeftOuter } +import org.apache.spark.sql.catalyst.plans.{Inner, LeftOuter} import org.apache.spark.sql._ import shapeless._ -import shapeless.ops.hlist.{ Prepend, ToTraversable, Tupler } -import shapeless.ops.record.Remove +import shapeless.ops.hlist.{Prepend, ToTraversable, Tupler} +import shapeless.ops.record.{Remover, Values} /** [[TypedDataset]] is a safer interface for working with `Dataset`. * @@ -606,24 +606,27 @@ class TypedDataset[T] protected[frameless](val dataset: Dataset[T])(implicit val } } - object dropMany extends ProductArgs { - def applyProduct[U <: HList, TRep <: HList, Removed <: HList, Out0 <: HList, Out](columns: U)( - implicit - genOfA: Generic.Aux[T, TRep], - dropped: Remove.Aux[T, U, Removed], -// ct: ColumnTypes.Aux[T, U, Out0], - - toTraversable: ToTraversable.Aux[U, List, UntypedExpression[T]], - tupler: Tupler.Aux[Removed, Out], - encoder: TypedEncoder[Out] - ): TypedDataset[Out] = { - val selected = dataset.toDF() - .drop(toTraversable. - .select(toTraversable(columns).map(c => new Column(c.expr)):_*) - .as[Out](TypedExpressionEncoder[Out]) + def drop[ + Out, + TRep <: HList, + Removed <: HList, + ValuesFromRemoved <: HList, + V + ]( + column: Witness.Lt[Symbol] + )(implicit + genOfT: LabelledGeneric.Aux[T, TRep], + removed: Remover.Aux[TRep, column.T, (V, Removed)], + values: Values.Aux[Removed, ValuesFromRemoved], + tupler: Tupler.Aux[ValuesFromRemoved, Out], + encoder: TypedEncoder[Out] + ): TypedDataset[Out] = { + val dropped = dataset + .toDF() + .drop(column.value.name) + .as[Out](TypedExpressionEncoder[Out]) - TypedDataset.create[Out](selected) - } + TypedDataset.create[Out](dropped) } /** Prepends a new column to the Dataset. diff --git a/dataset/src/test/scala/frameless/DropTest.scala b/dataset/src/test/scala/frameless/DropTest.scala new file mode 100644 index 000000000..6e95dc649 --- /dev/null +++ b/dataset/src/test/scala/frameless/DropTest.scala @@ -0,0 +1,24 @@ +package frameless + +import org.scalacheck.Prop +import org.scalacheck.Prop._ + +class DropTest extends TypedDatasetSuite { + test("drop five columns") { + def prop[A: TypedEncoder](value: A): Prop = { + val d5 = TypedDataset.create(X5(value, value, value, value, value) :: Nil) + val d4 = d5.drop('a) + val d3 = d4.drop('_4) + val d2 = d3.drop('_3) + val d1 = d2.drop('_2) + + Tuple1(value) ?= d1.collect().run().head + } + + check(prop[Int] _) + check(prop[Long] _) + check(prop[String] _) + check(prop[SQLDate] _) + check(prop[Option[X1[Boolean]]] _) + } +} From 2dcd565a0bcb344e9d7b6a6909d8456863d9cb14 Mon Sep 17 00:00:00 2001 From: frosforever Date: Fri, 17 Nov 2017 14:46:43 -0800 Subject: [PATCH 3/3] add more explicit drop tests and scaladoc for drop --- .../main/scala/frameless/TypedDataset.scala | 24 ++++++++- .../src/test/scala/frameless/DropTest.scala | 51 +++++++++++++++++-- 2 files changed, 71 insertions(+), 4 deletions(-) diff --git a/dataset/src/main/scala/frameless/TypedDataset.scala b/dataset/src/main/scala/frameless/TypedDataset.scala index a6ddee745..7dc92ee6d 100644 --- a/dataset/src/main/scala/frameless/TypedDataset.scala +++ b/dataset/src/main/scala/frameless/TypedDataset.scala @@ -606,6 +606,28 @@ class TypedDataset[T] protected[frameless](val dataset: Dataset[T])(implicit val } } + /** + * Returns a new Dataset as a tuple with the specified + * column dropped. + * Does not allow for dropping from a single column TypedDataset + * + * {{{ + * val d: TypedDataset[Foo(a: String, b: Int...)] = ??? + * val result = TypedDataset[(Int, ...)] = d.drop('a) + * }}} + * @param column column to drop specified as a Symbol + * @param genOfT LabelledGeneric derived for T + * @param remover Remover derived for TRep and column + * @param values values of T with column removed + * @param tupler tupler of values + * @param encoder evidence of encoder of the tupled values + * @tparam Out Tupled return type + * @tparam TRep shapeless' record representation of T + * @tparam Removed record of T with column removed + * @tparam ValuesFromRemoved values of T with column removed as an HList + * @tparam V value type of column in T + * @return + */ def drop[ Out, TRep <: HList, @@ -616,7 +638,7 @@ class TypedDataset[T] protected[frameless](val dataset: Dataset[T])(implicit val column: Witness.Lt[Symbol] )(implicit genOfT: LabelledGeneric.Aux[T, TRep], - removed: Remover.Aux[TRep, column.T, (V, Removed)], + remover: Remover.Aux[TRep, column.T, (V, Removed)], values: Values.Aux[Removed, ValuesFromRemoved], tupler: Tupler.Aux[ValuesFromRemoved, Out], encoder: TypedEncoder[Out] diff --git a/dataset/src/test/scala/frameless/DropTest.scala b/dataset/src/test/scala/frameless/DropTest.scala index 6e95dc649..5c030b273 100644 --- a/dataset/src/test/scala/frameless/DropTest.scala +++ b/dataset/src/test/scala/frameless/DropTest.scala @@ -7,9 +7,9 @@ class DropTest extends TypedDatasetSuite { test("drop five columns") { def prop[A: TypedEncoder](value: A): Prop = { val d5 = TypedDataset.create(X5(value, value, value, value, value) :: Nil) - val d4 = d5.drop('a) - val d3 = d4.drop('_4) - val d2 = d3.drop('_3) + val d4 = d5.drop('a) //drops first column + val d3 = d4.drop('_4) //drops last column + val d2 = d3.drop('_2) //drops middle column val d1 = d2.drop('_2) Tuple1(value) ?= d1.collect().run().head @@ -21,4 +21,49 @@ class DropTest extends TypedDatasetSuite { check(prop[SQLDate] _) check(prop[Option[X1[Boolean]]] _) } + + test("drop first column") { + def prop[A: TypedEncoder](value: A): Prop = { + val d3 = TypedDataset.create(X3(value, value, value) :: Nil) + val d2 = d3.drop('a) + + (value, value) ?= d2.collect().run().head + } + + check(prop[Int] _) + check(prop[Long] _) + check(prop[String] _) + check(prop[SQLDate] _) + check(prop[Option[X1[Boolean]]] _) + } + + test("drop middle column") { + def prop[A: TypedEncoder](value: A): Prop = { + val d3 = TypedDataset.create(X3(value, value, value) :: Nil) + val d2 = d3.drop('b) + + (value, value) ?= d2.collect().run().head + } + + check(prop[Int] _) + check(prop[Long] _) + check(prop[String] _) + check(prop[SQLDate] _) + check(prop[Option[X1[Boolean]]] _) + } + + test("drop last column") { + def prop[A: TypedEncoder](value: A): Prop = { + val d3 = TypedDataset.create(X3(value, value, value) :: Nil) + val d2 = d3.drop('c) + + (value, value) ?= d2.collect().run().head + } + + check(prop[Int] _) + check(prop[Long] _) + check(prop[String] _) + check(prop[SQLDate] _) + check(prop[Option[X1[Boolean]]] _) + } }