-
Notifications
You must be signed in to change notification settings - Fork 138
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[WIP] Window functions and column sorting #225
Changes from all commits
0317e57
286e7ed
5c13d7a
f2fcdaf
760db76
6a169ff
354084c
1f49949
5f66d61
c22af6b
81b75df
85a1b5c
62bea15
7fb80bf
4eded7d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,47 @@ | ||
package frameless | ||
|
||
import shapeless._ | ||
|
||
import scala.annotation.implicitNotFound | ||
|
||
/** Types that can be used to sort a dataset by Catalyst. */ | ||
@implicitNotFound("Cannot order by columns of type ${A}.") | ||
trait CatalystRowOrdered[A] | ||
|
||
object CatalystRowOrdered extends CatalystRowOrdered0 { | ||
/* | ||
The following are sortable by spark: | ||
see [[org.apache.spark.sql.catalyst.expressions.RowOrdering.isOrderable]] | ||
AtomicType | ||
StructType containing only orderable types | ||
ArrayType containing only orderable types | ||
UserDefinedType containing only orderable types | ||
|
||
MapType can't be used in order! | ||
TODO: UDF | ||
*/ | ||
|
||
implicit def orderedEvidence[A](implicit catalystOrdered: CatalystOrdered[A]): CatalystRowOrdered[A] = of[A] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This might be dangerous if something gets added to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If it works nicely with There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we need this extra type class? Why wouldn’t Catalyst order cover this case as well? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not sure I follow. Are you asking why is scala> spark.createDataset(Seq((Seq(1, 2, 3), Seq(2,3,1))))
res3: org.apache.spark.sql.Dataset[(Seq[Int], Seq[Int])] = [_1: array<int>, _2: array<int>]
scala> res3.select(untyped.col("_1") > untyped.col("_2")).show
org.apache.spark.sql.AnalysisException: cannot resolve '(`_1` > `_2`)' due to data type mismatch: '(`_1` > `_2`)' requires (boolean or tinyint or smallint or int or bigint or float or double or decimal or timestamp or date or string or binary) type, not array<int>;;
scala> res3.sort(untyped.col("_1")).show()
+---------+---------+
| _1| _2|
+---------+---------+
|[1, 2, 3]|[2, 3, 1]|
+---------+---------+ There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That looks more like a bug than a feature ... not yours ofc, on Spark side. Is there any particular reason why they made this design choice? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Seems that this is fixed in Spark 2.3: https://issues.apache.org/jira/browse/SPARK-21110. Can you also take a look and verify if this is the same bug. It might save us a lot of extra code if this magically goes away. We can then just focus on one type-class for CatalystOrdered it seems? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good find @imarios! I honestly have no idea if there's a reason for the design choice. I'll grab a more serious look at the fix sometime in the next few days. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Following up on this. I was able to test the above code snippet ☝️ https://github.com/typelevel/frameless/pull/225/files#r158376192 using spark |
||
|
||
implicit def arrayEv[A](implicit catalystOrdered: CatalystRowOrdered[A]): CatalystRowOrdered[Array[A]] = of[Array[A]] | ||
|
||
implicit def collectionEv[C[X] <: Seq[X], A](implicit catalystOrdered: CatalystRowOrdered[A]): CatalystRowOrdered[C[A]] = of[C[A]] | ||
|
||
implicit def optionEv[A](implicit catalystOrdered: CatalystRowOrdered[A]): CatalystRowOrdered[Option[A]] = of[Option[A]] | ||
} | ||
|
||
trait CatalystRowOrdered0 { | ||
private val theInstance = new CatalystRowOrdered[Any] {} | ||
protected def of[A]: CatalystRowOrdered[A] = theInstance.asInstanceOf[CatalystRowOrdered[A]] | ||
|
||
implicit def recordEv[A, G <: HList](implicit i0: Generic.Aux[A, G], i1: HasRowOrdered[G]): CatalystRowOrdered[A] = of[A] | ||
|
||
trait HasRowOrdered[T <: HList] | ||
object HasRowOrdered { | ||
implicit def deriveOrderHNil[H](implicit catalystRowOrdered: CatalystRowOrdered[H]): HasRowOrdered[H :: HNil] = | ||
new HasRowOrdered[H :: HNil] {} | ||
|
||
implicit def deriveOrderHCons[H, T <: HList](implicit head: CatalystRowOrdered[H], tail: HasRowOrdered[T]): HasRowOrdered[H :: T] = | ||
new HasRowOrdered[H :: T] {} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -278,6 +278,60 @@ sealed class TypedColumn[T, U]( | |
*/ | ||
def /(u: U)(implicit n: CatalystNumeric[U]): TypedColumn[T, Double] = self.untyped.divide(u).typed | ||
|
||
/** Returns a descending ordering used in sorting | ||
* | ||
* apache/spark | ||
*/ | ||
def desc(implicit catalystRowOrdering: CatalystRowOrdered[U]): TypedSortedColumn[T, U] = | ||
new TypedSortedColumn[T, U](withExpr { | ||
SortOrder(expr, Descending) | ||
}) | ||
|
||
/** Returns a descending ordering used in sorting where None values appear before non-None values | ||
* | ||
* apache/spark | ||
*/ | ||
def descNonesFirst(implicit isOption: U <:< Option[_], catalystRowOrdering: CatalystRowOrdered[U]): TypedSortedColumn[T, U] = | ||
new TypedSortedColumn[T, U](withExpr { | ||
SortOrder(expr, Descending, NullsFirst, Set.empty) | ||
}) | ||
|
||
/** Returns a descending ordering used in sorting where None values appear after non-None values | ||
* | ||
* apache/spark | ||
*/ | ||
def descNonesLast(implicit isOption: U <:< Option[_], catalystRowOrdering: CatalystRowOrdered[U]): TypedSortedColumn[T, U] = | ||
new TypedSortedColumn[T, U](withExpr { | ||
SortOrder(expr, Descending, NullsLast, Set.empty) | ||
}) | ||
|
||
/** Returns an ascending ordering used in sorting | ||
* | ||
* apache/spark | ||
*/ | ||
def asc(implicit catalystRowOrdering: CatalystRowOrdered[U]): TypedSortedColumn[T, U] = | ||
new TypedSortedColumn[T, U](withExpr { | ||
SortOrder(expr, Ascending) | ||
}) | ||
|
||
/** Returns an ascending ordering used in sorting where None values appear before non-None values | ||
* | ||
* apache/spark | ||
*/ | ||
def ascNonesFirst(implicit isOption: U <:< Option[_], catalystRowOrdering: CatalystRowOrdered[U]): TypedSortedColumn[T, U] = | ||
new TypedSortedColumn[T, U](withExpr { | ||
SortOrder(expr, Ascending, NullsFirst, Set.empty) | ||
}) | ||
|
||
/** Returns an ascending ordering used in sorting where None values appear after non-None values | ||
* | ||
* apache/spark | ||
*/ | ||
def ascNonesLast(implicit isOption: U <:< Option[_], catalystRowOrdering: CatalystRowOrdered[U]): TypedSortedColumn[T, U] = | ||
new TypedSortedColumn[T, U](withExpr { | ||
SortOrder(expr, Ascending, NullsLast, Set.empty) | ||
}) | ||
|
||
/** | ||
* Bitwise AND this expression and another expression. | ||
* {{{ | ||
|
@@ -485,6 +539,28 @@ sealed class TypedAggregate[T, U](val expr: Expression)( | |
} | ||
} | ||
|
||
sealed class TypedSortedColumn[T, U](val expr: Expression)( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. trying to see if we really need a new type here. I assume there are methods that only work on sorted columns? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes exactly! Annoyingly there are also methods that specifically don't work on already sorted columns. I can post up a few examples when I get a moment. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For example, selecting a import org.apache.spark.sql.{functions=>untyped}
ds.select(untyped.sum(untyped.col("a"))).show()
/*
+------+
|sum(a)|
+------+
| null|
+------+
*/
ds.select(untyped.sum(untyped.col("a").desc)).show()
/*
org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree:
Exchange SinglePartition
+- *HashAggregate(keys=[], functions=[partial_sum(cast(a#2 DESC NULLS LAST as double))], output=[sum#21])
+- LocalTableScan <empty>, [a#2]
at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56)
at org.apache.spark.sql.execution.exchange.ShuffleExchange.doExecute(ShuffleExchange.scala:115)
...
*/ |
||
implicit | ||
val uencoder: TypedEncoder[U] | ||
) extends UntypedExpression[T] { | ||
|
||
def this(column: Column)(implicit e: TypedEncoder[U]) { | ||
this(FramelessInternals.expr(column)) | ||
} | ||
|
||
def untyped: Column = new Column(expr) | ||
} | ||
|
||
object TypedSortedColumn { | ||
implicit def defaultAscending[T, U : CatalystRowOrdered](typedColumn: TypedColumn[T, U]): TypedSortedColumn[T, U] = | ||
new TypedSortedColumn[T, U](new Column(SortOrder(typedColumn.expr, Ascending)))(typedColumn.uencoder) | ||
|
||
object defaultAscendingPoly extends Poly1 { | ||
implicit def caseTypedColumn[T, U : CatalystRowOrdered] = at[TypedColumn[T, U]](c => defaultAscending(c)) | ||
implicit def caseTypeSortedColumn[T, U] = at[TypedSortedColumn[T, U]](identity) | ||
} | ||
} | ||
|
||
object TypedColumn { | ||
/** | ||
* Evidence that type `T` has column `K` with type `V`. | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,87 @@ | ||
package frameless | ||
|
||
import org.apache.spark.sql.Column | ||
import org.apache.spark.sql.catalyst.expressions.{ UnspecifiedFrame, WindowFrame } | ||
import org.apache.spark.sql.expressions.{ Window, WindowSpec } | ||
import shapeless.ops.hlist.{ Mapper, ToTraversable } | ||
import shapeless.{ HList, ProductArgs } | ||
|
||
trait OrderedWindow | ||
trait PartitionedWindow | ||
|
||
class TypedWindow[T, A] private ( | ||
partitionSpec: Seq[UntypedExpression[T]], | ||
orderSpec: Seq[UntypedExpression[T]], | ||
frame: WindowFrame //TODO. Really a rows or range between | ||
) { | ||
|
||
def untyped: WindowSpec = Window | ||
.partitionBy(partitionSpec.map(e => new Column(e.expr)):_*) | ||
.orderBy(orderSpec.map(e => new Column(e.expr)):_*) | ||
//TODO: frame | ||
|
||
|
||
/* TODO: Do we want single column versions like we do for agg for better type inference? | ||
def partitionBy[U](column: TypedColumn[T, U]): TypedWindow[T, A with PartitionedWindow] = | ||
new TypedWindow[T, A with PartitionedWindow]( | ||
partitionSpec = Seq(column), | ||
orderSpec = orderSpec, | ||
frame = frame | ||
) | ||
|
||
def orderBy[U](column: TypedSortedColumn[T, U]): TypedWindow[T, A with OrderedWindow] = | ||
new TypedWindow[T, A with OrderedWindow]( | ||
partitionSpec = partitionSpec, | ||
orderSpec = Seq(column), | ||
frame = frame | ||
) | ||
*/ | ||
|
||
object partitionBy extends ProductArgs { | ||
def applyProduct[U <: HList](columns: U) | ||
(implicit | ||
i1: ToTraversable.Aux[U, List, TypedColumn[T, _]] | ||
): TypedWindow[T, A with PartitionedWindow] = { | ||
new TypedWindow[T, A with PartitionedWindow]( | ||
partitionSpec = columns.toList[TypedColumn[T, _]], | ||
orderSpec = orderSpec, | ||
frame = frame | ||
) | ||
} | ||
} | ||
|
||
object orderBy extends ProductArgs { | ||
def applyProduct[U <: HList, O <: HList](columns: U) | ||
(implicit | ||
i0: Mapper.Aux[TypedSortedColumn.defaultAscendingPoly.type, U, O], | ||
i1: ToTraversable.Aux[O, List, TypedSortedColumn[T, _]] | ||
): TypedWindow[T, A with OrderedWindow] = { | ||
new TypedWindow[T, A with OrderedWindow]( | ||
partitionSpec = partitionSpec, | ||
orderSpec = i0(columns).toList[TypedSortedColumn[T, _]], | ||
frame = frame | ||
) | ||
} | ||
} | ||
} | ||
|
||
object TypedWindow { | ||
|
||
//TODO: Multiple columns. | ||
def partitionBy[T](column: TypedColumn[T, _]): TypedWindow[T, PartitionedWindow] = { | ||
new TypedWindow[T, PartitionedWindow]( | ||
partitionSpec = Seq(column), | ||
orderSpec = Seq.empty, | ||
frame = UnspecifiedFrame | ||
) | ||
} | ||
|
||
def orderBy[T](column: TypedSortedColumn[T, _]): TypedWindow[T, OrderedWindow] = { | ||
new TypedWindow[T, OrderedWindow]( | ||
partitionSpec = Seq.empty, | ||
orderSpec = Seq(column), | ||
frame = UnspecifiedFrame | ||
) | ||
} | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,17 @@ | ||
package frameless.functions | ||
|
||
import frameless.{ OrderedWindow, TypedColumn, TypedWindow } | ||
import org.apache.spark.sql.{ functions => untyped } | ||
|
||
trait WindowFunctions { | ||
|
||
//TODO: TypedAggregate version that can be used in `agg` | ||
// whose specs are all either aggs or in the groupBy. Not sure how to do the latter one | ||
def denseRank[T, A <: OrderedWindow](over: TypedWindow[T, A]): TypedColumn[T, Int] = { | ||
new TypedColumn[T, Int](untyped.dense_rank().over(over.untyped)) | ||
} | ||
|
||
} | ||
|
||
//TODO: Move these to the other funcs? | ||
object WindowFunctions extends WindowFunctions |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know. it won't be there in the final version