Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TypedColumn#year and LocalDateTime generator #228

Merged
merged 10 commits into from
Jan 31, 2018

Conversation

Avasil
Copy link
Contributor

@Avasil Avasil commented Dec 20, 2017

Connects to #164
It implements year method but I also added localDateTime generator to tests, I think it might be useful for testing functions which needs correct date as string.

@codecov-io
Copy link

codecov-io commented Dec 20, 2017

Codecov Report

Merging #228 into master will increase coverage by <.01%.
The diff coverage is 100%.

Impacted file tree graph

@@            Coverage Diff             @@
##           master     #228      +/-   ##
==========================================
+ Coverage   96.95%   96.95%   +<.01%     
==========================================
  Files          52       52              
  Lines         853      854       +1     
  Branches        8        8              
==========================================
+ Hits          827      828       +1     
  Misses         26       26
Impacted Files Coverage Δ
...la/frameless/functions/NonAggregateFunctions.scala 100% <100%> (ø) ⬆️

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 2f61244...43b0073. Read the comment docs.

Copy link
Contributor

@frosforever frosforever left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's possible for a String to not parse out a year. eg:

val ds = TypedDataset.create(Seq(X1("hello")))
ds.select(year(ds('a))).show().run()
/*
+----+
|  _1|
+----+
|null|
+----+
*/

*
* apache/spark
*/
def year[T](col: TypedColumn[T, String]): TypedColumn[T, Int] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should probably TypedColumn[T, Option[Int]]

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree here. This should fail when there is years to be extracted. Is Spark returns null in this case, then this should be encoded as @frosforever suggests. Should be a trivial change.

.toList
def dateTimeStringFuncProp[A: Encoder](strFunc: TypedColumn[X1[String], String] => TypedColumn[X1[String], A],
sparkFunc: Column => Column): Prop =
forAll { values: List[JavaLocalDateTime] =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what if it's not a nicely formatted date string?

Copy link
Contributor

@imarios imarios left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typesafely cover the case of malformed date strings.

@imarios
Copy link
Contributor

imarios commented Dec 22, 2017

@Avasil are you still having issues with the code? Can you paste some example error stack traces and the code to reproduce it here? I can try to help with that.

@Avasil
Copy link
Contributor Author

Avasil commented Dec 22, 2017

@imarios
I'm trying several approaches ( just changing type, extending UnaryExpression, udf) and so far I didn't manage to make anything work. I'm pretty new to Spark and I'm just learning how it is implemented working on those issues, the simplest approach, inspired by this is below. But it lacks the encoder and CatalystCast part and I don't know what makes it work under the hood - any explanation/resources are welcome :D

  def year[T](col: TypedColumn[T, String]): TypedColumn[T, Option[Int]] = {
    new TypedColumn[T, Option[Int]](untyped.year(col.untyped))
  }
    val ds = TypedDataset.create(Seq(X1("hello")))
    val ds2 = TypedDataset.create(Seq(X1("2015-02-12")))

    ds.select(year(ds('a))).show().run() // 1
    ds.dataset.select(untyped.year($"a")).show() // 2

    ds2.dataset.select(untyped.year($"a")).show() // 3
    ds2.select(year(ds('a))).show().run() // 4

It fails in the runtime on 4:

+----+
|  _1|
+----+
|null|
+----+

+-------+
|year(a)|
+-------+
|   null|
+-------+

+-------+
|year(a)|
+-------+
|   2015|
+-------+

[info] - year *** FAILED *** (58 milliseconds)
[info]   org.apache.spark.sql.AnalysisException: resolved attribute(s) a#34537 missing from a#34544 in operator !Project [year(cast(a#34537 as date)) AS year(a)#34589];;
[info] !Project [year(cast(a#34537 as date)) AS year(a)#34589]
[info] +- LocalRelation [a#34544]
[info]   at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:39)
[info]   at org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:91)
[info]   at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:347)
[info]   at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:78)
[info]   at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
[info]   at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:78)
[info]   at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:91)
[info]   at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:52)
[info]   at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:66)
[info]   at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$withPlan(Dataset.scala:2872)
[info]   at org.apache.spark.sql.Dataset.select(Dataset.scala:1153)
[info]   at frameless.TypedDataset$selectMany$.applyProduct(TypedDataset.scala:601)
[info]   at frameless.TypedDataset.select(TypedDataset.scala:409)
[info]   at frameless.functions.NonAggregateFunctionsTests$$anonfun$1.apply$mcV$sp(NonAggregateFunctionsTests.scala:626)
[info]   at frameless.functions.NonAggregateFunctionsTests$$anonfun$1.apply(NonAggregateFunctionsTests.scala:615)
[info]   at frameless.functions.NonAggregateFunctionsTests$$anonfun$1.apply(NonAggregateFunctionsTests.scala:615)
[info]   at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
[info]   at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
[info]   at org.scalatest.Transformer.apply(Transformer.scala:22)
[info]   at org.scalatest.Transformer.apply(Transformer.scala:20)
[info]   at org.scalatest.FunSuiteLike$$anon$1.apply(FunSuiteLike.scala:186)
[info]   at org.scalatest.TestSuite$class.withFixture(TestSuite.scala:196)
[info]   at org.scalatest.FunSuite.withFixture(FunSuite.scala:1560)
[info]   at org.scalatest.FunSuiteLike$class.invokeWithFixture$1(FunSuiteLike.scala:183)
[info]   at org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:196)
[info]   at org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:196)
[info]   at org.scalatest.SuperEngine.runTestImpl(Engine.scala:289)
[info]   at org.scalatest.FunSuiteLike$class.runTest(FunSuiteLike.scala:196)
[info]   at org.scalatest.FunSuite.runTest(FunSuite.scala:1560)
[info]   at org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:229)
[info]   at org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:229)
[info]   at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:396)
[info]   at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:384)
[info]   at scala.collection.immutable.List.foreach(List.scala:392)
[info]   at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:384)
[info]   at org.scalatest.SuperEngine.org$scalatest$SuperEngine$$runTestsInBranch(Engine.scala:379)
[info]   at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:461)
[info]   at org.scalatest.FunSuiteLike$class.runTests(FunSuiteLike.scala:229)
[info]   at org.scalatest.FunSuite.runTests(FunSuite.scala:1560)
[info]   at org.scalatest.Suite$class.run(Suite.scala:1147)
[info]   at org.scalatest.FunSuite.org$scalatest$FunSuiteLike$$super$run(FunSuite.scala:1560)
[info]   at org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:233)
[info]   at org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:233)
[info]   at org.scalatest.SuperEngine.runImpl(Engine.scala:521)
[info]   at org.scalatest.FunSuiteLike$class.run(FunSuiteLike.scala:233)
[info]   at frameless.TypedDatasetSuite.org$scalatest$BeforeAndAfterAll$$super$run(TypedDatasetSuite.scala:41)
[info]   at org.scalatest.BeforeAndAfterAll$class.liftedTree1$1(BeforeAndAfterAll.scala:213)
[info]   at org.scalatest.BeforeAndAfterAll$class.run(BeforeAndAfterAll.scala:210)
[info]   at frameless.TypedDatasetSuite.run(TypedDatasetSuite.scala:41)
[info]   at org.scalatest.tools.Framework.org$scalatest$tools$Framework$$runSuite(Framework.scala:314)
[info]   at org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:480)
[info]   at sbt.ForkMain$Run$2.call(ForkMain.java:296)
[info]   at sbt.ForkMain$Run$2.call(ForkMain.java:286)
[info]   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
[info]   at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
[info]   at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
[info]   at java.lang.Thread.run(Thread.java:745)

@frosforever
Copy link
Contributor

Hey @Avasil I'm on mobile at the moment and can't test this, but I think what you've got should work. Is it possible that 4 is blowing up because it's referencing ds and not ds2 in the select?

@Avasil
Copy link
Contributor Author

Avasil commented Dec 22, 2017

@frosforever oh man, that's exactly it, thank you! Still, can anyone point to me to the code responsible for this "conversion"? As I understand it and @imarios mentioned on gitter, it is encoded the same as null | Int in Spark but decoded as Option[Int] to work in safer manner

@OlivierBlanvillain
Copy link
Contributor

@Avasil This looks fine to me, anything missing? (sorry I polluted your diff with reformatting)

@Avasil
Copy link
Contributor Author

Avasil commented Jan 20, 2018

@OlivierBlanvillain I think everything is there :)

@OlivierBlanvillain
Copy link
Contributor

Alright, LGTM on my side, @imarios do you want to add something?

Copy link
Contributor

@frosforever frosforever left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM too

@imarios
Copy link
Contributor

imarios commented Jan 23, 2018

Hi @Avasil, LGTM, we just have to resolve the merge conflicts and we are good to go.

# Conflicts:
#	dataset/src/test/scala/frameless/functions/NonAggregateFunctionsTests.scala
#	dataset/src/test/scala/frameless/package.scala
@Avasil
Copy link
Contributor Author

Avasil commented Jan 23, 2018

Now it's failing on AggregateFunctionsTests.avg test but it doesn't seem to be related to this PR

@imarios
Copy link
Contributor

imarios commented Jan 29, 2018

@Avasil is there still an issue with this PR?

@Avasil
Copy link
Contributor Author

Avasil commented Jan 29, 2018

@imarios Not that I'm aware of

@imarios
Copy link
Contributor

imarios commented Jan 30, 2018

@Avasil I wanted to merge your PR before merging the unification changes. I then forgotten and merged the other one first ...

Do you think you can resolve the conflicts? It might also be good for you to see how the landscape has changed a bit after the unification changes.

Just to bring you up to speed, before, all these functions where valid for projected columns, but you couldn't use them during aggregation, say, ds.agg(sum(ds('a)) * 2) was not allowed because * was only defined for TypedColumn not for TypedAggregate. Now all methods are defined for both, but the syntaxed has changed a bit, hence the merge conflict here.

# Conflicts:
#	dataset/src/main/scala/frameless/functions/NonAggregateFunctions.scala
#	dataset/src/test/scala/frameless/functions/NonAggregateFunctionsTests.scala
@Avasil
Copy link
Contributor Author

Avasil commented Jan 30, 2018

@imarios
I've resolved conflicts, cleaned up a bit and reimplemented it in the new way. :)

But I have one suggestion about str.ThisType[A, B]. Have you considered naming it in more "user-friendly" way (something mentioning that it's column)? I'm afraid users might be confused by return type

@OlivierBlanvillain
Copy link
Contributor

@Avasil do you see this type in user code? Because if so, I think that would be a good reason to prefer having ThisType as a type parameter instead of a type member (@imarios)...

@Avasil
Copy link
Contributor Author

Avasil commented Jan 30, 2018

@OlivierBlanvillain In user code it's ok:

val test: TypedColumn[X1[String], Option[Int]] = year(ds[String]('a))
// or
val test: UntypedExpression[X1[String]] = year(ds[String]('a))

If you choose TypedColumn in IntelliJ it is simplified to the version above.

Different story when checking out functions in the sources. I don't think that's too big of an issue though

@OlivierBlanvillain
Copy link
Contributor

OlivierBlanvillain commented Jan 30, 2018

Ok good. ThisType is the standard name for F bounded polymorphism, it's a bit like self in the body of class c { self => }, once you've seen it once you know what it means ;)

@imarios imarios added this to the 0.5-release milestone Jan 30, 2018
@imarios
Copy link
Contributor

imarios commented Jan 30, 2018

@Avasil, do you see any conflicts on your side?

@Avasil
Copy link
Contributor Author

Avasil commented Jan 30, 2018

@imarios
What do you mean?
If you mean merge conflicts it looks all green to me and if you ask about ThisType then @OlivierBlanvillain explanation sounds good to me

@imarios
Copy link
Contributor

imarios commented Jan 30, 2018

Rebase gives me issues but squash and merge looks good. I will wait until master is done checking the other PR and I will merge this soon. Thanks!

@imarios imarios merged commit 864fd3f into typelevel:master Jan 31, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants