Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

out_gelf: Port the plugin to Windows #2574

Merged
merged 1 commit into from
Sep 24, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmake/windows-setup.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ set(FLB_OUT_DATADOG Yes)
set(FLB_OUT_ES Yes)
set(FLB_OUT_EXIT No)
set(FLB_OUT_FORWARD Yes)
set(FLB_OUT_GELF No)
set(FLB_OUT_GELF Yes)
set(FLB_OUT_HTTP Yes)
set(FLB_OUT_INFLUXDB Yes)
set(FLB_OUT_NATS No)
Expand Down
102 changes: 68 additions & 34 deletions plugins/out_gelf/gelf.c
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,51 @@
* server nodes omit this field automatically.
*/

/*
* Generate a unique message ID. The upper 48-bit is milliseconds
* since the Epoch, the lower 16-bit is a random nonce.
*/
static uint64_t message_id(void)
{
uint64_t now;
uint16_t nonce;
struct flb_time tm;

if (flb_time_get(&tm) != -1) {
now = (uint64_t) tm.tm.tv_sec * 1000 + tm.tm.tv_nsec / 1000000;
}
else {
now = (uint64_t) time(NULL) * 1000;
}
nonce = (uint16_t) rand();

return (now << 16) | nonce;
}

/*
* A GELF header is 12 bytes in size. It has the following
* structure:
*
* +---+---+---+---+---+---+---+---+---+---+---+---+
* | MAGIC | MESSAGE ID |SEQ|NUM|
* +---+---+---+---+---+---+---+---+---+---+---+---+
*
* NUM is the total number of packets to send. SEQ is the
* unique sequence number for each packet (zero-indexed).
*/
#define GELF_MAGIC "\x1e\x0f"
#define GELF_HEADER_SIZE 12

static void init_chunk_header(uint8_t *buf, int count)
{
uint64_t msgid = message_id();

memcpy(buf, GELF_MAGIC, 2);
memcpy(buf + 2, &msgid, 8);
buf[10] = 0;
buf[11] = count;
}

/*
* Chunked GELF
* Prepend the following structure to your GELF message to make it chunked:
Expand All @@ -89,66 +134,45 @@
* already arrived and still arriving chunks.
* A message MUST NOT consist of more than 128 chunks.
*/

static int gelf_send_udp_chunked(struct flb_out_gelf_config *ctx, void *msg,
size_t msg_size)
{
int ret;
uint8_t header[12];
uint8_t n;
size_t chunks;
size_t offset;
struct flb_time tm;
uint64_t messageid;
struct msghdr msghdr;
struct iovec iov[2];
size_t len;
uint8_t *buf = (uint8_t *) ctx->pckt_buf;

chunks = msg_size / ctx->pckt_size;
if ((msg_size % ctx->pckt_size) != 0)
if (msg_size % ctx->pckt_size != 0) {
chunks++;
}

if (chunks > 128) {
flb_plg_error(ctx->ins, "message too big: %zd bytes, too many chunks",
msg_size);
flb_plg_error(ctx->ins, "message too big: %zd bytes", msg_size);
return -1;
}

flb_time_get(&tm);

messageid = ((uint64_t)(tm.tm.tv_nsec*1000000 + tm.tm.tv_nsec) << 32) |
(uint64_t)rand_r(&(ctx->seed));

header[0] = 0x1e;
header[1] = 0x0f;
memcpy (header+2, &messageid, 8);
header[10] = chunks;

iov[0].iov_base = header;
iov[0].iov_len = 12;

memset(&msghdr, 0, sizeof(struct msghdr));
msghdr.msg_iov = iov;
msghdr.msg_iovlen = 2;
init_chunk_header(buf, chunks);

offset = 0;
for (n = 0; n < chunks; n++) {
header[11] = n;
buf[10] = n;

iov[1].iov_base = msg + offset;
if ((msg_size - offset) < ctx->pckt_size) {
iov[1].iov_len = msg_size - offset;
}
else {
iov[1].iov_len = ctx->pckt_size;
len = msg_size - offset;
if (ctx->pckt_size < len) {
len = ctx->pckt_size;
}
memcpy(buf + GELF_HEADER_SIZE, (char *) msg + offset, len);

ret = sendmsg(ctx->fd, &msghdr, MSG_DONTWAIT | MSG_NOSIGNAL);
ret = send(ctx->fd, buf, len + GELF_HEADER_SIZE,
MSG_DONTWAIT | MSG_NOSIGNAL);
if (ret == -1) {
flb_errno();
}
offset += ctx->pckt_size;
}

return 0;
}

Expand Down Expand Up @@ -399,14 +423,23 @@ static int cb_gelf_init(struct flb_output_instance *ins, struct flb_config *conf
}
close(fd);
}
srand(ctx->seed);

ctx->fd = -1;
ctx->pckt_buf = NULL;

if (ctx->mode == FLB_GELF_UDP) {
ctx->fd = flb_net_udp_connect(ins->host.name, ins->host.port);
if (ctx->fd < 0) {
flb_free(ctx);
return -1;
}
ctx->pckt_buf = flb_malloc(GELF_HEADER_SIZE + ctx->pckt_size);
if (ctx->pckt_buf == NULL) {
flb_socket_close(ctx->fd);
flb_free(ctx);
return -1;
}
}
else {
int io_flags = FLB_IO_TCP;
Expand Down Expand Up @@ -449,6 +482,7 @@ static int cb_gelf_exit(void *data, struct flb_config *config)
flb_sds_destroy(ctx->fields.full_message_key);
flb_sds_destroy(ctx->fields.level_key);

flb_free(ctx->pckt_buf);
flb_free(ctx);

return 0;
Expand Down
1 change: 1 addition & 0 deletions plugins/out_gelf/gelf.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ struct flb_out_gelf_config {
flb_sockfd_t fd;

int pckt_size;
char *pckt_buf;
int compress;
unsigned int seed;

Expand Down