Skip to content

Commit 901e43f

Browse files
Fujimoto Seijiedsiper
Fujimoto Seiji
authored andcommitted
in_statsd: implement the support for "statsd" protocol
This is the first cut at adding statsd support to Fluent Bit. You can use the "in_statsd" plugins as follows: $ fluent-bit -i statsd -o stdout ... now you can input metrics like: $ echo "click:10|c|@0.1" > /dev/udp/127.0.0.1/8125 $ echo "active:+10|g" > /dev/udp/127.0.0.1/8125 This plugin will parse the incoming messages and produce well- formatted records like below: {"type"=>"counter", "bucket"=>"click", "value"=>10.000000, "sample_rate"=>0.100000} {"type"=>"gauge", "bucket"=>"active", "value"=>10.000000, "incremental"=>1} With this, we can easily collect performance logs from services with statsd ouput support (like cadvisor). Signed-off-by: Fujimoto Seiji <fujimoto@clear-code.com>
1 parent 87380a1 commit 901e43f

File tree

5 files changed

+342
-0
lines changed

5 files changed

+342
-0
lines changed

CMakeLists.txt

+1
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,7 @@ option(FLB_IN_DUMMY "Enable Dummy input plugin" Yes)
122122
option(FLB_IN_NETIF "Enable NetworkIF input plugin" Yes)
123123
option(FLB_IN_WINLOG "Enable Windows Log input plugin" No)
124124
option(FLB_IN_COLLECTD "Enable Collectd input plugin" Yes)
125+
option(FLB_IN_STATSD "Enable StatsD input plugin" Yes)
125126
option(FLB_IN_STORAGE_BACKLOG "Enable storage backlog input plugin" Yes)
126127
option(FLB_OUT_AZURE "Enable Azure output plugin" Yes)
127128
option(FLB_OUT_BIGQUERY "Enable BigQuery output plugin" Yes)

cmake/windows-setup.cmake

+1
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ set(FLB_IN_DUMMY Yes)
3838
set(FLB_IN_NETIF No)
3939
set(FLB_IN_WINLOG Yes)
4040
set(FLB_IN_COLLECTD No)
41+
set(FLB_IN_STATSD No)
4142
set(FLB_IN_STORAGE_BACKLOG No)
4243

4344
# OUTPUT plugins

plugins/CMakeLists.txt

+1
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,7 @@ REGISTER_IN_PLUGIN("in_head")
137137
REGISTER_IN_PLUGIN("in_health")
138138
REGISTER_IN_PLUGIN("in_http")
139139
REGISTER_IN_PLUGIN("in_collectd")
140+
REGISTER_IN_PLUGIN("in_statsd")
140141
REGISTER_IN_PLUGIN("in_storage_backlog")
141142

142143
if (FLB_STREAM_PROCESSOR)

plugins/in_statsd/CMakeLists.txt

+4
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
set(src
2+
statsd.c)
3+
4+
FLB_PLUGIN(in_statsd "${src}" "")

plugins/in_statsd/statsd.c

+335
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,335 @@
1+
/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
2+
3+
/* Fluent Bit
4+
* ==========
5+
* Copyright (C) 2019 The Fluent Bit Authors
6+
* Copyright (C) 2015-2018 Treasure Data Inc.
7+
*
8+
* Licensed under the Apache License, Version 2.0 (the "License");
9+
* you may not use this file except in compliance with the License.
10+
* You may obtain a copy of the License at
11+
*
12+
* http://www.apache.org/licenses/LICENSE-2.0
13+
*
14+
* Unless required by applicable law or agreed to in writing, software
15+
* distributed under the License is distributed on an "AS IS" BASIS,
16+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17+
* See the License for the specific language governing permissions and
18+
* limitations under the License.
19+
*/
20+
21+
#include <fluent-bit/flb_info.h>
22+
#include <fluent-bit/flb_input.h>
23+
#include <fluent-bit/flb_utils.h>
24+
#include <fluent-bit/flb_socket.h>
25+
#include <fluent-bit/flb_pack.h>
26+
27+
#define MAX_PACKET_SIZE 65536
28+
#define DEFAULT_LISTEN "0.0.0.0"
29+
#define DEFAULT_PORT 8125
30+
31+
#define STATSD_TYPE_COUNTER 1
32+
#define STATSD_TYPE_GAUGE 2
33+
#define STATSD_TYPE_TIMER 3
34+
#define STATSD_TYPE_SET 4
35+
36+
struct flb_statsd {
37+
char *buf; /* buffer */
38+
char listen[256]; /* listening address (RFC-2181) */
39+
char port[6]; /* listening port (RFC-793) */
40+
flb_sockfd_t server_fd; /* server socket */
41+
flb_pipefd_t coll_fd; /* server handler */
42+
struct flb_input_instance *i_ins; /* input instance */
43+
};
44+
45+
/*
46+
* The "statsd_message" represents a single line in UDP packet.
47+
* It's just a bunch of pointers to ephemeral buffer.
48+
*/
49+
struct statsd_message {
50+
char *bucket;
51+
int bucket_len;
52+
char *value;
53+
int value_len;
54+
int type;
55+
double sample_rate;
56+
};
57+
58+
static void pack_string(msgpack_packer *mp_pck, char *str, size_t len)
59+
{
60+
if (len < 0) {
61+
len = strlen(str);
62+
}
63+
msgpack_pack_str(mp_pck, len);
64+
msgpack_pack_str_body(mp_pck, str, len);
65+
}
66+
67+
static int get_statsd_type(char *str)
68+
{
69+
switch (*str) {
70+
case 'g':
71+
return STATSD_TYPE_GAUGE;
72+
case 's':
73+
return STATSD_TYPE_SET;
74+
case 'c':
75+
return STATSD_TYPE_COUNTER;
76+
case 'm':
77+
if (*(str + 1) == 's') {
78+
return STATSD_TYPE_TIMER;
79+
}
80+
}
81+
return STATSD_TYPE_COUNTER;
82+
}
83+
84+
static int is_incremental(char *str)
85+
{
86+
return (*str == '+' || *str == '-');
87+
}
88+
89+
static int statsd_process_message(msgpack_packer *mp_pck,
90+
struct statsd_message *m)
91+
{
92+
msgpack_pack_array(mp_pck, 2);
93+
flb_pack_time_now(mp_pck);
94+
95+
switch (m->type) {
96+
case STATSD_TYPE_COUNTER:
97+
msgpack_pack_map(mp_pck, 4);
98+
pack_string(mp_pck, "type", 4);
99+
pack_string(mp_pck, "counter", 7);
100+
pack_string(mp_pck, "bucket", 6);
101+
pack_string(mp_pck, m->bucket, m->bucket_len);
102+
pack_string(mp_pck, "value", 5);
103+
msgpack_pack_double(mp_pck, atof(m->value));
104+
pack_string(mp_pck, "sample_rate", 11);
105+
msgpack_pack_double(mp_pck, m->sample_rate);
106+
break;
107+
case STATSD_TYPE_GAUGE:
108+
msgpack_pack_map(mp_pck, 4);
109+
pack_string(mp_pck, "type", 4);
110+
pack_string(mp_pck, "gauge", 5);
111+
pack_string(mp_pck, "bucket", 6);
112+
pack_string(mp_pck, m->bucket, m->bucket_len);
113+
pack_string(mp_pck, "value", 5);
114+
msgpack_pack_double(mp_pck, atof(m->value));
115+
pack_string(mp_pck, "incremental", 11);
116+
msgpack_pack_int(mp_pck, is_incremental(m->value));
117+
break;
118+
case STATSD_TYPE_TIMER:
119+
msgpack_pack_map(mp_pck, 4);
120+
pack_string(mp_pck, "type", 4);
121+
pack_string(mp_pck, "timer", 5);
122+
pack_string(mp_pck, "bucket", 6);
123+
pack_string(mp_pck, m->bucket, m->bucket_len);
124+
pack_string(mp_pck, "value", 5);
125+
msgpack_pack_double(mp_pck, atof(m->value));
126+
pack_string(mp_pck, "sample_rate", 11);
127+
msgpack_pack_double(mp_pck, m->sample_rate);
128+
break;
129+
case STATSD_TYPE_SET:
130+
msgpack_pack_map(mp_pck, 3);
131+
pack_string(mp_pck, "type", 4);
132+
pack_string(mp_pck, "set", 3);
133+
pack_string(mp_pck, "bucket", 6);
134+
pack_string(mp_pck, m->bucket, m->bucket_len);
135+
pack_string(mp_pck, "value", 5);
136+
pack_string(mp_pck, m->value, m->value_len);
137+
break;
138+
}
139+
return 0;
140+
}
141+
142+
static int statsd_process_line(msgpack_packer *mp_pck, char *line)
143+
{
144+
char *colon, *bar, *atmark;
145+
struct statsd_message m;
146+
147+
/*
148+
* bucket:value|type|@sample_rate
149+
* ------
150+
*/
151+
colon = strchr(line, ':');
152+
if (colon == NULL) {
153+
flb_error("[in_statsd] no bucket name found");
154+
return -1;
155+
}
156+
m.bucket = line;
157+
m.bucket_len = (colon - line);
158+
159+
/*
160+
* bucket:value|type|@sample_rate
161+
* ----
162+
*/
163+
bar = strchr(colon + 1, '|');
164+
if (bar == NULL) {
165+
flb_error("[in_statsd] no metric type found");
166+
return -1;
167+
}
168+
m.type = get_statsd_type(bar + 1);
169+
170+
/*
171+
* bucket:value|type|@sample_rate
172+
* -----
173+
*/
174+
m.value = colon + 1;
175+
m.value_len = (bar - colon - 1);
176+
177+
/*
178+
* bucket:value|type|@sample_rate
179+
* ------------
180+
*/
181+
atmark = strstr(bar + 1, "|@");
182+
if (atmark == NULL || atof(atmark + 2) == 0) {
183+
m.sample_rate = 1.0;
184+
}
185+
else {
186+
m.sample_rate = atof(atmark + 2);
187+
}
188+
189+
return statsd_process_message(mp_pck, &m);
190+
}
191+
192+
193+
static int cb_statsd_receive(struct flb_input_instance *i_ins,
194+
struct flb_config *config, void *data)
195+
{
196+
struct flb_statsd *ctx = data;
197+
char *line;
198+
int len;
199+
msgpack_packer mp_pck;
200+
msgpack_sbuffer mp_sbuf;
201+
202+
/* Receive a UDP datagram */
203+
len = recv(ctx->server_fd, ctx->buf, MAX_PACKET_SIZE - 1, 0);
204+
if (len < 0) {
205+
flb_errno();
206+
return -1;
207+
}
208+
ctx->buf[len] = '\0';
209+
210+
msgpack_sbuffer_init(&mp_sbuf);
211+
msgpack_packer_init(&mp_pck, &mp_sbuf, msgpack_sbuffer_write);
212+
213+
/* Process all messages in buffer */
214+
line = strtok(ctx->buf, "\n");
215+
while (line) {
216+
flb_trace("[in_statsd] received a line: '%s'", line);
217+
if (statsd_process_line(&mp_pck, line) < 0) {
218+
flb_error("[in_statsd] failed to process line: '%s'", line);
219+
}
220+
line = strtok(NULL, "\n");
221+
}
222+
223+
/* Send to output */
224+
if (mp_sbuf.size > 0) {
225+
flb_input_chunk_append_raw(i_ins, NULL, 0, mp_sbuf.data, mp_sbuf.size);
226+
}
227+
msgpack_sbuffer_destroy(&mp_sbuf);
228+
229+
return 0;
230+
}
231+
232+
static int cb_statsd_init(struct flb_input_instance *i_ins,
233+
struct flb_config *config, void *data)
234+
{
235+
struct flb_statsd *ctx;
236+
char *listen;
237+
int port;
238+
239+
ctx = flb_calloc(1, sizeof(struct flb_statsd));
240+
if (!ctx) {
241+
flb_errno();
242+
return -1;
243+
}
244+
245+
ctx->buf = flb_malloc(MAX_PACKET_SIZE);
246+
if (!ctx->buf) {
247+
flb_errno();
248+
flb_free(ctx);
249+
return -1;
250+
}
251+
ctx->i_ins = i_ins;
252+
253+
/* Listening address */
254+
if (i_ins->host.listen) {
255+
listen = i_ins->host.listen;
256+
}
257+
else {
258+
listen = DEFAULT_LISTEN;
259+
}
260+
strncpy(ctx->listen, listen, sizeof(ctx->listen) - 1);
261+
262+
/* Listening port */
263+
if (i_ins->host.port) {
264+
port = i_ins->host.port;
265+
}
266+
else {
267+
port = DEFAULT_PORT;
268+
}
269+
snprintf(ctx->port, sizeof(ctx->port), "%hu", port);
270+
271+
/* Export plugin context */
272+
flb_input_set_context(i_ins, ctx);
273+
274+
/* Accepts metrics from UDP connections. */
275+
ctx->server_fd = flb_net_server_udp(ctx->port, ctx->listen);
276+
if (ctx->server_fd == -1) {
277+
flb_error("[in_statsd] can't bind to %s:%s", ctx->listen, ctx->port);
278+
flb_free(ctx->buf);
279+
flb_free(ctx);
280+
return -1;
281+
}
282+
283+
/* Set up the UDP connection callback */
284+
ctx->coll_fd = flb_input_set_collector_socket(i_ins, cb_statsd_receive,
285+
ctx->server_fd, config);
286+
if (ctx->coll_fd == -1) {
287+
flb_error("[in_statsd] cannot set up connection callback ");
288+
flb_socket_close(ctx->server_fd);
289+
flb_free(ctx->buf);
290+
flb_free(ctx);
291+
return -1;
292+
}
293+
294+
flb_info("[in_statsd] start UDP server on %s:%s", ctx->listen, ctx->port);
295+
296+
return 0;
297+
}
298+
299+
static void cb_statsd_pause(void *data, struct flb_config *config)
300+
{
301+
struct flb_statsd *ctx = data;
302+
flb_input_collector_pause(ctx->coll_fd, ctx->i_ins);
303+
}
304+
305+
static void cb_statsd_resume(void *data, struct flb_config *config)
306+
{
307+
struct flb_statsd *ctx = data;
308+
flb_input_collector_resume(ctx->coll_fd, ctx->i_ins);
309+
}
310+
311+
static int cb_statsd_exit(void *data, struct flb_config *config)
312+
{
313+
struct flb_statsd *ctx = data;
314+
315+
flb_input_collector_pause(ctx->coll_fd, ctx->i_ins);
316+
flb_socket_close(ctx->server_fd);
317+
flb_free(ctx->buf);
318+
flb_free(ctx);
319+
return 0;
320+
}
321+
322+
/* Plugin reference */
323+
struct flb_input_plugin in_statsd_plugin = {
324+
.name = "statsd",
325+
.description = "StatsD input plugin",
326+
.cb_init = cb_statsd_init,
327+
.cb_pre_run = NULL,
328+
.cb_collect = NULL,
329+
.cb_ingest = NULL,
330+
.cb_flush_buf = NULL,
331+
.cb_pause = cb_statsd_pause,
332+
.cb_resume = cb_statsd_resume,
333+
.cb_exit = cb_statsd_exit,
334+
.flags = 0
335+
};

0 commit comments

Comments
 (0)