Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Window functions and column sorting #225

Closed
wants to merge 15 commits into from
45 changes: 45 additions & 0 deletions core/src/main/scala/frameless/CatalystRowOrdered.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package frameless

import shapeless._

import scala.annotation.implicitNotFound

/** Types that can be used to sort a dataset by Catalyst. */
@implicitNotFound("Cannot order by columns of type ${A}.")
trait CatalystRowOrdered[A]

object CatalystRowOrdered {
private[this] val theInstance = new CatalystRowOrdered[Any] {}
private[this] def of[A]: CatalystRowOrdered[A] = theInstance.asInstanceOf[CatalystRowOrdered[A]]

/*
The following are sortable by spark:
see [[org.apache.spark.sql.catalyst.expressions.RowOrdering.isOrderable]]
AtomicType
StructType containing only orderable types
ArrayType containing only orderable types
UserDefinedType containing only orderable types

MapType can't be used in order!
TODO: UDF
*/

implicit def orderedEvidence[A](implicit catalystOrdered: CatalystOrdered[A]): CatalystRowOrdered[A] = of[A]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might be dangerous if something gets added to CatalysOrdered that's not supported in sorting. Does anyone know if there even are any types like that? Would it be safer to explicitly state all the allowable types here even if there is some duplication?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it works nicely with CatalysOrdered I would keep it this way for now. Just add a note in object CatalysOrdered saying than if someone adds a new case here he should also update the window function tests.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this extra type class? Why wouldn’t Catalyst order cover this case as well?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I follow. Are you asking why is CatalystRowOrdered needed, shouldn't CatalystOrdered be enough? The reason for that is you can order rows by things that aren't comparable within a row, for example Arrays.

scala> spark.createDataset(Seq((Seq(1, 2, 3), Seq(2,3,1))))
res3: org.apache.spark.sql.Dataset[(Seq[Int], Seq[Int])] = [_1: array<int>, _2: array<int>]

scala> res3.select(untyped.col("_1") > untyped.col("_2")).show
org.apache.spark.sql.AnalysisException: cannot resolve '(`_1` > `_2`)' due to data type mismatch: '(`_1` > `_2`)' requires (boolean or tinyint or smallint or int or bigint or float or double or decimal or timestamp or date or string or binary) type, not array<int>;;

scala> res3.sort(untyped.col("_1")).show()
+---------+---------+
|       _1|       _2|
+---------+---------+
|[1, 2, 3]|[2, 3, 1]|
+---------+---------+

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That looks more like a bug than a feature ... not yours ofc, on Spark side. Is there any particular reason why they made this design choice?

Copy link
Contributor

@imarios imarios Dec 22, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems that this is fixed in Spark 2.3: https://issues.apache.org/jira/browse/SPARK-21110. Can you also take a look and verify if this is the same bug. It might save us a lot of extra code if this magically goes away. We can then just focus on one type-class for CatalystOrdered it seems?
Here is the PR:
apache/spark#18818

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good find @imarios! I honestly have no idea if there's a reason for the design choice. I'll grab a more serious look at the fix sometime in the next few days.
If this does indeed fix everything, are we targeting 2.3 for the next frameless version? Does it make sense to have keep this in for earlier version support?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Following up on this. I was able to test the above code snippet ☝️ https://github.com/typelevel/frameless/pull/225/files#r158376192 using spark 2.3.0-SNAPSHOT and it does indeed work!


implicit def arrayEv[A](implicit catalystOrdered: CatalystRowOrdered[A]): CatalystRowOrdered[Array[A]] = of[Array[A]]

implicit def collectionEv[C[X] <: Seq[X], A](implicit catalystOrdered: CatalystRowOrdered[A]): CatalystRowOrdered[C[A]] = of[C[A]]

implicit def optionEv[A](implicit catalystOrdered: CatalystRowOrdered[A]): CatalystRowOrdered[Option[A]] = of[Option[A]]

implicit def recordEv[A, G <: HList](implicit i0: Generic.Aux[A, G], i1: HasRowOrdered[G]): CatalystRowOrdered[A] = of[A]

trait HasRowOrdered[T <: HList]
object HasRowOrdered {
implicit def deriveOrderHNil[H](implicit catalystRowOrdered: CatalystRowOrdered[H]): HasRowOrdered[H :: HNil] =
new HasRowOrdered[H :: HNil] {}

implicit def deriveOrderHCons[H, T <: HList](implicit head: CatalystRowOrdered[H], tail: HasRowOrdered[T]): HasRowOrdered[H :: T] =
new HasRowOrdered[H :: T] {}
}
}
76 changes: 76 additions & 0 deletions dataset/src/main/scala/frameless/TypedColumn.scala
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,60 @@ sealed class TypedColumn[T, U](
*/
def /(u: U)(implicit n: CatalystNumeric[U]): TypedColumn[T, Double] = self.untyped.divide(u).typed

/** Returns a descending ordering used in sorting
*
* apache/spark
*/
def desc(implicit catalystRowOrdering: CatalystRowOrdered[U]): TypedSortedColumn[T, U] =
new TypedSortedColumn[T, U](withExpr {
SortOrder(expr, Descending)
})

/** Returns a descending ordering used in sorting where None values appear before non-None values
*
* apache/spark
*/
def descNullsFirst(implicit isOption: U <:< Option[_], catalystRowOrdering: CatalystRowOrdered[U]): TypedSortedColumn[T, U] =
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we keep the naming similar to Spark's with Nulls or rename this to a more Scala friendly descNonesFirst?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given than they are options involved, I think I like descNonesFirst more

new TypedSortedColumn[T, U](withExpr {
SortOrder(expr, Descending, NullsFirst, Set.empty)
})

/** Returns a descending ordering used in sorting where None values appear after non-None values
*
* apache/spark
*/
def descNullsLast(implicit isOption: U <:< Option[_], catalystRowOrdering: CatalystRowOrdered[U]): TypedSortedColumn[T, U] =
new TypedSortedColumn[T, U](withExpr {
SortOrder(expr, Descending, NullsLast, Set.empty)
})

/** Returns an ascending ordering used in sorting
*
* apache/spark
*/
def asc(implicit catalystRowOrdering: CatalystRowOrdered[U]): TypedSortedColumn[T, U] =
new TypedSortedColumn[T, U](withExpr {
SortOrder(expr, Ascending)
})

/** Returns an ascending ordering used in sorting where None values appear before non-None values
*
* apache/spark
*/
def ascNullsFirst(implicit isOption: U <:< Option[_], catalystRowOrdering: CatalystRowOrdered[U]): TypedSortedColumn[T, U] =
new TypedSortedColumn[T, U](withExpr {
SortOrder(expr, Ascending, NullsFirst, Set.empty)
})

/** Returns an ascending ordering used in sorting where None values appear after non-None values
*
* apache/spark
*/
def ascNullsLast(implicit isOption: U <:< Option[_], catalystRowOrdering: CatalystRowOrdered[U]): TypedSortedColumn[T, U] =
new TypedSortedColumn[T, U](withExpr {
SortOrder(expr, Ascending, NullsLast, Set.empty)
})

/**
* Bitwise AND this expression and another expression.
* {{{
Expand Down Expand Up @@ -485,6 +539,28 @@ sealed class TypedAggregate[T, U](val expr: Expression)(
}
}

sealed class TypedSortedColumn[T, U](val expr: Expression)(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

trying to see if we really need a new type here. I assume there are methods that only work on sorted columns?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes exactly! Annoyingly there are also methods that specifically don't work on already sorted columns. I can post up a few examples when I get a moment.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For example, selecting a sum on a sorted column blows up

import org.apache.spark.sql.{functions=>untyped}

ds.select(untyped.sum(untyped.col("a"))).show()
/*
+------+
|sum(a)|
+------+
|  null|
+------+
*/
ds.select(untyped.sum(untyped.col("a").desc)).show()
/*
org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree:
Exchange SinglePartition
+- *HashAggregate(keys=[], functions=[partial_sum(cast(a#2 DESC NULLS LAST as double))], output=[sum#21])
   +- LocalTableScan <empty>, [a#2]

  at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56)
  at org.apache.spark.sql.execution.exchange.ShuffleExchange.doExecute(ShuffleExchange.scala:115)
...
*/

implicit
val uencoder: TypedEncoder[U]
) extends UntypedExpression[T] {

def this(column: Column)(implicit e: TypedEncoder[U]) {
this(FramelessInternals.expr(column))
}

def untyped: Column = new Column(expr)
}

object TypedSortedColumn {
implicit def defaultAscending[T, U : CatalystRowOrdered](typedColumn: TypedColumn[T, U]): TypedSortedColumn[T, U] =
new TypedSortedColumn[T, U](new Column(SortOrder(typedColumn.expr, Ascending)))(typedColumn.uencoder)

object defaultAscendingPoly extends Poly1 {
implicit def caseTypedColumn[T, U : CatalystRowOrdered] = at[TypedColumn[T, U]](c => defaultAscending(c))
implicit def caseTypeSortedColumn[T, U] = at[TypedSortedColumn[T, U]](identity)
}
}

object TypedColumn {
/**
* Evidence that type `T` has column `K` with type `V`.
Expand Down
40 changes: 39 additions & 1 deletion dataset/src/main/scala/frameless/TypedDataset.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import org.apache.spark.sql.catalyst.plans.{Inner, LeftOuter}
import org.apache.spark.sql._
import shapeless._
import shapeless.labelled.FieldType
import shapeless.ops.hlist.{Diff, IsHCons, Prepend, ToTraversable, Tupler}
import shapeless.ops.hlist.{Diff, IsHCons, Mapper, Prepend, ToTraversable, Tupler}
import shapeless.ops.record.{Keys, Remover, Values}

/** [[TypedDataset]] is a safer interface for working with `Dataset`.
Expand Down Expand Up @@ -605,6 +605,44 @@ class TypedDataset[T] protected[frameless](val dataset: Dataset[T])(implicit val
}
}

/** Sort each partition in the dataset by the given column expressions
* {{{
* d.sortWithinPartitions(d('a).asc, d('b).desc)
* }}}
*/
object sortWithinPartitions extends ProductArgs {
def applyProduct[U <: HList, O <: HList](columns: U)
(implicit
i0: Mapper.Aux[TypedSortedColumn.defaultAscendingPoly.type, U, O],
i1: ToTraversable.Aux[O, List, TypedSortedColumn[T, _]]
): TypedDataset[T] = {
val sorted = dataset.toDF()
.sortWithinPartitions(i0(columns).toList[TypedSortedColumn[T, _]].map(c => new Column(c.expr)):_*)
.as[T](TypedExpressionEncoder[T])

TypedDataset.create[T](sorted)
}
}

/** Sort the dataset by the given column expressions
* {{{
* d.sort(d('a).asc, d('b).desc)
* }}}
*/
object sort extends ProductArgs {
def applyProduct[U <: HList, O <: HList](columns: U)
(implicit
i0: Mapper.Aux[TypedSortedColumn.defaultAscendingPoly.type, U, O],
i1: ToTraversable.Aux[O, List, TypedSortedColumn[T, _]]
): TypedDataset[T] = {
val sorted = dataset.toDF()
.sort(i0(columns).toList[TypedSortedColumn[T, _]].map(c => new Column(c.expr)):_*)
.as[T](TypedExpressionEncoder[T])

TypedDataset.create[T](sorted)
}
}

/** Returns a new Dataset as a tuple with the specified
* column dropped.
* Does not allow for dropping from a single column TypedDataset
Expand Down
87 changes: 87 additions & 0 deletions dataset/src/main/scala/frameless/TypedWindow.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package frameless

import org.apache.spark.sql.Column
import org.apache.spark.sql.catalyst.expressions.{ UnspecifiedFrame, WindowFrame }
import org.apache.spark.sql.expressions.{ Window, WindowSpec }
import shapeless.ops.hlist.{ Mapper, ToTraversable }
import shapeless.{ HList, ProductArgs }

trait OrderedWindow
trait PartitionedWindow

class TypedWindow[T, A] private (
partitionSpec: Seq[UntypedExpression[T]],
orderSpec: Seq[UntypedExpression[T]],
frame: WindowFrame //TODO. Really a rows or range between
) {

def untyped: WindowSpec = Window
.partitionBy(partitionSpec.map(e => new Column(e.expr)):_*)
.orderBy(orderSpec.map(e => new Column(e.expr)):_*)
//TODO: frame


/* TODO: Do we want single column versions like we do for agg for better type inference?
def partitionBy[U](column: TypedColumn[T, U]): TypedWindow[T, A with PartitionedWindow] =
new TypedWindow[T, A with PartitionedWindow](
partitionSpec = Seq(column),
orderSpec = orderSpec,
frame = frame
)

def orderBy[U](column: TypedSortedColumn[T, U]): TypedWindow[T, A with OrderedWindow] =
new TypedWindow[T, A with OrderedWindow](
partitionSpec = partitionSpec,
orderSpec = Seq(column),
frame = frame
)
*/

object partitionBy extends ProductArgs {
def applyProduct[U <: HList](columns: U)
(implicit
i1: ToTraversable.Aux[U, List, TypedColumn[T, _]]
): TypedWindow[T, A with PartitionedWindow] = {
new TypedWindow[T, A with PartitionedWindow](
partitionSpec = columns.toList[TypedColumn[T, _]],
orderSpec = orderSpec,
frame = frame
)
}
}

object orderBy extends ProductArgs {
def applyProduct[U <: HList, O <: HList](columns: U)
(implicit
i0: Mapper.Aux[TypedSortedColumn.defaultAscendingPoly.type, U, O],
i1: ToTraversable.Aux[O, List, TypedSortedColumn[T, _]]
): TypedWindow[T, A with OrderedWindow] = {
new TypedWindow[T, A with OrderedWindow](
partitionSpec = partitionSpec,
orderSpec = i0(columns).toList[TypedSortedColumn[T, _]],
frame = frame
)
}
}
}

object TypedWindow {

//TODO: Multiple columns.
def partitionBy[T](column: TypedColumn[T, _]): TypedWindow[T, PartitionedWindow] = {
new TypedWindow[T, PartitionedWindow](
partitionSpec = Seq(column),
orderSpec = Seq.empty,
frame = UnspecifiedFrame
)
}

def orderBy[T](column: TypedSortedColumn[T, _]): TypedWindow[T, OrderedWindow] = {
new TypedWindow[T, OrderedWindow](
partitionSpec = Seq.empty,
orderSpec = Seq(column),
frame = UnspecifiedFrame
)
}
}

26 changes: 26 additions & 0 deletions dataset/src/main/scala/frameless/functions/WindowFunctions.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package frameless.functions

import frameless.{ OrderedWindow, TypedAggregate, TypedColumn, TypedWindow }
import org.apache.spark.sql.{ functions => untyped }

trait WindowFunctions {
import WindowFunctionsHelpers.dense_rankObj

def dense_rank() = dense_rankObj
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we want to keep the same syntax as Spark where you call dense_rank().over(windowSpec) or just take the window spec as an argument to dense_rank? There's no valid dense_rank without a window spec.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't have a strong opinion on this. Staying close to vanilla is a plus but here it really makes little sense 😄. To be consistent I would CamelCase it either-way.


}

//TODO: Move these to the other funcs?
object WindowFunctions extends WindowFunctions

object WindowFunctionsHelpers {
//TODO: Hide this obj so that it doesn't show to users
private[functions] object dense_rankObj {
//TODO: TypedAggregate version that can be used in `agg`
// whose specs are all either aggs or in the groupBy. Not sure how to do the latter one
def over[T, A <: OrderedWindow](window: TypedWindow[T, A]): TypedColumn[T, Int] = {
new TypedColumn[T, Int](untyped.dense_rank().over(window.untyped))
}
}

}
40 changes: 40 additions & 0 deletions dataset/src/test/scala/frameless/SortTests.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package frameless

import org.apache.spark.sql.{ functions => sfunc }
import shapeless.test.illTyped

object SortTests {
case class Wack(w: Int)
case class Foo(a: String, b: Option[Int], c: Array[String], d: Map[String, Int], wack: Wack)
}

//TODO:
class SortTests extends TypedDatasetSuite {
import SortTests._

test("sorting") {
val seq = Seq(
Foo("a", Some(2), Array("a", "b"), Map("world" -> 2), Wack(1)),
Foo("b", Some(1), Array("b", "a"), Map("world" -> 2), Wack(2))
)

val ds = TypedDataset.create(seq)

assert(ds.sort(ds('a).asc).collect().run().map(_.a) === ds.dataset.sort(sfunc.col("a").asc).collect().map(_.a))
assert(ds.sort(ds('a).desc).collect().run().map(_.a) === ds.dataset.sort(sfunc.col("a").desc).collect().map(_.a))

assert(ds.sort(ds('b).asc).collect().run().map(_.a) === ds.dataset.sort(sfunc.col("b").asc).collect().map(_.a))
assert(ds.sort(ds('b).desc).collect().run().map(_.a) === ds.dataset.sort(sfunc.col("b").desc).collect().map(_.a))

assert(ds.sort(ds('b).ascNullsFirst).collect().run().map(_.a) === ds.dataset.sort(sfunc.col("b").asc_nulls_first).collect().map(_.a))
assert(ds.sort(ds('a), ds('b).desc).collect().run().map(_.a) === ds.dataset.sort(sfunc.col("a"), sfunc.col("b").desc).collect().map(_.a))

illTyped {
//Maps aren't allow
"""ds.sort(ds('d).desc)"""
}

assert(ds.sort(ds('wack).desc).collect().run().map(_.a) === ds.dataset.sort(sfunc.col("wack").desc).collect().map(_.a))
}

}
Loading