Skip to content

Commit

Permalink
make TypedSchemaDef a proper type class
Browse files Browse the repository at this point in the history
  • Loading branch information
mjakubowski84 committed Apr 2, 2022
1 parent 2e353c8 commit 0fa717a
Show file tree
Hide file tree
Showing 15 changed files with 57 additions and 47 deletions.
21 changes: 11 additions & 10 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@ ThisBuild / version := "2.4.0-SNAPSHOT"
ThisBuild / isSnapshot := false
ThisBuild / scalaVersion := twoThirteen

val akkaStreamKafkaVersion:String =if(scalaVersion == twoThirteen) {"3.0.0"} else {"2.1.1"}

ThisBuild / javacOptions ++= Seq("-source", "1.8", "-target", "1.8")
ThisBuild / resolvers := Seq(
Opts.resolver.sonatypeReleases,
Expand Down Expand Up @@ -95,12 +93,12 @@ lazy val core = (project in file("core"))
exclude (org = "org.slf4j", name = "slf4j-api"),
"org.apache.hadoop" % "hadoop-client" % hadoopVersion % Provided,
"org.slf4j" % "slf4j-api" % slf4jVersion,
"org.scala-lang.modules" %% "scala-collection-compat" % "2.6.0",
"org.scala-lang.modules" %% "scala-collection-compat" % "2.7.0",

// tests
"org.mockito" % "mockito-core" % "4.3.1" % "test",
"org.scalatest" %% "scalatest" % "3.2.11" % "test,it",
"ch.qos.logback" % "logback-classic" % "1.2.10" % "test,it",
"ch.qos.logback" % "logback-classic" % "1.2.11" % "test,it",
"org.slf4j" % "log4j-over-slf4j" % slf4jVersion % "test,it"
) ++ {
CrossVersion.partialVersion(scalaBinaryVersion.value) match {
Expand Down Expand Up @@ -161,16 +159,19 @@ lazy val fs2 = (project in file("fs2"))
lazy val examples = (project in file("examples"))
.settings(
name := "parquet4s-examples",
crossScalaVersions := supportedScalaVersions,
crossScalaVersions := Seq(twoTwelve, twoThirteen),
publish / skip := true,
publishLocal / skip := true,
libraryDependencies ++= Seq(
"org.apache.hadoop" % "hadoop-client" % hadoopVersion,
"io.github.embeddedkafka" %% "embedded-kafka" % "3.1.0",
"ch.qos.logback" % "logback-classic" % "1.2.10",
"ch.qos.logback" % "logback-classic" % "1.2.11",
"org.slf4j" % "log4j-over-slf4j" % slf4jVersion,
"com.typesafe.akka" %% "akka-stream-kafka" % akkaStreamKafkaVersion,
"com.github.fd4s" %% "fs2-kafka" % "2.3.0",
"com.typesafe.akka" %% "akka-stream-kafka" % {
if (scalaVersion.value == twoThirteen) { "3.0.0" }
else { "2.1.1" }
},
"com.github.fd4s" %% "fs2-kafka" % "2.4.0",
"co.fs2" %% "fs2-io" % fs2Version
),
excludeDependencies ++= Seq(
Expand Down Expand Up @@ -251,7 +252,7 @@ lazy val documentation = (project in file("site"))
.settings(
publish / skip := true,
libraryDependencies ++= Seq(
"org.scalameta" %% "mdoc" % "2.3.0",
"org.scalameta" %% "mdoc" % "2.3.1",
"org.apache.hadoop" % "hadoop-client" % hadoopVersion,
"org.slf4j" % "slf4j-nop" % slf4jVersion,
"org.slf4j" % "log4j-over-slf4j" % slf4jVersion
Expand All @@ -260,7 +261,7 @@ lazy val documentation = (project in file("site"))
ExclusionRule("org.scala-lang.modules", "scala-collection-compat_2.13")
),
dependencyOverrides ++= Seq(
"org.scala-lang.modules" %% "scala-collection-compat" % "2.6.0"
"org.scala-lang.modules" %% "scala-collection-compat" % "2.7.0"
)
)
.dependsOn(core, akka, fs2)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package com.github.mjakubowski84.parquet4s

import com.github.mjakubowski84.parquet4s.ParquetSchemaResolver.TypedSchemaDef
import com.github.mjakubowski84.parquet4s.ValueImplicits.*
import org.apache.parquet.schema.{LogicalTypeAnnotation, MessageType, PrimitiveType}
import org.scalatest.BeforeAndAfter
Expand Down Expand Up @@ -29,7 +28,7 @@ class CustomTypeITSpec extends AnyFlatSpec with Matchers with BeforeAndAfter wit
override def hashCode(): Int = value.hashCode
override def equals(obj: Any): Boolean =
if (!obj.isInstanceOf[SimpleClass]) false
else this.value == (obj.asInstanceOf[SimpleClass].value)
else this.value == obj.asInstanceOf[SimpleClass].value
override def toString: String = value
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.github.mjakubowski84.parquet4s

import com.github.mjakubowski84.parquet4s.ParquetSchemaResolver.TypedSchemaDef
import com.github.mjakubowski84.parquet4s.TypedSchemaDef
import com.github.mjakubowski84.parquet4s.ValueImplicits.*
import org.apache.parquet.schema.{LogicalTypeAnnotation, MessageType, PrimitiveType}
import org.scalatest.BeforeAndAfter
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.github.mjakubowski84.parquet4s

import com.github.mjakubowski84.parquet4s.ParquetSchemaResolver.TypedSchemaDef
import com.github.mjakubowski84.parquet4s.TypedSchemaDef
import org.apache.parquet.schema.{LogicalTypeAnnotation, MessageType, PrimitiveType, Types}
import org.scalatest.BeforeAndAfter
import org.scalatest.flatspec.AnyFlatSpec
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,9 @@ trait ParquetSchemaResolver[T] {

}

object ParquetSchemaResolver extends SchemaDefs {
object ParquetSchemaResolver {

trait Tag[V]
type TypedSchemaDef[V] = SchemaDef & Tag[V]
type TypedSchemaDef[V] = com.github.mjakubowski84.parquet4s.TypedSchemaDef[V]

class TypedSchemaDefInvoker[V](val schema: TypedSchemaDef[V], fieldName: String) extends (() => Type) {
override def apply(): Type = schema(fieldName)
Expand Down Expand Up @@ -109,7 +108,7 @@ object ParquetSchemaResolver extends SchemaDefs {
*/
implicit def productSchemaVisitor[V](implicit resolver: ParquetSchemaResolver[V]): SchemaVisitor[V] =
(cursor: Cursor, invoker: TypedSchemaDefInvoker[V]) =>
invoker.schema match {
invoker.schema.wrapped match {
// override fields only in generated groups (records), custom ones provided by users are not processed
case _: GroupSchemaDef if invoker.schema.metadata.contains(SchemaDef.Meta.Generated) =>
resolver.resolveSchema(cursor) match {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package com.github.mjakubowski84.parquet4s

import com.github.mjakubowski84.parquet4s.ParquetSchemaResolver.TypedSchemaDef
import shapeless.LowPriority

trait ProductSchemaDefs {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,9 @@ trait ParquetSchemaResolver[T] {

}

object ParquetSchemaResolver extends SchemaDefs {
object ParquetSchemaResolver {

trait Tag[V]
type TypedSchemaDef[V] = SchemaDef & Tag[V]
type TypedSchemaDef[V] = com.github.mjakubowski84.parquet4s.TypedSchemaDef[V]

class TypedSchemaDefInvoker[V](val schema: TypedSchemaDef[V], fieldName: String) extends (() => Type) {
override def apply(): Type = schema(fieldName)
Expand Down Expand Up @@ -109,7 +108,7 @@ object ParquetSchemaResolver extends SchemaDefs {
*/
implicit def productSchemaVisitor[V](implicit resolver: ParquetSchemaResolver[V]): SchemaVisitor[V] =
(cursor: Cursor, invoker: TypedSchemaDefInvoker[V]) =>
invoker.schema match {
invoker.schema.wrapped match {
// override fields only in generated groups (records), custom ones provided by users are not processed
case _: GroupSchemaDef if invoker.schema.metadata.contains(SchemaDef.Meta.Generated) =>
resolver.resolveSchema(cursor) match {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package com.github.mjakubowski84.parquet4s

import com.github.mjakubowski84.parquet4s.ParquetSchemaResolver.TypedSchemaDef
import shapeless.LowPriority

trait ProductSchemaDefs {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.github.mjakubowski84.parquet4s

import com.github.mjakubowski84.parquet4s.ParquetSchemaResolver.TypedSchemaDef
import com.github.mjakubowski84.parquet4s.{TypedSchemaDef => TSD}
import org.apache.parquet.schema.*
import org.slf4j.LoggerFactory

Expand Down Expand Up @@ -31,11 +31,9 @@ trait ParquetSchemaResolver[T]:
*/
def schemaName: Option[String] = None

object ParquetSchemaResolver extends SchemaDefs:
object ParquetSchemaResolver:

final abstract class Tag[V]
opaque type Tagged[+T] = Any
type TypedSchemaDef[V] = SchemaDef & Tagged[Tag[V]]
type TypedSchemaDef[V] = TSD[V]

final abstract private[ParquetSchemaResolver] class Fields[Labels <: Tuple, Values <: Tuple]

Expand Down Expand Up @@ -90,17 +88,17 @@ object ParquetSchemaResolver extends SchemaDefs:
null
)

given defaultSchemaVisitor[V: TypedSchemaDef](using NotGiven[ParquetSchemaResolver[V]]): SchemaVisitor[V] with
given defaultSchemaVisitor[V: TSD](using NotGiven[ParquetSchemaResolver[V]]): SchemaVisitor[V] with
def onActive(cursor: Cursor, fieldName: String): Option[Type] =
Option(summon[TypedSchemaDef[V]](fieldName))
Option(summon[TSD[V]](fieldName))

/** Purpose of productSchemaVisitor is to filter product fields so that those that are used for partitioning are not
* present in final schema. It is only applied to products that are not nested in Options and collections as optional
* fields and elements of collections are not valid for partitioning.
* present in the final schema. It is only applied to products that are not nested in Options and collections - as
* optional fields and elements of collections are not valid for partitioning.
*/
given productSchemaVisitor[V <: Product: ParquetSchemaResolver: TypedSchemaDef]: SchemaVisitor[V] with
given productSchemaVisitor[V <: Product: ParquetSchemaResolver: TSD]: SchemaVisitor[V] with
def onActive(cursor: Cursor, fieldName: String): Option[Type] =
summon[TypedSchemaDef[V]] match
summon[TSD[V]].wrapped match
// override fields only in generated groups (records), custom ones provided by users are not processed
case schema: GroupSchemaDef if schema.metadata.contains(SchemaDef.Meta.Generated) =>
summon[ParquetSchemaResolver[V]].resolveSchema(cursor) match
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package com.github.mjakubowski84.parquet4s

import com.github.mjakubowski84.parquet4s.ParquetSchemaResolver.TypedSchemaDef

import scala.reflect.{ClassTag, classTag}
import scala.util.NotGiven

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package com.github.mjakubowski84.parquet4s

import com.github.mjakubowski84.parquet4s.ParquetSchemaResolver.TypedSchemaDef
import org.apache.parquet.schema.Type

import scala.jdk.CollectionConverters.*
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.github.mjakubowski84.parquet4s

import com.github.mjakubowski84.parquet4s.ParquetSchemaResolver.TypedSchemaDef
import com.github.mjakubowski84.parquet4s.SchemaDef.Meta
import org.apache.parquet.schema.*
import org.apache.parquet.schema.LogicalTypeAnnotation.{
DateLogicalTypeAnnotation,
Expand Down Expand Up @@ -46,6 +46,8 @@ object Message {

}

/** Defines Parquet schema. Produces Parquet [[Type]] for a field of given name.
*/
trait SchemaDef {

type Self <: SchemaDef
Expand All @@ -54,7 +56,7 @@ trait SchemaDef {

def withRequired(required: Boolean): Self

def typed[V]: ParquetSchemaResolver.TypedSchemaDef[V] = this.asInstanceOf[TypedSchemaDef[V]]
def typed[V]: TypedSchemaDef[V] = TypedSchemaDef.wrap[V](this)

private[parquet4s] def metadata: Set[SchemaDef.Meta.Property]

Expand Down Expand Up @@ -185,9 +187,27 @@ private case class MapSchemaDef(key: Type, value: Type, required: Boolean, metad

}

trait SchemaDefs extends PrimitiveSchemaDefs with TimeValueSchemaDefs with ComplexSchemaDefs
/** [[SchemaDef]] for type [[V]].
*/
trait TypedSchemaDef[V] extends SchemaDef {
private[parquet4s] def wrapped: SchemaDef
}

object TypedSchemaDef extends PrimitiveSchemaDefs with TimeValueSchemaDefs with ComplexSchemaDefs {
private[parquet4s] def wrap[V](schemaDef: SchemaDef): TypedSchemaDef[V] =
new TypedSchemaDef[V] {
override val wrapped: SchemaDef = schemaDef
override type Self = TypedSchemaDef[V]
override def apply(name: String): Type = schemaDef.apply(name)
override def withRequired(required: Boolean): Self = wrap[V](schemaDef.withRequired(required))
override private[parquet4s] val metadata: Set[SchemaDef.Meta.Property] = schemaDef.metadata
override private[parquet4s] def withMetadata(meta: Meta.Property): Self =
wrap[V](schemaDef.withMetadata(meta))
}
}

trait PrimitiveSchemaDefs {

implicit val stringSchema: TypedSchemaDef[String] =
SchemaDef
.primitive(BINARY, required = false, logicalTypeAnnotation = Option(LogicalTypes.StringType))
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package com.github.mjakubowski84.parquet4s

import com.github.mjakubowski84.parquet4s.ParquetSchemaResolver.TypedSchemaDef
import org.apache.parquet.schema.{LogicalTypeAnnotation, PrimitiveType}

import scala.util.Random
Expand Down
8 changes: 4 additions & 4 deletions project/DependecyVersions.scala
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
object DependecyVersions {
val parquetVersion = "1.12.2"
val shapelessVersion = "2.3.8"
val shapelessVersion = "2.3.9"
val sparkVersion = "3.2.1"
val hadoopVersion = "3.3.1"
val slf4jVersion = "1.7.36"
val akkaVersion = "2.6.18"
val fs2Version = "3.2.5"
val catsEffectVersion = "3.3.5"
val akkaVersion = "2.6.19"
val fs2Version = "3.2.7"
val catsEffectVersion = "3.3.9"
}
2 changes: 1 addition & 1 deletion site/docs/docs/records_and_schema.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ Additionally, if you want to write your custom type, you have to define the sche

```scala mdoc:compile-only
import org.apache.parquet.schema.{LogicalTypeAnnotation, PrimitiveType}
import com.github.mjakubowski84.parquet4s.ParquetSchemaResolver.TypedSchemaDef
import com.github.mjakubowski84.parquet4s.TypedSchemaDef
import com.github.mjakubowski84.parquet4s.{LogicalTypes, SchemaDef}

case class CustomType(i: Int)
Expand Down

0 comments on commit 0fa717a

Please sign in to comment.