Skip to content

Commit 642b72f

Browse files
committed
WIP: WAL sparse writes
1 parent 9ca16bb commit 642b72f

File tree

5 files changed

+86
-23
lines changed

5 files changed

+86
-23
lines changed

docs/internals/COMPACTION.md

+4-2
Original file line numberDiff line numberDiff line change
@@ -243,7 +243,7 @@ the live index replication. NB the snapshot sender process may need to call
243243
into the leader process to get read plans as entries _could_ be in the memtable.
244244

245245
#### How to work out which live indexes the follower needs
246-
246+
WA
247247
Gnarly example:
248248

249249
Follower term indexes:
@@ -287,4 +287,6 @@ for decision making.
287287

288288
WAL needs to accept sparse writes without a higher snapshot idx (snap install)
289289
WAL needs to accept contiguous writes with a higher snap idx with and without live indexes
290-
290+
WAL will send ra_seq of entries written in a WAL
291+
SegWriter needs to flush the live indexes preceeding the snapshot index which
292+
_should_ be covered in the sparse sequence of written indexes.

src/ra_log.erl

+3-3
Original file line numberDiff line numberDiff line change
@@ -336,7 +336,7 @@ commit_tx(#?MODULE{cfg = #cfg{uid = UId,
336336
{WalCommands, Num} =
337337
lists:foldl(fun ({Idx, Term, Cmd0}, {WC, N}) ->
338338
Cmd = {ttb, term_to_iovec(Cmd0)},
339-
WalC = {append, WriterId, Tid, Idx, Term, Cmd},
339+
WalC = {append, WriterId, Tid, Idx-1, Idx, Term, Cmd},
340340
{[WalC | WC], N+1}
341341
end, {[], 0}, Entries),
342342

@@ -1176,11 +1176,11 @@ wal_write_batch(#?MODULE{cfg = #cfg{uid = UId,
11761176
{WalCommands, Num} =
11771177
lists:foldl(fun ({Idx, Term, Cmd0}, {WC, N}) ->
11781178
Cmd = {ttb, term_to_iovec(Cmd0)},
1179-
WalC = {append, WriterId, Tid, Idx, Term, Cmd},
1179+
WalC = {append, WriterId, Tid, Idx-1, Idx, Term, Cmd},
11801180
{[WalC | WC], N+1}
11811181
end, {[], 0}, Entries),
11821182

1183-
[{_, _, _, LastIdx, LastTerm, _} | _] = WalCommands,
1183+
[{_, _, _, _PrevIdx, LastIdx, LastTerm, _} | _] = WalCommands,
11841184
{_, Mt} = ra_mt:commit(Mt0),
11851185
put_counter(Cfg, ?C_RA_SVR_METRIC_LAST_INDEX, LastIdx),
11861186
ok = incr_counter(Cfg, ?C_RA_LOG_WRITE_OPS, Num),

src/ra_log_segment_writer.erl

+9-9
Original file line numberDiff line numberDiff line change
@@ -66,14 +66,13 @@ start_link(#{name := Name} = Config) ->
6666

6767
-spec accept_mem_tables(atom() | pid(),
6868
#{ra_uid() => [{ets:tid(), ra:range()}]},
69-
string()) ->
70-
ok.
69+
string()) -> ok.
7170
accept_mem_tables(_SegmentWriter, Tables, undefined)
7271
when map_size(Tables) == 0 ->
7372
ok;
74-
accept_mem_tables(SegmentWriter, Tables, WalFile)
75-
when is_map(Tables) ->
76-
gen_server:cast(SegmentWriter, {mem_tables, Tables, WalFile}).
73+
accept_mem_tables(SegmentWriter, UIdTidRanges, WalFile)
74+
when is_map(UIdTidRanges) ->
75+
gen_server:cast(SegmentWriter, {mem_tables, UIdTidRanges, WalFile}).
7776

7877
-spec truncate_segments(atom() | pid(), ra_uid(), ra_log:segment_ref()) -> ok.
7978
truncate_segments(SegWriter, Who, SegRef) ->
@@ -135,9 +134,10 @@ segments_for(UId, #state{data_dir = DataDir}) ->
135134
Dir = filename:join(DataDir, ra_lib:to_list(UId)),
136135
segment_files(Dir).
137136

138-
handle_cast({mem_tables, Ranges, WalFile}, #state{data_dir = Dir,
139-
system = System} = State) ->
140-
ok = counters:add(State#state.counter, ?C_MEM_TABLES, map_size(Ranges)),
137+
handle_cast({mem_tables, UIdTidRanges, WalFile},
138+
#state{data_dir = Dir,
139+
system = System} = State) ->
140+
ok = counters:add(State#state.counter, ?C_MEM_TABLES, map_size(UIdTidRanges)),
141141
#{names := Names} = ra_system:fetch(System),
142142
Degree = erlang:system_info(schedulers),
143143
%% TODO: refactor to make better use of time where each uid has an
@@ -156,7 +156,7 @@ handle_cast({mem_tables, Ranges, WalFile}, #state{data_dir = Dir,
156156
ok = ra_log_ets:delete_mem_tables(Names, UId),
157157
Acc
158158
end
159-
end, [], Ranges),
159+
end, [], UIdTidRanges),
160160

161161
T1 = erlang:monotonic_time(),
162162
_ = [begin

src/ra_log_wal.erl

+29-8
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
-export([
1919
write/6,
20+
write/7,
2021
write_batch/2,
2122
last_writer_seq/2,
2223
force_roll_over/1]).
@@ -140,7 +141,8 @@
140141
-export_type([wal_conf/0]).
141142

142143
-type wal_command() ::
143-
{append, writer_id(), ra_index(), ra_term(), term()}.
144+
{append, writer_id(), PrevIndex :: ra:index() | -1,
145+
Index :: ra:index(), Term :: ra_term(), wal_cmd()}.
144146

145147
-type wal_op() :: {cast, wal_command()} |
146148
{call, from(), wal_command()}.
@@ -149,10 +151,23 @@
149151
-spec write(atom() | pid(), writer_id(), ets:tid(), ra_index(), ra_term(),
150152
wal_cmd()) ->
151153
{ok, pid()} | {error, wal_down}.
152-
write(Wal, {_, _} = From, MtTid, Idx, Term, Cmd)
154+
write(Wal, From, MtTid, Idx, Term, Cmd) ->
155+
%% "normal opereation where we assume a contigious sequence
156+
%% this may be removed at some point
157+
write(Wal, From, MtTid, Idx-1, Idx, Term, Cmd).
158+
159+
-spec write(atom() | pid(), writer_id(), ets:tid(),
160+
PrevIndex :: ra:index() | -1,
161+
Index :: ra_index(),
162+
Term :: ra_term(),
163+
wal_cmd()) ->
164+
{ok, pid()} | {error, wal_down}.
165+
write(Wal, {_, _} = From, MtTid, PrevIdx, Idx, Term, Cmd)
153166
when is_integer(Idx) andalso
154-
is_integer(Term) ->
155-
named_cast(Wal, {append, From, MtTid, Idx, Term, Cmd}).
167+
is_integer(PrevIdx) andalso
168+
is_integer(Term) andalso
169+
PrevIdx < Idx ->
170+
named_cast(Wal, {append, From, MtTid, PrevIdx, Idx, Term, Cmd}).
156171

157172
-spec write_batch(Wal :: atom() | pid(), [wal_command()]) ->
158173
{ok, pid()} | {error, wal_down}.
@@ -476,7 +491,7 @@ write_data({UId, Pid} = Id, MtTid, Idx, Term, Data0, Trunc, SmallestIndex,
476491
end.
477492

478493

479-
handle_msg({append, {UId, Pid} = Id, MtTid, Idx, Term, Entry},
494+
handle_msg({append, {UId, Pid} = Id, MtTid, PrevIdx0, Idx, Term, Entry},
480495
#state{conf = Conf,
481496
writers = Writers} = State0) ->
482497
SmallestIdx = smallest_live_index(Conf, UId),
@@ -487,13 +502,17 @@ handle_msg({append, {UId, Pid} = Id, MtTid, Idx, Term, Entry},
487502
_ when Idx < SmallestIdx ->
488503
%% the smallest live index for the last snapshot is higher than
489504
%% this index, just drop it
490-
PrevIdx = SmallestIdx - 1,
491-
State0#state{writers = Writers#{UId => {in_seq, PrevIdx}}};
505+
LastIdx = SmallestIdx - 1,
506+
State0#state{writers = Writers#{UId => {in_seq, LastIdx}}};
492507
{ok, {_, PrevIdx}}
493-
when Idx =< PrevIdx + 1 orelse
508+
when PrevIdx0 =< PrevIdx orelse
494509
Trunc ->
510+
%% if the passed in previous index is less than the last written
511+
%% index (gap detection) _or_ it is a truncation
512+
%% then we can proceed and write the entry
495513
write_data(Id, MtTid, Idx, Term, Entry, Trunc, SmallestIdx, State0);
496514
error ->
515+
%% no state for the UId is known so go ahead and write
497516
write_data(Id, MtTid, Idx, Term, Entry, false, SmallestIdx, State0);
498517
{ok, {out_of_seq, _}} ->
499518
% writer is out of seq simply ignore drop the write
@@ -528,6 +547,8 @@ incr_batch(#batch{num_writes = Writes,
528547
%% The Tid and term is the same so add to current batch_writer
529548
Range = ra_range:extend(Idx, ra_range:truncate(SmallestIdx - 1,
530549
Range0)),
550+
%% TODO: range nees to become a ra_seq so that we can
551+
%% capture sparse writes correctly
531552
Waiting0#{Pid => W#batch_writer{range = Range,
532553
smallest_live_idx = SmallestIdx,
533554
term = Term

test/ra_log_wal_SUITE.erl

+41-1
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ all() ->
2424
all_tests() ->
2525
[
2626
basic_log_writes,
27+
sparse_writes,
2728
wal_filename_upgrade,
2829
same_uid_different_process,
2930
consecutive_terms_in_batch_should_result_in_two_written_events,
@@ -61,7 +62,8 @@ groups() ->
6162
[
6263
{default, [], all_tests()},
6364
%% uses fsync instead of the default fdatasync
64-
{fsync, [], all_tests()},
65+
%% just testing that the configuration and dispatch works
66+
{fsync, [], [basic_log_writes]},
6567
{no_sync, [], all_tests()}
6668
].
6769

@@ -148,6 +150,44 @@ basic_log_writes(Config) ->
148150
meck:unload(),
149151
ok.
150152

153+
sparse_writes(Config) ->
154+
meck:new(ra_log_segment_writer, [passthrough]),
155+
meck:expect(ra_log_segment_writer, await, fun(_) -> ok end),
156+
Conf = ?config(wal_conf, Config),
157+
{UId, _} = WriterId = ?config(writer_id, Config),
158+
Tid = ets:new(?FUNCTION_NAME, []),
159+
{ok, Pid} = ra_log_wal:start_link(Conf),
160+
{ok, _} = ra_log_wal:write(Pid, WriterId, Tid, 11, 12, 1, "value"),
161+
ok = await_written(WriterId, 1, {12, 12}),
162+
debugger:start(),
163+
int:i(ra_log_wal),
164+
int:break(ra_log_wal, 975),
165+
timer:sleep(1000),
166+
{ok, _} = ra_log_wal:write(Pid, WriterId, Tid, 12, 15, 1, "value2"),
167+
timer:sleep(200000),
168+
ok = await_written(WriterId, 1, {15, 15}),
169+
ra_log_wal:force_roll_over(Pid),
170+
receive
171+
{'$gen_cast',
172+
{mem_tables, #{UId := [{Tid, {12, 15}}]}, _}} ->
173+
ok
174+
after 5000 ->
175+
flush(),
176+
ct:fail("receiving mem table ranges timed out")
177+
end,
178+
proc_lib:stop(Pid),
179+
meck:unload(),
180+
ok.
181+
182+
sparse_write_same_batch(_Config) ->
183+
ct:fail("~s", [?FUNCTION_NAME]).
184+
185+
sparse_write_recover(_Config) ->
186+
ct:fail("~s", [?FUNCTION_NAME]).
187+
188+
sparse_write_overwrite(_Config) ->
189+
ct:fail("~s", [?FUNCTION_NAME]).
190+
151191
wal_filename_upgrade(Config) ->
152192
meck:new(ra_log_segment_writer, [passthrough]),
153193
meck:expect(ra_log_segment_writer, await, fun(_) -> ok end),

0 commit comments

Comments
 (0)