Skip to content

Commit f1e9199

Browse files
turettnxmcqueen
authored andcommitted
out_es: support nanosecond timestamp precision (fluent#2544)
Starting in Elasticsearch 7, a "date_nanos" data type was added, increasing timestamp precision from milliseconds to nanoseconds. This patch adds a "Time_Key_Nanos" option which tells the ElasticSearch output plugin to send 9 decimal places instead of 3 to ElasticSearch. Tests are included, and a patch to document the new option will be submitted shortly. Signed-off-by: Neal Turett <nturett@evoforge.org> Signed-off-by: xmcqueen <bmcqueen@linkedin.com>
1 parent 3cec5df commit f1e9199

File tree

3 files changed

+81
-13
lines changed

3 files changed

+81
-13
lines changed

plugins/out_es/es.c

+13-9
Original file line numberDiff line numberDiff line change
@@ -329,13 +329,6 @@ static int elasticsearch_format(struct flb_config *config,
329329
flb_time_pop_from_msgpack(&tms, &result, &obj);
330330
}
331331

332-
/*
333-
* Timestamp: Elasticsearch only support fractional seconds in
334-
* milliseconds unit, not nanoseconds, so we take our nsec value and
335-
* change it representation.
336-
*/
337-
tms.tm.tv_nsec = (tms.tm.tv_nsec / 1000000);
338-
339332
map = root.via.array.ptr[1];
340333
map_size = map.via.map.size;
341334

@@ -387,8 +380,14 @@ static int elasticsearch_format(struct flb_config *config,
387380
gmtime_r(&tms.tm.tv_sec, &tm);
388381
s = strftime(time_formatted, sizeof(time_formatted) - 1,
389382
ctx->time_key_format, &tm);
390-
len = snprintf(time_formatted + s, sizeof(time_formatted) - 1 - s,
391-
".%03" PRIu64 "Z", (uint64_t) tms.tm.tv_nsec);
383+
if (ctx->time_key_nanos) {
384+
len = snprintf(time_formatted + s, sizeof(time_formatted) - 1 - s,
385+
".%09" PRIu64 "Z", (uint64_t) tms.tm.tv_nsec);
386+
} else {
387+
len = snprintf(time_formatted + s, sizeof(time_formatted) - 1 - s,
388+
".%03" PRIu64 "Z",
389+
(uint64_t) tms.tm.tv_nsec / 1000000);
390+
}
392391

393392
s += len;
394393
msgpack_pack_str(&tmp_pck, s);
@@ -845,6 +844,11 @@ static struct flb_config_map config_map[] = {
845844
0, FLB_TRUE, offsetof(struct flb_elasticsearch, time_key_format),
846845
NULL
847846
},
847+
{
848+
FLB_CONFIG_MAP_BOOL, "time_key_nanos", "false",
849+
0, FLB_TRUE, offsetof(struct flb_elasticsearch, time_key_nanos),
850+
NULL
851+
},
848852
{
849853
FLB_CONFIG_MAP_BOOL, "include_tag_key", "false",
850854
0, FLB_TRUE, offsetof(struct flb_elasticsearch, include_tag_key),

plugins/out_es/es.h

+3
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,9 @@ struct flb_elasticsearch {
9191
/* time key format */
9292
flb_sds_t time_key_format;
9393

94+
/* time key nanoseconds */
95+
int time_key_nanos;
96+
9497
/* include_tag_key */
9598
int include_tag_key;
9699
flb_sds_t tag_key;

tests/runtime/out_elasticsearch.c

+65-4
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,19 @@ static void cb_check_logstash_format(void *ctx, int ffd,
3434
flb_free(res_data);
3535
}
3636

37+
static void cb_check_logstash_format_nanos(void *ctx, int ffd,
38+
int res_ret, void *res_data, size_t res_size,
39+
void *data)
40+
{
41+
char *p;
42+
char *out_js = res_data;
43+
char *index_line = "\"@timestamp\":\"2015-11-24T22:15:40.000000000Z\"";
44+
45+
p = strstr(out_js, index_line);
46+
TEST_CHECK(p != NULL);
47+
flb_free(res_data);
48+
}
49+
3750
static void cb_check_tag_key(void *ctx, int ffd,
3851
int res_ret, void *res_data, size_t res_size,
3952
void *data)
@@ -152,6 +165,53 @@ void flb_test_logstash_format()
152165
flb_destroy(ctx);
153166
}
154167

168+
void flb_test_logstash_format_nanos()
169+
{
170+
int ret;
171+
int size = sizeof(JSON_ES) - 1;
172+
flb_ctx_t *ctx;
173+
int in_ffd;
174+
int out_ffd;
175+
176+
/* Create context, flush every second (some checks omitted here) */
177+
ctx = flb_create();
178+
flb_service_set(ctx, "flush", "1", "grace", "1", NULL);
179+
180+
/* Lib input mode */
181+
in_ffd = flb_input(ctx, (char *) "lib", NULL);
182+
flb_input_set(ctx, in_ffd, "tag", "test", NULL);
183+
184+
/* Elasticsearch output */
185+
out_ffd = flb_output(ctx, (char *) "es", NULL);
186+
flb_output_set(ctx, out_ffd,
187+
"match", "test",
188+
NULL);
189+
190+
/* Override defaults of index and type */
191+
flb_output_set(ctx, out_ffd,
192+
"logstash_format", "on",
193+
"logstash_prefix", "prefix",
194+
"logstash_dateformat", "%Y-%m-%d",
195+
"time_key_nanos", "on",
196+
NULL);
197+
198+
/* Enable test mode */
199+
ret = flb_output_set_test(ctx, out_ffd, "formatter",
200+
cb_check_logstash_format_nanos,
201+
NULL, NULL);
202+
203+
/* Start */
204+
ret = flb_start(ctx);
205+
TEST_CHECK(ret == 0);
206+
207+
/* Ingest data sample */
208+
flb_lib_push(ctx, in_ffd, (char *) JSON_ES, size);
209+
210+
sleep(2);
211+
flb_stop(ctx);
212+
flb_destroy(ctx);
213+
}
214+
155215
void flb_test_tag_key()
156216
{
157217
int ret;
@@ -243,9 +303,10 @@ void flb_test_replace_dots()
243303

244304
/* Test list */
245305
TEST_LIST = {
246-
{"index_type" , flb_test_index_type },
247-
{"logstash_format", flb_test_logstash_format },
248-
{"tag_key" , flb_test_tag_key },
249-
{"replace_dots" , flb_test_replace_dots },
306+
{"index_type" , flb_test_index_type },
307+
{"logstash_format" , flb_test_logstash_format },
308+
{"logstash_format_nanos", flb_test_logstash_format_nanos },
309+
{"tag_key" , flb_test_tag_key },
310+
{"replace_dots" , flb_test_replace_dots },
250311
{NULL, NULL}
251312
};

0 commit comments

Comments
 (0)