-
Notifications
You must be signed in to change notification settings - Fork 138
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[WIP] Window functions and column sorting #225
Conversation
TODO: UDF | ||
*/ | ||
|
||
implicit def orderedEvidence[A](implicit catalystOrdered: CatalystOrdered[A]): CatalystRowOrdered[A] = of[A] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This might be dangerous if something gets added to CatalysOrdered
that's not supported in sorting. Does anyone know if there even are any types like that? Would it be safer to explicitly state all the allowable types here even if there is some duplication?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If it works nicely with CatalysOrdered
I would keep it this way for now. Just add a note in object CatalysOrdered
saying than if someone adds a new case here he should also update the window function tests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need this extra type class? Why wouldn’t Catalyst order cover this case as well?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure I follow. Are you asking why is CatalystRowOrdered
needed, shouldn't CatalystOrdered
be enough? The reason for that is you can order rows by things that aren't comparable within a row, for example Arrays.
scala> spark.createDataset(Seq((Seq(1, 2, 3), Seq(2,3,1))))
res3: org.apache.spark.sql.Dataset[(Seq[Int], Seq[Int])] = [_1: array<int>, _2: array<int>]
scala> res3.select(untyped.col("_1") > untyped.col("_2")).show
org.apache.spark.sql.AnalysisException: cannot resolve '(`_1` > `_2`)' due to data type mismatch: '(`_1` > `_2`)' requires (boolean or tinyint or smallint or int or bigint or float or double or decimal or timestamp or date or string or binary) type, not array<int>;;
scala> res3.sort(untyped.col("_1")).show()
+---------+---------+
| _1| _2|
+---------+---------+
|[1, 2, 3]|[2, 3, 1]|
+---------+---------+
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That looks more like a bug than a feature ... not yours ofc, on Spark side. Is there any particular reason why they made this design choice?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems that this is fixed in Spark 2.3: https://issues.apache.org/jira/browse/SPARK-21110. Can you also take a look and verify if this is the same bug. It might save us a lot of extra code if this magically goes away. We can then just focus on one type-class for CatalystOrdered it seems?
Here is the PR:
apache/spark#18818
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good find @imarios! I honestly have no idea if there's a reason for the design choice. I'll grab a more serious look at the fix sometime in the next few days.
If this does indeed fix everything, are we targeting 2.3
for the next frameless version? Does it make sense to have keep this in for earlier version support?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Following up on this. I was able to test the above code snippet ☝️ https://github.com/typelevel/frameless/pull/225/files#r158376192 using spark 2.3.0-SNAPSHOT
and it does indeed work!
* | ||
* apache/spark | ||
*/ | ||
def descNullsFirst(implicit isOption: U <:< Option[_], catalystRowOrdering: CatalystRowOrdered[U]): TypedSortedColumn[T, U] = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we keep the naming similar to Spark's with Nulls
or rename this to a more Scala friendly descNonesFirst
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given than they are options involved, I think I like descNonesFirst
more
trait WindowFunctions { | ||
import WindowFunctionsHelpers.dense_rankObj | ||
|
||
def dense_rank() = dense_rankObj |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we want to keep the same syntax as Spark where you call dense_rank().over(windowSpec)
or just take the window spec as an argument to dense_rank
? There's no valid dense_rank
without a window spec.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't have a strong opinion on this. Staying close to vanilla is a plus but here it really makes little sense 😄. To be consistent I would CamelCase it either-way.
Codecov Report
@@ Coverage Diff @@
## master #225 +/- ##
==========================================
- Coverage 96.15% 95.04% -1.11%
==========================================
Files 52 55 +3
Lines 936 989 +53
Branches 15 14 -1
==========================================
+ Hits 900 940 +40
- Misses 36 49 +13
Continue to review full report at Codecov.
|
@@ -99,7 +99,7 @@ lazy val commonScalacOptions = Seq( | |||
"-encoding", "UTF-8", | |||
"-feature", | |||
"-unchecked", | |||
"-Xfatal-warnings", | |||
// "-Xfatal-warnings", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know. it won't be there in the final version
} | ||
|
||
class SortTests extends TypedDatasetSuite { | ||
test("bad udt") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I could use some help with this. Unclear how to support UDT
s via Record
types etc while preventing those whose underlying sqlType
are not sortable. The generic Record
support is super helpful for StructType
s.
While this is an admittedly contrived example, it's certainly possible to hit this.
Any ideas?
Will be giving a look at this in the next few days |
@@ -485,6 +539,28 @@ sealed class TypedAggregate[T, U](val expr: Expression)( | |||
} | |||
} | |||
|
|||
sealed class TypedSortedColumn[T, U](val expr: Expression)( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
trying to see if we really need a new type here. I assume there are methods that only work on sorted columns?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes exactly! Annoyingly there are also methods that specifically don't work on already sorted columns. I can post up a few examples when I get a moment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For example, selecting a sum
on a sorted column blows up
import org.apache.spark.sql.{functions=>untyped}
ds.select(untyped.sum(untyped.col("a"))).show()
/*
+------+
|sum(a)|
+------+
| null|
+------+
*/
ds.select(untyped.sum(untyped.col("a").desc)).show()
/*
org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree:
Exchange SinglePartition
+- *HashAggregate(keys=[], functions=[partial_sum(cast(a#2 DESC NULLS LAST as double))], output=[sum#21])
+- LocalTableScan <empty>, [a#2]
at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56)
at org.apache.spark.sql.execution.exchange.ShuffleExchange.doExecute(ShuffleExchange.scala:115)
...
*/
TODO: UDF | ||
*/ | ||
|
||
implicit def orderedEvidence[A](implicit catalystOrdered: CatalystOrdered[A]): CatalystRowOrdered[A] = of[A] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That looks more like a bug than a feature ... not yours ofc, on Spark side. Is there any particular reason why they made this design choice?
@frosforever thanks for the PR! Is it still work of process? Just ping us when you need a review 😄 |
Hey @OlivierBlanvillain Yes this is still a WIP and it's still an open question if Perhaps the never version of frameless should only target |
No, so far master has only targeted the latest Spark version. If someone is interested it shouldn't be too hard to cross compile, but not something that I personally want to spend time on. |
@frosforever reviewing this PR again, makes me wonder why I had to reinvent the column ordering on another PR O.o. Oh well, let me read this again to see if there is any difference. We can probably take the best of the two and merge them. I think ordering on a columns is a big enough addition to be on a seperate PR. Leaving this one to focus on solving Window functions. What do you think? |
@frosforever Working on #231 made me get this PR more now :). I like the idea of using Mapper and the Poly function to make sorting work for regular columns even if they are not explicitly SortedTypedColumns. Let's do this, can you take just the column sorting part of this PR, try to follow the orderByMany pattern from PR #231, copy the unit tests from there and create a seperate PR? Let's not confuse the |
Regarding sorted collection, I got this one working: implicit def derivedOrderingForCollections[C[_], A]
(implicit
a: CatalystOrdered[A],
b: CatalystCollection[C]
): CatalystOrdered[C[A]] = theInstance.asInstanceOf[CatalystOrdered[C[A]]] This will cover all catalyst collections given that they all ecnode to Catalysts |
closing in favor of #248 |
Connect to #164 and #136
Still a WIP but wanted to put this out there for early feedback before going whole hog.
Currently the only window supported is
dense_rank
and window frames aren't supported. There's also not yet support for sorting on user defined types.