From 2349e194219fbd04ccef8d8ffe17eb2d1c047faf Mon Sep 17 00:00:00 2001 From: Drew Nutter Date: Mon, 3 Mar 2025 10:26:01 -0500 Subject: [PATCH] fix(prometheus): http server exit on error (#590) Currently, if the prometheus http server has an error, it just prints the name of the error to stderr and then exits the thread. Metrics stop being served. All kinds of errors could happen in here, but that doesn't mean we should stop serving metrics. It's better to log an error and then continue running. I used runService from the service manager to accomplish this, with a bit of refactoring in how the thread is spawned. --- src/cmd.zig | 30 ++++++++++------- src/fuzz.zig | 13 ++++---- src/geyser/main.zig | 6 +++- src/prometheus/http.zig | 18 ++--------- src/prometheus/lib.zig | 1 - src/utils/service.zig | 72 +++++++++++++++++++++++------------------ 6 files changed, 74 insertions(+), 66 deletions(-) diff --git a/src/cmd.zig b/src/cmd.zig index 304479820..7069e2d18 100644 --- a/src/cmd.zig +++ b/src/cmd.zig @@ -30,7 +30,7 @@ const downloadSnapshotsFromGossip = sig.accounts_db.downloadSnapshotsFromGossip; const getShredAndIPFromEchoServer = sig.net.echo.getShredAndIPFromEchoServer; const getWallclockMs = sig.time.getWallclockMs; const globalRegistry = sig.prometheus.globalRegistry; -const spawnMetrics = sig.prometheus.spawnMetrics; +const servePrometheus = sig.prometheus.servePrometheus; var gpa = std.heap.GeneralPurposeAllocator(.{}){}; const gpa_allocator = if (builtin.mode == .Debug) @@ -857,7 +857,7 @@ fn validator() !void { &blockstore_db, lowest_cleanup_slot, current_config.max_shreds, - &app_base.exit, + app_base.exit, }); defer cleanup_service_handle.join(); @@ -901,7 +901,7 @@ fn validator() !void { const rpc_epoch_ctx_service_thread = try std.Thread.spawn( .{}, sig.adapter.RpcEpochContextService.run, - .{ &rpc_epoch_ctx_service, &app_base.exit }, + .{ &rpc_epoch_ctx_service, app_base.exit }, ); const turbine_config = current_config.turbine; @@ -915,7 +915,7 @@ fn validator() !void { .registry = app_base.metrics_registry, .random = prng.random(), .my_keypair = &app_base.my_keypair, - .exit = &app_base.exit, + .exit = app_base.exit, .gossip_table_rw = &gossip_service.gossip_table_rw, .my_shred_version = &gossip_service.my_shred_version, .epoch_context_mgr = &epoch_context_manager, @@ -975,7 +975,7 @@ fn shredNetwork() !void { const rpc_epoch_ctx_service_thread = try std.Thread.spawn( .{}, sig.adapter.RpcEpochContextService.run, - .{ &rpc_epoch_ctx_service, &app_base.exit }, + .{ &rpc_epoch_ctx_service, app_base.exit }, ); // blockstore @@ -1017,7 +1017,7 @@ fn shredNetwork() !void { &blockstore_db, lowest_cleanup_slot, current_config.max_shreds, - &app_base.exit, + app_base.exit, }); defer cleanup_service_handle.join(); @@ -1033,7 +1033,7 @@ fn shredNetwork() !void { .registry = app_base.metrics_registry, .random = prng.random(), .my_keypair = &app_base.my_keypair, - .exit = &app_base.exit, + .exit = app_base.exit, .gossip_table_rw = &gossip_service.gossip_table_rw, .my_shred_version = &gossip_service.my_shred_version, .epoch_context_mgr = &epoch_context_manager, @@ -1261,7 +1261,7 @@ pub fn testTransactionSenderService() !void { transaction_channel, &gossip_service.gossip_table_rw, genesis_config.epoch_schedule, - &app_base.exit, + app_base.exit, ); const transaction_sender_handle = try std.Thread.spawn( .{}, @@ -1280,7 +1280,7 @@ pub fn testTransactionSenderService() !void { allocator, transaction_channel, rpc_client, - &app_base.exit, + app_base.exit, app_base.logger.unscoped(), ); // send and confirm mock transactions @@ -1380,7 +1380,7 @@ const AppBase = struct { my_ip: IpAddr, my_port: u16, - exit: std.atomic.Value(bool) = std.atomic.Value(bool).init(false), + exit: *std.atomic.Value(bool), closed: bool, fn init(allocator: std.mem.Allocator, cmd_config: config.Cmd) !AppBase { @@ -1389,8 +1389,14 @@ const AppBase = struct { const logger = plain_logger.withScope(LOG_SCOPE); errdefer logger.deinit(); + const exit = try std.heap.c_allocator.create(std.atomic.Value(bool)); + errdefer allocator.destroy(exit); + exit.* = std.atomic.Value(bool).init(false); + const metrics_registry = globalRegistry(); - const metrics_thread = try spawnMetrics(allocator, cmd_config.metrics_port); + const metrics_thread = try sig.utils.service_manager.spawnService( // + plain_logger, exit, "metrics endpoint", .{}, // + servePrometheus, .{ allocator, metrics_registry, cmd_config.metrics_port }); errdefer metrics_thread.detach(); const my_keypair = try sig.identity.getOrInit(allocator, logger.unscoped()); @@ -1429,6 +1435,7 @@ const AppBase = struct { .shred_version = my_shred_version, .my_ip = my_ip, .my_port = my_port, + .exit = exit, .closed = false, }; } @@ -1446,6 +1453,7 @@ const AppBase = struct { self.metrics_thread.detach(); self.logger.deinit(); if (self.log_file) |file| file.close(); + self.allocator.destroy(self.exit); } }; diff --git a/src/fuzz.zig b/src/fuzz.zig index 1e4c5ddc0..09b2a6c37 100644 --- a/src/fuzz.zig +++ b/src/fuzz.zig @@ -9,7 +9,8 @@ const ledger_fuzz = sig.ledger.fuzz_ledger; const ChannelPrintLogger = sig.trace.ChannelPrintLogger; const Level = sig.trace.Level; -const spawnMetrics = sig.prometheus.spawnMetrics; +const servePrometheus = sig.prometheus.servePrometheus; +const globalRegistry = sig.prometheus.globalRegistry; // where seeds are saved (in case of too many logs) const SEED_FILE_PATH = sig.TEST_DATA_DIR ++ "fuzz_seeds.txt"; @@ -45,12 +46,10 @@ pub fn main() !void { const metrics_port: u16 = 12345; logger.info().logf("metrics port: {d}", .{metrics_port}); - const metrics_thread = try spawnMetrics( - // TODO: use the GPA here, the server is just leaking because we're losing the handle - // to it and never deiniting. - std.heap.c_allocator, - metrics_port, - ); + const metrics_thread = try std.Thread + // TODO: use the GPA here, the server is just leaking because we're losing the handle + // to it and never deiniting. + .spawn(.{}, servePrometheus, .{ std.heap.c_allocator, globalRegistry(), 12355 }); metrics_thread.detach(); _ = cli_args.skip(); diff --git a/src/geyser/main.zig b/src/geyser/main.zig index 167c558e1..36e15586e 100644 --- a/src/geyser/main.zig +++ b/src/geyser/main.zig @@ -3,6 +3,9 @@ const std = @import("std"); const sig = @import("sig"); const cli = @import("zig-cli"); +const servePrometheus = sig.prometheus.servePrometheus; +const globalRegistry = sig.prometheus.globalRegistry; + pub const Config = struct { pipe_path: []const u8 = sig.VALIDATOR_DIR ++ "geyser.pipe", measure_rate_secs: u64 = 5, @@ -166,7 +169,8 @@ pub fn csvDump() !void { const logger = std_logger.logger(); - const metrics_thread = try sig.prometheus.spawnMetrics(allocator, 12355); + const metrics_thread = try std.Thread + .spawn(.{}, servePrometheus, .{ allocator, globalRegistry(), 12355 }); metrics_thread.detach(); logger.info().log("spawing metrics thread on port 12355"); diff --git a/src/prometheus/http.zig b/src/prometheus/http.zig index bcbd35152..86ad5f2b0 100644 --- a/src/prometheus/http.zig +++ b/src/prometheus/http.zig @@ -1,20 +1,11 @@ const builtin = @import("builtin"); const std = @import("std"); +const sig = @import("../sig.zig"); const Registry = @import("registry.zig").Registry; const globalRegistry = @import("registry.zig").globalRegistry; const DEFAULT_BUCKETS = @import("histogram.zig").DEFAULT_BUCKETS; -/// Initializes the global registry. Returns error if registry was already initialized. -/// Spawns a thread to serve the metrics over http on the given port. -pub fn spawnMetrics( - gpa_allocator: std.mem.Allocator, - port: u16, -) !std.Thread { - const registry = globalRegistry(); - return std.Thread.spawn(.{}, servePrometheus, .{ gpa_allocator, registry, port }); -} - pub fn servePrometheus( allocator: std.mem.Allocator, registry: *Registry(.{}), @@ -97,9 +88,6 @@ pub fn main() !void { }.run, .{}, ); - try servePrometheus( - alloc, - globalRegistry(), - 12345, - ); + + try servePrometheus(alloc, globalRegistry(), 12345); } diff --git a/src/prometheus/lib.zig b/src/prometheus/lib.zig index 4048b5d90..7f8a3d936 100644 --- a/src/prometheus/lib.zig +++ b/src/prometheus/lib.zig @@ -17,4 +17,3 @@ pub const Registry = registry.Registry; pub const globalRegistry = registry.globalRegistry; pub const servePrometheus = http.servePrometheus; -pub const spawnMetrics = http.spawnMetrics; diff --git a/src/utils/service.zig b/src/utils/service.zig index fa74438cd..afd2e3028 100644 --- a/src/utils/service.zig +++ b/src/utils/service.zig @@ -61,42 +61,26 @@ pub const ServiceManager = struct { comptime function: anytype, args: anytype, ) !void { - try self.spawnCustom( - name, - self.default_run_config, - self.default_spawn_config, - function, - args, - ); + try self.spawnCustom(name, .{}, function, args); } /// Spawn a thread to be managed. /// The function may be restarted periodically, according to the provided config. - fn spawnCustom( + pub fn spawnCustom( self: *Self, comptime name: []const u8, - run_config: ?RunConfig, - spawn_config: std.Thread.SpawnConfig, + config: struct { + run_config: ?RunConfig = null, + spawn_config: ?std.Thread.SpawnConfig = null, + }, comptime function: anytype, args: anytype, ) !void { - const allocator = self.arena.allocator(); - - var thread = try std.Thread.spawn( - spawn_config, - runService, - .{ - self.logger, - self.exit, - name, - run_config orelse self.default_run_config, - function, - args, - }, - ); - - thread.setName(name) catch {}; - try self.threads.append(allocator, thread); + const thread = try spawnService(self.logger, self.exit, name, .{ + .run_config = config.run_config orelse self.default_run_config, + .spawn_config = config.spawn_config orelse self.default_spawn_config, + }, function, args); + try self.threads.append(self.arena.allocator(), thread); } /// Wait for all threads to exit, then return. @@ -143,6 +127,31 @@ pub const ReturnHandler = struct { log_exit: bool = true, }; +/// Spawn a thread with a looping/restart policy using runService. +/// The function may be restarted periodically, according to the provided config. +pub fn spawnService( + any_logger: anytype, + exit: *Atomic(bool), + name: []const u8, + config: struct { + run_config: RunConfig = .{}, + spawn_config: std.Thread.SpawnConfig = .{}, + }, + function: anytype, + args: anytype, +) std.Thread.SpawnError!std.Thread { + const logger = any_logger.withScope(ServiceManager.LOG_SCOPE); + var thread = try std.Thread.spawn( + config.spawn_config, + runService, + .{ logger, exit, name, config.run_config, function, args }, + ); + + thread.setName(name) catch logger.err().logf("failed to set name for thread '{s}'", .{name}); + + return thread; +} + /// Convert a short-lived task into a long-lived service by looping it, /// or make a service resilient by restarting it on failure. /// @@ -176,14 +185,15 @@ pub fn runService( // identify result if (result) |_| num_oks += 1 else |_| num_errors += 1; - const handler, const num_events, const event_name, const level_logger = if (result) |_| - .{ config.return_handler, num_oks, "return", logger.info() } + const handler, const num_events, const event_name, const level_logger, const trace // + = if (result) |_| + .{ config.return_handler, num_oks, "return", logger.info(), null } else |_| - .{ config.error_handler, num_errors, "error", logger.warn() }; + .{ config.error_handler, num_errors, "error", logger.warn(), @errorReturnTrace() }; // handle result if (handler.log_return) { - level_logger.logf("{s} has {s}ed: {any}", .{ name, event_name, result }); + level_logger.logf("{s} has {s}ed: {any} {?}", .{ name, event_name, result, trace }); } if (handler.max_iterations) |max| if (num_events >= max) { if (handler.set_exit_on_completion) {