Skip to content

Commit b365cc5

Browse files
committed
in sdtin: add buffer_size parameter
1 parent db5c22b commit b365cc5

File tree

2 files changed

+77
-23
lines changed

2 files changed

+77
-23
lines changed

plugins/in_stdin/in_stdin.c

+70-19
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
#include <fluent-bit/flb_time.h>
2626
#include <fluent-bit/flb_parser.h>
2727
#include <fluent-bit/flb_error.h>
28+
#include <fluent-bit/flb_utils.h>
2829

2930
#include <msgpack.h>
3031

@@ -106,7 +107,7 @@ static int in_stdin_collect(struct flb_input_instance *ins,
106107

107108
bytes = read(ctx->fd,
108109
ctx->buf + ctx->buf_len,
109-
sizeof(ctx->buf) - ctx->buf_len - 1);
110+
ctx->buf_size - ctx->buf_len - 1);
110111
flb_plg_trace(ctx->ins, "stdin read() = %i", bytes);
111112

112113
if (bytes == 0) {
@@ -210,45 +211,91 @@ static int in_stdin_collect(struct flb_input_instance *ins,
210211
return 0;
211212
}
212213

214+
/* Read config file and*/
215+
static int in_stdin_config_init(struct flb_in_stdin_config *ctx,
216+
struct flb_input_instance *in,
217+
struct flb_config *config)
218+
{
219+
const char *pval = NULL;
220+
221+
/* parser settings */
222+
pval = flb_input_get_property("parser", in);
223+
if (pval) {
224+
ctx->parser = flb_parser_get(pval, config);
225+
if (!ctx->parser) {
226+
flb_plg_error(ctx->ins, "requested parser '%s' not found", pval);
227+
}
228+
}
229+
else {
230+
ctx->parser = NULL;
231+
}
232+
233+
/* buffer size setting */
234+
pval = flb_input_get_property("buffer_size", in);
235+
if (pval != NULL && flb_utils_size_to_bytes(pval) > 0) {
236+
ctx->buf_size = flb_utils_size_to_bytes(pval);
237+
}
238+
else {
239+
ctx->buf_size = DEFAULT_BUF_SIZE;
240+
}
241+
242+
flb_plg_debug(ctx->ins, "buf_size=%zu", ctx->buf_size);
243+
return 0;
244+
}
245+
246+
static void in_stdin_config_destroy(struct flb_in_stdin_config *ctx)
247+
{
248+
if (!ctx) {
249+
return;
250+
}
251+
252+
/* release buffer */
253+
if (ctx->buf) {
254+
flb_free(ctx->buf);
255+
}
256+
flb_free(ctx);
257+
}
258+
213259
/* Initialize plugin */
214260
static int in_stdin_init(struct flb_input_instance *in,
215261
struct flb_config *config, void *data)
216262
{
217263
int fd;
218264
int ret;
219-
const char *tmp;
220265
struct flb_in_stdin_config *ctx;
221266
(void) data;
222267

223-
/* Allocate space for the configuration */
268+
/* Allocate space for the configuration context */
224269
ctx = flb_malloc(sizeof(struct flb_in_stdin_config));
225270
if (!ctx) {
226271
return -1;
227272
}
273+
274+
ctx->buf = NULL;
228275
ctx->buf_len = 0;
229276
ctx->ins = in;
230277

278+
/* Initialize stdin config */
279+
ret = in_stdin_config_init(ctx, in, config);
280+
if (ret < 0) {
281+
goto init_error;
282+
}
283+
284+
ctx->buf = flb_malloc(ctx->buf_size);
285+
if (!ctx->buf) {
286+
flb_errno();
287+
goto init_error;
288+
}
289+
231290
/* Clone the standard input file descriptor */
232291
fd = dup(STDIN_FILENO);
233292
if (fd == -1) {
234293
flb_errno();
235294
flb_plg_error(ctx->ins, "Could not open standard input!");
236-
flb_free(ctx);
237-
return -1;
295+
goto init_error;
238296
}
239297
ctx->fd = fd;
240298

241-
tmp = flb_input_get_property("parser", in);
242-
if (tmp) {
243-
ctx->parser = flb_parser_get(tmp, config);
244-
if (!ctx->parser) {
245-
flb_plg_error(ctx->ins, "requested parser '%s' not found", tmp);
246-
}
247-
}
248-
else {
249-
ctx->parser = NULL;
250-
}
251-
252299
/* Always initialize built-in JSON pack state */
253300
flb_pack_state_init(&ctx->pack_state);
254301
ctx->pack_state.multiple = FLB_TRUE;
@@ -263,12 +310,16 @@ static int in_stdin_init(struct flb_input_instance *in,
263310
config);
264311
if (ret == -1) {
265312
flb_plg_error(ctx->ins, "Could not set collector for STDIN input plugin");
266-
flb_free(ctx);
267-
return -1;
313+
goto init_error;
268314
}
269315
ctx->coll_fd = ret;
270316

271317
return 0;
318+
319+
init_error:
320+
in_stdin_config_destroy(ctx);
321+
322+
return -1;
272323
}
273324

274325
/* Cleanup serial input */
@@ -284,7 +335,7 @@ static int in_stdin_exit(void *in_context, struct flb_config *config)
284335
close(ctx->fd);
285336
}
286337
flb_pack_state_reset(&ctx->pack_state);
287-
flb_free(ctx);
338+
in_stdin_config_destroy(ctx);
288339

289340
return 0;
290341
}

plugins/in_stdin/in_stdin.h

+7-4
Original file line numberDiff line numberDiff line change
@@ -24,12 +24,15 @@
2424
#include <fluent-bit/flb_config.h>
2525
#include <fluent-bit/flb_input.h>
2626

27+
#define DEFAULT_BUF_SIZE 16384
28+
2729
/* STDIN Input configuration & context */
2830
struct flb_in_stdin_config {
29-
int fd; /* stdin file descriptor */
30-
int coll_fd; /* collector fd */
31-
int buf_len; /* read buffer length */
32-
char buf[8192 * 2]; /* read buffer: 16Kb max */
31+
int fd; /* stdin file descriptor */
32+
int coll_fd; /* collector fd */
33+
size_t buf_size; /* size of a buffer */
34+
int buf_len; /* read buffer length */
35+
char *buf; /* read buffer */
3336

3437
/* Parser / Format */
3538
struct flb_parser *parser;

0 commit comments

Comments
 (0)