Skip to content

Commit

Permalink
Merge pull request #61 from rbino/feat/get_account_balances_transfers
Browse files Browse the repository at this point in the history
Add  `get_account_balances` and `get_account_transfers`
  • Loading branch information
rbino authored Feb 25, 2025
2 parents ddd1e9c + bcd8f24 commit d6ad8e1
Show file tree
Hide file tree
Showing 10 changed files with 491 additions and 5 deletions.
75 changes: 75 additions & 0 deletions lib/tigerbeetlex.ex
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ defmodule TigerBeetlex do
end

alias TigerBeetlex.AccountBatch
alias TigerBeetlex.AccountFilterBatch
alias TigerBeetlex.IDBatch
alias TigerBeetlex.NifAdapter
alias TigerBeetlex.TransferBatch
Expand Down Expand Up @@ -186,6 +187,80 @@ defmodule TigerBeetlex do
NifAdapter.create_transfers(client.ref, transfer_batch.ref)
end

@doc """
Fetch a list of historical `%TigerBeetlex.AccountBalance{}` for a given `%TigerBeetlex.Account{}`.
Only accounts created with the `history` flag set retain historical balances. This is off by default.
`client` is a `%TigerBeetlex{}` client.
`account_filter_batch` is a `TigerBeetlex.AccountFilterBatch` struct.
The function returns a ref which can be used to match the received response message.
The response message has this format:
{:tigerbeetlex_response, request_ref, response}
Where `request_ref` is the same `ref` returned when this function was called and `response` is
a response that can be decoded using `TigerBeetlex.Response.decode/1`.
The value returned from `TigerBeetlex.Response.decode(response)` will either be
`{:error, reason}` or `{:ok, results}`, where `results` is a list of `%TigerBeetlex.AccountBalance{}`
structs.
## Examples
batch = TigerBeetlex.AccountFilterBatch.new!(%AccountFilter{id: <<42::128>>})
{:ok, ref} = TigerBeetlex.get_account_balances(client, batch)
receive do
{:tigerbeetlex_response, ^ref, response} -> TigerBeetlex.Response.decode(response)
end
#=> {:ok, [%TigerBeetlex.AccountBalance{}]}
"""
def get_account_balances(%__MODULE__{} = client, %AccountFilterBatch{} = batch) do
NifAdapter.get_account_balances(client.ref, batch.ref)
end

@doc """
Fetch a list of `%TigerBeetlex.Transfer{}` involving a `%TigerBeetlex.Account{}`.
`client` is a `%TigerBeetlex{}` client.
`account_filter_batch` is a `TigerBeetlex.AccountFilterBatch` struct.
The function returns a ref which can be used to match the received response message.
The response message has this format:
{:tigerbeetlex_response, request_ref, response}
Where `request_ref` is the same `ref` returned when this function was called and `response` is
a response that can be decoded using `TigerBeetlex.Response.decode/1`.
The value returned from `TigerBeetlex.Response.decode(response)` will either be
`{:error, reason}` or `{:ok, results}`, where `results` is a list of `%TigerBeetlex.Transfer{}`
structs.
## Examples
batch = TigerBeetlex.AccountFilterBatch.new!(%AccountFilter{id: <<42::128>>})
{:ok, ref} = TigerBeetlex.get_account_balances(client, batch)
receive do
{:tigerbeetlex_response, ^ref, response} -> TigerBeetlex.Response.decode(response)
end
#=> {:ok, [%TigerBeetlex.Transfer{}]}
"""
def get_account_transfers(%__MODULE__{} = client, %AccountFilterBatch{} = batch) do
NifAdapter.get_account_transfers(client.ref, batch.ref)
end

@doc """
Lookup a batch of accounts.
Expand Down
47 changes: 47 additions & 0 deletions lib/tigerbeetlex/account_filter_batch.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
defmodule TigerBeetlex.AccountFilterBatch do
@moduledoc """
Account Filter Batch creation and manipulation.
"""

use TypedStruct

typedstruct do
field :ref, reference(), enforce: true
end

alias TigerBeetlex.AccountFilter
alias TigerBeetlex.AccountFilterBatch
alias TigerBeetlex.BatchFullError
alias TigerBeetlex.InvalidBatchError
alias TigerBeetlex.NifAdapter
alias TigerBeetlex.OutOfMemoryError
alias TigerBeetlex.Types

@doc """
Creates a new account filter batch.
"""
@spec new(filter :: AccountFilter.t()) ::
{:ok, t()} | {:error, Types.create_batch_error()} | {:error, Types.append_error()}
def new(%AccountFilter{} = filter) do
binary = AccountFilter.to_binary(filter)
# 1 is the only valid value here - since that's what tigerbeetle expects
# https://docs.tigerbeetle.com/reference/requests/#batching-events
with {:ok, ref} <- NifAdapter.create_account_filter_batch(1),
:ok <- NifAdapter.append_account_filter(ref, binary) do
{:ok, %AccountFilterBatch{ref: ref}}
end
end

@doc """
Creates a new account filter batch, rasing in case of an error.
"""
@spec new!(filter :: AccountFilter.t()) :: t()
def new!(%AccountFilter{} = filter) do
case new(filter) do
{:ok, batch} -> batch
{:error, :out_of_memory} -> raise OutOfMemoryError
{:error, :invalid_batch} -> raise InvalidBatchError
{:error, :batch_full} -> raise BatchFullError
end
end
end
47 changes: 47 additions & 0 deletions lib/tigerbeetlex/connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ defmodule TigerBeetlex.Connection do

alias TigerBeetlex.{
AccountBatch,
AccountFilterBatch,
IDBatch,
Receiver,
TransferBatch,
Expand Down Expand Up @@ -278,6 +279,52 @@ defmodule TigerBeetlex.Connection do
|> GenServer.call({:lookup_transfers, id_batch})
end

@doc """
Fetch a list of historical `%TigerBeetlex.AccountBalance{}` for a given `%TigerBeetlex.Account{}`.
Only accounts created with the `history` flag set retain historical balances. This is off by default.
`name` is the same atom that was passed in the `:name` option in `start_link/1`.
`account_filter_batch` is a `TigerBeetlex.AccountFilterBatch` struct.
If successful, the function returns `{:ok, results}` where `results` is a list of
`%TigerBeetlex.AccountBalance{}` structs.
## Examples
batch = TigerBeetlex.AccountFilterBatch.new!(%AccountFilter{id: <<42::128>>})
TigerBeetlex.Connection.get_account_balances(:tb, batch)
#=> {:ok, [%TigerBeetlex.AccountBalance{}]}
"""
def get_account_balances(name, %AccountFilterBatch{} = account_filter_batch) do
via_tuple(name)
|> GenServer.call({:get_account_balances, account_filter_batch})
end

@doc """
Fetch a list of `%TigerBeetlex.Transfer{}` involving a `%TigerBeetlex.Account{}`.
`name` is the same atom that was passed in the `:name` option in `start_link/1`.
`account_filter_batch` is a `TigerBeetlex.AccountFilterBatch` struct.
If successful, the function returns `{:ok, results}` where `results` is a list of
`%TigerBeetlex.Transfer{}` structs.
## Examples
batch = TigerBeetlex.AccountFilterBatch.new!(%AccountFilter{id: <<42::128>>})
TigerBeetlex.Connection.get_account_transfers(:tb, batch)
#=> {:ok, [%TigerBeetlex.Transfer{}]}
"""
def get_account_transfers(name, %AccountFilterBatch{} = account_filter_batch) do
via_tuple(name)
|> GenServer.call({:get_account_transfers, account_filter_batch})
end

defp via_tuple(name) do
{:via, PartitionSupervisor, {name, self()}}
end
Expand Down
19 changes: 19 additions & 0 deletions lib/tigerbeetlex/nif_adapter.ex
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,25 @@ defmodule TigerBeetlex.NifAdapter do
{:ok, Types.transfer_batch()} | {:error, Types.create_batch_error()}
def create_transfer_batch(_capacity), do: :erlang.nif_error(:nif_not_loaded)

@spec get_account_transfers(client :: Types.client(), batch :: Types.account_filter_batch()) ::
{:ok, reference()} | {:error, Types.lookup_transfers_error()}
def get_account_transfers(_client, _id), do: :erlang.nif_error(:nif_not_loaded)

@spec get_account_balances(client :: Types.client(), batch :: Types.account_filter_batch()) ::
{:ok, reference()} | {:error, Types.lookup_accounts_error()}
def get_account_balances(_client, _id), do: :erlang.nif_error(:nif_not_loaded)

@spec create_account_filter_batch(capacity :: non_neg_integer()) ::
{:ok, Types.account_filter_batch()} | {:error, Types.create_batch_error()}
def create_account_filter_batch(_capacity), do: :erlang.nif_error(:nif_not_loaded)

@spec append_account_filter(
batch :: Types.account_filter_batch(),
binary :: Types.account_filter_binary()
) ::
:ok | {:error, Types.append_error()}
def append_account_filter(_batch, _binary), do: :erlang.nif_error(:nif_not_loaded)

@spec append_transfer(
transfer_batch :: Types.transfer_batch(),
transfer_binary :: Types.transfer_binary()
Expand Down
8 changes: 8 additions & 0 deletions lib/tigerbeetlex/receiver.ex
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,14 @@ defmodule TigerBeetlex.Receiver do
send_request(from, :lookup_transfers, [state.client, id_batch], state)
end

def handle_call({:get_account_balances, id_batch}, from, state) do
send_request(from, :get_account_balances, [state.client, id_batch], state)
end

def handle_call({:get_account_transfers, id_batch}, from, state) do
send_request(from, :get_account_transfers, [state.client, id_batch], state)
end

defp send_request(from, function, arguments, state) do
case apply(TigerBeetlex, function, arguments) do
{:ok, ref} ->
Expand Down
4 changes: 4 additions & 0 deletions lib/tigerbeetlex/types.ex
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@ defmodule TigerBeetlex.Types do

@type account_binary :: <<_::1024>>

@type account_filter_batch :: reference()

@type account_filter_binary :: <<_::1024>>

@type transfer_batch :: reference()

@type transfer_binary :: <<_::1024>>
Expand Down
35 changes: 35 additions & 0 deletions src/account_filter_batch.zig
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
const std = @import("std");
const assert = std.debug.assert;

const batch = @import("batch.zig");
const beam = @import("beam.zig");
const scheduler = beam.scheduler;

const tb = @import("vsr").tigerbeetle;
const AccountFilter = tb.AccountFilter;
pub const AccountFilterBatch = batch.Batch(AccountFilter);
pub const AccountFilterBatchResource = batch.BatchResource(AccountFilter);

pub fn create(env: beam.Env, capacity: u32) beam.Term {
return batch.create(AccountFilter, env, capacity) catch |err| switch (err) {
error.OutOfMemory => return beam.make_error_atom(env, "out_of_memory"),
};
}

pub fn append(
env: beam.Env,
transfer_batch_resource: AccountFilterBatchResource,
transfer_bytes: []const u8,
) !beam.Term {
if (transfer_bytes.len != @sizeOf(AccountFilter)) return beam.raise_badarg(env);

return batch.append(
AccountFilter,
env,
transfer_batch_resource,
transfer_bytes,
) catch |err| switch (err) {
error.BatchFull => beam.make_error_atom(env, "batch_full"),
error.LockFailed => return error.Yield,
};
}
6 changes: 5 additions & 1 deletion src/client.zig
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ const Resource = resource.Resource;

const tb = @import("vsr").tigerbeetle;
const tb_client = @import("vsr").tb_client;
const AccountFilter = tb.AccountFilter;
const Account = tb.Account;
const Transfer = tb.Transfer;

Expand Down Expand Up @@ -61,7 +62,8 @@ fn OperationBatchItemType(comptime operation: tb_client.tb_operation_t) type {
.create_accounts => Account,
.create_transfers => Transfer,
.lookup_accounts, .lookup_transfers => u128,
.get_account_transfers, .get_account_balances, .query_accounts, .query_transfers => @panic("TODO"),
.get_account_transfers, .get_account_balances => AccountFilter,
.query_accounts, .query_transfers => @panic("TODO"),
.pulse => unreachable,
};
}
Expand All @@ -77,6 +79,8 @@ pub const create_accounts = get_submit_fn(.create_accounts);
pub const create_transfers = get_submit_fn(.create_transfers);
pub const lookup_accounts = get_submit_fn(.lookup_accounts);
pub const lookup_transfers = get_submit_fn(.lookup_transfers);
pub const get_account_transfers = get_submit_fn(.get_account_transfers);
pub const get_account_balances = get_submit_fn(.get_account_balances);

fn get_submit_fn(comptime operation: tb_client.tb_operation_t) (fn (
env: beam.Env,
Expand Down
8 changes: 8 additions & 0 deletions src/tigerbeetlex.zig
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,18 @@ const account_batch = @import("account_batch.zig");
const client = @import("client.zig");
const id_batch = @import("id_batch.zig");
const transfer_batch = @import("transfer_batch.zig");
const account_filter_batch = @import("account_filter_batch.zig");
const AccountBatch = account_batch.AccountBatch;
const IdBatch = id_batch.IdBatch;
const TransferBatch = transfer_batch.TransferBatch;
const AccountFilterBatch = account_filter_batch.TransferBatch;
const Client = client.Client;

const ClientResource = client.ClientResource;
const AccountBatchResource = account_batch.AccountBatchResource;
const IdBatchResource = id_batch.IdBatchResource;
const TransferBatchResource = transfer_batch.TransferBatchResource;
const AccountFilterBatchResource = account_filter_batch.AccountFilterBatchResource;

// Needed to configure VSR
pub const vsr_options = @import("config").vsr_options;
Expand All @@ -31,6 +34,10 @@ var exported_nifs = [_]nif.FunctionEntry{
nif.wrap("create_transfers", client.create_transfers),
nif.wrap("lookup_accounts", client.lookup_accounts),
nif.wrap("lookup_transfers", client.lookup_transfers),
nif.wrap("get_account_balances", client.get_account_balances),
nif.wrap("get_account_transfers", client.get_account_transfers),
nif.wrap("create_account_filter_batch", account_filter_batch.create),
nif.wrap("append_account_filter", account_filter_batch.append),
nif.wrap("create_account_batch", account_batch.create),
nif.wrap("append_account", account_batch.append),
nif.wrap("fetch_account", account_batch.fetch),
Expand All @@ -50,6 +57,7 @@ fn nif_load(env: beam.Env, _: [*c]?*anyopaque, _: beam.Term) callconv(.C) c_int
AccountBatchResource.create_type(env, "TigerBeetlex.AccountBatch");
IdBatchResource.create_type(env, "TigerBeetlex.IdBatch");
TransferBatchResource.create_type(env, "TigerBeetlex.TransferBatch");
AccountFilterBatchResource.create_type(env, "TigerBeetlex.AccountFilterBatch");
return 0;
}

Expand Down
Loading

0 comments on commit d6ad8e1

Please sign in to comment.