Skip to content

Commit 25788fd

Browse files
cakperAvasil
authored andcommitted
Add generic map of properties to Producer/Consumer config (#45)
(cherry picked from commit 78b90c4)
1 parent 7c3b43d commit 25788fd

File tree

13 files changed

+224
-49
lines changed

13 files changed

+224
-49
lines changed

AUTHORS

+4-1
Original file line numberDiff line numberDiff line change
@@ -8,4 +8,7 @@ Alex Gryzlov
88
https://github.com/clayrat
99

1010
Piotr Gawryś
11-
https://github.com/Avasil
11+
https://github.com/Avasil
12+
13+
Kacper Gunia
14+
https://github.com/cakper

kafka-0.10.x/src/main/scala/monix/kafka/KafkaConsumerConfig.scala

+10-3
Original file line numberDiff line numberDiff line change
@@ -196,6 +196,11 @@ import scala.concurrent.duration._
196196
* @param observableCommitOrder is the `monix.observable.commit.order` setting.
197197
* Specifies when the commit should happen, like before we receive the
198198
* acknowledgement from downstream, or afterwards.
199+
*
200+
* @param properties map of other properties that will be passed to
201+
* the underlying kafka client. Any properties not explicitly handled
202+
* by this object can be set via the map, but in case of a duplicate
203+
* a value set on the case class will overwrite value set via properties.
199204
*/
200205
final case class KafkaConsumerConfig(
201206
bootstrapServers: List[String],
@@ -237,9 +242,10 @@ final case class KafkaConsumerConfig(
237242
retryBackoffTime: FiniteDuration,
238243
observableCommitType: ObservableCommitType,
239244
observableCommitOrder: ObservableCommitOrder,
240-
observableSeekToEndOnStart: Boolean) {
245+
observableSeekToEndOnStart: Boolean,
246+
properties: Map[String, String]) {
241247

242-
def toMap: Map[String,String] = Map(
248+
def toMap: Map[String, String] = properties ++ Map(
243249
"bootstrap.servers" -> bootstrapServers.mkString(","),
244250
"fetch.min.bytes" -> fetchMinBytes.toString,
245251
"group.id" -> groupId,
@@ -415,7 +421,8 @@ object KafkaConsumerConfig {
415421
retryBackoffTime = config.getInt("retry.backoff.ms").millis,
416422
observableCommitType = ObservableCommitType(config.getString("monix.observable.commit.type")),
417423
observableCommitOrder = ObservableCommitOrder(config.getString("monix.observable.commit.order")),
418-
observableSeekToEndOnStart = config.getBoolean("monix.observable.seekEnd.onStart")
424+
observableSeekToEndOnStart = config.getBoolean("monix.observable.seekEnd.onStart"),
425+
properties = Map.empty
419426
)
420427
}
421428
}

kafka-0.10.x/src/main/scala/monix/kafka/KafkaProducerConfig.scala

+18-9
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,10 @@ package monix.kafka
1818

1919
import java.io.File
2020
import java.util.Properties
21+
2122
import com.typesafe.config.{Config, ConfigFactory}
2223
import monix.kafka.config._
24+
2325
import scala.concurrent.duration._
2426

2527
/** The Kafka Producer config.
@@ -179,6 +181,11 @@ import scala.concurrent.duration._
179181
* @param monixSinkParallelism is the `monix.producer.sink.parallelism`
180182
* setting indicating how many requests the [[KafkaProducerSink]]
181183
* can execute in parallel.
184+
*
185+
* @param properties map of other properties that will be passed to
186+
* the underlying kafka client. Any properties not explicitly handled
187+
* by this object can be set via the map, but in case of a duplicate
188+
* a value set on the case class will overwrite value set via properties.
182189
*/
183190
case class KafkaProducerConfig(
184191
bootstrapServers: List[String],
@@ -215,15 +222,10 @@ case class KafkaProducerConfig(
215222
metricReporters: List[String],
216223
metricsNumSamples: Int,
217224
metricsSampleWindow: FiniteDuration,
218-
monixSinkParallelism: Int) {
225+
monixSinkParallelism: Int,
226+
properties: Map[String, String]) {
219227

220-
def toProperties: Properties = {
221-
val props = new Properties()
222-
for ((k,v) <- toMap; if v != null) props.put(k,v)
223-
props
224-
}
225-
226-
def toMap: Map[String,String] = Map(
228+
def toMap: Map[String, String] = properties ++ Map(
227229
"bootstrap.servers" -> bootstrapServers.mkString(","),
228230
"acks" -> acks.id,
229231
"buffer.memory" -> bufferMemoryInBytes.toString,
@@ -259,6 +261,12 @@ case class KafkaProducerConfig(
259261
"metrics.num.samples" -> metricsNumSamples.toString,
260262
"metrics.sample.window.ms" -> metricsSampleWindow.toMillis.toString
261263
)
264+
265+
def toProperties: Properties = {
266+
val props = new Properties()
267+
for ((k,v) <- toMap; if v != null) props.put(k,v)
268+
props
269+
}
262270
}
263271

264272
object KafkaProducerConfig {
@@ -385,7 +393,8 @@ object KafkaProducerConfig {
385393
metricReporters = config.getString("metric.reporters").trim.split("\\s*,\\s*").toList,
386394
metricsNumSamples = config.getInt("metrics.num.samples"),
387395
metricsSampleWindow = config.getInt("metrics.sample.window.ms").millis,
388-
monixSinkParallelism = config.getInt("monix.producer.sink.parallelism")
396+
monixSinkParallelism = config.getInt("monix.producer.sink.parallelism"),
397+
properties = Map.empty
389398
)
390399
}
391400
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
package monix.kafka
2+
3+
import org.scalatest.FunSuite
4+
5+
class ConfigTest extends FunSuite {
6+
test("overwrite properties with values from producer config") {
7+
val config =
8+
KafkaProducerConfig.default.copy(
9+
bootstrapServers = List("localhost:9092"),
10+
properties = Map("bootstrap.servers" -> "127.0.0.1:9092"))
11+
12+
assert(
13+
config.toProperties.getProperty("bootstrap.servers") == "localhost:9092"
14+
)
15+
}
16+
17+
test("overwrite properties with values from consumer config") {
18+
val config =
19+
KafkaConsumerConfig.default.copy(
20+
bootstrapServers = List("localhost:9092"),
21+
properties = Map("bootstrap.servers" -> "127.0.0.1:9092"))
22+
23+
assert(
24+
config.toProperties.getProperty("bootstrap.servers") == "localhost:9092"
25+
)
26+
}
27+
}

kafka-0.11.x/src/main/scala/monix/kafka/KafkaConsumerConfig.scala

+10-3
Original file line numberDiff line numberDiff line change
@@ -196,6 +196,11 @@ import scala.concurrent.duration._
196196
* @param observableCommitOrder is the `monix.observable.commit.order` setting.
197197
* Specifies when the commit should happen, like before we receive the
198198
* acknowledgement from downstream, or afterwards.
199+
*
200+
* @param properties map of other properties that will be passed to
201+
* the underlying kafka client. Any properties not explicitly handled
202+
* by this object can be set via the map, but in case of a duplicate
203+
* a value set on the case class will overwrite value set via properties.
199204
*/
200205
final case class KafkaConsumerConfig(
201206
bootstrapServers: List[String],
@@ -237,9 +242,10 @@ final case class KafkaConsumerConfig(
237242
retryBackoffTime: FiniteDuration,
238243
observableCommitType: ObservableCommitType,
239244
observableCommitOrder: ObservableCommitOrder,
240-
observableSeekToEndOnStart: Boolean) {
245+
observableSeekToEndOnStart: Boolean,
246+
properties: Map[String, String]) {
241247

242-
def toMap: Map[String,String] = Map(
248+
def toMap: Map[String, String] = properties ++ Map(
243249
"bootstrap.servers" -> bootstrapServers.mkString(","),
244250
"fetch.min.bytes" -> fetchMinBytes.toString,
245251
"group.id" -> groupId,
@@ -415,7 +421,8 @@ object KafkaConsumerConfig {
415421
retryBackoffTime = config.getInt("retry.backoff.ms").millis,
416422
observableCommitType = ObservableCommitType(config.getString("monix.observable.commit.type")),
417423
observableCommitOrder = ObservableCommitOrder(config.getString("monix.observable.commit.order")),
418-
observableSeekToEndOnStart = config.getBoolean("monix.observable.seekEnd.onStart")
424+
observableSeekToEndOnStart = config.getBoolean("monix.observable.seekEnd.onStart"),
425+
properties = Map.empty
419426
)
420427
}
421428
}

kafka-0.11.x/src/main/scala/monix/kafka/KafkaProducerConfig.scala

+18-9
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,10 @@ package monix.kafka
1818

1919
import java.io.File
2020
import java.util.Properties
21+
2122
import com.typesafe.config.{Config, ConfigFactory}
2223
import monix.kafka.config._
24+
2325
import scala.concurrent.duration._
2426

2527
/** The Kafka Producer config.
@@ -179,6 +181,11 @@ import scala.concurrent.duration._
179181
* @param monixSinkParallelism is the `monix.producer.sink.parallelism`
180182
* setting indicating how many requests the [[KafkaProducerSink]]
181183
* can execute in parallel.
184+
*
185+
* @param properties map of other properties that will be passed to
186+
* the underlying kafka client. Any properties not explicitly handled
187+
* by this object can be set via the map, but in case of a duplicate
188+
* a value set on the case class will overwrite value set via properties.
182189
*/
183190
case class KafkaProducerConfig(
184191
bootstrapServers: List[String],
@@ -215,15 +222,10 @@ case class KafkaProducerConfig(
215222
metricReporters: List[String],
216223
metricsNumSamples: Int,
217224
metricsSampleWindow: FiniteDuration,
218-
monixSinkParallelism: Int) {
225+
monixSinkParallelism: Int,
226+
properties: Map[String, String]) {
219227

220-
def toProperties: Properties = {
221-
val props = new Properties()
222-
for ((k,v) <- toMap; if v != null) props.put(k,v)
223-
props
224-
}
225-
226-
def toMap: Map[String,String] = Map(
228+
def toMap: Map[String, String] = properties ++ Map(
227229
"bootstrap.servers" -> bootstrapServers.mkString(","),
228230
"acks" -> acks.id,
229231
"buffer.memory" -> bufferMemoryInBytes.toString,
@@ -259,6 +261,12 @@ case class KafkaProducerConfig(
259261
"metrics.num.samples" -> metricsNumSamples.toString,
260262
"metrics.sample.window.ms" -> metricsSampleWindow.toMillis.toString
261263
)
264+
265+
def toProperties: Properties = {
266+
val props = new Properties()
267+
for ((k,v) <- toMap; if v != null) props.put(k,v)
268+
props
269+
}
262270
}
263271

264272
object KafkaProducerConfig {
@@ -385,7 +393,8 @@ object KafkaProducerConfig {
385393
metricReporters = config.getString("metric.reporters").trim.split("\\s*,\\s*").toList,
386394
metricsNumSamples = config.getInt("metrics.num.samples"),
387395
metricsSampleWindow = config.getInt("metrics.sample.window.ms").millis,
388-
monixSinkParallelism = config.getInt("monix.producer.sink.parallelism")
396+
monixSinkParallelism = config.getInt("monix.producer.sink.parallelism"),
397+
properties = Map.empty
389398
)
390399
}
391400
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
package monix.kafka
2+
3+
import org.scalatest.FunSuite
4+
5+
class ConfigTest extends FunSuite {
6+
test("overwrite properties with values from producer config") {
7+
val config =
8+
KafkaProducerConfig.default.copy(
9+
bootstrapServers = List("localhost:9092"),
10+
properties = Map("bootstrap.servers" -> "127.0.0.1:9092"))
11+
12+
assert(
13+
config.toProperties.getProperty("bootstrap.servers") == "localhost:9092"
14+
)
15+
}
16+
17+
test("overwrite properties with values from consumer config") {
18+
val config =
19+
KafkaConsumerConfig.default.copy(
20+
bootstrapServers = List("localhost:9092"),
21+
properties = Map("bootstrap.servers" -> "127.0.0.1:9092"))
22+
23+
assert(
24+
config.toProperties.getProperty("bootstrap.servers") == "localhost:9092"
25+
)
26+
}
27+
}

kafka-0.9.x/src/main/scala/monix/kafka/KafkaConsumerConfig.scala

+10-3
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,11 @@ import scala.concurrent.duration._
192192
* @param observableCommitOrder is the `monix.observable.commit.order` setting.
193193
* Specifies when the commit should happen, like before we receive the
194194
* acknowledgement from downstream, or afterwards.
195+
*
196+
* @param properties map of other properties that will be passed to
197+
* the underlying kafka client. Any properties not explicitly handled
198+
* by this object can be set via the map, but in case of a duplicate
199+
* a value set on the case class will overwrite value set via properties.
195200
*/
196201
final case class KafkaConsumerConfig(
197202
bootstrapServers: List[String],
@@ -231,9 +236,10 @@ final case class KafkaConsumerConfig(
231236
retryBackoffTime: FiniteDuration,
232237
observableCommitType: ObservableCommitType,
233238
observableCommitOrder: ObservableCommitOrder,
234-
observableSeekToEndOnStart: Boolean) {
239+
observableSeekToEndOnStart: Boolean,
240+
properties: Map[String, String]) {
235241

236-
def toMap: Map[String,String] = Map(
242+
def toMap: Map[String, String] = properties ++ Map(
237243
"bootstrap.servers" -> bootstrapServers.mkString(","),
238244
"fetch.min.bytes" -> fetchMinBytes.toString,
239245
"group.id" -> groupId,
@@ -405,7 +411,8 @@ object KafkaConsumerConfig {
405411
retryBackoffTime = config.getInt("retry.backoff.ms").millis,
406412
observableCommitType = ObservableCommitType(config.getString("monix.observable.commit.type")),
407413
observableCommitOrder = ObservableCommitOrder(config.getString("monix.observable.commit.order")),
408-
observableSeekToEndOnStart = config.getBoolean("monix.observable.seekEnd.onStart")
414+
observableSeekToEndOnStart = config.getBoolean("monix.observable.seekEnd.onStart"),
415+
properties = Map.empty
409416
)
410417
}
411418
}

kafka-0.9.x/src/main/scala/monix/kafka/KafkaProducerConfig.scala

+18-9
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,10 @@ package monix.kafka
1818

1919
import java.io.File
2020
import java.util.Properties
21+
2122
import com.typesafe.config.{Config, ConfigFactory}
2223
import monix.kafka.config._
24+
2325
import scala.concurrent.duration._
2426

2527
/** The Kafka Producer config.
@@ -175,6 +177,11 @@ import scala.concurrent.duration._
175177
* @param monixSinkParallelism is the `monix.producer.sink.parallelism`
176178
* setting indicating how many requests the [[KafkaProducerSink]]
177179
* can execute in parallel.
180+
*
181+
* @param properties map of other properties that will be passed to
182+
* the underlying kafka client. Any properties not explicitly handled
183+
* by this object can be set via the map, but in case of a duplicate
184+
* a value set on the case class will overwrite value set via properties.
178185
*/
179186
case class KafkaProducerConfig(
180187
bootstrapServers: List[String],
@@ -210,15 +217,10 @@ case class KafkaProducerConfig(
210217
metricReporters: List[String],
211218
metricsNumSamples: Int,
212219
metricsSampleWindow: FiniteDuration,
213-
monixSinkParallelism: Int) {
220+
monixSinkParallelism: Int,
221+
properties: Map[String, String]) {
214222

215-
def toProperties: Properties = {
216-
val props = new Properties()
217-
for ((k,v) <- toMap; if v != null) props.put(k,v)
218-
props
219-
}
220-
221-
def toMap: Map[String,String] = Map(
223+
def toMap: Map[String, String] = properties ++ Map(
222224
"bootstrap.servers" -> bootstrapServers.mkString(","),
223225
"acks" -> acks.id,
224226
"buffer.memory" -> bufferMemoryInBytes.toString,
@@ -253,6 +255,12 @@ case class KafkaProducerConfig(
253255
"metrics.num.samples" -> metricsNumSamples.toString,
254256
"metrics.sample.window.ms" -> metricsSampleWindow.toMillis.toString
255257
)
258+
259+
def toProperties: Properties = {
260+
val props = new Properties()
261+
for ((k,v) <- toMap; if v != null) props.put(k,v)
262+
props
263+
}
256264
}
257265

258266
object KafkaProducerConfig {
@@ -378,7 +386,8 @@ object KafkaProducerConfig {
378386
metricReporters = config.getString("metric.reporters").trim.split("\\s*,\\s*").toList,
379387
metricsNumSamples = config.getInt("metrics.num.samples"),
380388
metricsSampleWindow = config.getInt("metrics.sample.window.ms").millis,
381-
monixSinkParallelism = config.getInt("monix.producer.sink.parallelism")
389+
monixSinkParallelism = config.getInt("monix.producer.sink.parallelism"),
390+
properties = Map.empty
382391
)
383392
}
384393
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
package monix.kafka
2+
3+
import org.scalatest.FunSuite
4+
5+
class ConfigTest extends FunSuite {
6+
test("overwrite properties with values from producer config") {
7+
val config =
8+
KafkaProducerConfig.default.copy(
9+
bootstrapServers = List("localhost:9092"),
10+
properties = Map("bootstrap.servers" -> "127.0.0.1:9092"))
11+
12+
assert(
13+
config.toProperties.getProperty("bootstrap.servers") == "localhost:9092"
14+
)
15+
}
16+
17+
test("overwrite properties with values from consumer config") {
18+
val config =
19+
KafkaConsumerConfig.default.copy(
20+
bootstrapServers = List("localhost:9092"),
21+
properties = Map("bootstrap.servers" -> "127.0.0.1:9092"))
22+
23+
assert(
24+
config.toProperties.getProperty("bootstrap.servers") == "localhost:9092"
25+
)
26+
}
27+
}

0 commit comments

Comments
 (0)