Skip to content

Commit

Permalink
fix(prometheus): http server exit on error (#590)
Browse files Browse the repository at this point in the history
Currently, if the prometheus http server has an error, it just prints
the name of the error to stderr and then exits the thread. Metrics stop
being served. All kinds of errors could happen in here, but that doesn't
mean we should stop serving metrics. It's better to log an error and
then continue running. I used runService from the service manager to
accomplish this, with a bit of refactoring in how the thread is spawned.
  • Loading branch information
dnut authored Mar 3, 2025
1 parent b3fba1f commit 2349e19
Show file tree
Hide file tree
Showing 6 changed files with 74 additions and 66 deletions.
30 changes: 19 additions & 11 deletions src/cmd.zig
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ const downloadSnapshotsFromGossip = sig.accounts_db.downloadSnapshotsFromGossip;
const getShredAndIPFromEchoServer = sig.net.echo.getShredAndIPFromEchoServer;
const getWallclockMs = sig.time.getWallclockMs;
const globalRegistry = sig.prometheus.globalRegistry;
const spawnMetrics = sig.prometheus.spawnMetrics;
const servePrometheus = sig.prometheus.servePrometheus;

var gpa = std.heap.GeneralPurposeAllocator(.{}){};
const gpa_allocator = if (builtin.mode == .Debug)
Expand Down Expand Up @@ -857,7 +857,7 @@ fn validator() !void {
&blockstore_db,
lowest_cleanup_slot,
current_config.max_shreds,
&app_base.exit,
app_base.exit,
});
defer cleanup_service_handle.join();

Expand Down Expand Up @@ -901,7 +901,7 @@ fn validator() !void {
const rpc_epoch_ctx_service_thread = try std.Thread.spawn(
.{},
sig.adapter.RpcEpochContextService.run,
.{ &rpc_epoch_ctx_service, &app_base.exit },
.{ &rpc_epoch_ctx_service, app_base.exit },
);

const turbine_config = current_config.turbine;
Expand All @@ -915,7 +915,7 @@ fn validator() !void {
.registry = app_base.metrics_registry,
.random = prng.random(),
.my_keypair = &app_base.my_keypair,
.exit = &app_base.exit,
.exit = app_base.exit,
.gossip_table_rw = &gossip_service.gossip_table_rw,
.my_shred_version = &gossip_service.my_shred_version,
.epoch_context_mgr = &epoch_context_manager,
Expand Down Expand Up @@ -975,7 +975,7 @@ fn shredNetwork() !void {
const rpc_epoch_ctx_service_thread = try std.Thread.spawn(
.{},
sig.adapter.RpcEpochContextService.run,
.{ &rpc_epoch_ctx_service, &app_base.exit },
.{ &rpc_epoch_ctx_service, app_base.exit },
);

// blockstore
Expand Down Expand Up @@ -1017,7 +1017,7 @@ fn shredNetwork() !void {
&blockstore_db,
lowest_cleanup_slot,
current_config.max_shreds,
&app_base.exit,
app_base.exit,
});
defer cleanup_service_handle.join();

Expand All @@ -1033,7 +1033,7 @@ fn shredNetwork() !void {
.registry = app_base.metrics_registry,
.random = prng.random(),
.my_keypair = &app_base.my_keypair,
.exit = &app_base.exit,
.exit = app_base.exit,
.gossip_table_rw = &gossip_service.gossip_table_rw,
.my_shred_version = &gossip_service.my_shred_version,
.epoch_context_mgr = &epoch_context_manager,
Expand Down Expand Up @@ -1261,7 +1261,7 @@ pub fn testTransactionSenderService() !void {
transaction_channel,
&gossip_service.gossip_table_rw,
genesis_config.epoch_schedule,
&app_base.exit,
app_base.exit,
);
const transaction_sender_handle = try std.Thread.spawn(
.{},
Expand All @@ -1280,7 +1280,7 @@ pub fn testTransactionSenderService() !void {
allocator,
transaction_channel,
rpc_client,
&app_base.exit,
app_base.exit,
app_base.logger.unscoped(),
);
// send and confirm mock transactions
Expand Down Expand Up @@ -1380,7 +1380,7 @@ const AppBase = struct {
my_ip: IpAddr,
my_port: u16,

exit: std.atomic.Value(bool) = std.atomic.Value(bool).init(false),
exit: *std.atomic.Value(bool),
closed: bool,

fn init(allocator: std.mem.Allocator, cmd_config: config.Cmd) !AppBase {
Expand All @@ -1389,8 +1389,14 @@ const AppBase = struct {
const logger = plain_logger.withScope(LOG_SCOPE);
errdefer logger.deinit();

const exit = try std.heap.c_allocator.create(std.atomic.Value(bool));
errdefer allocator.destroy(exit);
exit.* = std.atomic.Value(bool).init(false);

const metrics_registry = globalRegistry();
const metrics_thread = try spawnMetrics(allocator, cmd_config.metrics_port);
const metrics_thread = try sig.utils.service_manager.spawnService( //
plain_logger, exit, "metrics endpoint", .{}, //
servePrometheus, .{ allocator, metrics_registry, cmd_config.metrics_port });
errdefer metrics_thread.detach();

const my_keypair = try sig.identity.getOrInit(allocator, logger.unscoped());
Expand Down Expand Up @@ -1429,6 +1435,7 @@ const AppBase = struct {
.shred_version = my_shred_version,
.my_ip = my_ip,
.my_port = my_port,
.exit = exit,
.closed = false,
};
}
Expand All @@ -1446,6 +1453,7 @@ const AppBase = struct {
self.metrics_thread.detach();
self.logger.deinit();
if (self.log_file) |file| file.close();
self.allocator.destroy(self.exit);
}
};

Expand Down
13 changes: 6 additions & 7 deletions src/fuzz.zig
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ const ledger_fuzz = sig.ledger.fuzz_ledger;
const ChannelPrintLogger = sig.trace.ChannelPrintLogger;
const Level = sig.trace.Level;

const spawnMetrics = sig.prometheus.spawnMetrics;
const servePrometheus = sig.prometheus.servePrometheus;
const globalRegistry = sig.prometheus.globalRegistry;

// where seeds are saved (in case of too many logs)
const SEED_FILE_PATH = sig.TEST_DATA_DIR ++ "fuzz_seeds.txt";
Expand Down Expand Up @@ -45,12 +46,10 @@ pub fn main() !void {
const metrics_port: u16 = 12345;

logger.info().logf("metrics port: {d}", .{metrics_port});
const metrics_thread = try spawnMetrics(
// TODO: use the GPA here, the server is just leaking because we're losing the handle
// to it and never deiniting.
std.heap.c_allocator,
metrics_port,
);
const metrics_thread = try std.Thread
// TODO: use the GPA here, the server is just leaking because we're losing the handle
// to it and never deiniting.
.spawn(.{}, servePrometheus, .{ std.heap.c_allocator, globalRegistry(), 12355 });
metrics_thread.detach();

_ = cli_args.skip();
Expand Down
6 changes: 5 additions & 1 deletion src/geyser/main.zig
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ const std = @import("std");
const sig = @import("sig");
const cli = @import("zig-cli");

const servePrometheus = sig.prometheus.servePrometheus;
const globalRegistry = sig.prometheus.globalRegistry;

pub const Config = struct {
pipe_path: []const u8 = sig.VALIDATOR_DIR ++ "geyser.pipe",
measure_rate_secs: u64 = 5,
Expand Down Expand Up @@ -166,7 +169,8 @@ pub fn csvDump() !void {

const logger = std_logger.logger();

const metrics_thread = try sig.prometheus.spawnMetrics(allocator, 12355);
const metrics_thread = try std.Thread
.spawn(.{}, servePrometheus, .{ allocator, globalRegistry(), 12355 });
metrics_thread.detach();
logger.info().log("spawing metrics thread on port 12355");

Expand Down
18 changes: 3 additions & 15 deletions src/prometheus/http.zig
Original file line number Diff line number Diff line change
@@ -1,20 +1,11 @@
const builtin = @import("builtin");
const std = @import("std");
const sig = @import("../sig.zig");

const Registry = @import("registry.zig").Registry;
const globalRegistry = @import("registry.zig").globalRegistry;
const DEFAULT_BUCKETS = @import("histogram.zig").DEFAULT_BUCKETS;

/// Initializes the global registry. Returns error if registry was already initialized.
/// Spawns a thread to serve the metrics over http on the given port.
pub fn spawnMetrics(
gpa_allocator: std.mem.Allocator,
port: u16,
) !std.Thread {
const registry = globalRegistry();
return std.Thread.spawn(.{}, servePrometheus, .{ gpa_allocator, registry, port });
}

pub fn servePrometheus(
allocator: std.mem.Allocator,
registry: *Registry(.{}),
Expand Down Expand Up @@ -97,9 +88,6 @@ pub fn main() !void {
}.run,
.{},
);
try servePrometheus(
alloc,
globalRegistry(),
12345,
);

try servePrometheus(alloc, globalRegistry(), 12345);
}
1 change: 0 additions & 1 deletion src/prometheus/lib.zig
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,3 @@ pub const Registry = registry.Registry;

pub const globalRegistry = registry.globalRegistry;
pub const servePrometheus = http.servePrometheus;
pub const spawnMetrics = http.spawnMetrics;
72 changes: 41 additions & 31 deletions src/utils/service.zig
Original file line number Diff line number Diff line change
Expand Up @@ -61,42 +61,26 @@ pub const ServiceManager = struct {
comptime function: anytype,
args: anytype,
) !void {
try self.spawnCustom(
name,
self.default_run_config,
self.default_spawn_config,
function,
args,
);
try self.spawnCustom(name, .{}, function, args);
}

/// Spawn a thread to be managed.
/// The function may be restarted periodically, according to the provided config.
fn spawnCustom(
pub fn spawnCustom(
self: *Self,
comptime name: []const u8,
run_config: ?RunConfig,
spawn_config: std.Thread.SpawnConfig,
config: struct {
run_config: ?RunConfig = null,
spawn_config: ?std.Thread.SpawnConfig = null,
},
comptime function: anytype,
args: anytype,
) !void {
const allocator = self.arena.allocator();

var thread = try std.Thread.spawn(
spawn_config,
runService,
.{
self.logger,
self.exit,
name,
run_config orelse self.default_run_config,
function,
args,
},
);

thread.setName(name) catch {};
try self.threads.append(allocator, thread);
const thread = try spawnService(self.logger, self.exit, name, .{
.run_config = config.run_config orelse self.default_run_config,
.spawn_config = config.spawn_config orelse self.default_spawn_config,
}, function, args);
try self.threads.append(self.arena.allocator(), thread);
}

/// Wait for all threads to exit, then return.
Expand Down Expand Up @@ -143,6 +127,31 @@ pub const ReturnHandler = struct {
log_exit: bool = true,
};

/// Spawn a thread with a looping/restart policy using runService.
/// The function may be restarted periodically, according to the provided config.
pub fn spawnService(
any_logger: anytype,
exit: *Atomic(bool),
name: []const u8,
config: struct {
run_config: RunConfig = .{},
spawn_config: std.Thread.SpawnConfig = .{},
},
function: anytype,
args: anytype,
) std.Thread.SpawnError!std.Thread {
const logger = any_logger.withScope(ServiceManager.LOG_SCOPE);
var thread = try std.Thread.spawn(
config.spawn_config,
runService,
.{ logger, exit, name, config.run_config, function, args },
);

thread.setName(name) catch logger.err().logf("failed to set name for thread '{s}'", .{name});

return thread;
}

/// Convert a short-lived task into a long-lived service by looping it,
/// or make a service resilient by restarting it on failure.
///
Expand Down Expand Up @@ -176,14 +185,15 @@ pub fn runService(

// identify result
if (result) |_| num_oks += 1 else |_| num_errors += 1;
const handler, const num_events, const event_name, const level_logger = if (result) |_|
.{ config.return_handler, num_oks, "return", logger.info() }
const handler, const num_events, const event_name, const level_logger, const trace //
= if (result) |_|
.{ config.return_handler, num_oks, "return", logger.info(), null }
else |_|
.{ config.error_handler, num_errors, "error", logger.warn() };
.{ config.error_handler, num_errors, "error", logger.warn(), @errorReturnTrace() };

// handle result
if (handler.log_return) {
level_logger.logf("{s} has {s}ed: {any}", .{ name, event_name, result });
level_logger.logf("{s} has {s}ed: {any} {?}", .{ name, event_name, result, trace });
}
if (handler.max_iterations) |max| if (num_events >= max) {
if (handler.set_exit_on_completion) {
Expand Down

0 comments on commit 2349e19

Please sign in to comment.