From 23d93f946d4a6e148fd7045d70a01debe08f823a Mon Sep 17 00:00:00 2001 From: Chris Twiner Date: Sat, 4 Jul 2020 00:03:52 +0200 Subject: [PATCH 01/17] fix #422, base for #427 --- build.sbt | 4 ++-- dataset/src/main/scala/frameless/TypedDataset.scala | 4 ++-- dataset/src/main/scala/frameless/TypedEncoder.scala | 6 +++--- .../main/scala/frameless/TypedExpressionEncoder.scala | 8 +++----- dataset/src/main/scala/frameless/functions/Udf.scala | 11 ++++++++--- .../ClassificationIntegrationTests.scala | 2 +- 6 files changed, 19 insertions(+), 16 deletions(-) diff --git a/build.sbt b/build.sbt index d03ae38d..3bd674e2 100644 --- a/build.sbt +++ b/build.sbt @@ -1,4 +1,4 @@ -val sparkVersion = "2.4.5" +val sparkVersion = "3.0.0" val catsCoreVersion = "2.0.0" val catsEffectVersion = "2.0.0" val catsMtlVersion = "0.7.0" @@ -85,7 +85,7 @@ lazy val docs = project lazy val framelessSettings = Seq( organization := "org.typelevel", - crossScalaVersions := Seq("2.11.12", "2.12.10"), + crossScalaVersions := Seq("2.12.10"), scalaVersion := crossScalaVersions.value.last, scalacOptions ++= commonScalacOptions(scalaVersion.value), licenses += ("Apache-2.0", url("http://opensource.org/licenses/Apache-2.0")), diff --git a/dataset/src/main/scala/frameless/TypedDataset.scala b/dataset/src/main/scala/frameless/TypedDataset.scala index 0bab78b8..a600532c 100644 --- a/dataset/src/main/scala/frameless/TypedDataset.scala +++ b/dataset/src/main/scala/frameless/TypedDataset.scala @@ -7,7 +7,7 @@ import frameless.ops._ import org.apache.spark.rdd.RDD import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Literal} -import org.apache.spark.sql.catalyst.plans.logical.Join +import org.apache.spark.sql.catalyst.plans.logical.{Join, JoinHint} import org.apache.spark.sql.catalyst.plans.Inner import org.apache.spark.sql.types.StructType import shapeless._ @@ -620,7 +620,7 @@ class TypedDataset[T] protected[frameless](val dataset: Dataset[T])(implicit val import FramelessInternals._ val leftPlan = logicalPlan(dataset) val rightPlan = logicalPlan(other.dataset) - val join = disambiguate(Join(leftPlan, rightPlan, Inner, Some(condition.expr))) + val join = disambiguate(Join(leftPlan, rightPlan, Inner, Some(condition.expr), JoinHint.NONE)) val joinedPlan = joinPlan(dataset, join, leftPlan, rightPlan) val joinedDs = mkDataset(dataset.sqlContext, joinedPlan, TypedExpressionEncoder[(T, U)]) TypedDataset.create[(T, U)](joinedDs) diff --git a/dataset/src/main/scala/frameless/TypedEncoder.scala b/dataset/src/main/scala/frameless/TypedEncoder.scala index de25d580..47104e8a 100644 --- a/dataset/src/main/scala/frameless/TypedEncoder.scala +++ b/dataset/src/main/scala/frameless/TypedEncoder.scala @@ -231,7 +231,7 @@ object TypedEncoder { case ByteType => path - case _ => MapObjects(encodeT.toCatalyst, path, encodeT.jvmRepr, encodeT.nullable) + case _ => MapObjects(encodeT.toCatalyst _, path, encodeT.jvmRepr, encodeT.nullable) } def fromCatalyst(path: Expression): Expression = @@ -246,7 +246,7 @@ object TypedEncoder { case ByteType => path case _ => - Invoke(MapObjects(encodeT.fromCatalyst, path, encodeT.catalystRepr, encodeT.nullable), "array", jvmRepr) + Invoke(MapObjects(encodeT.fromCatalyst _, path, encodeT.catalystRepr, encodeT.nullable), "array", jvmRepr) } } @@ -265,7 +265,7 @@ object TypedEncoder { def toCatalyst(path: Expression): Expression = if (ScalaReflection.isNativeType(encodeT.value.jvmRepr)) NewInstance(classOf[GenericArrayData], path :: Nil, catalystRepr) - else MapObjects(encodeT.value.toCatalyst, path, encodeT.value.jvmRepr, encodeT.value.nullable) + else MapObjects(encodeT.value.toCatalyst _, path, encodeT.value.jvmRepr, encodeT.value.nullable) def fromCatalyst(path: Expression): Expression = MapObjects( diff --git a/dataset/src/main/scala/frameless/TypedExpressionEncoder.scala b/dataset/src/main/scala/frameless/TypedExpressionEncoder.scala index 1032df54..b7e44af2 100644 --- a/dataset/src/main/scala/frameless/TypedExpressionEncoder.scala +++ b/dataset/src/main/scala/frameless/TypedExpressionEncoder.scala @@ -22,7 +22,7 @@ object TypedExpressionEncoder { def apply[T: TypedEncoder]: ExpressionEncoder[T] = { val encoder = TypedEncoder[T] - val schema = targetStructType(encoder) + val targetSchema = targetStructType(encoder) val in = BoundReference(0, encoder.jvmRepr, encoder.nullable) @@ -38,10 +38,8 @@ object TypedExpressionEncoder { } new ExpressionEncoder[T]( - schema = schema, - flat = false, - serializer = toRowExpressions, - deserializer = encoder.fromCatalyst(out), + objSerializer = encoder.toCatalyst(in), + objDeserializer = encoder.fromCatalyst(out), clsTag = encoder.classTag ) } diff --git a/dataset/src/main/scala/frameless/functions/Udf.scala b/dataset/src/main/scala/frameless/functions/Udf.scala index 5460fdc9..22aea60f 100644 --- a/dataset/src/main/scala/frameless/functions/Udf.scala +++ b/dataset/src/main/scala/frameless/functions/Udf.scala @@ -90,7 +90,7 @@ case class FramelessUdf[T, R]( override def nullable: Boolean = rencoder.nullable override def toString: String = s"FramelessUdf(${children.mkString(", ")})" - def eval(input: InternalRow): Any = { + lazy val evalCode = { val ctx = new CodegenContext() val eval = genCode(ctx) @@ -123,7 +123,11 @@ case class FramelessUdf[T, R]( val (clazz, _) = CodeGenerator.compile(code) val codegen = clazz.generate(ctx.references.toArray).asInstanceOf[InternalRow => AnyRef] - codegen(input) + codegen + } + + def eval(input: InternalRow): Any = { + evalCode(input) } def dataType: DataType = rencoder.catalystRepr @@ -152,7 +156,8 @@ case class FramelessUdf[T, R]( val internalTpe = CodeGenerator.boxedType(rencoder.jvmRepr) val internalTerm = ctx.addMutableState(internalTpe, ctx.freshName("internal")) val internalNullTerm = ctx.addMutableState("boolean", ctx.freshName("internalNull")) - val internalExpr = LambdaVariable(internalTerm, internalNullTerm, rencoder.jvmRepr) + // CTw - can't inject the term, may have to duplicate old code for parity + val internalExpr = LambdaVariable(internalTerm, rencoder.jvmRepr, true) val resultEval = rencoder.toCatalyst(internalExpr).genCode(ctx) diff --git a/ml/src/test/scala/frameless/ml/classification/ClassificationIntegrationTests.scala b/ml/src/test/scala/frameless/ml/classification/ClassificationIntegrationTests.scala index 8827c43b..d3220388 100644 --- a/ml/src/test/scala/frameless/ml/classification/ClassificationIntegrationTests.scala +++ b/ml/src/test/scala/frameless/ml/classification/ClassificationIntegrationTests.scala @@ -54,7 +54,7 @@ class ClassificationIntegrationTests extends FramelessMlSuite with Matchers { val predictionDs = model.transform(testInput).as[PredictionResultIndexed] case class IndexToStringInput(predictedField3Indexed: Double) - val indexToString = TypedIndexToString[IndexToStringInput](indexerModel.transformer.labels) + val indexToString = TypedIndexToString[IndexToStringInput](indexerModel.transformer.labelsArray.flatten) case class PredictionResult( features: Vector, From fe4798cc55cc4e864fa164352d68471382bd9916 Mon Sep 17 00:00:00 2001 From: Chris Twiner Date: Sun, 5 Jul 2020 16:37:57 +0200 Subject: [PATCH 02/17] better handling, schema on the serializer needs repacking for nulls --- .../main/scala/frameless/TypedExpressionEncoder.scala | 11 ++++++----- dataset/src/test/scala/frameless/ops/PivotTest.scala | 8 ++++---- 2 files changed, 10 insertions(+), 9 deletions(-) diff --git a/dataset/src/main/scala/frameless/TypedExpressionEncoder.scala b/dataset/src/main/scala/frameless/TypedExpressionEncoder.scala index b7e44af2..cbd98976 100644 --- a/dataset/src/main/scala/frameless/TypedExpressionEncoder.scala +++ b/dataset/src/main/scala/frameless/TypedExpressionEncoder.scala @@ -2,7 +2,7 @@ package frameless import org.apache.spark.sql.catalyst.analysis.GetColumnByOrdinal import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder -import org.apache.spark.sql.catalyst.expressions.{BoundReference, CreateNamedStruct, If, Literal} +import org.apache.spark.sql.catalyst.expressions.{BoundReference, CreateNamedStruct, If} import org.apache.spark.sql.types.StructType object TypedExpressionEncoder { @@ -27,18 +27,19 @@ object TypedExpressionEncoder { val in = BoundReference(0, encoder.jvmRepr, encoder.nullable) val (out, toRowExpressions) = encoder.toCatalyst(in) match { - case If(_, _, x: CreateNamedStruct) => + case a@ If(_, _, x: CreateNamedStruct) => val out = BoundReference(0, encoder.catalystRepr, encoder.nullable) - (out, x.flatten) + //(out, x.flatten) + (out, a) // will fail in ExpressionEncoder if the If doesn't have the first param : IsNull case other => val out = GetColumnByOrdinal(0, encoder.catalystRepr) - (out, CreateNamedStruct(Literal("_1") :: other :: Nil).flatten) + (out, other) } new ExpressionEncoder[T]( - objSerializer = encoder.toCatalyst(in), + objSerializer = toRowExpressions,//encoder.toCatalyst(in), // toRowExpressions, // objDeserializer = encoder.fromCatalyst(out), clsTag = encoder.classTag ) diff --git a/dataset/src/test/scala/frameless/ops/PivotTest.scala b/dataset/src/test/scala/frameless/ops/PivotTest.scala index dd9bf5e6..de3715ac 100644 --- a/dataset/src/test/scala/frameless/ops/PivotTest.scala +++ b/dataset/src/test/scala/frameless/ops/PivotTest.scala @@ -8,11 +8,11 @@ import org.scalacheck.Prop._ import org.scalacheck.{Gen, Prop} class PivotTest extends TypedDatasetSuite { - def withCustomGenX4: Gen[Vector[X4[String, String, Int, Boolean]]] = { - val kvPairGen: Gen[X4[String, String, Int, Boolean]] = for { + def withCustomGenX4: Gen[Vector[X4[String, String, Long, Boolean]]] = { + val kvPairGen: Gen[X4[String, String, Long, Boolean]] = for { a <- Gen.oneOf(Seq("1", "2", "3", "4")) b <- Gen.oneOf(Seq("a", "b", "c")) - c <- arbitrary[Int] + c <- arbitrary[Long] d <- arbitrary[Boolean] } yield X4(a, b, c, d) @@ -20,7 +20,7 @@ class PivotTest extends TypedDatasetSuite { } test("X4[Boolean, String, Int, Boolean] pivot on String") { - def prop(data: Vector[X4[String, String, Int, Boolean]]): Prop = { + def prop(data: Vector[X4[String, String, Long, Boolean]]): Prop = { val d = TypedDataset.create(data) val frameless = d.groupBy(d('a)). pivot(d('b)).on("a", "b", "c"). From 0d27d43c6081caf73dc8d821affcd5022c7d6ae8 Mon Sep 17 00:00:00 2001 From: Chris Twiner Date: Sun, 5 Jul 2020 20:50:42 +0200 Subject: [PATCH 03/17] parity udf code, complex types still not working --- .../frameless/TypedExpressionEncoder.scala | 9 ++-- .../main/scala/frameless/functions/Udf.scala | 41 +++++++++++++++++-- .../apache/spark/sql/FramelessInternals.scala | 2 + .../scala/frameless/TypedDatasetSuite.scala | 1 + 4 files changed, 44 insertions(+), 9 deletions(-) diff --git a/dataset/src/main/scala/frameless/TypedExpressionEncoder.scala b/dataset/src/main/scala/frameless/TypedExpressionEncoder.scala index cbd98976..9ce58067 100644 --- a/dataset/src/main/scala/frameless/TypedExpressionEncoder.scala +++ b/dataset/src/main/scala/frameless/TypedExpressionEncoder.scala @@ -26,12 +26,11 @@ object TypedExpressionEncoder { val in = BoundReference(0, encoder.jvmRepr, encoder.nullable) - val (out, toRowExpressions) = encoder.toCatalyst(in) match { - case a@ If(_, _, x: CreateNamedStruct) => + val (out, serializer) = encoder.toCatalyst(in) match { + case a @If(_, _, x: CreateNamedStruct) => val out = BoundReference(0, encoder.catalystRepr, encoder.nullable) - //(out, x.flatten) - (out, a) // will fail in ExpressionEncoder if the If doesn't have the first param : IsNull + (out, a) case other => val out = GetColumnByOrdinal(0, encoder.catalystRepr) @@ -39,7 +38,7 @@ object TypedExpressionEncoder { } new ExpressionEncoder[T]( - objSerializer = toRowExpressions,//encoder.toCatalyst(in), // toRowExpressions, // + objSerializer = serializer, objDeserializer = encoder.fromCatalyst(out), clsTag = encoder.classTag ) diff --git a/dataset/src/main/scala/frameless/functions/Udf.scala b/dataset/src/main/scala/frameless/functions/Udf.scala index 22aea60f..4294933f 100644 --- a/dataset/src/main/scala/frameless/functions/Udf.scala +++ b/dataset/src/main/scala/frameless/functions/Udf.scala @@ -2,9 +2,9 @@ package frameless package functions import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.{Expression, NonSQLExpression} -import org.apache.spark.sql.catalyst.expressions.codegen._, Block._ -import org.apache.spark.sql.catalyst.expressions.objects.LambdaVariable +import org.apache.spark.sql.catalyst.expressions.{Expression, LeafExpression, NonSQLExpression} +import org.apache.spark.sql.catalyst.expressions.codegen._ +import Block._ import org.apache.spark.sql.types.DataType import shapeless.syntax.std.tuple._ @@ -157,7 +157,7 @@ case class FramelessUdf[T, R]( val internalTerm = ctx.addMutableState(internalTpe, ctx.freshName("internal")) val internalNullTerm = ctx.addMutableState("boolean", ctx.freshName("internalNull")) // CTw - can't inject the term, may have to duplicate old code for parity - val internalExpr = LambdaVariable(internalTerm, rencoder.jvmRepr, true) + val internalExpr = Spark2_4_LambdaVariable(internalTerm, internalNullTerm, rencoder.jvmRepr, true) val resultEval = rencoder.toCatalyst(internalExpr).genCode(ctx) @@ -176,6 +176,39 @@ case class FramelessUdf[T, R]( } } +case class Spark2_4_LambdaVariable( + value: String, + isNull: String, + dataType: DataType, + nullable: Boolean = true) extends LeafExpression with NonSQLExpression { + + private val accessor: (InternalRow, Int) => Any = InternalRow.getAccessor(dataType) + + // Interpreted execution of `LambdaVariable` always get the 0-index element from input row. + override def eval(input: InternalRow): Any = { + assert(input.numFields == 1, + "The input row of interpreted LambdaVariable should have only 1 field.") + if (nullable && input.isNullAt(0)) { + null + } else { + accessor(input, 0) + } + } + + override def genCode(ctx: CodegenContext): ExprCode = { + val isNullValue = if (nullable) { + JavaCode.isNullVariable(isNull) + } else { + FalseLiteral + } + ExprCode(value = JavaCode.variable(value, dataType), isNull = isNullValue) + } + + // This won't be called as `genCode` is overrided, just overriding it to make + // `LambdaVariable` non-abstract. + override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = ev +} + object FramelessUdf { // Spark needs case class with `children` field to mutate it def apply[T, R]( diff --git a/dataset/src/main/scala/org/apache/spark/sql/FramelessInternals.scala b/dataset/src/main/scala/org/apache/spark/sql/FramelessInternals.scala index 842e04cf..6ee77c2a 100644 --- a/dataset/src/main/scala/org/apache/spark/sql/FramelessInternals.scala +++ b/dataset/src/main/scala/org/apache/spark/sql/FramelessInternals.scala @@ -22,6 +22,8 @@ object FramelessInternals { } } + def asNullable(dt: DataType): DataType = dt.asNullable + def expr(column: Column): Expression = column.expr def column(column: Column): Expression = column.expr diff --git a/dataset/src/test/scala/frameless/TypedDatasetSuite.scala b/dataset/src/test/scala/frameless/TypedDatasetSuite.scala index 36739c67..9761ea8d 100644 --- a/dataset/src/test/scala/frameless/TypedDatasetSuite.scala +++ b/dataset/src/test/scala/frameless/TypedDatasetSuite.scala @@ -29,6 +29,7 @@ trait SparkTesting { self: BeforeAndAfterAll => override def beforeAll(): Unit = { assert(s == null) s = SparkSession.builder().config(conf).getOrCreate() + s.sparkContext.setLogLevel("DEBUG") } override def afterAll(): Unit = { From dd4eb67e6a000f4e3ed612640b84e335fd950eb7 Mon Sep 17 00:00:00 2001 From: Chris Twiner Date: Sun, 5 Jul 2020 21:24:36 +0200 Subject: [PATCH 04/17] tests using minus scales fixed --- .../functions/NonAggregateFunctionsTests.scala | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/dataset/src/test/scala/frameless/functions/NonAggregateFunctionsTests.scala b/dataset/src/test/scala/frameless/functions/NonAggregateFunctionsTests.scala index f47891ef..ddb68b5b 100644 --- a/dataset/src/test/scala/frameless/functions/NonAggregateFunctionsTests.scala +++ b/dataset/src/test/scala/frameless/functions/NonAggregateFunctionsTests.scala @@ -1127,15 +1127,15 @@ class NonAggregateFunctionsTests extends TypedDatasetSuite { val cDS = session.createDataset(values) val resCompare = cDS - .select(sparkFunctions.round(cDS("a"), -1)) + .select(sparkFunctions.round(cDS("a"), 0)) .map(_.getAs[java.math.BigDecimal](0)) .collect() - .toList.map(_.setScale(-1)) + .toList.map(_.setScale(0)) val typedDS = TypedDataset.create(values) val col = typedDS('a) val res = typedDS - .select(round(col, -1)) + .select(round(col, 0)) .collect() .run() .toList @@ -1249,7 +1249,7 @@ class NonAggregateFunctionsTests extends TypedDatasetSuite { val cDS = session.createDataset(values) val resCompare = cDS - .select(sparkFunctions.bround(cDS("a"), -1)) + .select(sparkFunctions.bround(cDS("a"), 0)) .map(_.getAs[java.math.BigDecimal](0)) .collect() .toList.map(_.setScale(-1)) @@ -1257,7 +1257,7 @@ class NonAggregateFunctionsTests extends TypedDatasetSuite { val typedDS = TypedDataset.create(values) val col = typedDS('a) val res = typedDS - .select(bround(col, -1)) + .select(bround(col, 0)) .collect() .run() .toList From af34ebfd47a2fc5dac4014817d6f36dc6ce34a61 Mon Sep 17 00:00:00 2001 From: Chris Twiner Date: Sun, 5 Jul 2020 21:26:29 +0200 Subject: [PATCH 05/17] tests using minus scales fixed --- .../scala/frameless/functions/NonAggregateFunctionsTests.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dataset/src/test/scala/frameless/functions/NonAggregateFunctionsTests.scala b/dataset/src/test/scala/frameless/functions/NonAggregateFunctionsTests.scala index ddb68b5b..01d43ba9 100644 --- a/dataset/src/test/scala/frameless/functions/NonAggregateFunctionsTests.scala +++ b/dataset/src/test/scala/frameless/functions/NonAggregateFunctionsTests.scala @@ -1252,7 +1252,7 @@ class NonAggregateFunctionsTests extends TypedDatasetSuite { .select(sparkFunctions.bround(cDS("a"), 0)) .map(_.getAs[java.math.BigDecimal](0)) .collect() - .toList.map(_.setScale(-1)) + .toList.map(_.setScale(0)) val typedDS = TypedDataset.create(values) val col = typedDS('a) From fdccfb87da6047115e7ed6426274c552f32d7e06 Mon Sep 17 00:00:00 2001 From: Chris Twiner Date: Mon, 6 Jul 2020 11:14:07 +0200 Subject: [PATCH 06/17] get rid of debug entry, doesn't work either way --- dataset/src/test/scala/frameless/TypedDatasetSuite.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/dataset/src/test/scala/frameless/TypedDatasetSuite.scala b/dataset/src/test/scala/frameless/TypedDatasetSuite.scala index 9761ea8d..36739c67 100644 --- a/dataset/src/test/scala/frameless/TypedDatasetSuite.scala +++ b/dataset/src/test/scala/frameless/TypedDatasetSuite.scala @@ -29,7 +29,6 @@ trait SparkTesting { self: BeforeAndAfterAll => override def beforeAll(): Unit = { assert(s == null) s = SparkSession.builder().config(conf).getOrCreate() - s.sparkContext.setLogLevel("DEBUG") } override def afterAll(): Unit = { From 23868dca3dfa7268e67cbcb40a00096be9f5e1d1 Mon Sep 17 00:00:00 2001 From: Chris Twiner Date: Mon, 6 Jul 2020 12:17:02 +0200 Subject: [PATCH 07/17] value is the new _1 and set by ExpressionEncoder, it's not possible to replace schema directly so some nullable checks just won't work --- .../main/scala/frameless/TypedExpressionEncoder.scala | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/dataset/src/main/scala/frameless/TypedExpressionEncoder.scala b/dataset/src/main/scala/frameless/TypedExpressionEncoder.scala index 9ce58067..c5c1bbf6 100644 --- a/dataset/src/main/scala/frameless/TypedExpressionEncoder.scala +++ b/dataset/src/main/scala/frameless/TypedExpressionEncoder.scala @@ -1,5 +1,6 @@ package frameless +import org.apache.spark.sql.Encoder import org.apache.spark.sql.catalyst.analysis.GetColumnByOrdinal import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.catalyst.expressions.{BoundReference, CreateNamedStruct, If} @@ -9,21 +10,19 @@ object TypedExpressionEncoder { /** In Spark, DataFrame has always schema of StructType * - * DataFrames of primitive types become records with a single field called "_1". + * DataFrames of primitive types become records with a single field called "value" set in ExpressionEncoder. */ def targetStructType[A](encoder: TypedEncoder[A]): StructType = { encoder.catalystRepr match { case x: StructType => if (encoder.nullable) StructType(x.fields.map(_.copy(nullable = true))) else x - case dt => new StructType().add("_1", dt, nullable = encoder.nullable) + case dt => new StructType().add("value", dt, nullable = encoder.nullable) } } - def apply[T: TypedEncoder]: ExpressionEncoder[T] = { + def apply[T: TypedEncoder]: Encoder[T] = { val encoder = TypedEncoder[T] - val targetSchema = targetStructType(encoder) - val in = BoundReference(0, encoder.jvmRepr, encoder.nullable) val (out, serializer) = encoder.toCatalyst(in) match { @@ -34,7 +33,7 @@ object TypedExpressionEncoder { case other => val out = GetColumnByOrdinal(0, encoder.catalystRepr) - (out, other) + (out, other) } new ExpressionEncoder[T]( From f1e6b72543b055f1132172910ba1142789f7f013 Mon Sep 17 00:00:00 2001 From: Chris Twiner Date: Mon, 6 Jul 2020 19:54:32 +0200 Subject: [PATCH 08/17] nested types can't use ordinals directly in 3, bizarrely --- dataset/src/main/scala/frameless/RecordEncoder.scala | 11 +++++++---- dataset/src/main/scala/frameless/TypedDataset.scala | 6 +++--- .../main/scala/frameless/TypedExpressionEncoder.scala | 5 +++-- dataset/src/test/scala/frameless/InjectionTests.scala | 8 ++++++-- dataset/src/test/scala/frameless/SelectTests.scala | 1 + 5 files changed, 20 insertions(+), 11 deletions(-) diff --git a/dataset/src/main/scala/frameless/RecordEncoder.scala b/dataset/src/main/scala/frameless/RecordEncoder.scala index dca639c1..8b1d2d63 100644 --- a/dataset/src/main/scala/frameless/RecordEncoder.scala +++ b/dataset/src/main/scala/frameless/RecordEncoder.scala @@ -162,15 +162,18 @@ class RecordEncoder[F, G <: HList, H <: HList] def fromCatalyst(path: Expression): Expression = { val exprs = fields.value.value.map { field => - val fieldPath = path match { - case BoundReference(ordinal, dataType, nullable) => + val fieldPath = (field, path) match { + case (RecordEncoderField(_, _, r), BoundReference(ordinal, dataType, nullable) ) + if r.getClass.getName == "frameless.RecordEncoder" && fields.value.value.size == 1 => + GetStructField(path, field.ordinal, Some(field.name)) + case (_, BoundReference(ordinal, dataType, nullable) ) => GetColumnByOrdinal(field.ordinal, field.encoder.jvmRepr) - case other => + case (_, other) => GetStructField(path, field.ordinal, Some(field.name)) } field.encoder.fromCatalyst(fieldPath) } - + // UnresolvedExtractValue( UnresolvedAttribute(Seq(field.name)), Literal(field.name)) val newArgs = newInstanceExprs.value.from(exprs) val newExpr = NewInstance(classTag.runtimeClass, newArgs, jvmRepr, propagateNull = true) path match { diff --git a/dataset/src/main/scala/frameless/TypedDataset.scala b/dataset/src/main/scala/frameless/TypedDataset.scala index a600532c..7e84107e 100644 --- a/dataset/src/main/scala/frameless/TypedDataset.scala +++ b/dataset/src/main/scala/frameless/TypedDataset.scala @@ -902,11 +902,11 @@ class TypedDataset[T] protected[frameless](val dataset: Dataset[T])(implicit val i2: Tupler.Aux[Out0, Out], i3: TypedEncoder[Out] ): TypedDataset[Out] = { - val selected = dataset.toDF() + val base = dataset.toDF() .select(columns.toList[UntypedExpression[T]].map(c => new Column(c.expr)):_*) - .as[Out](TypedExpressionEncoder[Out]) + val selected = base.as[Out](TypedExpressionEncoder[Out]) - TypedDataset.create[Out](selected) + TypedDataset.create[Out](selected) } } diff --git a/dataset/src/main/scala/frameless/TypedExpressionEncoder.scala b/dataset/src/main/scala/frameless/TypedExpressionEncoder.scala index c5c1bbf6..1e060734 100644 --- a/dataset/src/main/scala/frameless/TypedExpressionEncoder.scala +++ b/dataset/src/main/scala/frameless/TypedExpressionEncoder.scala @@ -26,10 +26,10 @@ object TypedExpressionEncoder { val in = BoundReference(0, encoder.jvmRepr, encoder.nullable) val (out, serializer) = encoder.toCatalyst(in) match { - case a @If(_, _, x: CreateNamedStruct) => + case it @ If(a, b, x: CreateNamedStruct) => val out = BoundReference(0, encoder.catalystRepr, encoder.nullable) - (out, a) + (out, it) case other => val out = GetColumnByOrdinal(0, encoder.catalystRepr) @@ -43,3 +43,4 @@ object TypedExpressionEncoder { ) } } + diff --git a/dataset/src/test/scala/frameless/InjectionTests.scala b/dataset/src/test/scala/frameless/InjectionTests.scala index 4b3e88fd..e191d229 100644 --- a/dataset/src/test/scala/frameless/InjectionTests.scala +++ b/dataset/src/test/scala/frameless/InjectionTests.scala @@ -83,7 +83,11 @@ object I { class InjectionTests extends TypedDatasetSuite { test("Injection based encoders") { - check(forAll(prop[Country] _)) + /* + causes an eav in the jvm + J 27604 C2 org.apache.spark.sql.catalyst.expressions.UnsafeRow.getLong(I)J (18 bytes) + + check(forAll(prop[Country] _)) check(forAll(prop[LocalDateTime] _)) check(forAll(prop[Food] _)) check(forAll(prop[X1[Country]] _)) @@ -111,7 +115,7 @@ class InjectionTests extends TypedDatasetSuite { assert(TypedEncoder[I[Int]].catalystRepr == TypedEncoder[Int].catalystRepr) assert(TypedEncoder[I[I[Int]]].catalystRepr == TypedEncoder[Int].catalystRepr) - assert(TypedEncoder[I[Option[Int]]].nullable) + assert(TypedEncoder[I[Option[Int]]].nullable) */ } test("TypedEncoder[Person] is ambiguous") { diff --git a/dataset/src/test/scala/frameless/SelectTests.scala b/dataset/src/test/scala/frameless/SelectTests.scala index 8043fc94..d99f3969 100644 --- a/dataset/src/test/scala/frameless/SelectTests.scala +++ b/dataset/src/test/scala/frameless/SelectTests.scala @@ -6,6 +6,7 @@ import shapeless.test.illTyped import scala.reflect.ClassTag class SelectTests extends TypedDatasetSuite { + test("select('a) FROM abcd") { def prop[A, B, C, D](data: Vector[X4[A, B, C, D]])( implicit From a1d006b13d5094cb62860f893a878a960601ff6a Mon Sep 17 00:00:00 2001 From: Chris Twiner Date: Mon, 6 Jul 2020 22:30:14 +0200 Subject: [PATCH 09/17] the paths are wrong for options, the nested types 'solution' doesn't work though --- .../main/scala/frameless/InjectEncoder.scala | 25 +++++++ .../main/scala/frameless/OptionEncoder.scala | 60 ++++++++++++++++ .../main/scala/frameless/RecordEncoder.scala | 8 ++- .../main/scala/frameless/TypedEncoder.scala | 71 +------------------ .../test/scala/frameless/ops/CubeTests.scala | 4 +- 5 files changed, 96 insertions(+), 72 deletions(-) create mode 100644 dataset/src/main/scala/frameless/InjectEncoder.scala create mode 100644 dataset/src/main/scala/frameless/OptionEncoder.scala diff --git a/dataset/src/main/scala/frameless/InjectEncoder.scala b/dataset/src/main/scala/frameless/InjectEncoder.scala new file mode 100644 index 00000000..a42966a4 --- /dev/null +++ b/dataset/src/main/scala/frameless/InjectEncoder.scala @@ -0,0 +1,25 @@ +package frameless + +import org.apache.spark.sql.FramelessInternals +import org.apache.spark.sql.catalyst.expressions.{Expression, Literal} +import org.apache.spark.sql.catalyst.expressions.objects.Invoke +import org.apache.spark.sql.types.DataType + +import scala.reflect.ClassTag + +case class InjectEncoder[A: ClassTag, B]() + (implicit inj: Injection[A, B], trb: TypedEncoder[B]) extends TypedEncoder[A] { + def nullable: Boolean = trb.nullable + def jvmRepr: DataType = FramelessInternals.objectTypeFor[A](classTag) + def catalystRepr: DataType = trb.catalystRepr + + def fromCatalyst(path: Expression): Expression = { + val bexpr = trb.fromCatalyst(path) + Invoke(Literal.fromObject(inj), "invert", jvmRepr, Seq(bexpr)) + } + + def toCatalyst(path: Expression): Expression = { + val invoke = Invoke(Literal.fromObject(inj), "apply", trb.jvmRepr, Seq(path)) + trb.toCatalyst(invoke) + } +} diff --git a/dataset/src/main/scala/frameless/OptionEncoder.scala b/dataset/src/main/scala/frameless/OptionEncoder.scala new file mode 100644 index 00000000..736346eb --- /dev/null +++ b/dataset/src/main/scala/frameless/OptionEncoder.scala @@ -0,0 +1,60 @@ +package frameless + +import org.apache.spark.sql.FramelessInternals +import org.apache.spark.sql.catalyst.ScalaReflection +import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.spark.sql.catalyst.expressions.objects.{Invoke, UnwrapOption, WrapOption} +import org.apache.spark.sql.types.{BooleanType, ByteType, DataType, DoubleType, FloatType, IntegerType, LongType, ShortType} + +case class OptionEncoder[A]()(implicit underlying: TypedEncoder[A]) extends TypedEncoder[Option[A]] { + def nullable: Boolean = true + + def jvmRepr: DataType = FramelessInternals.objectTypeFor[Option[A]](classTag) + def catalystRepr: DataType = underlying.catalystRepr + + def toCatalyst(path: Expression): Expression = { + // for primitive types we must manually unbox the value of the object + underlying.jvmRepr match { + case IntegerType => + Invoke( + UnwrapOption(ScalaReflection.dataTypeFor[java.lang.Integer], path), + "intValue", + IntegerType) + case LongType => + Invoke( + UnwrapOption(ScalaReflection.dataTypeFor[java.lang.Long], path), + "longValue", + LongType) + case DoubleType => + Invoke( + UnwrapOption(ScalaReflection.dataTypeFor[java.lang.Double], path), + "doubleValue", + DoubleType) + case FloatType => + Invoke( + UnwrapOption(ScalaReflection.dataTypeFor[java.lang.Float], path), + "floatValue", + FloatType) + case ShortType => + Invoke( + UnwrapOption(ScalaReflection.dataTypeFor[java.lang.Short], path), + "shortValue", + ShortType) + case ByteType => + Invoke( + UnwrapOption(ScalaReflection.dataTypeFor[java.lang.Byte], path), + "byteValue", + ByteType) + case BooleanType => + Invoke( + UnwrapOption(ScalaReflection.dataTypeFor[java.lang.Boolean], path), + "booleanValue", + BooleanType) + + case _ => underlying.toCatalyst(UnwrapOption(underlying.jvmRepr, path)) + } + } + + def fromCatalyst(path: Expression): Expression = + WrapOption(underlying.fromCatalyst(path), underlying.jvmRepr) +} diff --git a/dataset/src/main/scala/frameless/RecordEncoder.scala b/dataset/src/main/scala/frameless/RecordEncoder.scala index 8b1d2d63..8ed121eb 100644 --- a/dataset/src/main/scala/frameless/RecordEncoder.scala +++ b/dataset/src/main/scala/frameless/RecordEncoder.scala @@ -164,10 +164,16 @@ class RecordEncoder[F, G <: HList, H <: HList] val exprs = fields.value.value.map { field => val fieldPath = (field, path) match { case (RecordEncoderField(_, _, r), BoundReference(ordinal, dataType, nullable) ) - if r.getClass.getName == "frameless.RecordEncoder" && fields.value.value.size == 1 => + if (r.getClass.getName == "frameless.RecordEncoder" + || r.getClass.getName == "frameless.InjectEncoder" + ) && fields.value.value.size == 1 => GetStructField(path, field.ordinal, Some(field.name)) case (_, BoundReference(ordinal, dataType, nullable) ) => GetColumnByOrdinal(field.ordinal, field.encoder.jvmRepr) + case (RecordEncoderField(_, _, r), other) + if r.getClass.getName == "frameless.OptionEncoder" => + //UnresolvedAttribute("a.a.a") + GetStructField(path, field.ordinal, Some(field.name)) case (_, other) => GetStructField(path, field.ordinal, Some(field.name)) } diff --git a/dataset/src/main/scala/frameless/TypedEncoder.scala b/dataset/src/main/scala/frameless/TypedEncoder.scala index 47104e8a..1bdcaab0 100644 --- a/dataset/src/main/scala/frameless/TypedEncoder.scala +++ b/dataset/src/main/scala/frameless/TypedEncoder.scala @@ -328,78 +328,11 @@ object TypedEncoder { encodeB.nullable) } - implicit def optionEncoder[A](implicit underlying: TypedEncoder[A]): TypedEncoder[Option[A]] = - new TypedEncoder[Option[A]] { - def nullable: Boolean = true - - def jvmRepr: DataType = FramelessInternals.objectTypeFor[Option[A]](classTag) - def catalystRepr: DataType = underlying.catalystRepr - - def toCatalyst(path: Expression): Expression = { - // for primitive types we must manually unbox the value of the object - underlying.jvmRepr match { - case IntegerType => - Invoke( - UnwrapOption(ScalaReflection.dataTypeFor[java.lang.Integer], path), - "intValue", - IntegerType) - case LongType => - Invoke( - UnwrapOption(ScalaReflection.dataTypeFor[java.lang.Long], path), - "longValue", - LongType) - case DoubleType => - Invoke( - UnwrapOption(ScalaReflection.dataTypeFor[java.lang.Double], path), - "doubleValue", - DoubleType) - case FloatType => - Invoke( - UnwrapOption(ScalaReflection.dataTypeFor[java.lang.Float], path), - "floatValue", - FloatType) - case ShortType => - Invoke( - UnwrapOption(ScalaReflection.dataTypeFor[java.lang.Short], path), - "shortValue", - ShortType) - case ByteType => - Invoke( - UnwrapOption(ScalaReflection.dataTypeFor[java.lang.Byte], path), - "byteValue", - ByteType) - case BooleanType => - Invoke( - UnwrapOption(ScalaReflection.dataTypeFor[java.lang.Boolean], path), - "booleanValue", - BooleanType) - - case _ => underlying.toCatalyst(UnwrapOption(underlying.jvmRepr, path)) - } - } - - def fromCatalyst(path: Expression): Expression = - WrapOption(underlying.fromCatalyst(path), underlying.jvmRepr) - } + implicit def optionEncoder[A](implicit underlying: TypedEncoder[A]): TypedEncoder[Option[A]] = OptionEncoder() /** Encodes things using injection if there is one defined */ implicit def usingInjection[A: ClassTag, B] - (implicit inj: Injection[A, B], trb: TypedEncoder[B]): TypedEncoder[A] = - new TypedEncoder[A] { - def nullable: Boolean = trb.nullable - def jvmRepr: DataType = FramelessInternals.objectTypeFor[A](classTag) - def catalystRepr: DataType = trb.catalystRepr - - def fromCatalyst(path: Expression): Expression = { - val bexpr = trb.fromCatalyst(path) - Invoke(Literal.fromObject(inj), "invert", jvmRepr, Seq(bexpr)) - } - - def toCatalyst(path: Expression): Expression = { - val invoke = Invoke(Literal.fromObject(inj), "apply", trb.jvmRepr, Seq(path)) - trb.toCatalyst(invoke) - } - } + (implicit inj: Injection[A, B], trb: TypedEncoder[B]): TypedEncoder[A] = InjectEncoder() /** Encodes things as records if there is no Injection defined */ implicit def usingDerivation[F, G <: HList, H <: HList] diff --git a/dataset/src/test/scala/frameless/ops/CubeTests.scala b/dataset/src/test/scala/frameless/ops/CubeTests.scala index eb389c55..9b7afe63 100644 --- a/dataset/src/test/scala/frameless/ops/CubeTests.scala +++ b/dataset/src/test/scala/frameless/ops/CubeTests.scala @@ -291,8 +291,8 @@ class CubeTests extends TypedDatasetSuite { datasetGrouped ?= dataGrouped } - check(forAll(prop[Short, Option[Short]] _)) - check(forAll(prop[Option[Short], Short] _)) + // check(forAll(prop[Short, Option[Short]] _)) + // check(forAll(prop[Option[Short], Short] _)) check(forAll(prop[X1[Option[Short]], Short] _)) } From 729c56d10d2739d26de9e8d2f9db54c70cdc85ca Mon Sep 17 00:00:00 2001 From: Chris Twiner Date: Tue, 7 Jul 2020 17:50:20 +0200 Subject: [PATCH 10/17] got the basics, wrong starting place for record encoders, will fix up to minimal changes --- .../main/scala/frameless/OptionEncoder.scala | 37 +++++++++++++++++-- .../main/scala/frameless/RecordEncoder.scala | 20 ++++++---- .../frameless/TypedExpressionEncoder.scala | 21 +++++++---- 3 files changed, 60 insertions(+), 18 deletions(-) diff --git a/dataset/src/main/scala/frameless/OptionEncoder.scala b/dataset/src/main/scala/frameless/OptionEncoder.scala index 736346eb..d1443107 100644 --- a/dataset/src/main/scala/frameless/OptionEncoder.scala +++ b/dataset/src/main/scala/frameless/OptionEncoder.scala @@ -1,10 +1,11 @@ package frameless import org.apache.spark.sql.FramelessInternals -import org.apache.spark.sql.catalyst.ScalaReflection -import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode, FalseLiteral} +import org.apache.spark.sql.catalyst.{InternalRow, ScalaReflection} +import org.apache.spark.sql.catalyst.expressions.{Expression, NonSQLExpression, UnaryExpression} import org.apache.spark.sql.catalyst.expressions.objects.{Invoke, UnwrapOption, WrapOption} -import org.apache.spark.sql.types.{BooleanType, ByteType, DataType, DoubleType, FloatType, IntegerType, LongType, ShortType} +import org.apache.spark.sql.types.{BooleanType, ByteType, DataType, DoubleType, FloatType, IntegerType, LongType, ObjectType, ShortType} case class OptionEncoder[A]()(implicit underlying: TypedEncoder[A]) extends TypedEncoder[Option[A]] { def nullable: Boolean = true @@ -58,3 +59,33 @@ case class OptionEncoder[A]()(implicit underlying: TypedEncoder[A]) extends Type def fromCatalyst(path: Expression): Expression = WrapOption(underlying.fromCatalyst(path), underlying.jvmRepr) } + +import org.apache.spark.sql.catalyst.expressions.codegen.Block._ + +/** + * Converts the result of evaluating `child` into an option, checking both the isNull bit and + * (in the case of reference types) equality with null. + * + * @param child The expression to evaluate and wrap. + * @param optType The type of this option. + */ +case class FramelessWrapOption(child: Expression, optType: DataType) + extends UnaryExpression with NonSQLExpression { + + override def dataType: DataType = ObjectType(classOf[Option[_]]) + + override def nullable: Boolean = false + + override def eval(input: InternalRow): Any = Option(child.eval(input)) + + override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { + val inputObject = child.genCode(ctx) + + val code = inputObject.code + code""" + scala.Option ${ev.value} = + ${inputObject.isNull} ? + scala.Option$$.MODULE$$.apply(null) : new scala.Some(${inputObject.value}); + """ + ev.copy(code = code, isNull = FalseLiteral) + } +} \ No newline at end of file diff --git a/dataset/src/main/scala/frameless/RecordEncoder.scala b/dataset/src/main/scala/frameless/RecordEncoder.scala index 8ed121eb..134b0be8 100644 --- a/dataset/src/main/scala/frameless/RecordEncoder.scala +++ b/dataset/src/main/scala/frameless/RecordEncoder.scala @@ -163,31 +163,35 @@ class RecordEncoder[F, G <: HList, H <: HList] def fromCatalyst(path: Expression): Expression = { val exprs = fields.value.value.map { field => val fieldPath = (field, path) match { - case (RecordEncoderField(_, _, r), BoundReference(ordinal, dataType, nullable) ) + /*case (RecordEncoderField(_, _, r), BoundReference(ordinal, dataType, nullable) ) if (r.getClass.getName == "frameless.RecordEncoder" || r.getClass.getName == "frameless.InjectEncoder" ) && fields.value.value.size == 1 => - GetStructField(path, field.ordinal, Some(field.name)) + GetStructField(path, field.ordinal, Some(field.name))*/ case (_, BoundReference(ordinal, dataType, nullable) ) => GetColumnByOrdinal(field.ordinal, field.encoder.jvmRepr) - case (RecordEncoderField(_, _, r), other) + /*case (RecordEncoderField(_, _, r), other) if r.getClass.getName == "frameless.OptionEncoder" => //UnresolvedAttribute("a.a.a") - GetStructField(path, field.ordinal, Some(field.name)) + GetStructField(path, field.ordinal, Some(field.name))*/ case (_, other) => GetStructField(path, field.ordinal, Some(field.name)) } field.encoder.fromCatalyst(fieldPath) } + val nullExpr = Literal.create(null, jvmRepr) + // UnresolvedExtractValue( UnresolvedAttribute(Seq(field.name)), Literal(field.name)) val newArgs = newInstanceExprs.value.from(exprs) val newExpr = NewInstance(classTag.runtimeClass, newArgs, jvmRepr, propagateNull = true) - path match { - case BoundReference(0, _, _) => newExpr + If(IsNull(path), nullExpr, newExpr) + +/* path match { + case BoundReference(0, _, _) => + newExpr case _ => { - val nullExpr = Literal.create(null, jvmRepr) If(IsNull(path), nullExpr, newExpr) } - } + } */ } } diff --git a/dataset/src/main/scala/frameless/TypedExpressionEncoder.scala b/dataset/src/main/scala/frameless/TypedExpressionEncoder.scala index 1e060734..c1986ae7 100644 --- a/dataset/src/main/scala/frameless/TypedExpressionEncoder.scala +++ b/dataset/src/main/scala/frameless/TypedExpressionEncoder.scala @@ -3,7 +3,7 @@ package frameless import org.apache.spark.sql.Encoder import org.apache.spark.sql.catalyst.analysis.GetColumnByOrdinal import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder -import org.apache.spark.sql.catalyst.expressions.{BoundReference, CreateNamedStruct, If} +import org.apache.spark.sql.catalyst.expressions.{BoundReference, CreateNamedStruct, If, IsNull, Literal} import org.apache.spark.sql.types.StructType object TypedExpressionEncoder { @@ -25,20 +25,27 @@ object TypedExpressionEncoder { val encoder = TypedEncoder[T] val in = BoundReference(0, encoder.jvmRepr, encoder.nullable) - val (out, serializer) = encoder.toCatalyst(in) match { + val (objectDeserializer, serializer) = encoder.toCatalyst(in) match { case it @ If(a, b, x: CreateNamedStruct) => - val out = BoundReference(0, encoder.catalystRepr, encoder.nullable) - - (out, it) + // + val out = GetColumnByOrdinal(0, encoder.catalystRepr) + // 48 + BoundReference(0, encoder.catalystRepr, encoder.nullable) + + // will be newInstance +// 48 + (If(IsNull(GetColumnByOrdinal(0, encoder.catalystRepr)), Literal.create(null, encoder.catalystRepr), encoder.fromCatalyst(out)), it) + // 48, so no difference + (encoder.fromCatalyst(out), it) case other => val out = GetColumnByOrdinal(0, encoder.catalystRepr) - (out, other) + ( encoder.fromCatalyst(out), other) } new ExpressionEncoder[T]( objSerializer = serializer, - objDeserializer = encoder.fromCatalyst(out), + objDeserializer = objectDeserializer, clsTag = encoder.classTag ) } From a1a7d5b6b7d465e10fb5e617a3e2ed0a8976e4c6 Mon Sep 17 00:00:00 2001 From: Chris Twiner Date: Tue, 7 Jul 2020 18:13:23 +0200 Subject: [PATCH 11/17] encoders ported, 347 tests passed, 7 failed --- .../main/scala/frameless/InjectEncoder.scala | 25 ----- .../main/scala/frameless/OptionEncoder.scala | 91 ------------------- .../main/scala/frameless/RecordEncoder.scala | 21 +---- .../main/scala/frameless/TypedEncoder.scala | 71 ++++++++++++++- .../frameless/TypedExpressionEncoder.scala | 21 ++--- .../test/scala/frameless/InjectionTests.scala | 8 +- .../test/scala/frameless/SelectTests.scala | 1 - .../test/scala/frameless/ops/CubeTests.scala | 4 +- 8 files changed, 82 insertions(+), 160 deletions(-) delete mode 100644 dataset/src/main/scala/frameless/InjectEncoder.scala delete mode 100644 dataset/src/main/scala/frameless/OptionEncoder.scala diff --git a/dataset/src/main/scala/frameless/InjectEncoder.scala b/dataset/src/main/scala/frameless/InjectEncoder.scala deleted file mode 100644 index a42966a4..00000000 --- a/dataset/src/main/scala/frameless/InjectEncoder.scala +++ /dev/null @@ -1,25 +0,0 @@ -package frameless - -import org.apache.spark.sql.FramelessInternals -import org.apache.spark.sql.catalyst.expressions.{Expression, Literal} -import org.apache.spark.sql.catalyst.expressions.objects.Invoke -import org.apache.spark.sql.types.DataType - -import scala.reflect.ClassTag - -case class InjectEncoder[A: ClassTag, B]() - (implicit inj: Injection[A, B], trb: TypedEncoder[B]) extends TypedEncoder[A] { - def nullable: Boolean = trb.nullable - def jvmRepr: DataType = FramelessInternals.objectTypeFor[A](classTag) - def catalystRepr: DataType = trb.catalystRepr - - def fromCatalyst(path: Expression): Expression = { - val bexpr = trb.fromCatalyst(path) - Invoke(Literal.fromObject(inj), "invert", jvmRepr, Seq(bexpr)) - } - - def toCatalyst(path: Expression): Expression = { - val invoke = Invoke(Literal.fromObject(inj), "apply", trb.jvmRepr, Seq(path)) - trb.toCatalyst(invoke) - } -} diff --git a/dataset/src/main/scala/frameless/OptionEncoder.scala b/dataset/src/main/scala/frameless/OptionEncoder.scala deleted file mode 100644 index d1443107..00000000 --- a/dataset/src/main/scala/frameless/OptionEncoder.scala +++ /dev/null @@ -1,91 +0,0 @@ -package frameless - -import org.apache.spark.sql.FramelessInternals -import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode, FalseLiteral} -import org.apache.spark.sql.catalyst.{InternalRow, ScalaReflection} -import org.apache.spark.sql.catalyst.expressions.{Expression, NonSQLExpression, UnaryExpression} -import org.apache.spark.sql.catalyst.expressions.objects.{Invoke, UnwrapOption, WrapOption} -import org.apache.spark.sql.types.{BooleanType, ByteType, DataType, DoubleType, FloatType, IntegerType, LongType, ObjectType, ShortType} - -case class OptionEncoder[A]()(implicit underlying: TypedEncoder[A]) extends TypedEncoder[Option[A]] { - def nullable: Boolean = true - - def jvmRepr: DataType = FramelessInternals.objectTypeFor[Option[A]](classTag) - def catalystRepr: DataType = underlying.catalystRepr - - def toCatalyst(path: Expression): Expression = { - // for primitive types we must manually unbox the value of the object - underlying.jvmRepr match { - case IntegerType => - Invoke( - UnwrapOption(ScalaReflection.dataTypeFor[java.lang.Integer], path), - "intValue", - IntegerType) - case LongType => - Invoke( - UnwrapOption(ScalaReflection.dataTypeFor[java.lang.Long], path), - "longValue", - LongType) - case DoubleType => - Invoke( - UnwrapOption(ScalaReflection.dataTypeFor[java.lang.Double], path), - "doubleValue", - DoubleType) - case FloatType => - Invoke( - UnwrapOption(ScalaReflection.dataTypeFor[java.lang.Float], path), - "floatValue", - FloatType) - case ShortType => - Invoke( - UnwrapOption(ScalaReflection.dataTypeFor[java.lang.Short], path), - "shortValue", - ShortType) - case ByteType => - Invoke( - UnwrapOption(ScalaReflection.dataTypeFor[java.lang.Byte], path), - "byteValue", - ByteType) - case BooleanType => - Invoke( - UnwrapOption(ScalaReflection.dataTypeFor[java.lang.Boolean], path), - "booleanValue", - BooleanType) - - case _ => underlying.toCatalyst(UnwrapOption(underlying.jvmRepr, path)) - } - } - - def fromCatalyst(path: Expression): Expression = - WrapOption(underlying.fromCatalyst(path), underlying.jvmRepr) -} - -import org.apache.spark.sql.catalyst.expressions.codegen.Block._ - -/** - * Converts the result of evaluating `child` into an option, checking both the isNull bit and - * (in the case of reference types) equality with null. - * - * @param child The expression to evaluate and wrap. - * @param optType The type of this option. - */ -case class FramelessWrapOption(child: Expression, optType: DataType) - extends UnaryExpression with NonSQLExpression { - - override def dataType: DataType = ObjectType(classOf[Option[_]]) - - override def nullable: Boolean = false - - override def eval(input: InternalRow): Any = Option(child.eval(input)) - - override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { - val inputObject = child.genCode(ctx) - - val code = inputObject.code + code""" - scala.Option ${ev.value} = - ${inputObject.isNull} ? - scala.Option$$.MODULE$$.apply(null) : new scala.Some(${inputObject.value}); - """ - ev.copy(code = code, isNull = FalseLiteral) - } -} \ No newline at end of file diff --git a/dataset/src/main/scala/frameless/RecordEncoder.scala b/dataset/src/main/scala/frameless/RecordEncoder.scala index 134b0be8..942b4bce 100644 --- a/dataset/src/main/scala/frameless/RecordEncoder.scala +++ b/dataset/src/main/scala/frameless/RecordEncoder.scala @@ -163,35 +163,18 @@ class RecordEncoder[F, G <: HList, H <: HList] def fromCatalyst(path: Expression): Expression = { val exprs = fields.value.value.map { field => val fieldPath = (field, path) match { - /*case (RecordEncoderField(_, _, r), BoundReference(ordinal, dataType, nullable) ) - if (r.getClass.getName == "frameless.RecordEncoder" - || r.getClass.getName == "frameless.InjectEncoder" - ) && fields.value.value.size == 1 => - GetStructField(path, field.ordinal, Some(field.name))*/ case (_, BoundReference(ordinal, dataType, nullable) ) => GetColumnByOrdinal(field.ordinal, field.encoder.jvmRepr) - /*case (RecordEncoderField(_, _, r), other) - if r.getClass.getName == "frameless.OptionEncoder" => - //UnresolvedAttribute("a.a.a") - GetStructField(path, field.ordinal, Some(field.name))*/ case (_, other) => GetStructField(path, field.ordinal, Some(field.name)) } field.encoder.fromCatalyst(fieldPath) } - val nullExpr = Literal.create(null, jvmRepr) - // UnresolvedExtractValue( UnresolvedAttribute(Seq(field.name)), Literal(field.name)) val newArgs = newInstanceExprs.value.from(exprs) val newExpr = NewInstance(classTag.runtimeClass, newArgs, jvmRepr, propagateNull = true) - If(IsNull(path), nullExpr, newExpr) -/* path match { - case BoundReference(0, _, _) => - newExpr - case _ => { - If(IsNull(path), nullExpr, newExpr) - } - } */ + val nullExpr = Literal.create(null, jvmRepr) + If(IsNull(path), nullExpr, newExpr) } } diff --git a/dataset/src/main/scala/frameless/TypedEncoder.scala b/dataset/src/main/scala/frameless/TypedEncoder.scala index 1bdcaab0..47104e8a 100644 --- a/dataset/src/main/scala/frameless/TypedEncoder.scala +++ b/dataset/src/main/scala/frameless/TypedEncoder.scala @@ -328,11 +328,78 @@ object TypedEncoder { encodeB.nullable) } - implicit def optionEncoder[A](implicit underlying: TypedEncoder[A]): TypedEncoder[Option[A]] = OptionEncoder() + implicit def optionEncoder[A](implicit underlying: TypedEncoder[A]): TypedEncoder[Option[A]] = + new TypedEncoder[Option[A]] { + def nullable: Boolean = true + + def jvmRepr: DataType = FramelessInternals.objectTypeFor[Option[A]](classTag) + def catalystRepr: DataType = underlying.catalystRepr + + def toCatalyst(path: Expression): Expression = { + // for primitive types we must manually unbox the value of the object + underlying.jvmRepr match { + case IntegerType => + Invoke( + UnwrapOption(ScalaReflection.dataTypeFor[java.lang.Integer], path), + "intValue", + IntegerType) + case LongType => + Invoke( + UnwrapOption(ScalaReflection.dataTypeFor[java.lang.Long], path), + "longValue", + LongType) + case DoubleType => + Invoke( + UnwrapOption(ScalaReflection.dataTypeFor[java.lang.Double], path), + "doubleValue", + DoubleType) + case FloatType => + Invoke( + UnwrapOption(ScalaReflection.dataTypeFor[java.lang.Float], path), + "floatValue", + FloatType) + case ShortType => + Invoke( + UnwrapOption(ScalaReflection.dataTypeFor[java.lang.Short], path), + "shortValue", + ShortType) + case ByteType => + Invoke( + UnwrapOption(ScalaReflection.dataTypeFor[java.lang.Byte], path), + "byteValue", + ByteType) + case BooleanType => + Invoke( + UnwrapOption(ScalaReflection.dataTypeFor[java.lang.Boolean], path), + "booleanValue", + BooleanType) + + case _ => underlying.toCatalyst(UnwrapOption(underlying.jvmRepr, path)) + } + } + + def fromCatalyst(path: Expression): Expression = + WrapOption(underlying.fromCatalyst(path), underlying.jvmRepr) + } /** Encodes things using injection if there is one defined */ implicit def usingInjection[A: ClassTag, B] - (implicit inj: Injection[A, B], trb: TypedEncoder[B]): TypedEncoder[A] = InjectEncoder() + (implicit inj: Injection[A, B], trb: TypedEncoder[B]): TypedEncoder[A] = + new TypedEncoder[A] { + def nullable: Boolean = trb.nullable + def jvmRepr: DataType = FramelessInternals.objectTypeFor[A](classTag) + def catalystRepr: DataType = trb.catalystRepr + + def fromCatalyst(path: Expression): Expression = { + val bexpr = trb.fromCatalyst(path) + Invoke(Literal.fromObject(inj), "invert", jvmRepr, Seq(bexpr)) + } + + def toCatalyst(path: Expression): Expression = { + val invoke = Invoke(Literal.fromObject(inj), "apply", trb.jvmRepr, Seq(path)) + trb.toCatalyst(invoke) + } + } /** Encodes things as records if there is no Injection defined */ implicit def usingDerivation[F, G <: HList, H <: HList] diff --git a/dataset/src/main/scala/frameless/TypedExpressionEncoder.scala b/dataset/src/main/scala/frameless/TypedExpressionEncoder.scala index c1986ae7..c8fbf88d 100644 --- a/dataset/src/main/scala/frameless/TypedExpressionEncoder.scala +++ b/dataset/src/main/scala/frameless/TypedExpressionEncoder.scala @@ -3,7 +3,7 @@ package frameless import org.apache.spark.sql.Encoder import org.apache.spark.sql.catalyst.analysis.GetColumnByOrdinal import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder -import org.apache.spark.sql.catalyst.expressions.{BoundReference, CreateNamedStruct, If, IsNull, Literal} +import org.apache.spark.sql.catalyst.expressions.{BoundReference, CreateNamedStruct, If} import org.apache.spark.sql.types.StructType object TypedExpressionEncoder { @@ -25,27 +25,20 @@ object TypedExpressionEncoder { val encoder = TypedEncoder[T] val in = BoundReference(0, encoder.jvmRepr, encoder.nullable) - val (objectDeserializer, serializer) = encoder.toCatalyst(in) match { - case it @ If(a, b, x: CreateNamedStruct) => - // + val (out, serializer) = encoder.toCatalyst(in) match { + case it @ If(_, _, _: CreateNamedStruct) => val out = GetColumnByOrdinal(0, encoder.catalystRepr) - // 48 - BoundReference(0, encoder.catalystRepr, encoder.nullable) - - // will be newInstance -// 48 - (If(IsNull(GetColumnByOrdinal(0, encoder.catalystRepr)), Literal.create(null, encoder.catalystRepr), encoder.fromCatalyst(out)), it) - // 48, so no difference - (encoder.fromCatalyst(out), it) + + (out, it) case other => val out = GetColumnByOrdinal(0, encoder.catalystRepr) - ( encoder.fromCatalyst(out), other) + (out, other) } new ExpressionEncoder[T]( objSerializer = serializer, - objDeserializer = objectDeserializer, + objDeserializer = encoder.fromCatalyst(out), clsTag = encoder.classTag ) } diff --git a/dataset/src/test/scala/frameless/InjectionTests.scala b/dataset/src/test/scala/frameless/InjectionTests.scala index e191d229..4b3e88fd 100644 --- a/dataset/src/test/scala/frameless/InjectionTests.scala +++ b/dataset/src/test/scala/frameless/InjectionTests.scala @@ -83,11 +83,7 @@ object I { class InjectionTests extends TypedDatasetSuite { test("Injection based encoders") { - /* - causes an eav in the jvm - J 27604 C2 org.apache.spark.sql.catalyst.expressions.UnsafeRow.getLong(I)J (18 bytes) - - check(forAll(prop[Country] _)) + check(forAll(prop[Country] _)) check(forAll(prop[LocalDateTime] _)) check(forAll(prop[Food] _)) check(forAll(prop[X1[Country]] _)) @@ -115,7 +111,7 @@ class InjectionTests extends TypedDatasetSuite { assert(TypedEncoder[I[Int]].catalystRepr == TypedEncoder[Int].catalystRepr) assert(TypedEncoder[I[I[Int]]].catalystRepr == TypedEncoder[Int].catalystRepr) - assert(TypedEncoder[I[Option[Int]]].nullable) */ + assert(TypedEncoder[I[Option[Int]]].nullable) } test("TypedEncoder[Person] is ambiguous") { diff --git a/dataset/src/test/scala/frameless/SelectTests.scala b/dataset/src/test/scala/frameless/SelectTests.scala index d99f3969..8043fc94 100644 --- a/dataset/src/test/scala/frameless/SelectTests.scala +++ b/dataset/src/test/scala/frameless/SelectTests.scala @@ -6,7 +6,6 @@ import shapeless.test.illTyped import scala.reflect.ClassTag class SelectTests extends TypedDatasetSuite { - test("select('a) FROM abcd") { def prop[A, B, C, D](data: Vector[X4[A, B, C, D]])( implicit diff --git a/dataset/src/test/scala/frameless/ops/CubeTests.scala b/dataset/src/test/scala/frameless/ops/CubeTests.scala index 9b7afe63..eb389c55 100644 --- a/dataset/src/test/scala/frameless/ops/CubeTests.scala +++ b/dataset/src/test/scala/frameless/ops/CubeTests.scala @@ -291,8 +291,8 @@ class CubeTests extends TypedDatasetSuite { datasetGrouped ?= dataGrouped } - // check(forAll(prop[Short, Option[Short]] _)) - // check(forAll(prop[Option[Short], Short] _)) + check(forAll(prop[Short, Option[Short]] _)) + check(forAll(prop[Option[Short], Short] _)) check(forAll(prop[X1[Option[Short]], Short] _)) } From 7896e8a56354e6eaefdbb6be235ee0639cc6e275 Mon Sep 17 00:00:00 2001 From: Chris Twiner Date: Tue, 7 Jul 2020 18:35:57 +0200 Subject: [PATCH 12/17] sum and sumdistinct have the wrong types for the zero literal --- .../scala/frameless/functions/AggregateFunctions.scala | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/dataset/src/main/scala/frameless/functions/AggregateFunctions.scala b/dataset/src/main/scala/frameless/functions/AggregateFunctions.scala index 6b0b10a7..34354a28 100644 --- a/dataset/src/main/scala/frameless/functions/AggregateFunctions.scala +++ b/dataset/src/main/scala/frameless/functions/AggregateFunctions.scala @@ -63,9 +63,10 @@ trait AggregateFunctions { def sum[A, T, Out](column: TypedColumn[T, A])( implicit summable: CatalystSummable[A, Out], - oencoder: TypedEncoder[Out] + oencoder: TypedEncoder[Out], + aencoder: TypedEncoder[A] ): TypedAggregate[T, Out] = { - val zeroExpr = Literal.create(summable.zero, TypedEncoder[Out].catalystRepr) + val zeroExpr = Literal.create(summable.zero, TypedEncoder[A].catalystRepr) val sumExpr = expr(sparkFunctions.sum(column.untyped)) val sumOrZero = Coalesce(Seq(sumExpr, zeroExpr)) @@ -79,9 +80,10 @@ trait AggregateFunctions { def sumDistinct[A, T, Out](column: TypedColumn[T, A])( implicit summable: CatalystSummable[A, Out], - oencoder: TypedEncoder[Out] + oencoder: TypedEncoder[Out], + aencoder: TypedEncoder[A] ): TypedAggregate[T, Out] = { - val zeroExpr = Literal.create(summable.zero, TypedEncoder[Out].catalystRepr) + val zeroExpr = Literal.create(summable.zero, TypedEncoder[A].catalystRepr) val sumExpr = expr(sparkFunctions.sumDistinct(column.untyped)) val sumOrZero = Coalesce(Seq(sumExpr, zeroExpr)) From f39f550067c2d8949c4262afcb812822e7da6d00 Mon Sep 17 00:00:00 2001 From: Chris Twiner Date: Tue, 7 Jul 2020 18:56:05 +0200 Subject: [PATCH 13/17] add spark 2.4 join behaviour and enable ignoring of nullable --- .../test/scala/frameless/SchemaTests.scala | 19 ++++++++++++---- .../test/scala/frameless/SelfJoinTests.scala | 22 ++++++++++++++----- 2 files changed, 31 insertions(+), 10 deletions(-) diff --git a/dataset/src/test/scala/frameless/SchemaTests.scala b/dataset/src/test/scala/frameless/SchemaTests.scala index 6912fd61..92fd3305 100644 --- a/dataset/src/test/scala/frameless/SchemaTests.scala +++ b/dataset/src/test/scala/frameless/SchemaTests.scala @@ -2,18 +2,29 @@ package frameless import frameless.functions.aggregate._ import frameless.functions._ +import org.apache.spark.sql.types.StructType import org.scalacheck.Prop import org.scalacheck.Prop._ import org.scalatest.matchers.should.Matchers class SchemaTests extends TypedDatasetSuite with Matchers { - def prop[A](dataset: TypedDataset[A]): Prop = { + def structToNonNullable(struct: StructType): StructType = { + StructType(struct.fields.map( f => f.copy(nullable = false))) + } + + def prop[A](dataset: TypedDataset[A], ignoreNullable: Boolean = false): Prop = { val schema = dataset.dataset.schema Prop.all( - dataset.schema ?= schema, - TypedExpressionEncoder.targetStructType(dataset.encoder) ?= schema + if (!ignoreNullable) + dataset.schema ?= schema + else + structToNonNullable(dataset.schema) ?= structToNonNullable(schema), + if (!ignoreNullable) + TypedExpressionEncoder.targetStructType(dataset.encoder) ?= schema + else + structToNonNullable(TypedExpressionEncoder.targetStructType(dataset.encoder)) ?= structToNonNullable(schema) ) } @@ -24,7 +35,7 @@ class SchemaTests extends TypedDatasetSuite with Matchers { val df = df0.groupBy(_a).agg(sum(_b)) - check(prop(df)) + check(prop(df, true)) } test("schema of select(lit(1L))") { diff --git a/dataset/src/test/scala/frameless/SelfJoinTests.scala b/dataset/src/test/scala/frameless/SelfJoinTests.scala index 50275fb4..54268ecc 100644 --- a/dataset/src/test/scala/frameless/SelfJoinTests.scala +++ b/dataset/src/test/scala/frameless/SelfJoinTests.scala @@ -17,11 +17,20 @@ class SelfJoinTests extends TypedDatasetSuite { result } + def allowAmbiguousJoin[T](body: => T)(implicit session: SparkSession): T = { + val crossJoin = "spark.sql.analyzer.failAmbiguousSelfJoin" + val oldSetting = session.conf.get(crossJoin) + session.conf.set(crossJoin, "false") + val result = body + session.conf.set(crossJoin, oldSetting) + result + } + test("self join with colLeft/colRight disambiguation") { def prop[ A : TypedEncoder : Ordering, B : TypedEncoder : Ordering - ](dx: List[X2[A, B]], d: X2[A, B]): Prop = { + ](dx: List[X2[A, B]], d: X2[A, B]): Prop = allowAmbiguousJoin { val data = d :: dx val ds = TypedDataset.create(data) @@ -46,7 +55,8 @@ class SelfJoinTests extends TypedDatasetSuite { A : TypedEncoder : Ordering, B : TypedEncoder : Ordering ](dx: List[X2[A, B]], d: X2[A, B]): Prop = - allowTrivialJoin { + allowTrivialJoin { allowAmbiguousJoin { + val data = d :: dx val ds = TypedDataset.create(data) val untyped = ds.dataset @@ -60,7 +70,7 @@ class SelfJoinTests extends TypedDatasetSuite { val typed = ds.joinInner(ds)(ds.colLeft('a) === ds.colLeft('a)).count().run vanilla ?= typed - } + } } check(prop[Int, Int] _) } @@ -69,7 +79,7 @@ class SelfJoinTests extends TypedDatasetSuite { def prop[ A : TypedEncoder : CatalystNumeric : Ordering, B : TypedEncoder : Ordering - ](data: List[X3[A, A, B]]): Prop = { + ](data: List[X3[A, A, B]]): Prop = allowAmbiguousJoin { val ds = TypedDataset.create(data) val df1 = ds.dataset.alias("df1") @@ -94,7 +104,7 @@ class SelfJoinTests extends TypedDatasetSuite { A : TypedEncoder : CatalystNumeric : Ordering, B : TypedEncoder : Ordering ](data: List[X3[A, A, B]]): Prop = - allowTrivialJoin { + allowTrivialJoin { allowAmbiguousJoin { val ds = TypedDataset.create(data) // The point I'm making here is that it "behaves just like Spark". I @@ -109,7 +119,7 @@ class SelfJoinTests extends TypedDatasetSuite { ).count().run() vanilla ?= typed - } + } } check(prop[Int, Int] _) } From c5a70781c14aeb865919164c8f84d6be2177d800 Mon Sep 17 00:00:00 2001 From: Chris Twiner Date: Wed, 8 Jul 2020 00:15:39 +0200 Subject: [PATCH 14/17] version bump and readme change --- README.md | 6 +++--- version.sbt | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index d5dd9f53..299e021d 100644 --- a/README.md +++ b/README.md @@ -33,7 +33,7 @@ The compatible versions of [Spark](http://spark.apache.org/) and | 0.6.1 | 2.3.0 | 1.x | 0.8 | 2.11 | 0.7.0 | 2.3.1 | 1.x | 1.x | 2.11 | 0.8.0 | 2.4.0 | 1.x | 1.x | 2.11/2.12 - +| 0.9.0 | 3.0.0 | 1.x | 1.x | 2.12 Versions 0.5.x and 0.6.x have identical features. The first is compatible with Spark 2.2.1 and the second with 2.3.0. @@ -74,12 +74,12 @@ detailed comparison of `TypedDataset` with Spark's `Dataset` API. * [Proof of Concept: TypedDataFrame](http://typelevel.org/frameless/TypedDataFrame.html) ## Quick Start -Frameless is compiled against Scala 2.11.x (and Scala 2.12.x since Frameless 0.8.0) +Frameless is compiled against Scala 2.12.x To use Frameless in your project add the following in your `build.sbt` file as needed: ```scala -val framelessVersion = "0.8.0" // for Spark 2.4.0 +val framelessVersion = "0.9.0" // for Spark 3.0.0 libraryDependencies ++= List( "org.typelevel" %% "frameless-dataset" % framelessVersion, diff --git a/version.sbt b/version.sbt index e74ced49..338b0ba2 100644 --- a/version.sbt +++ b/version.sbt @@ -1 +1 @@ -version in ThisBuild := "0.8.1-SNAPSHOT" +version in ThisBuild := "0.9.0-SNAPSHOT" From 918ca8cba2e0bedc9e55b2641e79cc783b234704 Mon Sep 17 00:00:00 2001 From: Chris Twiner Date: Wed, 8 Jul 2020 00:30:55 +0200 Subject: [PATCH 15/17] remove 2.11 travis build bits --- .travis.yml | 15 +++------------ 1 file changed, 3 insertions(+), 12 deletions(-) diff --git a/.travis.yml b/.travis.yml index e70c758a..dd1d8d84 100644 --- a/.travis.yml +++ b/.travis.yml @@ -7,22 +7,13 @@ jobs: include: - stage: Documentation 2.12 env: PHASE=A - scala: 2.12.8 - - stage: Documentation 2.11 - env: PHASE=A - scala: 2.11.12 + scala: 2.12.10 - stage: Unit Tests 2.12 env: PHASE=B - scala: 2.12.8 - - stage: Unit Tests 2.11 - env: PHASE=B - scala: 2.11.12 - - stage: Publish 2.11 - env: PHASE=C - scala: 2.11.12 + scala: 2.12.10 - stage: Publish 2.12 env: PHASE=C - scala: 2.12.8 + scala: 2.12.10 script: - scripts/travis-publish.sh From 9e0de29ce295a49a23e2f31729d7aea921c1fa3b Mon Sep 17 00:00:00 2001 From: Chris Twiner Date: Wed, 8 Jul 2020 13:12:41 +0200 Subject: [PATCH 16/17] #427 - Remove dead code --- dataset/src/main/scala/frameless/RecordEncoder.scala | 9 +-------- .../scala/org/apache/spark/sql/FramelessInternals.scala | 2 -- 2 files changed, 1 insertion(+), 10 deletions(-) diff --git a/dataset/src/main/scala/frameless/RecordEncoder.scala b/dataset/src/main/scala/frameless/RecordEncoder.scala index 942b4bce..0beaba5b 100644 --- a/dataset/src/main/scala/frameless/RecordEncoder.scala +++ b/dataset/src/main/scala/frameless/RecordEncoder.scala @@ -1,7 +1,6 @@ package frameless import org.apache.spark.sql.FramelessInternals -import org.apache.spark.sql.catalyst.analysis.GetColumnByOrdinal import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.objects.{Invoke, NewInstance} import org.apache.spark.sql.types._ @@ -162,13 +161,7 @@ class RecordEncoder[F, G <: HList, H <: HList] def fromCatalyst(path: Expression): Expression = { val exprs = fields.value.value.map { field => - val fieldPath = (field, path) match { - case (_, BoundReference(ordinal, dataType, nullable) ) => - GetColumnByOrdinal(field.ordinal, field.encoder.jvmRepr) - case (_, other) => - GetStructField(path, field.ordinal, Some(field.name)) - } - field.encoder.fromCatalyst(fieldPath) + field.encoder.fromCatalyst( GetStructField(path, field.ordinal, Some(field.name)) ) } val newArgs = newInstanceExprs.value.from(exprs) diff --git a/dataset/src/main/scala/org/apache/spark/sql/FramelessInternals.scala b/dataset/src/main/scala/org/apache/spark/sql/FramelessInternals.scala index 6ee77c2a..842e04cf 100644 --- a/dataset/src/main/scala/org/apache/spark/sql/FramelessInternals.scala +++ b/dataset/src/main/scala/org/apache/spark/sql/FramelessInternals.scala @@ -22,8 +22,6 @@ object FramelessInternals { } } - def asNullable(dt: DataType): DataType = dt.asNullable - def expr(column: Column): Expression = column.expr def column(column: Column): Expression = column.expr From 69ae5911c6ac51e28616ed91a475bb530d8370b1 Mon Sep 17 00:00:00 2001 From: Chris Twiner Date: Mon, 27 Jul 2020 11:31:19 +0200 Subject: [PATCH 17/17] #427 - Based on feedback on the pull, reverting test changes - no longer necessary --- README.md | 6 +++++- dataset/src/test/scala/frameless/ops/PivotTest.scala | 8 ++++---- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/README.md b/README.md index 299e021d..3a1c814c 100644 --- a/README.md +++ b/README.md @@ -48,6 +48,10 @@ This essentially allows you to use any version of Frameless with any version of The aforementioned table simply provides the versions of Spark we officially compile and test Frameless with, but other versions may probably work as well. +### Breaking changes in 0.9 + +* Spark 3 introduces a new ExpressionEncoder approach, the schema for single value DataFrame's is now ["value"](https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala#L270) not "_1". + ## Why? Frameless introduces a new Spark API, called `TypedDataset`. @@ -74,7 +78,7 @@ detailed comparison of `TypedDataset` with Spark's `Dataset` API. * [Proof of Concept: TypedDataFrame](http://typelevel.org/frameless/TypedDataFrame.html) ## Quick Start -Frameless is compiled against Scala 2.12.x +Since the 0.9.x release, Frameless is compiled only against Scala 2.12.x. To use Frameless in your project add the following in your `build.sbt` file as needed: diff --git a/dataset/src/test/scala/frameless/ops/PivotTest.scala b/dataset/src/test/scala/frameless/ops/PivotTest.scala index de3715ac..dd9bf5e6 100644 --- a/dataset/src/test/scala/frameless/ops/PivotTest.scala +++ b/dataset/src/test/scala/frameless/ops/PivotTest.scala @@ -8,11 +8,11 @@ import org.scalacheck.Prop._ import org.scalacheck.{Gen, Prop} class PivotTest extends TypedDatasetSuite { - def withCustomGenX4: Gen[Vector[X4[String, String, Long, Boolean]]] = { - val kvPairGen: Gen[X4[String, String, Long, Boolean]] = for { + def withCustomGenX4: Gen[Vector[X4[String, String, Int, Boolean]]] = { + val kvPairGen: Gen[X4[String, String, Int, Boolean]] = for { a <- Gen.oneOf(Seq("1", "2", "3", "4")) b <- Gen.oneOf(Seq("a", "b", "c")) - c <- arbitrary[Long] + c <- arbitrary[Int] d <- arbitrary[Boolean] } yield X4(a, b, c, d) @@ -20,7 +20,7 @@ class PivotTest extends TypedDatasetSuite { } test("X4[Boolean, String, Int, Boolean] pivot on String") { - def prop(data: Vector[X4[String, String, Long, Boolean]]): Prop = { + def prop(data: Vector[X4[String, String, Int, Boolean]]): Prop = { val d = TypedDataset.create(data) val frameless = d.groupBy(d('a)). pivot(d('b)).on("a", "b", "c").