From bf7113b0e12bb3e55cac334cf02cdad2a639dd22 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?L=C3=A1szl=C3=B3=20V=C3=A1rady?= Date: Thu, 6 Feb 2025 11:52:58 +0100 Subject: [PATCH 1/7] threaded-source: support rawmsg-size MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: László Várady --- lib/logthrsource/logthrsourcedrv.c | 2 ++ lib/logthrsource/logthrsourcedrv.h | 8 ++++++++ 2 files changed, 10 insertions(+) diff --git a/lib/logthrsource/logthrsourcedrv.c b/lib/logthrsource/logthrsourcedrv.c index e495fccf7..81e94c293 100644 --- a/lib/logthrsource/logthrsourcedrv.c +++ b/lib/logthrsource/logthrsourcedrv.c @@ -236,6 +236,8 @@ log_threaded_source_worker_init_instance(LogThreadedSourceWorker *self, LogThrea self->super.super.free_fn = log_threaded_source_worker_free; self->super.wakeup = _worker_wakeup; + self->super.metrics.raw_bytes_enabled = driver->raw_bytes_metrics_enabled; + self->worker_index = worker_index; } diff --git a/lib/logthrsource/logthrsourcedrv.h b/lib/logthrsource/logthrsourcedrv.h index cf204426c..7e36c475b 100644 --- a/lib/logthrsource/logthrsourcedrv.h +++ b/lib/logthrsource/logthrsourcedrv.h @@ -78,6 +78,8 @@ struct _LogThreadedSourceDriver gchar *transport_name; gsize transport_name_len; + gboolean raw_bytes_metrics_enabled; + void (*format_stats_key)(LogThreadedSourceDriver *self, StatsClusterKeyBuilder *kb); LogThreadedSourceWorker *(*worker_construct)(LogThreadedSourceDriver *self, gint worker_index); }; @@ -116,6 +118,12 @@ log_threaded_source_driver_get_parse_options(LogDriver *s) return &self->worker_options.parse_options; } +static inline void +log_threaded_source_enable_raw_bytes_metrics(LogThreadedSourceDriver *self) +{ + self->raw_bytes_metrics_enabled = TRUE; +} + /* Worker */ void log_threaded_source_worker_init_instance(LogThreadedSourceWorker *self, LogThreadedSourceDriver *driver, From 9a79340e818a853eb23dce3ee47c317bf85ef986 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?L=C3=A1szl=C3=B3=20V=C3=A1rady?= Date: Thu, 6 Feb 2025 11:53:16 +0100 Subject: [PATCH 2/7] python: add set_recvd_rawmsg_size() method to LogMessage MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: László Várady --- modules/python/python-logmsg.c | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/modules/python/python-logmsg.c b/modules/python/python-logmsg.c index 4f5e9c995..31893bde3 100644 --- a/modules/python/python-logmsg.c +++ b/modules/python/python-logmsg.c @@ -375,6 +375,20 @@ py_log_message_set_source_ipaddress(PyLogMessage *self, PyObject *args, PyObject Py_RETURN_TRUE; } +static PyObject * +py_log_message_set_recvd_rawmsg_size(PyLogMessage *self, PyObject *args, PyObject *kwrds) +{ + gulong rawmsg_size; + + static const gchar *kwlist[] = {"rawmsg_size", NULL}; + if (!PyArg_ParseTupleAndKeywords(args, kwrds, "k", (gchar **) kwlist, &rawmsg_size)) + return NULL; + + log_msg_set_recvd_rawmsg_size(self->msg, rawmsg_size); + + Py_RETURN_NONE; +} + static PyObject * py_log_message_set_timestamp(PyLogMessage *self, PyObject *args, PyObject *kwrds) { @@ -466,6 +480,7 @@ static PyMethodDef py_log_message_methods[] = { "set_pri", (PyCFunction)py_log_message_set_pri, METH_VARARGS | METH_KEYWORDS, "Set syslog priority" }, { "get_pri", (PyCFunction)py_log_message_get_pri, METH_VARARGS | METH_KEYWORDS, "Get syslog priority" }, { "set_source_ipaddress", (PyCFunction)py_log_message_set_source_ipaddress, METH_VARARGS | METH_KEYWORDS, "Set source address" }, + { "set_recvd_rawmsg_size", (PyCFunction)py_log_message_set_recvd_rawmsg_size, METH_VARARGS | METH_KEYWORDS, "Set raw message size" }, { "set_timestamp", (PyCFunction)py_log_message_set_timestamp, METH_VARARGS | METH_KEYWORDS, "Set timestamp" }, { "get_timestamp", (PyCFunction)py_log_message_get_timestamp, METH_VARARGS | METH_KEYWORDS, "Get timestamp" }, { "set_bookmark", (PyCFunction)py_log_message_set_bookmark, METH_VARARGS | METH_KEYWORDS, "Set bookmark" }, From 61cc3d56975a3533eb7754df5889e86181b1da7a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?L=C3=A1szl=C3=B3=20V=C3=A1rady?= Date: Thu, 6 Feb 2025 11:53:33 +0100 Subject: [PATCH 3/7] python: enable input_event_bytes_total metrics MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: László Várady --- modules/python/python-fetcher.c | 2 ++ modules/python/python-source.c | 2 ++ 2 files changed, 4 insertions(+) diff --git a/modules/python/python-fetcher.c b/modules/python/python-fetcher.c index bb43723d9..13ff4175d 100644 --- a/modules/python/python-fetcher.c +++ b/modules/python/python-fetcher.c @@ -675,6 +675,8 @@ python_fetcher_new(GlobalConfig *cfg) self->super.super.worker_options.super.stats_level = STATS_LEVEL0; self->super.super.worker_options.super.stats_source = stats_register_type("python"); + log_threaded_source_enable_raw_bytes_metrics(&self->super.super); + self->super.fetch = python_fetcher_fetch; python_binding_init_instance(&self->binding); diff --git a/modules/python/python-source.c b/modules/python/python-source.c index f643fc69c..9f421468a 100644 --- a/modules/python/python-source.c +++ b/modules/python/python-source.c @@ -766,6 +766,8 @@ python_sd_new(GlobalConfig *cfg) self->super.worker_options.super.stats_source = stats_register_type("python"); self->super.worker_construct = _construct_worker; + log_threaded_source_enable_raw_bytes_metrics(&self->super); + self->post_message = _post_message_blocking; python_binding_init_instance(&self->binding); From e725c1bc05c9ab543e4c26deb357ef472a9f7f09 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?L=C3=A1szl=C3=B3=20V=C3=A1rady?= Date: Thu, 6 Feb 2025 11:53:50 +0100 Subject: [PATCH 4/7] webhook: calculate rawmsg size MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: László Várady --- modules/python-modules/syslogng/modules/webhook/source.py | 1 + 1 file changed, 1 insertion(+) diff --git a/modules/python-modules/syslogng/modules/webhook/source.py b/modules/python-modules/syslogng/modules/webhook/source.py index 4f5418eaa..d6fb936ec 100644 --- a/modules/python-modules/syslogng/modules/webhook/source.py +++ b/modules/python-modules/syslogng/modules/webhook/source.py @@ -54,6 +54,7 @@ async def post(self, **path_arguments) -> None: def _construct_msg(self, request, path_arguments) -> LogMessage: msg = LogMessage(self.request.body) + msg.set_recvd_rawmsg_size(len(self.request.body)) for key, value in request.query_arguments.items(): value = value[0] if len(value) == 1 else value From 96833b99362a12e363d945a127f65eb0b4a6f685 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?L=C3=A1szl=C3=B3=20V=C3=A1rady?= Date: Thu, 6 Feb 2025 11:54:40 +0100 Subject: [PATCH 5/7] otel: support input_event_bytes_total metrics MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: László Várady --- modules/grpc/otel/otel-source-services.hpp | 6 ++++++ modules/grpc/otel/otel-source.cpp | 3 +++ 2 files changed, 9 insertions(+) diff --git a/modules/grpc/otel/otel-source-services.hpp b/modules/grpc/otel/otel-source-services.hpp index 91ad61a9f..2e69253a6 100644 --- a/modules/grpc/otel/otel-source-services.hpp +++ b/modules/grpc/otel/otel-source-services.hpp @@ -123,6 +123,8 @@ syslogng::grpc::otel::TraceServiceCall::Proceed(bool ok) } LogMessage *msg = log_msg_new_empty(); + log_msg_set_recvd_rawmsg_size(msg, span.ByteSizeLong()); + ProtobufParser::store_raw_metadata(msg, ctx.peer(), resource, resource_spans_schema_url, scope, scope_spans_schema_url); ProtobufParser::store_raw(msg, span); @@ -180,6 +182,8 @@ syslogng::grpc::otel::LogsServiceCall::Proceed(bool ok) } LogMessage *msg = log_msg_new_empty(); + log_msg_set_recvd_rawmsg_size(msg, log_record.ByteSizeLong()); + if (ProtobufParser::is_syslog_ng_log_record(resource, resource_logs_schema_url, scope, scope_logs_schema_url)) { @@ -245,6 +249,8 @@ syslogng::grpc::otel::MetricsServiceCall::Proceed(bool ok) } LogMessage *msg = log_msg_new_empty(); + log_msg_set_recvd_rawmsg_size(msg, metric.ByteSizeLong()); + ProtobufParser::store_raw_metadata(msg, ctx.peer(), resource, resource_metrics_schema_url, scope, scope_metrics_schema_url); ProtobufParser::store_raw(msg, metric); diff --git a/modules/grpc/otel/otel-source.cpp b/modules/grpc/otel/otel-source.cpp index ca5dd4f6d..f1383773a 100644 --- a/modules/grpc/otel/otel-source.cpp +++ b/modules/grpc/otel/otel-source.cpp @@ -175,6 +175,9 @@ LogDriver * otel_sd_new(GlobalConfig *cfg) { GrpcSourceDriver *self = grpc_sd_new(cfg, "opentelemetry", "otlp"); + + log_threaded_source_enable_raw_bytes_metrics(&self->super); + self->cpp = new SourceDriver(self); return &self->super.super.super; } From f4da9374aa31e0527e793e16320cd16a91f75c0f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?L=C3=A1szl=C3=B3=20V=C3=A1rady?= Date: Thu, 6 Feb 2025 11:59:43 +0100 Subject: [PATCH 6/7] news: add feature #494 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: László Várady --- news/feature-494.md | 1 + 1 file changed, 1 insertion(+) create mode 100644 news/feature-494.md diff --git a/news/feature-494.md b/news/feature-494.md new file mode 100644 index 000000000..4ebd1c4a6 --- /dev/null +++ b/news/feature-494.md @@ -0,0 +1 @@ +`webhook()`,`opentelemetry()` sources: support `input_event_bytes` metrics From 501791812883359e6a1f457125e4289e44a606d6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?L=C3=A1szl=C3=B3=20V=C3=A1rady?= Date: Sat, 8 Feb 2025 15:49:43 +0100 Subject: [PATCH 7/7] lib/modules: enable event_bytes_total unconditionally MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: László Várady --- lib/logreader.c | 1 - lib/logsource.c | 10 +++------- lib/logsource.h | 1 - lib/logthrdest/logthrdestdrv.c | 9 +++------ lib/logthrdest/logthrdestdrv.h | 2 -- lib/logthrsource/logthrsourcedrv.c | 3 --- lib/logthrsource/logthrsourcedrv.h | 8 -------- modules/grpc/common/grpc-dest.cpp | 1 - modules/grpc/otel/otel-source.cpp | 3 --- modules/http/http.c | 1 - modules/python/python-dest.c | 2 -- modules/python/python-fetcher.c | 2 -- modules/python/python-source.c | 2 -- 13 files changed, 6 insertions(+), 39 deletions(-) diff --git a/lib/logreader.c b/lib/logreader.c index 1eb73ce40..6570bac89 100644 --- a/lib/logreader.c +++ b/lib/logreader.c @@ -766,7 +766,6 @@ log_reader_new(GlobalConfig *cfg) self->super.super.free_fn = log_reader_free; self->super.wakeup = log_reader_wakeup; self->super.schedule_dynamic_window_realloc = _schedule_dynamic_window_realloc; - self->super.metrics.raw_bytes_enabled = TRUE; self->handshake_in_progress = TRUE; log_reader_init_watches(self); g_mutex_init(&self->pending_close_lock); diff --git a/lib/logsource.c b/lib/logsource.c index 69116e3fe..11f6cbabe 100644 --- a/lib/logsource.c +++ b/lib/logsource.c @@ -495,11 +495,8 @@ _register_counters(LogSource *self) stats_unlock(); - if (self->metrics.raw_bytes_enabled) - { - level = log_pipe_is_internal(&self->super) ? STATS_LEVEL3 : STATS_LEVEL1; - _register_raw_bytes_stats(self, level); - } + level = log_pipe_is_internal(&self->super) ? STATS_LEVEL3 : STATS_LEVEL1; + _register_raw_bytes_stats(self, level); } gboolean @@ -522,8 +519,7 @@ log_source_init(LogPipe *s) static void _unregister_counters(LogSource *self) { - if (self->metrics.raw_bytes_enabled) - _unregister_raw_bytes_stats(self); + _unregister_raw_bytes_stats(self); stats_lock(); diff --git a/lib/logsource.h b/lib/logsource.h index 8d67ebedb..8f338ea66 100644 --- a/lib/logsource.h +++ b/lib/logsource.h @@ -89,7 +89,6 @@ struct _LogSource StatsClusterKey *recvd_messages_key; StatsCounterItem *recvd_messages; - gboolean raw_bytes_enabled; StatsClusterKey *recvd_bytes_key; StatsByteCounter recvd_bytes; diff --git a/lib/logthrdest/logthrdestdrv.c b/lib/logthrdest/logthrdestdrv.c index 83a0d15c3..7b517c116 100644 --- a/lib/logthrdest/logthrdestdrv.c +++ b/lib/logthrdest/logthrdestdrv.c @@ -871,12 +871,9 @@ _register_worker_stats(LogThreadedDestWorker *self) stats_cluster_key_builder_add_label(kb, stats_cluster_label("id", self->owner->super.super.id ? : "")); _format_stats_key(self->owner, kb); - if (self->owner->metrics.raw_bytes_enabled) - { - stats_cluster_key_builder_set_name(kb, "output_event_bytes_total"); - self->metrics.output_event_bytes_sc_key = stats_cluster_key_builder_build_single(kb); - stats_byte_counter_init(&self->metrics.written_bytes, self->metrics.output_event_bytes_sc_key, level, SBCP_KIB); - } + stats_cluster_key_builder_set_name(kb, "output_event_bytes_total"); + self->metrics.output_event_bytes_sc_key = stats_cluster_key_builder_build_single(kb); + stats_byte_counter_init(&self->metrics.written_bytes, self->metrics.output_event_bytes_sc_key, level, SBCP_KIB); } stats_cluster_key_builder_pop(kb); diff --git a/lib/logthrdest/logthrdestdrv.h b/lib/logthrdest/logthrdestdrv.h index 3ed6eab76..0456cd760 100644 --- a/lib/logthrdest/logthrdestdrv.h +++ b/lib/logthrdest/logthrdestdrv.h @@ -145,8 +145,6 @@ struct _LogThreadedDestDriver StatsCounterItem *written_messages; StatsCounterItem *output_event_retries; - gboolean raw_bytes_enabled; - StatsAggregator *max_message_size; StatsAggregator *average_messages_size; StatsAggregator *max_batch_size; diff --git a/lib/logthrsource/logthrsourcedrv.c b/lib/logthrsource/logthrsourcedrv.c index 81e94c293..5c5bb1fac 100644 --- a/lib/logthrsource/logthrsourcedrv.c +++ b/lib/logthrsource/logthrsourcedrv.c @@ -235,9 +235,6 @@ log_threaded_source_worker_init_instance(LogThreadedSourceWorker *self, LogThrea self->super.super.init = _worker_init; self->super.super.free_fn = log_threaded_source_worker_free; self->super.wakeup = _worker_wakeup; - - self->super.metrics.raw_bytes_enabled = driver->raw_bytes_metrics_enabled; - self->worker_index = worker_index; } diff --git a/lib/logthrsource/logthrsourcedrv.h b/lib/logthrsource/logthrsourcedrv.h index 7e36c475b..cf204426c 100644 --- a/lib/logthrsource/logthrsourcedrv.h +++ b/lib/logthrsource/logthrsourcedrv.h @@ -78,8 +78,6 @@ struct _LogThreadedSourceDriver gchar *transport_name; gsize transport_name_len; - gboolean raw_bytes_metrics_enabled; - void (*format_stats_key)(LogThreadedSourceDriver *self, StatsClusterKeyBuilder *kb); LogThreadedSourceWorker *(*worker_construct)(LogThreadedSourceDriver *self, gint worker_index); }; @@ -118,12 +116,6 @@ log_threaded_source_driver_get_parse_options(LogDriver *s) return &self->worker_options.parse_options; } -static inline void -log_threaded_source_enable_raw_bytes_metrics(LogThreadedSourceDriver *self) -{ - self->raw_bytes_metrics_enabled = TRUE; -} - /* Worker */ void log_threaded_source_worker_init_instance(LogThreadedSourceWorker *self, LogThreadedSourceDriver *driver, diff --git a/modules/grpc/common/grpc-dest.cpp b/modules/grpc/common/grpc-dest.cpp index 6241e8349..af175975a 100644 --- a/modules/grpc/common/grpc-dest.cpp +++ b/modules/grpc/common/grpc-dest.cpp @@ -269,7 +269,6 @@ grpc_dd_new(GlobalConfig *cfg, const gchar *stats_name) self->super.worker.construct = _construct_worker; self->super.stats_source = stats_register_type(stats_name); self->super.format_stats_key = _format_stats_key; - self->super.metrics.raw_bytes_enabled = TRUE; return self; } diff --git a/modules/grpc/otel/otel-source.cpp b/modules/grpc/otel/otel-source.cpp index f1383773a..ca5dd4f6d 100644 --- a/modules/grpc/otel/otel-source.cpp +++ b/modules/grpc/otel/otel-source.cpp @@ -175,9 +175,6 @@ LogDriver * otel_sd_new(GlobalConfig *cfg) { GrpcSourceDriver *self = grpc_sd_new(cfg, "opentelemetry", "otlp"); - - log_threaded_source_enable_raw_bytes_metrics(&self->super); - self->cpp = new SourceDriver(self); return &self->super.super.super; } diff --git a/modules/http/http.c b/modules/http/http.c index 9f7621351..8011cf477 100644 --- a/modules/http/http.c +++ b/modules/http/http.c @@ -512,7 +512,6 @@ http_dd_new(GlobalConfig *cfg) self->super.super.super.super.free_fn = http_dd_free; self->super.super.super.super.generate_persist_name = _format_persist_name; self->super.format_stats_key = _format_stats_key; - self->super.metrics.raw_bytes_enabled = TRUE; self->super.stats_source = stats_register_type("http"); self->super.worker.construct = http_dw_new; diff --git a/modules/python/python-dest.c b/modules/python/python-dest.c index 3524aee88..d3371c477 100644 --- a/modules/python/python-dest.c +++ b/modules/python/python-dest.c @@ -657,8 +657,6 @@ python_dd_new(GlobalConfig *cfg) self->super.super.super.super.free_fn = python_dd_free; self->super.super.super.super.generate_persist_name = python_dd_format_persist_name; - self->super.metrics.raw_bytes_enabled = TRUE; - self->super.worker.connect = python_dd_connect; self->super.worker.disconnect = python_dd_disconnect; self->super.worker.insert = python_dd_insert; diff --git a/modules/python/python-fetcher.c b/modules/python/python-fetcher.c index 13ff4175d..bb43723d9 100644 --- a/modules/python/python-fetcher.c +++ b/modules/python/python-fetcher.c @@ -675,8 +675,6 @@ python_fetcher_new(GlobalConfig *cfg) self->super.super.worker_options.super.stats_level = STATS_LEVEL0; self->super.super.worker_options.super.stats_source = stats_register_type("python"); - log_threaded_source_enable_raw_bytes_metrics(&self->super.super); - self->super.fetch = python_fetcher_fetch; python_binding_init_instance(&self->binding); diff --git a/modules/python/python-source.c b/modules/python/python-source.c index 9f421468a..f643fc69c 100644 --- a/modules/python/python-source.c +++ b/modules/python/python-source.c @@ -766,8 +766,6 @@ python_sd_new(GlobalConfig *cfg) self->super.worker_options.super.stats_source = stats_register_type("python"); self->super.worker_construct = _construct_worker; - log_threaded_source_enable_raw_bytes_metrics(&self->super); - self->post_message = _post_message_blocking; python_binding_init_instance(&self->binding);