Skip to content

Commit ba7fee4

Browse files
authored
in stdin: add buffer_size parameter (#2364)
* in sdtin: add buffer_size parameter Signed-off-by: Martin Dojcak <martin.dojcak@lablabs.io>
1 parent 3cc276d commit ba7fee4

File tree

2 files changed

+85
-25
lines changed

2 files changed

+85
-25
lines changed

plugins/in_stdin/in_stdin.c

+78-21
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
#include <fluent-bit/flb_time.h>
2626
#include <fluent-bit/flb_parser.h>
2727
#include <fluent-bit/flb_error.h>
28+
#include <fluent-bit/flb_utils.h>
2829

2930
#include <msgpack.h>
3031

@@ -106,7 +107,7 @@ static int in_stdin_collect(struct flb_input_instance *ins,
106107

107108
bytes = read(ctx->fd,
108109
ctx->buf + ctx->buf_len,
109-
sizeof(ctx->buf) - ctx->buf_len - 1);
110+
ctx->buf_size - ctx->buf_len - 1);
110111
flb_plg_trace(ctx->ins, "stdin read() = %i", bytes);
111112

112113
if (bytes == 0) {
@@ -210,45 +211,97 @@ static int in_stdin_collect(struct flb_input_instance *ins,
210211
return 0;
211212
}
212213

214+
/* Read stdin config*/
215+
static int in_stdin_config_init(struct flb_in_stdin_config *ctx,
216+
struct flb_input_instance *in,
217+
struct flb_config *config)
218+
{
219+
const char *pval = NULL;
220+
221+
ctx->buf_size = DEFAULT_BUF_SIZE;
222+
ctx->buf = NULL;
223+
ctx->buf_len = 0;
224+
ctx->ins = in;
225+
226+
/* parser settings */
227+
pval = flb_input_get_property("parser", in);
228+
if (pval) {
229+
ctx->parser = flb_parser_get(pval, config);
230+
if (!ctx->parser) {
231+
flb_plg_error(ctx->ins, "requested parser '%s' not found", pval);
232+
return -1;
233+
}
234+
}
235+
236+
/* buffer size setting */
237+
pval = flb_input_get_property("buffer_size", in);
238+
if (pval != NULL) {
239+
ctx->buf_size = (size_t) flb_utils_size_to_bytes(pval);
240+
241+
if (ctx->buf_size == -1) {
242+
flb_plg_error(ctx->ins, "buffer_size '%s' is invalid", pval);
243+
return -1;
244+
}
245+
else if (ctx->buf_size < DEFAULT_BUF_SIZE) {
246+
flb_plg_error(ctx->ins, "buffer_size '%s' must be at least %i bytes",
247+
pval, DEFAULT_BUF_SIZE);
248+
return -1;
249+
}
250+
}
251+
252+
flb_plg_debug(ctx->ins, "buf_size=%zu", ctx->buf_size);
253+
return 0;
254+
}
255+
256+
static void in_stdin_config_destroy(struct flb_in_stdin_config *ctx)
257+
{
258+
if (!ctx) {
259+
return;
260+
}
261+
262+
/* release buffer */
263+
if (ctx->buf) {
264+
flb_free(ctx->buf);
265+
}
266+
flb_free(ctx);
267+
}
268+
213269
/* Initialize plugin */
214270
static int in_stdin_init(struct flb_input_instance *in,
215271
struct flb_config *config, void *data)
216272
{
217273
int fd;
218274
int ret;
219-
const char *tmp;
220275
struct flb_in_stdin_config *ctx;
221276
(void) data;
222277

223-
/* Allocate space for the configuration */
278+
/* Allocate space for the configuration context */
224279
ctx = flb_malloc(sizeof(struct flb_in_stdin_config));
225280
if (!ctx) {
226281
return -1;
227282
}
228-
ctx->buf_len = 0;
229-
ctx->ins = in;
283+
284+
/* Initialize stdin config */
285+
ret = in_stdin_config_init(ctx, in, config);
286+
if (ret < 0) {
287+
goto init_error;
288+
}
289+
290+
ctx->buf = flb_malloc(ctx->buf_size);
291+
if (!ctx->buf) {
292+
flb_errno();
293+
goto init_error;
294+
}
230295

231296
/* Clone the standard input file descriptor */
232297
fd = dup(STDIN_FILENO);
233298
if (fd == -1) {
234299
flb_errno();
235300
flb_plg_error(ctx->ins, "Could not open standard input!");
236-
flb_free(ctx);
237-
return -1;
301+
goto init_error;
238302
}
239303
ctx->fd = fd;
240304

241-
tmp = flb_input_get_property("parser", in);
242-
if (tmp) {
243-
ctx->parser = flb_parser_get(tmp, config);
244-
if (!ctx->parser) {
245-
flb_plg_error(ctx->ins, "requested parser '%s' not found", tmp);
246-
}
247-
}
248-
else {
249-
ctx->parser = NULL;
250-
}
251-
252305
/* Always initialize built-in JSON pack state */
253306
flb_pack_state_init(&ctx->pack_state);
254307
ctx->pack_state.multiple = FLB_TRUE;
@@ -263,12 +316,16 @@ static int in_stdin_init(struct flb_input_instance *in,
263316
config);
264317
if (ret == -1) {
265318
flb_plg_error(ctx->ins, "Could not set collector for STDIN input plugin");
266-
flb_free(ctx);
267-
return -1;
319+
goto init_error;
268320
}
269321
ctx->coll_fd = ret;
270322

271323
return 0;
324+
325+
init_error:
326+
in_stdin_config_destroy(ctx);
327+
328+
return -1;
272329
}
273330

274331
/* Cleanup serial input */
@@ -284,7 +341,7 @@ static int in_stdin_exit(void *in_context, struct flb_config *config)
284341
close(ctx->fd);
285342
}
286343
flb_pack_state_reset(&ctx->pack_state);
287-
flb_free(ctx);
344+
in_stdin_config_destroy(ctx);
288345

289346
return 0;
290347
}

plugins/in_stdin/in_stdin.h

+7-4
Original file line numberDiff line numberDiff line change
@@ -24,12 +24,15 @@
2424
#include <fluent-bit/flb_config.h>
2525
#include <fluent-bit/flb_input.h>
2626

27+
#define DEFAULT_BUF_SIZE 16000
28+
2729
/* STDIN Input configuration & context */
2830
struct flb_in_stdin_config {
29-
int fd; /* stdin file descriptor */
30-
int coll_fd; /* collector fd */
31-
int buf_len; /* read buffer length */
32-
char buf[8192 * 2]; /* read buffer: 16Kb max */
31+
int fd; /* stdin file descriptor */
32+
int coll_fd; /* collector fd */
33+
size_t buf_size; /* size of a buffer */
34+
int buf_len; /* read buffer length */
35+
char *buf; /* read buffer */
3336

3437
/* Parser / Format */
3538
struct flb_parser *parser;

0 commit comments

Comments
 (0)