|
75 | 75 | * server nodes omit this field automatically.
|
76 | 76 | */
|
77 | 77 |
|
| 78 | +/* |
| 79 | + * Generate a unique message ID. The upper 48-bit is milliseconds |
| 80 | + * since the Epoch, the lower 16-bit is a random nonce. |
| 81 | + */ |
| 82 | +static uint64_t message_id(void) |
| 83 | +{ |
| 84 | + uint64_t now; |
| 85 | + uint16_t nonce; |
| 86 | + struct flb_time tm; |
| 87 | + |
| 88 | + if (flb_time_get(&tm) != -1) { |
| 89 | + now = (uint64_t) tm.tm.tv_sec * 1000 + tm.tm.tv_nsec / 1000000; |
| 90 | + } |
| 91 | + else { |
| 92 | + now = (uint64_t) time(NULL) * 1000; |
| 93 | + } |
| 94 | + nonce = (uint16_t) rand(); |
| 95 | + |
| 96 | + return (now << 16) | nonce; |
| 97 | +} |
| 98 | + |
| 99 | +/* |
| 100 | + * A GELF header is 12 bytes in size. It has the following |
| 101 | + * structure: |
| 102 | + * |
| 103 | + * +---+---+---+---+---+---+---+---+---+---+---+---+ |
| 104 | + * | MAGIC | MESSAGE ID |SEQ|NUM| |
| 105 | + * +---+---+---+---+---+---+---+---+---+---+---+---+ |
| 106 | + * |
| 107 | + * NUM is the total number of packets to send. SEQ is the |
| 108 | + * unique sequence number for each packet (zero-indexed). |
| 109 | + */ |
| 110 | +#define GELF_MAGIC "\x1e\x0f" |
| 111 | +#define GELF_HEADER_SIZE 12 |
| 112 | + |
| 113 | +static void init_chunk_header(uint8_t *buf, int count) |
| 114 | +{ |
| 115 | + uint64_t msgid = message_id(); |
| 116 | + |
| 117 | + memcpy(buf, GELF_MAGIC, 2); |
| 118 | + memcpy(buf + 2, &msgid, 8); |
| 119 | + buf[10] = 0; |
| 120 | + buf[11] = count; |
| 121 | +} |
| 122 | + |
78 | 123 | /*
|
79 | 124 | * Chunked GELF
|
80 | 125 | * Prepend the following structure to your GELF message to make it chunked:
|
|
89 | 134 | * already arrived and still arriving chunks.
|
90 | 135 | * A message MUST NOT consist of more than 128 chunks.
|
91 | 136 | */
|
92 |
| - |
93 | 137 | static int gelf_send_udp_chunked(struct flb_out_gelf_config *ctx, void *msg,
|
94 | 138 | size_t msg_size)
|
95 | 139 | {
|
96 | 140 | int ret;
|
97 |
| - uint8_t header[12]; |
98 | 141 | uint8_t n;
|
99 | 142 | size_t chunks;
|
100 | 143 | size_t offset;
|
101 |
| - struct flb_time tm; |
102 |
| - uint64_t messageid; |
103 |
| - struct msghdr msghdr; |
104 |
| - struct iovec iov[2]; |
| 144 | + size_t len; |
| 145 | + uint8_t *buf = (uint8_t *) ctx->pckt_buf; |
105 | 146 |
|
106 | 147 | chunks = msg_size / ctx->pckt_size;
|
107 |
| - if ((msg_size % ctx->pckt_size) != 0) |
| 148 | + if (msg_size % ctx->pckt_size != 0) { |
108 | 149 | chunks++;
|
| 150 | + } |
109 | 151 |
|
110 | 152 | if (chunks > 128) {
|
111 |
| - flb_plg_error(ctx->ins, "message too big: %zd bytes, too many chunks", |
112 |
| - msg_size); |
| 153 | + flb_plg_error(ctx->ins, "message too big: %zd bytes", msg_size); |
113 | 154 | return -1;
|
114 | 155 | }
|
115 | 156 |
|
116 |
| - flb_time_get(&tm); |
117 |
| - |
118 |
| - messageid = ((uint64_t)(tm.tm.tv_nsec*1000000 + tm.tm.tv_nsec) << 32) | |
119 |
| - (uint64_t)rand_r(&(ctx->seed)); |
120 |
| - |
121 |
| - header[0] = 0x1e; |
122 |
| - header[1] = 0x0f; |
123 |
| - memcpy (header+2, &messageid, 8); |
124 |
| - header[10] = chunks; |
125 |
| - |
126 |
| - iov[0].iov_base = header; |
127 |
| - iov[0].iov_len = 12; |
128 |
| - |
129 |
| - memset(&msghdr, 0, sizeof(struct msghdr)); |
130 |
| - msghdr.msg_iov = iov; |
131 |
| - msghdr.msg_iovlen = 2; |
| 157 | + init_chunk_header(buf, chunks); |
132 | 158 |
|
133 | 159 | offset = 0;
|
134 | 160 | for (n = 0; n < chunks; n++) {
|
135 |
| - header[11] = n; |
| 161 | + buf[10] = n; |
136 | 162 |
|
137 |
| - iov[1].iov_base = msg + offset; |
138 |
| - if ((msg_size - offset) < ctx->pckt_size) { |
139 |
| - iov[1].iov_len = msg_size - offset; |
140 |
| - } |
141 |
| - else { |
142 |
| - iov[1].iov_len = ctx->pckt_size; |
| 163 | + len = msg_size - offset; |
| 164 | + if (ctx->pckt_size < len) { |
| 165 | + len = ctx->pckt_size; |
143 | 166 | }
|
| 167 | + memcpy(buf + GELF_HEADER_SIZE, (char *) msg + offset, len); |
144 | 168 |
|
145 |
| - ret = sendmsg(ctx->fd, &msghdr, MSG_DONTWAIT | MSG_NOSIGNAL); |
| 169 | + ret = send(ctx->fd, buf, len + GELF_HEADER_SIZE, |
| 170 | + MSG_DONTWAIT | MSG_NOSIGNAL); |
146 | 171 | if (ret == -1) {
|
147 | 172 | flb_errno();
|
148 | 173 | }
|
149 | 174 | offset += ctx->pckt_size;
|
150 | 175 | }
|
151 |
| - |
152 | 176 | return 0;
|
153 | 177 | }
|
154 | 178 |
|
@@ -399,14 +423,23 @@ static int cb_gelf_init(struct flb_output_instance *ins, struct flb_config *conf
|
399 | 423 | }
|
400 | 424 | close(fd);
|
401 | 425 | }
|
| 426 | + srand(ctx->seed); |
402 | 427 |
|
403 | 428 | ctx->fd = -1;
|
| 429 | + ctx->pckt_buf = NULL; |
| 430 | + |
404 | 431 | if (ctx->mode == FLB_GELF_UDP) {
|
405 | 432 | ctx->fd = flb_net_udp_connect(ins->host.name, ins->host.port);
|
406 | 433 | if (ctx->fd < 0) {
|
407 | 434 | flb_free(ctx);
|
408 | 435 | return -1;
|
409 | 436 | }
|
| 437 | + ctx->pckt_buf = flb_malloc(GELF_HEADER_SIZE + ctx->pckt_size); |
| 438 | + if (ctx->pckt_buf == NULL) { |
| 439 | + flb_socket_close(ctx->fd); |
| 440 | + flb_free(ctx); |
| 441 | + return -1; |
| 442 | + } |
410 | 443 | }
|
411 | 444 | else {
|
412 | 445 | int io_flags = FLB_IO_TCP;
|
@@ -449,6 +482,7 @@ static int cb_gelf_exit(void *data, struct flb_config *config)
|
449 | 482 | flb_sds_destroy(ctx->fields.full_message_key);
|
450 | 483 | flb_sds_destroy(ctx->fields.level_key);
|
451 | 484 |
|
| 485 | + flb_free(ctx->pckt_buf); |
452 | 486 | flb_free(ctx);
|
453 | 487 |
|
454 | 488 | return 0;
|
|
0 commit comments