Skip to content

Commit

Permalink
Various memory leak fixes in the error path & misc (#572)
Browse files Browse the repository at this point in the history
  • Loading branch information
InKryption authored Feb 25, 2025
1 parent 4fbe993 commit 2500686
Show file tree
Hide file tree
Showing 6 changed files with 99 additions and 63 deletions.
14 changes: 9 additions & 5 deletions src/accountsdb/db.zig
Original file line number Diff line number Diff line change
Expand Up @@ -2636,8 +2636,8 @@ pub const AccountsDB = struct {
};
const archive_file_name_bounded = archive_info.snapshotArchiveName();
const archive_file_name = archive_file_name_bounded.constSlice();
self.logger.info().logf("Generating full snapshot '{s}' (full path: {s}).", .{
archive_file_name, sig.utils.fmt.tryRealPath(self.snapshot_dir, archive_file_name),
self.logger.info().logf("Generating full snapshot '{s}' (full path: {1s}/{0s}).", .{
archive_file_name, sig.utils.fmt.tryRealPath(self.snapshot_dir, "."),
});
break :blk try self.snapshot_dir.createFile(archive_file_name, .{ .read = true });
};
Expand Down Expand Up @@ -2853,9 +2853,13 @@ pub const AccountsDB = struct {
};
const archive_file_name_bounded = archive_info.snapshotArchiveName();
const archive_file_name = archive_file_name_bounded.constSlice();
self.logger.info().logf("Generating incremental snapshot '{s}' (full path: {s}).", .{
archive_file_name, sig.utils.fmt.tryRealPath(self.snapshot_dir, archive_file_name),
});
self.logger.info().logf(
"Generating incremental snapshot '{0s}' (full path: {1s}/{0s}).",
.{
archive_file_name,
sig.utils.fmt.tryRealPath(self.snapshot_dir, "."),
},
);
break :blk try self.snapshot_dir.createFile(archive_file_name, .{ .read = true });
};
defer archive_file.close();
Expand Down
28 changes: 11 additions & 17 deletions src/accountsdb/fuzz.zig
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,9 @@ const zstd = @import("zstd");

const AccountsDB = sig.accounts_db.AccountsDB;
const StandardErrLogger = sig.trace.ChannelPrintLogger;
const Level = sig.trace.Level;
const Account = sig.core.Account;
const Slot = sig.core.time.Slot;
const Pubkey = sig.core.pubkey.Pubkey;
const BankFields = sig.accounts_db.snapshots.BankFields;
const FullSnapshotFileInfo = sig.accounts_db.snapshots.FullSnapshotFileInfo;
const IncrementalSnapshotFileInfo = sig.accounts_db.snapshots.IncrementalSnapshotFileInfo;
const AccountDataHandle = sig.accounts_db.buffer_pool.AccountDataHandle;
Expand All @@ -18,7 +16,7 @@ pub const TrackedAccount = struct {
slot: u64,
data: [32]u8,

pub fn initRandom(random: std.rand.Random, slot: Slot) !TrackedAccount {
pub fn initRandom(random: std.rand.Random, slot: Slot) TrackedAccount {
var data: [32]u8 = undefined;
random.bytes(&data);
return .{
Expand Down Expand Up @@ -63,25 +61,24 @@ pub fn run(seed: u64, args: *std.process.ArgIterator) !void {

var std_logger = try StandardErrLogger.init(.{
.allocator = allocator,
.max_level = Level.debug,
.max_level = .debug,
.max_buffer = 1 << 20,
});
defer std_logger.deinit();

const logger = std_logger.logger();

const use_disk = random.boolean();

var test_data_dir = try std.fs.cwd().makeOpenPath(sig.FUZZ_DATA_DIR, .{});
defer test_data_dir.close();
var fuzz_data_dir = try std.fs.cwd().makeOpenPath(sig.FUZZ_DATA_DIR, .{});
defer fuzz_data_dir.close();

const snapshot_dir_name = "accountsdb";
var snapshot_dir = try test_data_dir.makeOpenPath(snapshot_dir_name, .{});
var snapshot_dir = try fuzz_data_dir.makeOpenPath(snapshot_dir_name, .{});
defer snapshot_dir.close();
defer {
// NOTE: sometimes this can take a long time so we print when we start and finish
std.debug.print("deleting snapshot dir...\n", .{});
test_data_dir.deleteTreeMinStackSize(snapshot_dir_name) catch |err| {
fuzz_data_dir.deleteTreeMinStackSize(snapshot_dir_name) catch |err| {
std.debug.print(
"failed to delete snapshot dir ('{s}'): {}\n",
.{ sig.utils.fmt.tryRealPath(snapshot_dir, "."), err },
Expand Down Expand Up @@ -133,9 +130,6 @@ pub fn run(seed: u64, args: *std.process.ArgIterator) !void {
defer tracked_accounts.deinit();
try tracked_accounts.ensureTotalCapacity(10_000);

var random_bank_fields = try BankFields.initRandom(allocator, random, 1 << 8);
defer random_bank_fields.deinit(allocator);

const zstd_compressor = try zstd.Compressor.init(.{});
defer zstd_compressor.deinit();

Expand Down Expand Up @@ -172,7 +166,7 @@ pub fn run(seed: u64, args: *std.process.ArgIterator) !void {
for (&accounts, &pubkeys, 0..) |*account, *pubkey, i| {
errdefer for (accounts[0..i]) |prev_account| prev_account.deinit(allocator);

var tracked_account = try TrackedAccount.initRandom(random, slot);
var tracked_account = TrackedAccount.initRandom(random, slot);

const existing_pubkey = random.boolean();
if ((existing_pubkey and tracked_accounts.count() > 0) or
Expand Down Expand Up @@ -277,8 +271,8 @@ pub fn run(seed: u64, args: *std.process.ArgIterator) !void {
true,
);
logger.info().logf(
"fuzz[validate]: unpacked full snapshot at slot: {}",
.{full_snapshot_info.slot},
"fuzz[validate]: unpacked full snapshot '{s}'",
.{full_archive_name},
);

break :full full_snapshot_file_info;
Expand Down Expand Up @@ -320,8 +314,8 @@ pub fn run(seed: u64, args: *std.process.ArgIterator) !void {
true,
);
logger.info().logf(
"fuzz[validate]: unpacked inc snapshot at slot: {}",
.{inc_snapshot_info.slot},
"fuzz[validate]: unpacked inc snapshot '{s}'",
.{inc_archive_name},
);

break :inc inc_snapshot_file_info;
Expand Down
43 changes: 31 additions & 12 deletions src/accountsdb/index.zig
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ pub const AccountIndex = struct {
number_of_shards: usize,
) !Self {
const logger = logger_.withScope(LOG_SCOPE);

const reference_allocator: ReferenceAllocator = switch (allocator_config) {
.ram => |ram| blk: {
logger.info().logf("using ram memory for account index", .{});
Expand All @@ -96,12 +97,26 @@ pub const AccountIndex = struct {
.disk => |disk| blk: {
var index_dir = try disk.accountsdb_dir.makeOpenPath("index", .{});
errdefer index_dir.close();

logger.info().logf(
"using disk memory (@{s}) for account index",
.{sig.utils.fmt.tryRealPath(index_dir, ".")},
);

const disk_allocator = try allocator.create(DiskMemoryAllocator);
disk_allocator.* = .{ .dir = index_dir, .logger = logger.withScope(@typeName(DiskMemoryAllocator)) };
logger.info().logf("using disk memory (@{s}) for account index", .{sig.utils.fmt.tryRealPath(index_dir, ".")});
break :blk .{ .disk = .{ .dma = disk_allocator, .ptr_allocator = allocator } };
errdefer allocator.destroy(disk_allocator);
disk_allocator.* = .{
.dir = index_dir,
.logger = logger.withScope(@typeName(DiskMemoryAllocator)),
};

break :blk .{ .disk = .{
.dma = disk_allocator,
.ptr_allocator = allocator,
} };
},
};
errdefer reference_allocator.deinit();

const reference_manager = try sig.utils.allocators.RecycleBuffer(
AccountRef,
Expand All @@ -111,6 +126,7 @@ pub const AccountIndex = struct {
.memory_allocator = reference_allocator.get(),
.records_allocator = allocator,
});
errdefer reference_manager.destroy();

return .{
.allocator = allocator,
Expand All @@ -131,8 +147,7 @@ pub const AccountIndex = struct {
slot_reference_map.deinit();
}

self.reference_manager.deinit();
self.allocator.destroy(self.reference_manager);
self.reference_manager.destroy();
self.reference_allocator.deinit();
}

Expand Down Expand Up @@ -353,7 +368,10 @@ pub const AccountIndex = struct {
reference_memory,
.{},
);
try self.reference_manager.memory.append(references);
try self.reference_manager.memory.append(
self.reference_manager.memory_allocator,
references,
);
self.reference_manager.capacity += references.len;

// update the pointers of the references
Expand All @@ -367,7 +385,7 @@ pub const AccountIndex = struct {
// load the records
self.logger.info().log("loading manager records");
const records = try sig.bincode.readFromSlice(
self.reference_manager.records.allocator,
self.reference_manager.records_allocator,
@TypeOf(self.reference_manager.records),
records_memory,
.{},
Expand Down Expand Up @@ -730,11 +748,12 @@ pub const ReferenceAllocator = union(Tag) {
};
}

pub fn deinit(self: *ReferenceAllocator) void {
switch (self.*) {
.disk => {
self.disk.dma.dir.close();
self.disk.ptr_allocator.destroy(self.disk.dma);
pub fn deinit(self: ReferenceAllocator) void {
switch (self) {
.disk => |disk| {
var dir = disk.dma.dir;
dir.close();
disk.ptr_allocator.destroy(disk.dma);
},
.ram => {},
}
Expand Down
8 changes: 7 additions & 1 deletion src/trace/log.zig
Original file line number Diff line number Diff line change
Expand Up @@ -127,13 +127,17 @@ pub const ChannelPrintLogger = struct {

pub fn init(config: Config) !*Self {
const max_buffer = config.max_buffer orelse return error.MaxBufferNotSet;

const recycle_fba = try config.allocator.create(RecycleFBA(.{}));
errdefer config.allocator.destroy(recycle_fba);
recycle_fba.* = try RecycleFBA(.{}).init(.{
.records_allocator = config.allocator,
.bytes_allocator = config.allocator,
}, max_buffer);
errdefer recycle_fba.deinit();

const self = try config.allocator.create(Self);
errdefer config.allocator.destroy(self);
self.* = .{
.allocator = config.allocator,
.log_allocator = recycle_fba.allocator(),
Expand All @@ -146,11 +150,13 @@ pub const ChannelPrintLogger = struct {
};

self.handle = try std.Thread.spawn(.{}, run, .{self});
errdefer comptime unreachable;

return self;
}

pub fn deinit(self: *Self) void {
if (self.handle) |*handle| {
if (self.handle) |handle| {
std.time.sleep(std.time.ns_per_ms * 5);
self.exit.store(true, .seq_cst);
handle.join();
Expand Down
59 changes: 33 additions & 26 deletions src/utils/allocators.zig
Original file line number Diff line number Diff line change
Expand Up @@ -30,19 +30,21 @@ pub fn RecycleBuffer(comptime T: type, default_init: T, config: struct {
std.debug.assert(config.min_split_size > 0);

return struct {
// records are used to keep track of the memory blocks
records: std.ArrayList(Record),
// memory holds blocks of memory ([]T) that can be allocated/deallocated
memory: std.ArrayList([]T),
// allocator used to alloc the memory blocks
records_allocator: std.mem.Allocator,
/// records are used to keep track of the memory blocks
records: std.ArrayListUnmanaged(Record),
/// allocator used to alloc the memory blocks
memory_allocator: std.mem.Allocator,
// total number of T elements we have in memory
/// memory holds blocks of memory ([]T) that can be allocated/deallocated
memory: std.ArrayListUnmanaged([]T),
/// total number of T elements we have in memory
capacity: u64,
// the maximum contiguous capacity we have in memory
// NOTE: since we support multiple memory slices, this tells us the max single alloc size
/// the maximum contiguous capacity we have in memory
/// NOTE: since we support multiple memory slices, this tells us the max single alloc size
max_continguous_capacity: u64,
// for thread safety
/// for thread safety
mux: std.Thread.Mutex = .{},
const Self = @This();

// NOTE: we use the global_index to support fast loading the state
pub const Record = struct {
Expand All @@ -59,38 +61,43 @@ pub fn RecycleBuffer(comptime T: type, default_init: T, config: struct {
.default_value = &.{},
};
};
const Self = @This();

const AllocatorConfig = struct {
memory_allocator: std.mem.Allocator,
records_allocator: std.mem.Allocator,
memory_allocator: std.mem.Allocator,
};

pub fn init(allocator_config: AllocatorConfig) Self {
return .{
.records = std.ArrayList(Record).init(allocator_config.records_allocator),
.memory = std.ArrayList([]T).init(allocator_config.records_allocator),
.records_allocator = allocator_config.records_allocator,
.records = .{},
.memory_allocator = allocator_config.memory_allocator,
.memory = .{},
.capacity = 0,
.max_continguous_capacity = 0,
};
}

pub fn create(allocator_config: AllocatorConfig) !*Self {
const self = try allocator_config.records_allocator.create(Self);
self.* = Self.init(allocator_config);
return self;
}

pub fn deinit(self: *Self) void {
if (config.thread_safe) self.mux.lock();
defer if (config.thread_safe) self.mux.unlock();

for (self.memory.items) |block| {
self.memory_allocator.free(block);
}
self.memory.deinit();
self.records.deinit();
self.memory.deinit(self.memory_allocator);
self.records.deinit(self.records_allocator);
}

pub fn create(allocator_config: AllocatorConfig) !*Self {
const self = try allocator_config.records_allocator.create(Self);
self.* = Self.init(allocator_config);
return self;
}

pub fn destroy(self: *Self) void {
self.deinit();
self.records_allocator.destroy(self);
}

/// append a block of N elements to the manager
Expand All @@ -104,8 +111,8 @@ pub fn RecycleBuffer(comptime T: type, default_init: T, config: struct {
pub fn expandCapacityUnsafe(self: *Self, n: u64) std.mem.Allocator.Error!void {
if (n == 0) return;

try self.records.ensureUnusedCapacity(1);
try self.memory.ensureUnusedCapacity(1);
try self.records.ensureUnusedCapacity(self.records_allocator, 1);
try self.memory.ensureUnusedCapacity(self.memory_allocator, 1);

const buf = try self.memory_allocator.alloc(T, n);
// NOTE: we do this here so bincode serialization can work correctly
Expand Down Expand Up @@ -273,7 +280,7 @@ pub fn RecycleBuffer(comptime T: type, default_init: T, config: struct {
record.len = used_len;
// add new unused record to the list
// NOTE: errors here are unreachable because if we hit OOM, were left in a bad state
self.records.append(.{
self.records.append(self.records_allocator, .{
.is_free = true,
.buf = split_buf,
.global_index = record.global_index + used_len,
Expand Down Expand Up @@ -993,13 +1000,13 @@ test "recycle buffer: save and load" {
_ = try sig.bincode.writeToSlice(records_memory, allocator.records.items, .{});

// read from slice to records
const records = try sig.bincode.readFromSlice(
var records = try sig.bincode.readFromSlice(
backing_allocator,
@TypeOf(allocator.records),
records_memory,
.{},
);
defer records.deinit();
defer records.deinit(backing_allocator);

for (records.items) |*record| {
record.buf = references[record.global_index..][0..record.len];
Expand Down
10 changes: 8 additions & 2 deletions src/utils/thread.zig
Original file line number Diff line number Diff line change
Expand Up @@ -205,11 +205,17 @@ pub fn HomogeneousThreadPool(comptime TaskType: type) type {
num_threads: u32,
num_tasks: u64,
) !Self {
var tasks = try std.ArrayList(TaskAdapter).initCapacity(allocator, num_tasks);
errdefer tasks.deinit();

var results = try std.ArrayList(TaskResult).initCapacity(allocator, num_tasks);
errdefer results.deinit();

return .{
.allocator = allocator,
.pool = ThreadPool.init(.{ .max_threads = num_threads }),
.tasks = try std.ArrayList(TaskAdapter).initCapacity(allocator, num_tasks),
.results = try std.ArrayList(TaskResult).initCapacity(allocator, num_tasks),
.tasks = tasks,
.results = results,
};
}

Expand Down

0 comments on commit 2500686

Please sign in to comment.