Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

processor_sampling: new trace sampling processor #10029

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion cmake/plugins_options.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,9 @@ DEFINE_OPTION(FLB_IN_EBPF "Enable Linux eBPF input plugin"
DEFINE_OPTION(FLB_PROCESSOR_CONTENT_MODIFIER "Enable content modifier processor" ON)
DEFINE_OPTION(FLB_PROCESSOR_LABELS "Enable metrics label manipulation processor" ON)
DEFINE_OPTION(FLB_PROCESSOR_METRICS_SELECTOR "Enable metrics selector processor" ON)
DEFINE_OPTION(FLB_PROCESSOR_OPENTELEMETRY_ENVELOPE "Enable OpenTelemetry envelope processor" ON)
DEFINE_OPTION(FLB_PROCESSOR_SQL "Enable SQL processor" ON)
DEFINE_OPTION(FLB_PROCESSOR_OPENTELEMETRY_ENVELOPE "Enable OpenTelemetry envelope processor" ON)
DEFINE_OPTION(FLB_PROCESSOR_SAMPLING "Enable sampling processor" ON)

# Filters
# =======
Expand Down
24 changes: 18 additions & 6 deletions include/fluent-bit/flb_processor.h
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ struct flb_processor_plugin {

int (*cb_process_traces) (struct flb_processor_instance *,
struct ctrace *,
struct ctrace **,
const char *,
int);

Expand All @@ -179,6 +180,7 @@ struct flb_processor_instance {
char *alias; /* alias name */
void *context; /* Instance local context */
void *data;
struct flb_processor_unit *pu; /* processor unit linked to */
struct flb_processor_plugin *p; /* original plugin */
struct mk_list properties; /* config properties */
struct mk_list *config_map; /* configuration map */
Expand Down Expand Up @@ -228,12 +230,10 @@ int flb_processor_unit_set_property(struct flb_processor_unit *pu, const char *k
int flb_processors_load_from_config_format_group(struct flb_processor *proc, struct flb_cf_group *g);

/* Processor plugin instance */

struct flb_processor_instance *flb_processor_instance_create(
struct flb_config *config,
int event_type,
const char *name,
void *data);
struct flb_processor_instance *flb_processor_instance_create(struct flb_config *config,
struct flb_processor_unit *pu,
int event_type,
const char *name, void *data);

void flb_processor_instance_destroy(
struct flb_processor_instance *ins);
Expand Down Expand Up @@ -274,4 +274,16 @@ static inline int flb_processor_instance_config_map_set(
return flb_config_map_set(&ins->properties, ins->config_map, context);
}

static inline
struct flb_input_instance *flb_processor_get_input_instance(struct flb_processor_unit *pu)
{
struct flb_processor *processor;
struct flb_input_instance *ins;

processor = (struct flb_processor *) pu->parent;
ins = (struct flb_input_instance *) processor->data;

return ins;
}

#endif
3 changes: 2 additions & 1 deletion plugins/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -288,8 +288,9 @@ REGISTER_IN_PLUGIN("in_random")
REGISTER_PROCESSOR_PLUGIN("processor_content_modifier")
REGISTER_PROCESSOR_PLUGIN("processor_labels")
REGISTER_PROCESSOR_PLUGIN("processor_metrics_selector")
REGISTER_PROCESSOR_PLUGIN("processor_sql")
REGISTER_PROCESSOR_PLUGIN("processor_opentelemetry_envelope")
REGISTER_PROCESSOR_PLUGIN("processor_sampling")
REGISTER_PROCESSOR_PLUGIN("processor_sql")

# OUTPUTS
# =======
Expand Down
5 changes: 3 additions & 2 deletions plugins/processor_content_modifier/cm.c
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,8 @@ static int cb_process_logs(struct flb_processor_instance *ins,
}

static int cb_process_traces(struct flb_processor_instance *ins,
struct ctrace *traces_context,
struct ctrace *in_ctr,
struct ctrace **out_ctr,
const char *tag,
int tag_len)
{
Expand All @@ -93,7 +94,7 @@ static int cb_process_traces(struct flb_processor_instance *ins,
}
ctx = ins->context;

ret = cm_traces_process(ins, ctx, traces_context, tag, tag_len);
ret = cm_traces_process(ins, ctx, in_ctr, out_ctr, tag, tag_len);
return ret;

}
Expand Down
1 change: 1 addition & 0 deletions plugins/processor_content_modifier/cm.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ int cm_logs_process(struct flb_processor_instance *ins,
int cm_traces_process(struct flb_processor_instance *ins,
struct content_modifier_ctx *ctx,
struct ctrace *traces_context,
struct ctrace **out_traces_context,
const char *tag, int tag_len);

int cm_metrics_process(struct flb_processor_instance *ins,
Expand Down
3 changes: 3 additions & 0 deletions plugins/processor_content_modifier/cm_traces.c
Original file line number Diff line number Diff line change
Expand Up @@ -560,6 +560,7 @@ static int traces_hash_attributes(struct content_modifier_ctx *ctx, struct ctrac
int cm_traces_process(struct flb_processor_instance *ins,
struct content_modifier_ctx *ctx,
struct ctrace *traces_context,
struct ctrace **out_traces_context,
const char *tag, int tag_len)
{
int ret = -1;
Expand Down Expand Up @@ -587,6 +588,8 @@ int cm_traces_process(struct flb_processor_instance *ins,
ret = traces_convert_attributes(ctx, traces_context, ctx->key, ctx->converted_type);
}

*out_traces_context = traces_context;

if (ret != 0) {
return FLB_PROCESSOR_FAILURE;
}
Expand Down
11 changes: 11 additions & 0 deletions plugins/processor_sampling/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
set(src
sampling.c
sampling_conf.c
sampling_span_registry.c

# types of sampling
sampling_test.c
sampling_probabilistic.c
)

FLB_PLUGIN(processor_sampling "${src}" "")
193 changes: 193 additions & 0 deletions plugins/processor_sampling/sampling.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */

/* Fluent Bit
* ==========
* Copyright (C) 2015-2025 The Fluent Bit Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include <fluent-bit/flb_processor_plugin.h>
#include <fluent-bit/flb_processor.h>
#include <fluent-bit/flb_hash_table.h>

#include "sampling.h"
#include "sampling_span_registry.h"

static int clean_empty_resource_spans(struct ctrace *ctr)
{
int count = 0;
struct cfl_list *head;
struct cfl_list *head_scope_span;
struct cfl_list *tmp;
struct cfl_list *tmp_scope_span;
struct ctrace_resource_span *resource_span;
struct ctrace_scope_span *scope_span;

cfl_list_foreach_safe(head, tmp, &ctr->resource_spans) {
resource_span = cfl_list_entry(head, struct ctrace_resource_span, _head);

/* iterate scope spans */
cfl_list_foreach_safe(head_scope_span, tmp_scope_span, &resource_span->scope_spans) {
scope_span = cfl_list_entry(head_scope_span, struct ctrace_scope_span, _head);
if (cfl_list_is_empty(&scope_span->spans)) {
ctr_scope_span_destroy(scope_span);
}
}

/* check if resource span is now empty */
if (cfl_list_is_empty(&resource_span->scope_spans)) {
cfl_list_del(&resource_span->_head);
ctr_resource_span_destroy(resource_span);
count++;
}
}

return count;
}

static void debug_trace(struct sampling *ctx, struct ctrace *ctr, int is_before)
{
char tmp[128];
struct sampling_span_registry *reg = NULL;

reg = sampling_span_registry_create();
if (!reg) {
return;
}

sampling_span_registry_add_trace(ctx, reg, ctr);
if (is_before) {
snprintf(tmp, sizeof(tmp) - 1, "Debug sampling '%s' (%p): before", ctx->type_str, ctr);
sampling_span_registry_print(ctx, reg, tmp);
}
else {
snprintf(tmp, sizeof(tmp) - 1, "Debug sampling '%s' (%p): after", ctx->type_str, ctr);
sampling_span_registry_print(ctx, reg, tmp);
}

sampling_span_registry_destroy(reg);
}

static int cb_process_traces(struct flb_processor_instance *ins,
struct ctrace *in_ctr,
struct ctrace **out_ctr,
const char *tag,
int tag_len)
{
int ret;
int count;
struct sampling *ctx = ins->context;

/* just a quick check for developers */
if (!ctx->plugin->cb_do_sampling) {
flb_plg_error(ins, "unimplemented sampling callback for type '%s'", ctx->type_str);
return -1;
}

if (ctx->debug_mode) {
debug_trace(ctx, in_ctr, FLB_TRUE);
}

/* do sampling: the callback will modify the ctrace context */
ret = ctx->plugin->cb_do_sampling(ctx, ctx->plugin_context, in_ctr, out_ctr);

if (ctx->debug_mode) {
debug_trace(ctx, *out_ctr, FLB_FALSE);
}

/* check if the ctrace context has empty resource spans */
count = clean_empty_resource_spans(*out_ctr);
flb_plg_debug(ins, "cleaned %i empty resource spans", count);

return ret;
}

/* register the sampling plugins available */
static void sampling_plugin_register(struct sampling *ctx)
{
cfl_list_add(&sampling_probabilistic_plugin._head, &ctx->plugins);
}

static int cb_init(struct flb_processor_instance *processor_instance,
void *source_plugin_instance,
int source_plugin_type,
struct flb_config *config)
{
int ret;
struct sampling *ctx;

/* create main plugin context */
ctx = sampling_config_create(processor_instance, config);
if (!ctx) {
return FLB_PROCESSOR_FAILURE;
}
processor_instance->context = (void *) ctx;

/* register plugins */
sampling_plugin_register(ctx);

ret = sampling_config_process_rules(config, ctx);
if (ret == -1) {
flb_plg_error(processor_instance, "failed to parse sampling rules");
flb_free(ctx);
return -1;
}

/* initialize the backend plugin */
ret = ctx->plugin->cb_init(config, ctx);

return FLB_PROCESSOR_SUCCESS;
}

static int cb_exit(struct flb_processor_instance *processor_instance, void *data)
{
if (processor_instance != NULL && data != NULL) {
sampling_config_destroy(processor_instance->config, data);
}

return FLB_PROCESSOR_SUCCESS;
}

static struct flb_config_map config_map[] = {
{
FLB_CONFIG_MAP_STR, "type", NULL,
0, FLB_TRUE, offsetof(struct sampling, type_str),
"Type of the sampling processor"
},
{
FLB_CONFIG_MAP_BOOL, "debug", "false",
0, FLB_TRUE, offsetof(struct sampling, debug_mode),
"Enable debug mode where it prints the trace and it spans"
},
{
FLB_CONFIG_MAP_VARIANT, "rules", NULL,
0, FLB_TRUE, offsetof(struct sampling, rules),
"Sampling rules, these are defined by the sampling processor/type"
},

/* EOF */
{0}
};

struct flb_processor_plugin processor_sampling_plugin = {
.name = "sampling",
.description = "Sampling",
.cb_init = cb_init,
.cb_process_logs = NULL,
.cb_process_metrics = NULL,
.cb_process_traces = cb_process_traces,
.cb_exit = cb_exit,
.config_map = config_map,
.flags = 0
};
Loading
Loading