Skip to content

Commit

Permalink
Add '-F <file>' to read config from file and default to ccloud's conf…
Browse files Browse the repository at this point in the history
…ig file (#130)
  • Loading branch information
edenhill authored Feb 2, 2018
1 parent 9d515e1 commit 5a7d3ba
Show file tree
Hide file tree
Showing 2 changed files with 209 additions and 42 deletions.
249 changes: 207 additions & 42 deletions kafkacat.c
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
#include <stdlib.h>
#include <stdarg.h>
#include <signal.h>
#include <ctype.h>

#include <sys/types.h>
#include <sys/stat.h>
Expand Down Expand Up @@ -912,7 +913,7 @@ static void RD_NORETURN usage (const char *argv0, int exitcode,
"General options:\n"
" -C | -P | -L | -Q Mode: Consume, Produce, Metadata List, Query mode\n"
#if ENABLE_KAFKACONSUMER
" -G <group-id> Mode: High-level KafkaConsumer (Kafka 0.9 balanced consumer groups)\n"
" -G <group-id> Mode: High-level KafkaConsumer (Kafka >=0.9 balanced consumer groups)\n"
" Expects a list of topics to subscribe to\n"
#endif
" -t <topic> Topic to consume from, produce to, "
Expand All @@ -925,6 +926,11 @@ static void RD_NORETURN usage (const char *argv0, int exitcode,
" -E Do not exit on non fatal error\n"
" -K <delim> Key delimiter (same format as -D)\n"
" -c <cnt> Limit message count\n"
" -F <config-file> Read configuration properties from file,\n"
" file format is \"property=value\".\n"
" Default config file:\n"
" ~/.ccloud/config - Confluent Cloud settings.\n"
" Use `-F none` to disable default search.\n"
" -X list List available librdkafka configuration "
"properties\n"
" -X prop=val Set librdkafka configuration property.\n"
Expand All @@ -939,7 +945,7 @@ static void RD_NORETURN usage (const char *argv0, int exitcode,
" -h Print usage help\n"
"\n"
"Producer options:\n"
" -z snappy|gzip Message compression. Default: none\n"
" -z snappy|gzip|lz4 Message compression. Default: none\n"
" -p -1 Use random partitioner\n"
" -D <delim> Delimiter to split input into messages\n"
" -K <delim> Delimiter to split input key and message\n"
Expand Down Expand Up @@ -1144,6 +1150,190 @@ static void conf_dump (void) {
}


/**
* @brief Try setting a config property. Provides "topic." fallthru.
*
* @returns -1 on failure or 0 on success.
*/
static int try_conf_set (const char *name, const char *val,
char *errstr, size_t errstr_size) {
rd_kafka_conf_res_t res = RD_KAFKA_CONF_UNKNOWN;

/* Try "topic." prefixed properties on topic
* conf first, and then fall through to global if
* it didnt match a topic configuration property. */
if (!strncmp(name, "topic.", strlen("topic.")))
res = rd_kafka_topic_conf_set(conf.rkt_conf,
name+
strlen("topic."),
val,
errstr, errstr_size);

if (res == RD_KAFKA_CONF_UNKNOWN)
res = rd_kafka_conf_set(conf.rk_conf, name, val,
errstr, errstr_size);

if (res != RD_KAFKA_CONF_OK)
return -1;

if (!strcmp(name, "metadata.broker.list") ||
!strcmp(name, "bootstrap.servers"))
conf.flags |= CONF_F_BROKERS_SEEN;

if (!strcmp(name, "api.version.request"))
conf.flags |= CONF_F_APIVERREQ_USER;

/* Interception */
#if RD_KAFKA_VERSION >= 0x00090000
if (!strcmp(name, "quota.support.enable"))
rd_kafka_conf_set_throttle_cb(conf.rk_conf,
throttle_cb);
#endif


return 0;
}

/**
* @brief Intercept configuration properties and try to identify
* incompatible properties that needs to be converted to librdkafka
* configuration properties.
*
* @returns -1 on failure, 0 if the property was not handled,
* or 1 if it was handled.
*/
static int try_java_conf_set (const char *name, const char *val,
char *errstr, size_t errstr_size) {
if (!strcmp(name, "ssl.endpoint.identification.algorithm"))
return 1; /* SSL server verification:
* not supported by librdkafka: ignore for now */

if (!strcmp(name, "sasl.jaas.config")) {
char sasl_user[128], sasl_pass[128];
if (sscanf(val,
"org.apache.kafka.common.security.plain.PlainLoginModule required username=\"%[^\"]\" password=\"%[^\"]\"",
sasl_user, sasl_pass) == 2) {
if (try_conf_set("sasl.username", sasl_user,
errstr, errstr_size) == -1 ||
try_conf_set("sasl.password", sasl_pass,
errstr, errstr_size) == -1)
return -1;
return 1;
}
}

return 0;
}


/**
* @brief Read config file, fail terminally if fatal is true, else
* fail silently.
*
* @returns 0 on success or -1 on failure (unless fatal is true
* in which case the app will have exited).
*/
static int read_conf_file (const char *path, int fatal) {
FILE *fp;
char buf[512];
int line = 0;

if (!(fp = fopen(path, "r"))) {
if (fatal)
KC_FATAL("Failed to open %s: %s",
path, strerror(errno));
return -1;
}

if (!fatal)
KC_INFO(1, "Reading configuration from file %s\n", path);

while (fgets(buf, sizeof(buf), fp)) {
char *s = buf;
char *t;
char errstr[512];
int r;

line++;

/* Left-trim */
while (isspace(*s))
s++;

/* Right-trim and remove newline */
t = s + strlen(s) - 1;
while (t > s && isspace(*t)) {
*t = 0;
t--;
}

/* Ignore Empty line */
if (!*s)
continue;

/* Ignore comments */
if (*s == '#')
continue;

/* Strip escapes for \: \= which can be encountered in
* Java configs (see comment below) */
while ((t = strstr(s, "\\:"))) {
memmove(t, t+1, strlen(t+1)+1); /* overwrite \: */
*t = ':'; /* reinsert : */
}
while ((t = strstr(s, "\\="))) {
memmove(t, t+1, strlen(t+1)+1); /* overwrite \= */
*t = '='; /* reinsert : */
}

/* Parse prop=value */
if (!(t = strchr(s, '=')) || t == s)
KC_FATAL("%s:%d: expected property=value\n",
path, line);

*t = 0;
t++;

/**
* Attempt to support Java client configuration files,
* such as the ccloud config.
* There are some quirks with unnecessary escaping with \
* that we remove, as well as parsing special configuration
* properties that don't match librdkafka's.
*/
r = try_java_conf_set(s, t, errstr, sizeof(errstr));
if (r == -1)
KC_FATAL("%s:%d: %s (java config conversion)\n",
path, line, errstr);
else if (r == 1)
continue; /* Handled */

if (try_conf_set(s, t, errstr, sizeof(errstr)) == -1)
KC_FATAL("%s:%d: %s\n", path, line, errstr);
}

fclose(fp);

return 0;
}


static void read_default_conf_files (void) {
#ifdef _MSC_VER
return;
#else
char path[512];
const char *home;

if (!(home = getenv("HOME")))
return;

snprintf(path, sizeof(path), "%s/.ccloud/config", home);

read_conf_file(path, 0/*not fatal*/);
#endif
}

/**
* Parse command line arguments
*/
Expand All @@ -1155,11 +1345,10 @@ static void argparse (int argc, char **argv,
const char *delim = "\n";
const char *key_delim = NULL;
char tmp_fmt[64];
int conf_brokers_seen = 0;
int do_conf_dump = 0;

while ((opt = getopt(argc, argv,
"PCG:LQt:p:b:z:o:eED:K:k:Od:qvX:c:Tuf:ZlVh"
"PCG:LQt:p:b:z:o:eED:K:k:Od:qvF:X:c:Tuf:ZlVh"
#if ENABLE_JSON
"J"
#endif
Expand Down Expand Up @@ -1196,7 +1385,7 @@ static void argparse (int argc, char **argv,
break;
case 'b':
conf.brokers = optarg;
conf_brokers_seen++;
conf.flags |= CONF_F_BROKERS_SEEN;
break;
case 'z':
if (rd_kafka_conf_set(conf.rk_conf,
Expand Down Expand Up @@ -1275,10 +1464,16 @@ static void argparse (int argc, char **argv,
case 'u':
setbuf(stdout, NULL);
break;
case 'F':
conf.flags |= CONF_F_NO_CONF_SEARCH;
if (!strcmp(optarg, "-") || !strcmp(optarg, "none"))
break;
else
read_conf_file(optarg, 1);
break;
case 'X':
{
char *name, *val;
rd_kafka_conf_res_t res;

if (!strcmp(optarg, "list") ||
!strcmp(optarg, "help")) {
Expand All @@ -1303,42 +1498,9 @@ static void argparse (int argc, char **argv,
*val = '\0';
val++;

if (!strcmp(name, "metadata.broker.list") ||
!strcmp(name, "bootstrap.servers"))
conf_brokers_seen++;

res = RD_KAFKA_CONF_UNKNOWN;
/* Try "topic." prefixed properties on topic
* conf first, and then fall through to global if
* it didnt match a topic configuration property. */
if (!strncmp(name, "topic.", strlen("topic.")))
res = rd_kafka_topic_conf_set(conf.rkt_conf,
name+
strlen("topic."),
val,
errstr,
sizeof(errstr));

if (res == RD_KAFKA_CONF_UNKNOWN) {
res = rd_kafka_conf_set(conf.rk_conf, name, val,
errstr, sizeof(errstr));
}

if (res != RD_KAFKA_CONF_OK)
if (try_conf_set(name, val,
errstr, sizeof(errstr)) == -1)
KC_FATAL("%s", errstr);

/* Interception */
#if RD_KAFKA_VERSION >= 0x00090000
if (!strcmp(name, "quota.support.enable"))
rd_kafka_conf_set_throttle_cb(conf.rk_conf,
throttle_cb);
#endif

if (!strcmp(name, "api.version.request"))
conf.flags |= CONF_F_APIVERREQ_USER;



}
break;

Expand All @@ -1356,13 +1518,16 @@ static void argparse (int argc, char **argv,
}
}

if (!(conf.flags & CONF_F_NO_CONF_SEARCH))
read_default_conf_files();

/* Dump configuration and exit, if so desired. */
if (do_conf_dump) {
conf_dump();
exit(0);
}

if (!conf_brokers_seen)
if (!(conf.flags & CONF_F_BROKERS_SEEN))
usage(argv[0], 1, "-b <broker,..> missing", 0);

/* Decide mode if not specified */
Expand Down
2 changes: 2 additions & 0 deletions kafkacat.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ struct conf {
#define CONF_F_LINE 0x20 /* Read files in line mode when producing */
#define CONF_F_APIVERREQ 0x40 /* Enable api.version.request=true */
#define CONF_F_APIVERREQ_USER 0x80 /* User set api.version.request */
#define CONF_F_NO_CONF_SEARCH 0x100 /* Disable default config file search */
#define CONF_F_BROKERS_SEEN 0x200 /* Brokers have been configured */
int delim;
int key_delim;

Expand Down

0 comments on commit 5a7d3ba

Please sign in to comment.