Skip to content

Commit

Permalink
fix(prometheus): prometheus cannot connect to sig's metrics (#531)
Browse files Browse the repository at this point in the history
A previous commit broke our prometheus server. This reverts the portion of that commit which affects prometheus.

8345545

"refactor(echo,prometheus,build): Remove our dependency on the httpz library (#520)"
  • Loading branch information
dnut authored Feb 3, 2025
1 parent a14a5a5 commit 3e0043e
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 61 deletions.
8 changes: 8 additions & 0 deletions build.zig
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ pub fn build(b: *Build) void {
const zig_cli_dep = b.dependency("zig-cli", dep_opts);
const zig_cli_module = zig_cli_dep.module("zig-cli");

const httpz_dep = b.dependency("httpz", dep_opts);
const httpz_mod = httpz_dep.module("httpz");

const zstd_dep = b.dependency("zstd", dep_opts);
const zstd_mod = zstd_dep.module("zstd");

Expand Down Expand Up @@ -75,6 +78,7 @@ pub fn build(b: *Build) void {
sig_mod.addImport("zig-network", zig_network_module);
sig_mod.addImport("base58-zig", base58_module);
sig_mod.addImport("zig-cli", zig_cli_module);
sig_mod.addImport("httpz", httpz_mod);
sig_mod.addImport("zstd", zstd_mod);
switch (blockstore_db) {
.rocksdb => sig_mod.addImport("rocksdb", rocksdb_mod),
Expand All @@ -101,6 +105,7 @@ pub fn build(b: *Build) void {
sig_exe.root_module.addOptions("build-options", build_options);

sig_exe.root_module.addImport("base58-zig", base58_module);
sig_exe.root_module.addImport("httpz", httpz_mod);
sig_exe.root_module.addImport("zig-cli", zig_cli_module);
sig_exe.root_module.addImport("zig-network", zig_network_module);
sig_exe.root_module.addImport("zstd", zstd_mod);
Expand Down Expand Up @@ -139,6 +144,7 @@ pub fn build(b: *Build) void {
unit_tests_exe.root_module.addOptions("build-options", build_options);

unit_tests_exe.root_module.addImport("base58-zig", base58_module);
unit_tests_exe.root_module.addImport("httpz", httpz_mod);
unit_tests_exe.root_module.addImport("zig-network", zig_network_module);
unit_tests_exe.root_module.addImport("zstd", zstd_mod);
switch (blockstore_db) {
Expand Down Expand Up @@ -173,6 +179,7 @@ pub fn build(b: *Build) void {

fuzz_exe.root_module.addImport("base58-zig", base58_module);
fuzz_exe.root_module.addImport("zig-network", zig_network_module);
fuzz_exe.root_module.addImport("httpz", httpz_mod);
fuzz_exe.root_module.addImport("zstd", zstd_mod);
switch (blockstore_db) {
.rocksdb => fuzz_exe.root_module.addImport("rocksdb", rocksdb_mod),
Expand Down Expand Up @@ -207,6 +214,7 @@ pub fn build(b: *Build) void {

benchmark_exe.root_module.addImport("base58-zig", base58_module);
benchmark_exe.root_module.addImport("zig-network", zig_network_module);
benchmark_exe.root_module.addImport("httpz", httpz_mod);
benchmark_exe.root_module.addImport("zstd", zstd_mod);
benchmark_exe.root_module.addImport("prettytable", pretty_table_mod);
switch (blockstore_db) {
Expand Down
4 changes: 4 additions & 0 deletions build.zig.zon
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@
.url = "https://github.com/sam701/zig-cli/archive/8c7a798c0f7fa0358d7ab41106fc872fca4cd995.tar.gz",
.hash = "1220c008492d9460c3be2b209600a948181e6efb3bf0d79a1633def499632e708f4b",
},
.httpz = .{
.url = "https://github.com/karlseguin/http.zig/archive/79dad0f0cc652830cd8e49bf3e73aa77155ad4b2.tar.gz",
.hash = "1220b8a918dfcee4fc8326ec337776e2ffd3029511c35f6b96d10aa7be98ca2faf99",
},
.zstd = .{
.url = "git+https://github.com/Syndica/zstd.zig#5095f011c1183aa67d696172795440d6a33732c9",
.hash = "122030ebe280b73693963a67ed656226a67b7f00a0a05665155da00c9fcdee90de88",
Expand Down
92 changes: 31 additions & 61 deletions src/prometheus/http.zig
Original file line number Diff line number Diff line change
@@ -1,78 +1,48 @@
const builtin = @import("builtin");
const std = @import("std");
const httpz = @import("httpz");

const Registry = @import("registry.zig").Registry;
const globalRegistry = @import("registry.zig").globalRegistry;
const DEFAULT_BUCKETS = @import("histogram.zig").DEFAULT_BUCKETS;

/// Initializes the global registry. Returns error if registry was already initialized.
/// Spawns a thread to serve the metrics over http on the given port.
pub fn spawnMetrics(
gpa_allocator: std.mem.Allocator,
port: u16,
) !std.Thread {
const registry = globalRegistry();
return std.Thread.spawn(.{}, servePrometheus, .{ gpa_allocator, registry, port });
}

pub fn servePrometheus(
allocator: std.mem.Allocator,
registry: *Registry(.{}),
port: u16,
) !void {
const our_ip = std.net.Address.initIp4(.{ 127, 0, 0, 1 }, port);
var tcp = try our_ip.listen(.{
.force_nonblocking = true,
.reuse_address = true,
});
defer tcp.deinit();

while (true) {
const conn = tcp.accept() catch |err| switch (err) {
error.WouldBlock => continue,
else => |e| return e,
};

// TODO: unify this with the code for the RPC server
if (comptime builtin.target.isDarwin()) set_flags: {
const FlagsInt = @typeInfo(std.posix.O).Struct.backing_integer.?;
var flags_int: FlagsInt =
@intCast(try std.posix.fcntl(conn.stream.handle, std.posix.F.GETFL, 0));
const flags: *std.posix.O =
std.mem.bytesAsValue(std.posix.O, std.mem.asBytes(&flags_int));
if (flags.NONBLOCK == false and flags.CLOEXEC == true) break :set_flags;
flags.NONBLOCK = false;
flags.CLOEXEC = true;
_ = try std.posix.fcntl(conn.stream.handle, std.posix.F.SETFL, flags_int);
}
const endpoint = MetricsEndpoint{
.allocator = allocator,
.registry = registry,
};
var server = try httpz.ServerCtx(*const MetricsEndpoint, *const MetricsEndpoint).init(
allocator,
.{ .port = port, .address = "0.0.0.0" },
&endpoint,
);
var router = server.router();
router.get("/metrics", getMetrics);
return server.listen();
}

var read_buffer: [4096]u8 = undefined;
var http_server = std.http.Server.init(conn, &read_buffer);
var request = http_server.receiveHead() catch continue;
const MetricsEndpoint = struct {
allocator: std.mem.Allocator,
registry: *Registry(.{}),
};

if (request.head.method != .GET or
!std.mem.eql(u8, request.head.target, "/metrics") //
) {
try request.respond("", .{
.version = .@"HTTP/1.0",
.status = .not_found,
.keep_alive = false,
});
continue;
}
/// Initializes the global registry. Returns error if registry was already initialized.
/// Spawns a thread to serve the metrics over http on the given port.
pub fn spawnMetrics(gpa_allocator: std.mem.Allocator, port: u16) !std.Thread {
const registry = globalRegistry();
return std.Thread.spawn(.{}, servePrometheus, .{ gpa_allocator, registry, port });
}

var send_buffer: [4096]u8 = undefined;
var response = request.respondStreaming(.{
.send_buffer = &send_buffer,
.respond_options = .{
.version = .@"HTTP/1.0",
.status = .ok,
.keep_alive = true,
},
});
try registry.write(allocator, response.writer());
try response.end();
}
pub fn getMetrics(
self: *const MetricsEndpoint,
_: *httpz.Request,
response: *httpz.Response,
) !void {
response.content_type = .TEXT; // expected by prometheus
try self.registry.write(self.allocator, response.writer());
}

/// Runs a test prometheus endpoint with dummy data.
Expand Down

0 comments on commit 3e0043e

Please sign in to comment.