Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

webhook/opentelemetry: input_event_bytes metrics #494

Merged
merged 7 commits into from
Feb 10, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion lib/logreader.c
Original file line number Diff line number Diff line change
Expand Up @@ -766,7 +766,6 @@ log_reader_new(GlobalConfig *cfg)
self->super.super.free_fn = log_reader_free;
self->super.wakeup = log_reader_wakeup;
self->super.schedule_dynamic_window_realloc = _schedule_dynamic_window_realloc;
self->super.metrics.raw_bytes_enabled = TRUE;
self->handshake_in_progress = TRUE;
log_reader_init_watches(self);
g_mutex_init(&self->pending_close_lock);
Expand Down
10 changes: 3 additions & 7 deletions lib/logsource.c
Original file line number Diff line number Diff line change
Expand Up @@ -495,11 +495,8 @@ _register_counters(LogSource *self)

stats_unlock();

if (self->metrics.raw_bytes_enabled)
{
level = log_pipe_is_internal(&self->super) ? STATS_LEVEL3 : STATS_LEVEL1;
_register_raw_bytes_stats(self, level);
}
level = log_pipe_is_internal(&self->super) ? STATS_LEVEL3 : STATS_LEVEL1;
_register_raw_bytes_stats(self, level);
}

gboolean
Expand All @@ -522,8 +519,7 @@ log_source_init(LogPipe *s)
static void
_unregister_counters(LogSource *self)
{
if (self->metrics.raw_bytes_enabled)
_unregister_raw_bytes_stats(self);
_unregister_raw_bytes_stats(self);

stats_lock();

Expand Down
1 change: 0 additions & 1 deletion lib/logsource.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,6 @@ struct _LogSource
StatsClusterKey *recvd_messages_key;
StatsCounterItem *recvd_messages;

gboolean raw_bytes_enabled;
StatsClusterKey *recvd_bytes_key;
StatsByteCounter recvd_bytes;

Expand Down
9 changes: 3 additions & 6 deletions lib/logthrdest/logthrdestdrv.c
Original file line number Diff line number Diff line change
Expand Up @@ -871,12 +871,9 @@ _register_worker_stats(LogThreadedDestWorker *self)
stats_cluster_key_builder_add_label(kb, stats_cluster_label("id", self->owner->super.super.id ? : ""));
_format_stats_key(self->owner, kb);

if (self->owner->metrics.raw_bytes_enabled)
{
stats_cluster_key_builder_set_name(kb, "output_event_bytes_total");
self->metrics.output_event_bytes_sc_key = stats_cluster_key_builder_build_single(kb);
stats_byte_counter_init(&self->metrics.written_bytes, self->metrics.output_event_bytes_sc_key, level, SBCP_KIB);
}
stats_cluster_key_builder_set_name(kb, "output_event_bytes_total");
self->metrics.output_event_bytes_sc_key = stats_cluster_key_builder_build_single(kb);
stats_byte_counter_init(&self->metrics.written_bytes, self->metrics.output_event_bytes_sc_key, level, SBCP_KIB);
}
stats_cluster_key_builder_pop(kb);

Expand Down
2 changes: 0 additions & 2 deletions lib/logthrdest/logthrdestdrv.h
Original file line number Diff line number Diff line change
Expand Up @@ -145,8 +145,6 @@ struct _LogThreadedDestDriver
StatsCounterItem *written_messages;
StatsCounterItem *output_event_retries;

gboolean raw_bytes_enabled;

StatsAggregator *max_message_size;
StatsAggregator *average_messages_size;
StatsAggregator *max_batch_size;
Expand Down
1 change: 0 additions & 1 deletion lib/logthrsource/logthrsourcedrv.c
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,6 @@ log_threaded_source_worker_init_instance(LogThreadedSourceWorker *self, LogThrea
self->super.super.init = _worker_init;
self->super.super.free_fn = log_threaded_source_worker_free;
self->super.wakeup = _worker_wakeup;

self->worker_index = worker_index;
}

Expand Down
1 change: 0 additions & 1 deletion modules/grpc/common/grpc-dest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,6 @@ grpc_dd_new(GlobalConfig *cfg, const gchar *stats_name)
self->super.worker.construct = _construct_worker;
self->super.stats_source = stats_register_type(stats_name);
self->super.format_stats_key = _format_stats_key;
self->super.metrics.raw_bytes_enabled = TRUE;

return self;
}
6 changes: 6 additions & 0 deletions modules/grpc/otel/otel-source-services.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,8 @@ syslogng::grpc::otel::TraceServiceCall::Proceed(bool ok)
}

LogMessage *msg = log_msg_new_empty();
log_msg_set_recvd_rawmsg_size(msg, span.ByteSizeLong());

ProtobufParser::store_raw_metadata(msg, ctx.peer(), resource, resource_spans_schema_url, scope,
scope_spans_schema_url);
ProtobufParser::store_raw(msg, span);
Expand Down Expand Up @@ -180,6 +182,8 @@ syslogng::grpc::otel::LogsServiceCall::Proceed(bool ok)
}

LogMessage *msg = log_msg_new_empty();
log_msg_set_recvd_rawmsg_size(msg, log_record.ByteSizeLong());

if (ProtobufParser::is_syslog_ng_log_record(resource, resource_logs_schema_url, scope,
scope_logs_schema_url))
{
Expand Down Expand Up @@ -245,6 +249,8 @@ syslogng::grpc::otel::MetricsServiceCall::Proceed(bool ok)
}

LogMessage *msg = log_msg_new_empty();
log_msg_set_recvd_rawmsg_size(msg, metric.ByteSizeLong());

ProtobufParser::store_raw_metadata(msg, ctx.peer(), resource, resource_metrics_schema_url, scope,
scope_metrics_schema_url);
ProtobufParser::store_raw(msg, metric);
Expand Down
1 change: 0 additions & 1 deletion modules/http/http.c
Original file line number Diff line number Diff line change
Expand Up @@ -512,7 +512,6 @@ http_dd_new(GlobalConfig *cfg)
self->super.super.super.super.free_fn = http_dd_free;
self->super.super.super.super.generate_persist_name = _format_persist_name;
self->super.format_stats_key = _format_stats_key;
self->super.metrics.raw_bytes_enabled = TRUE;
self->super.stats_source = stats_register_type("http");
self->super.worker.construct = http_dw_new;

Expand Down
1 change: 1 addition & 0 deletions modules/python-modules/syslogng/modules/webhook/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ async def post(self, **path_arguments) -> None:

def _construct_msg(self, request, path_arguments) -> LogMessage:
msg = LogMessage(self.request.body)
msg.set_recvd_rawmsg_size(len(self.request.body))

for key, value in request.query_arguments.items():
value = value[0] if len(value) == 1 else value
Expand Down
2 changes: 0 additions & 2 deletions modules/python/python-dest.c
Original file line number Diff line number Diff line change
Expand Up @@ -657,8 +657,6 @@ python_dd_new(GlobalConfig *cfg)
self->super.super.super.super.free_fn = python_dd_free;
self->super.super.super.super.generate_persist_name = python_dd_format_persist_name;

self->super.metrics.raw_bytes_enabled = TRUE;

self->super.worker.connect = python_dd_connect;
self->super.worker.disconnect = python_dd_disconnect;
self->super.worker.insert = python_dd_insert;
Expand Down
15 changes: 15 additions & 0 deletions modules/python/python-logmsg.c
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,20 @@ py_log_message_set_source_ipaddress(PyLogMessage *self, PyObject *args, PyObject
Py_RETURN_TRUE;
}

static PyObject *
py_log_message_set_recvd_rawmsg_size(PyLogMessage *self, PyObject *args, PyObject *kwrds)
{
gulong rawmsg_size;

static const gchar *kwlist[] = {"rawmsg_size", NULL};
if (!PyArg_ParseTupleAndKeywords(args, kwrds, "k", (gchar **) kwlist, &rawmsg_size))
return NULL;

log_msg_set_recvd_rawmsg_size(self->msg, rawmsg_size);

Py_RETURN_NONE;
}

static PyObject *
py_log_message_set_timestamp(PyLogMessage *self, PyObject *args, PyObject *kwrds)
{
Expand Down Expand Up @@ -466,6 +480,7 @@ static PyMethodDef py_log_message_methods[] =
{ "set_pri", (PyCFunction)py_log_message_set_pri, METH_VARARGS | METH_KEYWORDS, "Set syslog priority" },
{ "get_pri", (PyCFunction)py_log_message_get_pri, METH_VARARGS | METH_KEYWORDS, "Get syslog priority" },
{ "set_source_ipaddress", (PyCFunction)py_log_message_set_source_ipaddress, METH_VARARGS | METH_KEYWORDS, "Set source address" },
{ "set_recvd_rawmsg_size", (PyCFunction)py_log_message_set_recvd_rawmsg_size, METH_VARARGS | METH_KEYWORDS, "Set raw message size" },
{ "set_timestamp", (PyCFunction)py_log_message_set_timestamp, METH_VARARGS | METH_KEYWORDS, "Set timestamp" },
{ "get_timestamp", (PyCFunction)py_log_message_get_timestamp, METH_VARARGS | METH_KEYWORDS, "Get timestamp" },
{ "set_bookmark", (PyCFunction)py_log_message_set_bookmark, METH_VARARGS | METH_KEYWORDS, "Set bookmark" },
Expand Down
1 change: 1 addition & 0 deletions news/feature-494.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
`webhook()`,`opentelemetry()` sources: support `input_event_bytes` metrics