Skip to content

Commit 6d72ffb

Browse files
committed
upstream: new 'destroy_queue' to defer connections context destroy (#2497)
A connection context might be destroyed while the event loop still has some pending event to be processed, in some cases a network exception. Destroying the context might lead to a corruption. The following patch implements a new queue to store temporary the connection context so the 'destroy' process is defered until all events from the event loop has been processed. Signed-off-by: Eduardo Silva <eduardo@treasure-data.com>
1 parent 57e4d42 commit 6d72ffb

File tree

2 files changed

+38
-4
lines changed

2 files changed

+38
-4
lines changed

include/fluent-bit/flb_upstream.h

+2
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,8 @@ struct flb_upstream {
8080
*/
8181
struct mk_list busy_queue;
8282

83+
struct mk_list destroy_queue;
84+
8385
#ifdef FLB_HAVE_TLS
8486
/* context with mbedTLS data to handle certificates and keys */
8587
struct flb_tls *tls;

src/flb_upstream.c

+36-4
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@ struct flb_upstream *flb_upstream_create(struct flb_config *config,
9898

9999
mk_list_init(&u->av_queue);
100100
mk_list_init(&u->busy_queue);
101+
mk_list_init(&u->destroy_queue);
101102

102103
#ifdef FLB_HAVE_TLS
103104
u->tls = (struct flb_tls *) tls;
@@ -194,12 +195,19 @@ static int destroy_conn(struct flb_upstream_conn *u_conn)
194195
flb_socket_close(u_conn->fd);
195196
}
196197

198+
u->n_connections--;
199+
197200
/* remove connection from the queue */
198201
mk_list_del(&u_conn->_head);
199202

200-
u->n_connections--;
201-
flb_free(u_conn);
203+
/* Add node to destroy queue */
204+
mk_list_add(&u_conn->_head, &u->destroy_queue);
205+
202206

207+
/*
208+
* note: the connection context is destroyed by the engine once all events
209+
* have been processed.
210+
*/
203211
return 0;
204212
}
205213

@@ -344,7 +352,8 @@ struct flb_upstream_conn *flb_upstream_conn_get(struct flb_upstream *u)
344352

345353
int err;
346354
err = flb_socket_error(conn->fd);
347-
if (!FLB_EINPROGRESS(err)) {
355+
if (!FLB_EINPROGRESS(err) && err != 0) {
356+
flb_errno();
348357
flb_debug("[upstream] KA connection #%i is in a failed state "
349358
"to: %s:%i, cleaning up",
350359
conn->fd, u->tcp_host, u->tcp_port);
@@ -357,7 +366,6 @@ struct flb_upstream_conn *flb_upstream_conn_get(struct flb_upstream *u)
357366
conn->ts_assigned = time(NULL);
358367
flb_debug("[upstream] KA connection #%i to %s:%i has been assigned (recycled)",
359368
conn->fd, u->tcp_host, u->tcp_port);
360-
361369
/*
362370
* Note: since we are in a keepalive connection, the socket is already being
363371
* monitored for possible disconnections while idle. Upon re-use by the caller
@@ -494,6 +502,30 @@ int flb_upstream_conn_timeouts(struct flb_config *ctx)
494502
u_conn->fd, u->tcp_host, u->tcp_port);
495503
}
496504
}
505+
506+
}
507+
508+
return 0;
509+
}
510+
511+
int flb_upstream_conn_pending_destroy(struct flb_config *ctx)
512+
{
513+
struct mk_list *head;
514+
struct mk_list *tmp;
515+
struct mk_list *u_head;
516+
struct flb_upstream *u;
517+
struct flb_upstream_conn *u_conn;
518+
519+
/* Iterate all upstream contexts */
520+
mk_list_foreach(head, &ctx->upstreams) {
521+
u = mk_list_entry(head, struct flb_upstream, _head);
522+
523+
/* Real destroy of connections context */
524+
mk_list_foreach_safe(u_head, tmp, &u->destroy_queue) {
525+
u_conn = mk_list_entry(u_head, struct flb_upstream_conn, _head);
526+
mk_list_del(&u_conn->_head);
527+
flb_free(u_conn);
528+
}
497529
}
498530

499531
return 0;

0 commit comments

Comments
 (0)