Skip to content

Commit

Permalink
pipe: add flb_pipe_error
Browse files Browse the repository at this point in the history
On Windows, the `flb_pipe_r` and `flb_pipe_w` macros do not set errno on
failure, meaning calling `flb_errno` in error scenarios is insufficient.
This PR adds a new macro that will check the correct place,
`WSAGetLastError`, and output a similar error message. On Linux this
will still be `flb_errno`, meaning messages should work the same as they
always did, but now on Windows we will get actual error messages.

Signed-off-by: braydonk <braydonk@google.com>
  • Loading branch information
braydonk committed Mar 4, 2025
1 parent 8e8c9d6 commit c66851c
Show file tree
Hide file tree
Showing 20 changed files with 72 additions and 45 deletions.
2 changes: 1 addition & 1 deletion include/fluent-bit/flb_input.h
Original file line number Diff line number Diff line change
Expand Up @@ -671,7 +671,7 @@ static FLB_INLINE void flb_input_return(struct flb_coro *coro) {
val = FLB_BITS_U64_SET(FLB_ENGINE_IN_CORO, ins->id);
n = flb_pipe_w(ins->ch_events[1], (void *) &val, sizeof(val));
if (n == -1) {
flb_errno();
flb_pipe_error();
}

flb_input_coro_prepare_destroy(input_coro);
Expand Down
7 changes: 7 additions & 0 deletions include/fluent-bit/flb_log.h
Original file line number Diff line number Diff line change
Expand Up @@ -232,11 +232,18 @@ static inline int flb_log_suppress_check(int log_suppress_interval, const char *
int flb_log_worker_init(struct flb_worker *worker);
int flb_log_worker_destroy(struct flb_worker *worker);
int flb_errno_print(int errnum, const char *file, int line);
int flb_WSAGetLastError_print(int errnum, const char *file, int line);

#ifdef __FLB_FILENAME__
#define flb_errno() flb_errno_print(errno, __FLB_FILENAME__, __LINE__)
#ifdef WIN32
#define flb_WSAGetLastError() flb_WSAGetLastError_print(WSAGetLastError(), __FLB_FILENAME__, __LINE__)
#endif
#else
#define flb_errno() flb_errno_print(errno, __FILE__, __LINE__)
#ifdef WIN32
#define flb_WSAGetLastError() flb_WSAGetLastError_print(WSAGetLastError(), __FILE__, __LINE__)
#endif
#endif

#endif
2 changes: 1 addition & 1 deletion include/fluent-bit/flb_output.h
Original file line number Diff line number Diff line change
Expand Up @@ -1199,7 +1199,7 @@ static inline void flb_output_return(int ret, struct flb_coro *co) {
/* Notify the event loop about our return status */
n = flb_pipe_w(pipe_fd, (void *) &val, sizeof(val));
if (n == -1) {
flb_errno();
flb_pipe_error();
}

/*
Expand Down
3 changes: 3 additions & 0 deletions include/fluent-bit/flb_pipe.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,14 @@
#define flb_sockfd_t evutil_socket_t
#define flb_pipe_w(fd, buf, len) send(fd, buf, len, 0)
#define flb_pipe_r(fd, buf, len) recv(fd, buf, len, 0)
#define flb_pipe_error() flb_WSAGetLastError()
#define FLB_PIPE_WOULDBLOCK() (WSAGetLastError() == WSAEWOULDBLOCK)
#else
#define flb_pipefd_t int
#define flb_sockfd_t int
#define flb_pipe_w(fd, buf, len) write(fd, buf, len)
#define flb_pipe_r(fd, buf, len) read(fd, buf, len)
#define flb_pipe_error() flb_errno()
#define FLB_PIPE_WOULDBLOCK() (errno == EAGAIN || errno == EWOULDBLOCK)
#endif

Expand All @@ -43,5 +45,6 @@ int flb_pipe_close(flb_pipefd_t fd);
int flb_pipe_set_nonblocking(flb_pipefd_t fd);
ssize_t flb_pipe_read_all(int fd, void *buf, size_t count);
ssize_t flb_pipe_write_all(int fd, const void *buf, size_t count);
void flb_pipe_log_last_error();

#endif
2 changes: 1 addition & 1 deletion include/fluent-bit/flb_scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ static FLB_INLINE void flb_sched_timer_cb_coro_return()
val = FLB_BITS_U64_SET(FLB_SCHED_TIMER_CORO_RETURN, stc->id);
n = flb_pipe_w(sched->ch_events[1], &val, sizeof(val));
if (n == -1) {
flb_errno();
flb_pipe_error();
}

flb_coro_yield(coro, FLB_TRUE);
Expand Down
6 changes: 3 additions & 3 deletions plugins/in_exec/in_exec.c
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ static int in_exec_collect(struct flb_input_instance *ins,
if (ctx->oneshot == FLB_TRUE) {
ret = flb_pipe_r(ctx->ch_manager[0], &val, sizeof(val));
if (ret == -1) {
flb_errno();
flb_pipe_error();
return -1;
}
}
Expand Down Expand Up @@ -256,7 +256,7 @@ static int in_exec_config_read(struct flb_exec *ctx,
flb_plg_error(in, "unable to load configuration");
return -1;
}

/* filepath setting */
if (ctx->cmd == NULL) {
flb_plg_error(in, "no input 'command' was given");
Expand Down Expand Up @@ -418,7 +418,7 @@ static int in_exec_prerun(struct flb_input_instance *ins,
/* Kick the oneshot execution */
ret = flb_pipe_w(ctx->ch_manager[1], &val, sizeof(val));
if (ret == -1) {
flb_errno();
flb_pipe_error();
return -1;
}
return 0;
Expand Down
4 changes: 2 additions & 2 deletions plugins/in_exec_wasi/in_exec_wasi.c
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ static int in_exec_wasi_collect(struct flb_input_instance *ins,
ret = flb_pipe_r(ctx->ch_manager[0], &val, sizeof(val));
if (ret == -1) {
fclose(stdoutp);
flb_errno();
flb_pipe_error();
return -1;
}
}
Expand Down Expand Up @@ -404,7 +404,7 @@ static int in_exec_wasi_prerun(struct flb_input_instance *ins,
/* Kick the oneshot execution */
ret = flb_pipe_w(ctx->ch_manager[1], &val, sizeof(val));
if (ret == -1) {
flb_errno();
flb_pipe_error();
return -1;
}
return 0;
Expand Down
1 change: 1 addition & 0 deletions plugins/in_lib/in_lib.c
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ static int in_lib_collect(struct flb_input_instance *ins,
flb_plg_trace(ctx->ins, "in_lib read() = %i", bytes);
if (bytes == -1) {
perror("read");
flb_pipe_error();
if (errno == -EPIPE) {
return -1;
}
Expand Down
2 changes: 1 addition & 1 deletion plugins/in_tail/tail.c
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ static inline int consume_byte(flb_pipefd_t fd)
/* We need to consume the byte */
ret = flb_pipe_r(fd, (char *) &val, sizeof(val));
if (ret <= 0) {
flb_errno();
flb_pipe_error();
return -1;
}

Expand Down
6 changes: 3 additions & 3 deletions plugins/in_tail/tail_signal.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ static inline int tail_signal_manager(struct flb_tail_config *ctx)
/* Insert a dummy event into the channel manager */
n = flb_pipe_w(ctx->ch_manager[1], (const char *) &val, sizeof(val));
if (n == -1) {
flb_errno();
flb_pipe_error();
return -1;
}
else {
Expand All @@ -68,7 +68,7 @@ static inline int tail_signal_pending(struct flb_tail_config *ctx)
* notification is already pending, it's safe to ignore.
*/
if (n == -1 && !FLB_PIPE_WOULDBLOCK()) {
flb_errno();
flb_pipe_error();
return -1;
}

Expand All @@ -87,7 +87,7 @@ static inline int tail_consume_pending(struct flb_tail_config *ctx)
do {
ret = flb_pipe_r(ctx->ch_pending[0], (char *) &val, sizeof(val));
if (ret <= 0 && !FLB_PIPE_WOULDBLOCK()) {
flb_errno();
flb_pipe_error();
return -1;
}
} while (!FLB_PIPE_WOULDBLOCK());
Expand Down
8 changes: 4 additions & 4 deletions src/aws/flb_aws_credentials_process.c
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,7 @@ static int read_until_block(char* name, flb_pipefd_t fd, struct readbuf* buf)
if (FLB_PIPE_WOULDBLOCK()) {
return 1;
}
flb_errno();
flb_pipe_error();
return -1;
}
else if (result == 0) { /* EOF */
Expand Down Expand Up @@ -481,7 +481,7 @@ static void exec_process_child(struct process* p)
{
while ((dup2(p->stdin_stream, STDIN_FILENO) < 0)) {
if (errno != EINTR) {
return;
return;
}
}
while ((dup2(p->stdout_stream[1], STDOUT_FILENO) < 0)) {
Expand All @@ -491,7 +491,7 @@ static void exec_process_child(struct process* p)
}
while ((dup2(p->stderr_stream, STDERR_FILENO) < 0)) {
if (errno != EINTR) {
return;
return;
}
}

Expand Down Expand Up @@ -558,7 +558,7 @@ static int read_from_process(struct process* p, struct readbuf* buf)
return -1;
}

flb_time_set(&timeout,
flb_time_set(&timeout,
(time_t) (CREDENTIAL_PROCESS_TIMEOUT_MS / MS_PER_SEC),
((long) (CREDENTIAL_PROCESS_TIMEOUT_MS % MS_PER_SEC)) * NS_PER_MS);

Expand Down
8 changes: 4 additions & 4 deletions src/flb_engine.c
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ static inline int handle_input_event(flb_pipefd_t fd, uint64_t ts,

bytes = flb_pipe_r(fd, &val, sizeof(val));
if (bytes == -1) {
flb_errno();
flb_pipe_error();
return -1;
}

Expand Down Expand Up @@ -484,7 +484,7 @@ static inline int handle_output_events(flb_pipefd_t fd,
bytes = flb_pipe_r(fd, &values, sizeof(values));

if (bytes == -1) {
flb_errno();
flb_pipe_error();
return -1;
}

Expand Down Expand Up @@ -525,7 +525,7 @@ static inline int flb_engine_manager(flb_pipefd_t fd, struct flb_config *config)
/* read the event */
bytes = flb_pipe_r(fd, &val, sizeof(val));
if (bytes == -1) {
flb_errno();
flb_pipe_error();
return -1;
}

Expand Down Expand Up @@ -1049,7 +1049,7 @@ int flb_engine_start(struct flb_config *config)
/* Read the coroutine reference */
ret = flb_pipe_r(event->fd, &output_flush, sizeof(struct flb_output_flush *));
if (ret <= 0 || output_flush == 0) {
flb_errno();
flb_pipe_error();
continue;
}

Expand Down
14 changes: 7 additions & 7 deletions src/flb_input_thread.c
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ static inline int handle_input_thread_event(flb_pipefd_t fd, struct flb_config *

bytes = flb_pipe_r(fd, &val, sizeof(val));
if (bytes == -1) {
flb_errno();
flb_pipe_error();
return -1;
}

Expand Down Expand Up @@ -426,7 +426,7 @@ static void input_thread(void *data)
/* Read the coroutine reference */
ret = flb_pipe_r(event->fd, &output_flush, sizeof(struct flb_output_flush *));
if (ret <= 0 || output_flush == 0) {
flb_errno();
flb_pipe_error();
continue;
}

Expand Down Expand Up @@ -518,7 +518,7 @@ int flb_input_thread_instance_pause(struct flb_input_instance *ins)

ret = flb_pipe_w(thi->ch_parent_events[1], &val, sizeof(val));
if (ret <= 0) {
flb_errno();
flb_pipe_error();
return -1;
}

Expand All @@ -543,7 +543,7 @@ int flb_input_thread_instance_resume(struct flb_input_instance *ins)

ret = flb_pipe_w(thi->ch_parent_events[1], &val, sizeof(val));
if (ret <= 0) {
flb_errno();
flb_pipe_error();
return -1;
}

Expand All @@ -565,7 +565,7 @@ int flb_input_thread_instance_exit(struct flb_input_instance *ins)

ret = flb_pipe_w(thi->ch_parent_events[1], &val, sizeof(val));
if (ret <= 0) {
flb_errno();
flb_pipe_error();
return -1;
}

Expand Down Expand Up @@ -731,7 +731,7 @@ int flb_input_thread_collectors_signal_start(struct flb_input_instance *ins)

ret = flb_pipe_w(thi->ch_parent_events[1], &val, sizeof(uint64_t));
if (ret <= 0) {
flb_errno();
flb_pipe_error();
return -1;
}

Expand All @@ -749,7 +749,7 @@ int flb_input_thread_collectors_signal_wait(struct flb_input_instance *ins)
thi = ins->thi;
bytes = flb_pipe_r(thi->ch_parent_events[0], &val, sizeof(uint64_t));
if (bytes <= 0) {
flb_errno();
flb_pipe_error();
return -1;
}

Expand Down
2 changes: 1 addition & 1 deletion src/flb_lib.c
Original file line number Diff line number Diff line change
Expand Up @@ -793,7 +793,7 @@ int flb_lib_push(flb_ctx_t *ctx, int ffd, const void *data, size_t len)
else {
ret = flb_pipe_w(i_ins->channel[1], data, len);
if (ret == -1) {
flb_errno();
flb_pipe_error();
return -1;
}
}
Expand Down
22 changes: 19 additions & 3 deletions src/flb_log.c
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@
#include <fluent-bit/flb_worker.h>
#include <fluent-bit/flb_mem.h>

#ifdef WIN32
#include <winsock.h>
#include <winbase.h>
#endif

#ifdef FLB_HAVE_AWS_ERROR_REPORTER
#include <fluent-bit/aws/flb_aws_error_reporter.h>

Expand All @@ -57,7 +62,7 @@ static inline int64_t flb_log_consume_signal(struct flb_log *context)
sizeof(signal_value));

if (result <= 0) {
flb_errno();
flb_pipe_error();

return -1;
}
Expand All @@ -75,7 +80,7 @@ static inline int flb_log_enqueue_signal(struct flb_log *context,
sizeof(signal_value));

if (result <= 0) {
flb_errno();
flb_pipe_error();

result = 1;
}
Expand Down Expand Up @@ -121,7 +126,7 @@ static inline int log_read(flb_pipefd_t fd, struct flb_log *log)
bytes = flb_pipe_read_all(fd, &msg, sizeof(struct log_message));

if (bytes <= 0) {
flb_errno();
flb_pipe_error();

return -1;
}
Expand Down Expand Up @@ -745,6 +750,17 @@ int flb_errno_print(int errnum, const char *file, int line)
return 0;
}

int flb_WSAGetLastError_print(int errnum, const char *file, int line)
{
#ifdef WIN32
char buf[256];
FormatMessage(FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_IGNORE_INSERTS,
NULL, errnum, MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT),
buf, sizeof(buf), NULL);
flb_error("[%s:%i WSAGetLastError=%i] %s", file, line, errnum, buf);
#endif
}

int flb_log_destroy(struct flb_log *log, struct flb_config *config)
{
/* Signal the child worker, stop working */
Expand Down
4 changes: 2 additions & 2 deletions src/flb_notification.c
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ int flb_notification_enqueue(int plugin_type,
sizeof(void *));

if (result == -1) {
flb_errno();
flb_pipe_error();

return -1;
}
Expand All @@ -291,7 +291,7 @@ int flb_notification_receive(flb_pipefd_t channel,
result = flb_pipe_r(channel, notification, sizeof(struct flb_notification *));

if (result <= 0) {
flb_errno();
flb_pipe_error();
return -1;;
}

Expand Down
2 changes: 1 addition & 1 deletion src/flb_output.c
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,7 @@ int flb_output_task_flush(struct flb_task *task,
ret = flb_pipe_w(config->ch_self_events[1], &out_flush,
sizeof(struct flb_output_flush*));
if (ret == -1) {
flb_errno();
flb_pipe_error();
flb_output_flush_destroy(out_flush);
flb_task_users_dec(task, FLB_FALSE);

Expand Down
Loading

0 comments on commit c66851c

Please sign in to comment.