Skip to content

Commit 970cb86

Browse files
authored
Merge pull request #5 from kdrakon/adding-serializer-providing
added new optional param `classInstance` to `Serializer` and `Deserializer` helper classes
2 parents 9f363c7 + e494ad6 commit 970cb86

File tree

6 files changed

+115
-16
lines changed

6 files changed

+115
-16
lines changed

kafka-0.10.x/src/main/scala/monix/kafka/Deserializer.scala

+18-2
Original file line numberDiff line numberDiff line change
@@ -26,17 +26,33 @@ import language.existentials
2626
/** Wraps a Kafka `Deserializer`, provided for
2727
* convenience, since it can be implicitly fetched
2828
* from the context.
29+
*
30+
* @param className is the full package path to the Kafka `Deserializer`
31+
*
32+
* @param classType is the actual [[Class]] for [[className]]
33+
*
34+
* @param constructor creates an instance of [[classType]].
35+
* This is defaulted with a `Deserializer.Constructor[A]` function that creates a
36+
* new instance using an assumed empty constructor.
37+
* Supplying this parameter allows for manual provision of the `Deserializer`.
2938
*/
3039
final case class Deserializer[A](
3140
className: String,
32-
classType: Class[_ <: KafkaDeserializer[A]]) {
41+
classType: Class[_ <: KafkaDeserializer[A]],
42+
constructor: Deserializer.Constructor[A] = (d: Deserializer[A]) => d.classType.newInstance()) {
3343

3444
/** Creates a new instance. */
3545
def create(): KafkaDeserializer[A] =
36-
classType.newInstance()
46+
constructor(this)
3747
}
3848

3949
object Deserializer {
50+
51+
/** Alias for the function that provides an instance of
52+
* the Kafka `Deserializer`.
53+
*/
54+
type Constructor[A] = (Deserializer[A]) => KafkaDeserializer[A]
55+
4056
implicit val forStrings: Deserializer[String] =
4157
Deserializer(
4258
className = "org.apache.kafka.common.serialization.StringDeserializer",

kafka-0.10.x/src/main/scala/monix/kafka/Serializer.scala

+18-2
Original file line numberDiff line numberDiff line change
@@ -26,17 +26,33 @@ import language.existentials
2626
/** Wraps a Kafka `Serializer`, provided for
2727
* convenience, since it can be implicitly fetched
2828
* from the context.
29+
*
30+
* @param className is the full package path to the Kafka `Serializer`
31+
*
32+
* @param classType is the actual [[Class]] for [[className]]
33+
*
34+
* @param constructor creates an instance of [[classType]].
35+
* This is defaulted with a `Serializer.Constructor[A]` function that creates a
36+
* new instance using an assumed empty constructor.
37+
* Supplying this parameter allows for manual provision of the `Serializer`.
2938
*/
3039
final case class Serializer[A](
3140
className: String,
32-
classType: Class[_ <: KafkaSerializer[A]]) {
41+
classType: Class[_ <: KafkaSerializer[A]],
42+
constructor: Serializer.Constructor[A] = (s: Serializer[A]) => s.classType.newInstance()) {
3343

3444
/** Creates a new instance. */
3545
def create(): KafkaSerializer[A] =
36-
classType.newInstance()
46+
constructor(this)
3747
}
3848

3949
object Serializer {
50+
51+
/** Alias for the function that provides an instance of
52+
* the Kafka `Serializer`.
53+
*/
54+
type Constructor[A] = (Serializer[A]) => KafkaSerializer[A]
55+
4056
implicit val forStrings: Serializer[String] =
4157
Serializer(
4258
className = "org.apache.kafka.common.serialization.StringSerializer",

kafka-0.8.x/src/main/scala/monix/kafka/Deserializer.scala

+25-6
Original file line numberDiff line numberDiff line change
@@ -23,22 +23,41 @@ import language.existentials
2323
/** Wraps a Kafka `Decoder`, provided for
2424
* convenience, since it can be implicitly fetched
2525
* from the context.
26+
*
27+
* @param className is the full package path to the Kafka `Decoder`
28+
*
29+
* @param classType is the actual [[Class]] for [[className]]
30+
*
31+
* @param constructor creates an instance of [[classType]].
32+
* This is defaulted with a `Deserializer.Constructor[A]` function that creates a
33+
* new instance using an assumed empty or nullable constructor.
34+
* Supplying this parameter allows for manual provision of the `Decoder`.
2635
*/
2736
final case class Deserializer[A](
2837
className: String,
29-
classType: Class[_ <: KafkaDecoder[A]]) {
38+
classType: Class[_ <: KafkaDecoder[A]],
39+
constructor: Deserializer.Constructor[A] = Deserializer.reflectCreate[A] _) {
3040

3141
/** Creates a new instance. */
32-
def create(): KafkaDecoder[A] = {
33-
val constructor = classType.getDeclaredConstructors()(0)
42+
def create(): KafkaDecoder[A] =
43+
constructor(this)
44+
}
45+
46+
object Deserializer {
47+
48+
/** Alias for the function that provides an instance of
49+
* the Kafka `Decoder`.
50+
*/
51+
type Constructor[A] = (Deserializer[A]) => KafkaDecoder[A]
52+
53+
private def reflectCreate[A](d: Deserializer[A]): KafkaDecoder[A] = {
54+
val constructor = d.classType.getDeclaredConstructors()(0)
3455
constructor.getParameterCount match {
35-
case 0 => classType.newInstance()
56+
case 0 => d.classType.newInstance()
3657
case 1 => constructor.newInstance(null).asInstanceOf[KafkaDecoder[A]]
3758
}
3859
}
39-
}
4060

41-
object Deserializer {
4261
implicit val forStrings: Deserializer[String] =
4362
Deserializer(
4463
className = "kafka.serializer.StringDecoder",

kafka-0.8.x/src/main/scala/monix/kafka/Serializer.scala

+18-2
Original file line numberDiff line numberDiff line change
@@ -24,17 +24,33 @@ import language.existentials
2424
/** Wraps a Kafka `Serializer`, provided for
2525
* convenience, since it can be implicitly fetched
2626
* from the context.
27+
*
28+
* @param className is the full package path to the Kafka `Serializer`
29+
*
30+
* @param classType is the actual [[Class]] for [[className]]
31+
*
32+
* @param constructor creates an instance of [[classType]].
33+
* This is defaulted with a `Serializer.Constructor[A]` function that creates a
34+
* new instance using an assumed empty constructor.
35+
* Supplying this parameter allows for manual provision of the `Serializer`.
2736
*/
2837
final case class Serializer[A](
2938
className: String,
30-
classType: Class[_ <: KafkaSerializer[A]]) {
39+
classType: Class[_ <: KafkaSerializer[A]],
40+
constructor: Serializer.Constructor[A] = (s: Serializer[A]) => s.classType.newInstance()) {
3141

3242
/** Creates a new instance. */
3343
def create(): KafkaSerializer[A] =
34-
classType.newInstance()
44+
constructor(this)
3545
}
3646

3747
object Serializer {
48+
49+
/** Alias for the function that provides an instance of
50+
* the Kafka `Serializer`.
51+
*/
52+
type Constructor[A] = (Serializer[A]) => KafkaSerializer[A]
53+
3854
implicit val forStrings: Serializer[String] =
3955
Serializer(
4056
className = "org.apache.kafka.common.serialization.StringSerializer",

kafka-0.9.x/src/main/scala/monix/kafka/Deserializer.scala

+18-2
Original file line numberDiff line numberDiff line change
@@ -24,17 +24,33 @@ import language.existentials
2424
/** Wraps a Kafka `Deserializer`, provided for
2525
* convenience, since it can be implicitly fetched
2626
* from the context.
27+
*
28+
* @param className is the full package path to the Kafka `Deserializer`
29+
*
30+
* @param classType is the actual [[Class]] for [[className]]
31+
*
32+
* @param constructor creates an instance of [[classType]].
33+
* This is defaulted with a `Deserializer.Constructor[A]` function that creates a
34+
* new instance using an assumed empty constructor.
35+
* Supplying this parameter allows for manual provision of the `Deserializer`.
2736
*/
2837
final case class Deserializer[A](
2938
className: String,
30-
classType: Class[_ <: KafkaDeserializer[A]]) {
39+
classType: Class[_ <: KafkaDeserializer[A]],
40+
constructor: Deserializer.Constructor[A] = (d: Deserializer[A]) => d.classType.newInstance()) {
3141

3242
/** Creates a new instance. */
3343
def create(): KafkaDeserializer[A] =
34-
classType.newInstance()
44+
constructor(this)
3545
}
3646

3747
object Deserializer {
48+
49+
/** Alias for the function that provides an instance of
50+
* the Kafka `Deserializer`.
51+
*/
52+
type Constructor[A] = (Deserializer[A]) => KafkaDeserializer[A]
53+
3854
implicit val forStrings: Deserializer[String] =
3955
Deserializer(
4056
className = "org.apache.kafka.common.serialization.StringDeserializer",

kafka-0.9.x/src/main/scala/monix/kafka/Serializer.scala

+18-2
Original file line numberDiff line numberDiff line change
@@ -24,17 +24,33 @@ import language.existentials
2424
/** Wraps a Kafka `Serializer`, provided for
2525
* convenience, since it can be implicitly fetched
2626
* from the context.
27+
*
28+
* @param className is the full package path to the Kafka `Serializer`
29+
*
30+
* @param classType is the actual [[Class]] for [[className]]
31+
*
32+
* @param constructor creates an instance of [[classType]].
33+
* This is defaulted with a `Serializer.Constructor[A]` function that creates a
34+
* new instance using an assumed empty constructor.
35+
* Supplying this parameter allows for manual provision of the `Serializer`.
2736
*/
2837
final case class Serializer[A](
2938
className: String,
30-
classType: Class[_ <: KafkaSerializer[A]]) {
39+
classType: Class[_ <: KafkaSerializer[A]],
40+
constructor: Serializer.Constructor[A] = (s: Serializer[A]) => s.classType.newInstance()) {
3141

3242
/** Creates a new instance. */
3343
def create(): KafkaSerializer[A] =
34-
classType.newInstance()
44+
constructor(this)
3545
}
3646

3747
object Serializer {
48+
49+
/** Alias for the function that provides an instance of
50+
* the Kafka `Serializer`.
51+
*/
52+
type Constructor[A] = (Serializer[A]) => KafkaSerializer[A]
53+
3854
implicit val forStrings: Serializer[String] =
3955
Serializer(
4056
className = "org.apache.kafka.common.serialization.StringSerializer",

0 commit comments

Comments
 (0)