Skip to content

Commit b8053b7

Browse files
committed
out_kafka: new option 'queue_full_retries' (#2553)
When the librdkafka queue is full and Fluent Bit cannot ingest more data, it does a 'local retry' of maximum 10 times waiting a second between each retry. But in some cases like the exposed in #2553 is not enough. This patch exposes a new configuration property called 'queue_full_retries' that configure the maximum number of times the retry must be done. Note that this limit now can be disabled setting a value of '0' or 'false'. Signed-off-by: Eduardo Silva <eduardo@treasure-data.com>
1 parent d0348ab commit b8053b7

File tree

3 files changed

+32
-9
lines changed

3 files changed

+32
-9
lines changed

plugins/out_kafka/kafka.c

+10-3
Original file line numberDiff line numberDiff line change
@@ -266,7 +266,14 @@ int produce_message(struct flb_time *tm, msgpack_object *map,
266266
}
267267

268268
retry:
269-
if (queue_full_retries >= 10) {
269+
/*
270+
* If the local rdkafka queue is full, we retry up to 'queue_full_retries'
271+
* times set by the configuration (default: 10). If the configuration
272+
* property was set to '0' or 'false', we don't impose a limit. Use that
273+
* value under your own risk.
274+
*/
275+
if (ctx->queue_full_retries > 0 &&
276+
queue_full_retries >= ctx->queue_full_retries) {
270277
if (ctx->format == FLB_KAFKA_FMT_JSON) {
271278
flb_free(out_buf);
272279
}
@@ -294,8 +301,8 @@ int produce_message(struct flb_time *tm, msgpack_object *map,
294301
* otherwise let the caller to issue a main retry againt the engine.
295302
*/
296303
if (rd_kafka_last_error() == RD_KAFKA_RESP_ERR__QUEUE_FULL) {
297-
flb_plg_warn(ctx->ins, "internal queue is full, "
298-
"retrying in one second");
304+
flb_plg_warn(ctx->ins,
305+
"internal queue is full, retrying in one second");
299306

300307
/*
301308
* If the queue is full, first make sure to discard any further

plugins/out_kafka/kafka_config.c

+13
Original file line numberDiff line numberDiff line change
@@ -186,6 +186,19 @@ struct flb_kafka *flb_kafka_conf_create(struct flb_output_instance *ins,
186186
}
187187
}
188188

189+
/* Config: queue_full_retries */
190+
tmp = flb_output_get_property("queue_full_retries", ins);
191+
if (!tmp) {
192+
ctx->queue_full_retries = FLB_KAFKA_QUEUE_FULL_RETRIES;
193+
}
194+
else {
195+
/* set number of retries: note that if the number is zero, means forever */
196+
ctx->queue_full_retries = atoi(tmp);
197+
if (ctx->queue_full_retries < 0) {
198+
ctx->queue_full_retries = 0;
199+
}
200+
}
201+
189202
/* Config Gelf_Timestamp_Key */
190203
tmp = flb_output_get_property("gelf_timestamp_key", ins);
191204
if (tmp) {

plugins/out_kafka/kafka_config.h

+9-6
Original file line numberDiff line numberDiff line change
@@ -26,12 +26,13 @@
2626

2727
#include "rdkafka.h"
2828

29-
#define FLB_KAFKA_FMT_JSON 0
30-
#define FLB_KAFKA_FMT_MSGP 1
31-
#define FLB_KAFKA_FMT_GELF 2
32-
#define FLB_KAFKA_BROKERS "127.0.0.1"
33-
#define FLB_KAFKA_TOPIC "fluent-bit"
34-
#define FLB_KAFKA_TS_KEY "@timestamp"
29+
#define FLB_KAFKA_FMT_JSON 0
30+
#define FLB_KAFKA_FMT_MSGP 1
31+
#define FLB_KAFKA_FMT_GELF 2
32+
#define FLB_KAFKA_BROKERS "127.0.0.1"
33+
#define FLB_KAFKA_TOPIC "fluent-bit"
34+
#define FLB_KAFKA_TS_KEY "@timestamp"
35+
#define FLB_KAFKA_QUEUE_FULL_RETRIES 10
3536

3637
/* rdkafka log levels based on syslog(3) */
3738
#define FLB_KAFKA_LOG_EMERG 0
@@ -93,6 +94,8 @@ struct flb_kafka {
9394

9495
int dynamic_topic;
9596

97+
int queue_full_retries;
98+
9699
/* Internal */
97100
rd_kafka_t *producer;
98101
rd_kafka_conf_t *conf;

0 commit comments

Comments
 (0)