Skip to content

Commit 6bbd16e

Browse files
fujimotosxmcqueen
authored andcommitted
out_gelf: port the plugin to Windows (fluent#2574)
With this patch, the GELF Output plugin can be compiled and linked on Windows. This commit also fixes a few bugs in GELF plugin: * The message header was not constructed properly. In particular, 11-12th bytes were filled in the reversed order. * The message id generation was bogus (e.g. it did "tv_nsec*1000000 + tm.tm.tv_nsec" to generate a timestamp) Fix these glitches and add detailed documentation to each function. Signed-off-by: Fujimoto Seiji <fujimoto@ceptord.net>
1 parent ad24e80 commit 6bbd16e

File tree

3 files changed

+70
-35
lines changed

3 files changed

+70
-35
lines changed

cmake/windows-setup.cmake

+1-1
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ set(FLB_OUT_DATADOG Yes)
4949
set(FLB_OUT_ES Yes)
5050
set(FLB_OUT_EXIT No)
5151
set(FLB_OUT_FORWARD Yes)
52-
set(FLB_OUT_GELF No)
52+
set(FLB_OUT_GELF Yes)
5353
set(FLB_OUT_HTTP Yes)
5454
set(FLB_OUT_INFLUXDB Yes)
5555
set(FLB_OUT_NATS No)

plugins/out_gelf/gelf.c

+68-34
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,51 @@
7575
* server nodes omit this field automatically.
7676
*/
7777

78+
/*
79+
* Generate a unique message ID. The upper 48-bit is milliseconds
80+
* since the Epoch, the lower 16-bit is a random nonce.
81+
*/
82+
static uint64_t message_id(void)
83+
{
84+
uint64_t now;
85+
uint16_t nonce;
86+
struct flb_time tm;
87+
88+
if (flb_time_get(&tm) != -1) {
89+
now = (uint64_t) tm.tm.tv_sec * 1000 + tm.tm.tv_nsec / 1000000;
90+
}
91+
else {
92+
now = (uint64_t) time(NULL) * 1000;
93+
}
94+
nonce = (uint16_t) rand();
95+
96+
return (now << 16) | nonce;
97+
}
98+
99+
/*
100+
* A GELF header is 12 bytes in size. It has the following
101+
* structure:
102+
*
103+
* +---+---+---+---+---+---+---+---+---+---+---+---+
104+
* | MAGIC | MESSAGE ID |SEQ|NUM|
105+
* +---+---+---+---+---+---+---+---+---+---+---+---+
106+
*
107+
* NUM is the total number of packets to send. SEQ is the
108+
* unique sequence number for each packet (zero-indexed).
109+
*/
110+
#define GELF_MAGIC "\x1e\x0f"
111+
#define GELF_HEADER_SIZE 12
112+
113+
static void init_chunk_header(uint8_t *buf, int count)
114+
{
115+
uint64_t msgid = message_id();
116+
117+
memcpy(buf, GELF_MAGIC, 2);
118+
memcpy(buf + 2, &msgid, 8);
119+
buf[10] = 0;
120+
buf[11] = count;
121+
}
122+
78123
/*
79124
* Chunked GELF
80125
* Prepend the following structure to your GELF message to make it chunked:
@@ -89,66 +134,45 @@
89134
* already arrived and still arriving chunks.
90135
* A message MUST NOT consist of more than 128 chunks.
91136
*/
92-
93137
static int gelf_send_udp_chunked(struct flb_out_gelf_config *ctx, void *msg,
94138
size_t msg_size)
95139
{
96140
int ret;
97-
uint8_t header[12];
98141
uint8_t n;
99142
size_t chunks;
100143
size_t offset;
101-
struct flb_time tm;
102-
uint64_t messageid;
103-
struct msghdr msghdr;
104-
struct iovec iov[2];
144+
size_t len;
145+
uint8_t *buf = (uint8_t *) ctx->pckt_buf;
105146

106147
chunks = msg_size / ctx->pckt_size;
107-
if ((msg_size % ctx->pckt_size) != 0)
148+
if (msg_size % ctx->pckt_size != 0) {
108149
chunks++;
150+
}
109151

110152
if (chunks > 128) {
111-
flb_plg_error(ctx->ins, "message too big: %zd bytes, too many chunks",
112-
msg_size);
153+
flb_plg_error(ctx->ins, "message too big: %zd bytes", msg_size);
113154
return -1;
114155
}
115156

116-
flb_time_get(&tm);
117-
118-
messageid = ((uint64_t)(tm.tm.tv_nsec*1000000 + tm.tm.tv_nsec) << 32) |
119-
(uint64_t)rand_r(&(ctx->seed));
120-
121-
header[0] = 0x1e;
122-
header[1] = 0x0f;
123-
memcpy (header+2, &messageid, 8);
124-
header[10] = chunks;
125-
126-
iov[0].iov_base = header;
127-
iov[0].iov_len = 12;
128-
129-
memset(&msghdr, 0, sizeof(struct msghdr));
130-
msghdr.msg_iov = iov;
131-
msghdr.msg_iovlen = 2;
157+
init_chunk_header(buf, chunks);
132158

133159
offset = 0;
134160
for (n = 0; n < chunks; n++) {
135-
header[11] = n;
161+
buf[10] = n;
136162

137-
iov[1].iov_base = msg + offset;
138-
if ((msg_size - offset) < ctx->pckt_size) {
139-
iov[1].iov_len = msg_size - offset;
140-
}
141-
else {
142-
iov[1].iov_len = ctx->pckt_size;
163+
len = msg_size - offset;
164+
if (ctx->pckt_size < len) {
165+
len = ctx->pckt_size;
143166
}
167+
memcpy(buf + GELF_HEADER_SIZE, (char *) msg + offset, len);
144168

145-
ret = sendmsg(ctx->fd, &msghdr, MSG_DONTWAIT | MSG_NOSIGNAL);
169+
ret = send(ctx->fd, buf, len + GELF_HEADER_SIZE,
170+
MSG_DONTWAIT | MSG_NOSIGNAL);
146171
if (ret == -1) {
147172
flb_errno();
148173
}
149174
offset += ctx->pckt_size;
150175
}
151-
152176
return 0;
153177
}
154178

@@ -399,14 +423,23 @@ static int cb_gelf_init(struct flb_output_instance *ins, struct flb_config *conf
399423
}
400424
close(fd);
401425
}
426+
srand(ctx->seed);
402427

403428
ctx->fd = -1;
429+
ctx->pckt_buf = NULL;
430+
404431
if (ctx->mode == FLB_GELF_UDP) {
405432
ctx->fd = flb_net_udp_connect(ins->host.name, ins->host.port);
406433
if (ctx->fd < 0) {
407434
flb_free(ctx);
408435
return -1;
409436
}
437+
ctx->pckt_buf = flb_malloc(GELF_HEADER_SIZE + ctx->pckt_size);
438+
if (ctx->pckt_buf == NULL) {
439+
flb_socket_close(ctx->fd);
440+
flb_free(ctx);
441+
return -1;
442+
}
410443
}
411444
else {
412445
int io_flags = FLB_IO_TCP;
@@ -449,6 +482,7 @@ static int cb_gelf_exit(void *data, struct flb_config *config)
449482
flb_sds_destroy(ctx->fields.full_message_key);
450483
flb_sds_destroy(ctx->fields.level_key);
451484

485+
flb_free(ctx->pckt_buf);
452486
flb_free(ctx);
453487

454488
return 0;

plugins/out_gelf/gelf.h

+1
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ struct flb_out_gelf_config {
3535
flb_sockfd_t fd;
3636

3737
int pckt_size;
38+
char *pckt_buf;
3839
int compress;
3940
unsigned int seed;
4041

0 commit comments

Comments
 (0)