diff --git a/plugins/out_kinesis_firehose/firehose.c b/plugins/out_kinesis_firehose/firehose.c index 69990917457..a27e20b9579 100644 --- a/plugins/out_kinesis_firehose/firehose.c +++ b/plugins/out_kinesis_firehose/firehose.c @@ -95,6 +95,16 @@ static int cb_firehose_init(struct flb_output_instance *ins, ctx->time_key_format = DEFAULT_TIME_KEY_FORMAT; } + tmp = flb_output_get_property("log_key", ins); + if (tmp) { + ctx->log_key = tmp; + } + + if (ctx->log_key && ctx->time_key) { + flb_plg_error(ctx->ins, "'time_key' and 'log_key' can not be used together"); + goto error; + } + tmp = flb_output_get_property("endpoint", ins); if (tmp) { ctx->custom_endpoint = FLB_TRUE; @@ -371,33 +381,33 @@ static int cb_firehose_exit(void *data, struct flb_config *config) static struct flb_config_map config_map[] = { { FLB_CONFIG_MAP_STR, "region", NULL, - 0, FLB_FALSE, 0, + 0, FLB_TRUE, offsetof(struct flb_firehose, region), "The AWS region of your delivery stream" }, { FLB_CONFIG_MAP_STR, "delivery_stream", NULL, - 0, FLB_FALSE, 0, + 0, FLB_TRUE, offsetof(struct flb_firehose, delivery_stream), "Firehose delivery stream name" }, { FLB_CONFIG_MAP_STR, "time_key", NULL, - 0, FLB_FALSE, 0, + 0, FLB_TRUE, offsetof(struct flb_firehose, time_key), "Add the timestamp to the record under this key. By default the timestamp " "from Fluent Bit will not be added to records sent to Kinesis." }, { FLB_CONFIG_MAP_STR, "time_key_format", NULL, - 0, FLB_FALSE, 0, + 0, FLB_TRUE, offsetof(struct flb_firehose, time_key_format), "strftime compliant format string for the timestamp; for example, " "the default is '%Y-%m-%dT%H:%M:%S'. This option is used with time_key. " }, { FLB_CONFIG_MAP_STR, "role_arn", NULL, - 0, FLB_FALSE, 0, + 0, FLB_TRUE, offsetof(struct flb_firehose, role_arn), "ARN of an IAM role to assume (ex. for cross account access)." }, @@ -409,10 +419,20 @@ static struct flb_config_map config_map[] = { { FLB_CONFIG_MAP_STR, "sts_endpoint", NULL, - 0, FLB_FALSE, 0, + 0, FLB_TRUE, offsetof(struct flb_firehose, sts_endpoint), "Custom endpoint for the STS API." }, + { + FLB_CONFIG_MAP_STR, "log_key", NULL, + 0, FLB_TRUE, offsetof(struct flb_firehose, log_key), + "By default, the whole log record will be sent to Firehose. " + "If you specify a key name with this option, then only the value of " + "that key will be sent to Firehose. For example, if you are using " + "the Fluentd Docker log driver, you can specify `log_key log` and only " + "the log message will be sent to Firehose." + }, + /* EOF */ {0} }; diff --git a/plugins/out_kinesis_firehose/firehose_api.c b/plugins/out_kinesis_firehose/firehose_api.c index 6d8f1633ad4..50103359803 100644 --- a/plugins/out_kinesis_firehose/firehose_api.c +++ b/plugins/out_kinesis_firehose/firehose_api.c @@ -183,6 +183,17 @@ static int process_event(struct flb_firehose *ctx, struct flush *buf, return 2; } + if (ctx->log_key) { + /* + * flb_msgpack_to_json will encase the value in quotes + * We don't want that for log_key, so we ignore the first + * and last character + */ + written -= 2; + tmp_buf_ptr++; /* pass over the opening quote */ + buf->tmp_buf_offset++; + } + /* is (written + 1) because we still have to append newline */ if ((written + 1) >= MAX_EVENT_SIZE) { flb_plg_warn(ctx->ins, "[size=%zu] Discarding record which is larger than "