Skip to content

Commit

Permalink
Merge pull request #62 from rbino/chore/tb-0.16.29
Browse files Browse the repository at this point in the history
chore: update TigerBeetle to 0.16.29
  • Loading branch information
rbino authored Feb 25, 2025
2 parents d6ad8e1 + 8803986 commit 167ee40
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 45 deletions.
4 changes: 2 additions & 2 deletions build.zig.zon
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@
// internet connectivity.
.dependencies = .{
.tigerbeetle = .{
.url = "https://github.com/tigerbeetle/tigerbeetle/archive/refs/tags/0.16.27.tar.gz",
.hash = "1220799ff2ee1f14db1e01385a39d3c6668bfffb19cb5103907c4576272faf82eddf",
.url = "https://github.com/tigerbeetle/tigerbeetle/archive/refs/tags/0.16.29.tar.gz",
.hash = "12206daa08ef3fdaef5decc6479c52467eb1be63dc13ccbdfc3b8bd04050c7f50e56",
},
},
.paths = .{
Expand Down
74 changes: 45 additions & 29 deletions src/client.zig
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ const batch = @import("batch.zig");
const Batch = batch.Batch;
const BatchResource = batch.BatchResource;

pub const Client = tb_client.tb_client_t;
pub const ClientResource = Resource(Client, client_resource_deinit_fn);
const ClientInterface = tb_client.ClientInterface;
pub const ClientResource = Resource(*ClientInterface, client_resource_deinit_fn);

const RequestContext = struct {
caller_pid: beam.Pid,
Expand All @@ -27,27 +27,36 @@ const RequestContext = struct {
};

pub fn init(env: beam.Env, cluster_id: u128, addresses: []const u8) beam.Term {
const client: tb_client.tb_client_t = tb_client.init(
const client = beam.general_purpose_allocator.create(tb_client.ClientInterface) catch {
return beam.make_error_atom(env, "out_of_memory");
};
tb_client.init(
beam.general_purpose_allocator,
client,
cluster_id,
addresses,
@intFromPtr(beam.alloc_env()),
on_completion,
) catch |err| switch (err) {
error.Unexpected => return beam.make_error_atom(env, "unexpected"),
error.OutOfMemory => return beam.make_error_atom(env, "out_of_memory"),
error.AddressInvalid => return beam.make_error_atom(env, "invalid_address"),
error.AddressLimitExceeded => return beam.make_error_atom(env, "address_limit_exceeded"),
error.SystemResources => return beam.make_error_atom(env, "system_resources"),
error.NetworkSubsystemFailed => return beam.make_error_atom(env, "network_subsystem"),
) catch |err| {
// TODO: do some refactoring to allow using errdefer
beam.general_purpose_allocator.destroy(client);
switch (err) {
error.Unexpected => return beam.make_error_atom(env, "unexpected"),
error.OutOfMemory => return beam.make_error_atom(env, "out_of_memory"),
error.AddressInvalid => return beam.make_error_atom(env, "invalid_address"),
error.AddressLimitExceeded => return beam.make_error_atom(env, "address_limit_exceeded"),
error.SystemResources => return beam.make_error_atom(env, "system_resources"),
error.NetworkSubsystemFailed => return beam.make_error_atom(env, "network_subsystem"),
}
};

const client_resource = ClientResource.init(client) catch |err|
switch (err) {
error.OutOfMemory => {
// Deinit the client
// TODO: do some refactoring to allow using errdefer
tb_client.deinit(client);
client.deinit() catch unreachable;
beam.general_purpose_allocator.destroy(client);
return beam.make_error_atom(env, "out_of_memory");
},
};
Expand All @@ -57,14 +66,14 @@ pub fn init(env: beam.Env, cluster_id: u128, addresses: []const u8) beam.Term {
return beam.make_ok_term(env, term_handle);
}

fn OperationBatchItemType(comptime operation: tb_client.tb_operation_t) type {
fn OperationBatchItemType(comptime operation: tb_client.Operation) type {
return switch (operation) {
.create_accounts => Account,
.create_transfers => Transfer,
.lookup_accounts, .lookup_transfers => u128,
.get_account_transfers, .get_account_balances => AccountFilter,
.query_accounts, .query_transfers => @panic("TODO"),
.pulse => unreachable,
.pulse, .get_events => unreachable,
};
}

Expand All @@ -82,7 +91,7 @@ pub const lookup_transfers = get_submit_fn(.lookup_transfers);
pub const get_account_transfers = get_submit_fn(.get_account_transfers);
pub const get_account_balances = get_submit_fn(.get_account_balances);

fn get_submit_fn(comptime operation: tb_client.tb_operation_t) (fn (
fn get_submit_fn(comptime operation: tb_client.Operation) (fn (
env: beam.Env,
client_resource: beam.Term,
payload_term: beam.Term,
Expand Down Expand Up @@ -111,7 +120,7 @@ fn get_submit_fn(comptime operation: tb_client.tb_operation_t) (fn (
}

fn submit(
comptime operation: tb_client.tb_operation_t,
comptime operation: tb_client.Operation,
env: beam.Env,
client_resource: ClientResource,
payload_resource: BatchResource(OperationBatchItemType(operation)),
Expand All @@ -120,7 +129,7 @@ fn submit(
const client = client_resource.value();
const payload = payload_resource.ptr_const();

const packet = beam.general_purpose_allocator.create(tb_client.tb_packet_t) catch {
const packet = beam.general_purpose_allocator.create(tb_client.Packet) catch {
return error.OutOfMemory;
};
errdefer beam.general_purpose_allocator.destroy(packet);
Expand Down Expand Up @@ -150,26 +159,29 @@ fn submit(
ctx.payload_raw_obj = payload_resource.raw_ptr;
ctx.client_raw_obj = client_resource.raw_ptr;

packet.operation = @intFromEnum(operation);
packet.data = payload.items.ptr;
packet.data_size = @sizeOf(Item) * payload.len;
packet.user_data = ctx;
packet.status = .ok;
packet.* = .{
.user_data = ctx,
.operation = @intFromEnum(operation),
.data = payload.items.ptr,
.data_size = @sizeOf(Item) * payload.len,
.user_tag = 0,
.status = undefined,
};

tb_client.submit(client, packet);
client.submit(packet) catch |err| switch (err) {
error.ClientInvalid => return beam.make_error_atom(env, "client_closed"),
};

return beam.make_ok_term(env, ref);
}

fn on_completion(
context: usize,
client: tb_client.tb_client_t,
packet: *tb_client.tb_packet_t,
packet: *tb_client.Packet,
timestamp: u64,
result_ptr: ?[*]const u8,
result_len: u32,
) callconv(.C) void {
_ = client;
_ = timestamp;
const ctx: *RequestContext = @ptrCast(@alignCast(packet.user_data.?));
defer beam.general_purpose_allocator.destroy(ctx);
Expand Down Expand Up @@ -206,10 +218,14 @@ fn on_completion(

fn client_resource_deinit_fn(_: beam.Env, ptr: ?*anyopaque) callconv(.C) void {
if (ptr) |p| {
const cl: *Client = @ptrCast(@alignCast(p));
defer tb_client.deinit(cl.*);

const completion_ctx = tb_client.completion_context(cl.*);
const client: *ClientInterface = @ptrCast(@alignCast(p));
// The client was already closed, we just return
defer client.deinit() catch {};

const completion_ctx = client.completion_context() catch |err| switch (err) {
// The client was already closed, we just return
error.ClientInvalid => return,
};
const env: beam.Env = @ptrFromInt(completion_ctx);
beam.free_env(env);
} else unreachable;
Expand Down
2 changes: 1 addition & 1 deletion src/config.zig
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
pub const vsr_options = .{
.release = "0.16.27",
.release = "0.16.29",
.release_client_min = "0.15.3",
.git_commit = null,
.hash_log_mode = .none,
Expand Down
1 change: 0 additions & 1 deletion src/tigerbeetlex.zig
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ const AccountBatch = account_batch.AccountBatch;
const IdBatch = id_batch.IdBatch;
const TransferBatch = transfer_batch.TransferBatch;
const AccountFilterBatch = account_filter_batch.TransferBatch;
const Client = client.Client;

const ClientResource = client.ClientResource;
const AccountBatchResource = account_batch.AccountBatchResource;
Expand Down
26 changes: 14 additions & 12 deletions tools/elixir_bindings.zig
Original file line number Diff line number Diff line change
Expand Up @@ -728,10 +728,11 @@ fn emit_response_module(
, .{});

{
const operation_info = @typeInfo(tb_client.tb_operation_t).Enum;
const operation_info = @typeInfo(tb_client.Operation).Enum;
inline for (operation_info.fields) |field| {
const result_type = StateMachine.ResultType(@enumFromInt(field.value));
if (result_type == void) continue;
const operation: tb_client.Operation = @enumFromInt(field.value);
if (operation == .pulse or operation == .get_events) continue;
const result_type = StateMachine.ResultType(operation);

try buffer.writer().print(
\\ - `{[operation_name]s}`: a list of `%TigerBeetlex.{[result_module]s}{{}}`
Expand All @@ -749,9 +750,9 @@ fn emit_response_module(
, .{});

{
const packet_status_info = @typeInfo(tb_client.tb_packet_status_t).Enum;
const packet_status_info = @typeInfo(tb_client.PacketStatus).Enum;
inline for (packet_status_info.fields) |field| {
const status: tb_client.tb_packet_status_t = @enumFromInt(field.value);
const status: tb_client.PacketStatus = @enumFromInt(field.value);
if (status == .ok) {
try buffer.writer().print(
\\ def decode({{{[ok_value]}, operation, batch}}) do
Expand All @@ -778,10 +779,11 @@ fn emit_response_module(
}

{
const operation_info = @typeInfo(tb_client.tb_operation_t).Enum;
const operation_info = @typeInfo(tb_client.Operation).Enum;
inline for (operation_info.fields) |field| {
const result_type = StateMachine.ResultType(@enumFromInt(field.value));
if (result_type == void) continue;
const operation: tb_client.Operation = @enumFromInt(field.value);
if (operation == .pulse or operation == .get_events) continue;
const result_type = StateMachine.ResultType(operation);

try buffer.writer().print(
\\ defp build_result_list({[operation_value]}, batch) when rem(bit_size(batch), {[result_bit_size]}) == 0 do
Expand All @@ -801,7 +803,7 @@ fn emit_response_module(
}

{
const packet_status_info = @typeInfo(tb_client.tb_packet_status_t).Enum;
const packet_status_info = @typeInfo(tb_client.PacketStatus).Enum;

try buffer.writer().print(
\\ @doc false
Expand All @@ -828,7 +830,7 @@ fn emit_response_module(
}

{
const operation_info = @typeInfo(tb_client.tb_operation_t).Enum;
const operation_info = @typeInfo(tb_client.Operation).Enum;

try buffer.writer().print(
\\ @doc false
Expand All @@ -838,8 +840,8 @@ fn emit_response_module(
, .{});

inline for (operation_info.fields) |field| {
const result_type = StateMachine.ResultType(@enumFromInt(field.value));
if (result_type == void) continue;
const operation: tb_client.Operation = @enumFromInt(field.value);
if (operation == .pulse or operation == .get_events) continue;

try buffer.writer().print(
\\ {[error_name]s}: {[error_value]},
Expand Down

0 comments on commit 167ee40

Please sign in to comment.