Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

implement committed entries pagination #440

Merged
merged 9 commits into from
Jun 4, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion harness/tests/integration_cases/test_raft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ fn next_ents(r: &mut Raft<MemStorage>, s: &MemStorage) -> Vec<Entry> {
s.wl().append(&unstable).expect("");
r.on_persist_entries(last_idx, last_term);
}
let ents = r.raft_log.next_entries();
let ents = r.raft_log.next_entries(None);
r.commit_apply(r.raft_log.committed);
ents.unwrap_or_else(Vec::new)
}
Expand Down
6 changes: 3 additions & 3 deletions harness/tests/integration_cases/test_raft_paper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -482,7 +482,7 @@ fn test_leader_commit_entry() {

assert_eq!(r.raft_log.committed, li + 1);
let wents = vec![new_entry(1, li + 1, SOME_DATA)];
assert_eq!(r.raft_log.next_entries(), Some(wents));
assert_eq!(r.raft_log.next_entries(None), Some(wents));
let mut msgs = r.read_messages();
msgs.sort_by_key(|m| format!("{:?}", m));
for (i, m) in msgs.drain(..).enumerate() {
Expand Down Expand Up @@ -572,7 +572,7 @@ fn test_leader_commit_preceding_entries() {
empty_entry(3, li + 1),
new_entry(3, li + 2, SOME_DATA),
]);
let g = r.raft_log.next_entries();
let g = r.raft_log.next_entries(None);
let wg = Some(tt);
if g != wg {
panic!("#{}: ents = {:?}, want {:?}", i, g, wg);
Expand Down Expand Up @@ -629,7 +629,7 @@ fn test_follower_commit_entry() {
);
}
let wents = Some(ents[..commit as usize].to_vec());
let g = r.raft_log.next_entries();
let g = r.raft_log.next_entries(None);
if g != wents {
panic!("#{}: next_ents = {:?}, want {:?}", i, g, wents);
}
Expand Down
137 changes: 137 additions & 0 deletions harness/tests/integration_cases/test_raw_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1581,3 +1581,140 @@ fn test_async_ready_multiple_snapshot() {

raw_node.advance_apply_to(20);
}

#[test]
fn test_committed_entries_pagination() {
let l = default_logger();
let s = new_storage();
let mut raw_node = new_raw_node(1, vec![1, 2, 3], 10, 1, s, &l);

let mut entries = vec![];
for i in 2..10 {
entries.push(new_entry(1, i, None));
}
let mut msg = new_message_with_entries(3, 1, MessageType::MsgAppend, entries.to_vec());
msg.set_term(1);
msg.set_index(1);
msg.set_log_term(1);
msg.set_commit(9);
raw_node.step(msg).unwrap();

// Test unpersisted entries won't be fetched.
// NOTE: maybe it's better to allow fetching unpersisted committed entries.
let rd = raw_node.ready();
assert!(rd.committed_entries().is_empty());
assert!(raw_node.has_ready());

// Persist entries.
assert!(!rd.entries().is_empty());
raw_node.store().wl().append(rd.entries()).unwrap();

// Advance the ready, and we can get committed_entries as expected.
// Test using 0 as `committed_entries_max_size` works as expected.
raw_node.raft.set_max_committed_size_per_ready(0);
let rd = raw_node.advance(rd);
// `MemStorage::entries` uses `util::limit_size` to limit size of committed entries.
// So there will be at least one entry.
assert_eq!(rd.committed_entries().len(), 1);

// Fetch a `Ready` again without size limit for committed entries.
assert!(raw_node.has_ready());
raw_node.raft.set_max_committed_size_per_ready(u64::MAX);
let rd = raw_node.ready();
assert_eq!(rd.committed_entries().len(), 7);

// No more `Ready`s.
assert!(!raw_node.has_ready());
}

/// Test with `commit_since_index`, committed entries can be fetched correctly after restart.
///
/// Case steps:
/// - Node learns that index 10 is committed
/// - `next_entries` returns entries [2..11) in committed_entries (but index 10 already
/// exceeds maxBytes), which isn't noticed internally by Raft
/// - Commit index gets bumped to 10
/// - The node persists the `HardState`, but crashes before applying the entries
/// - Upon restart, the storage returns the same entries, but `slice` takes a
/// different code path and removes the last entry.
/// - Raft does not emit a HardState, but when the app calls advance(), it bumps
/// its internal applied index cursor to 10 (when it should be 9)
/// - The next `Ready` asks the app to apply index 11 (omitting index 10), losing a
/// write.
#[test]
fn test_committed_entries_pagination_after_restart() {
let l = default_logger();
let s = IgnoreSizeHintMemStorage::default();
s.inner
.wl()
.apply_snapshot(new_snapshot(1, 1, vec![1, 2, 3]))
.unwrap();

let (mut entries, mut size) = (vec![], 0);
for i in 2..=10 {
let e = new_entry(1, i, Some("test data"));
size += e.compute_size() as u64;
entries.push(e);
}
s.inner.wl().append(&entries).unwrap();
s.inner.wl().mut_hard_state().commit = 10;

s.inner
.wl()
.append(&[new_entry(1, 11, Some("boom"))])
.unwrap();

let config = new_test_config(1, 10, 1);
let mut raw_node = RawNode::new(&config, s, &l).unwrap();

// `IgnoreSizeHintMemStorage` will ignore `max_committed_size_per_ready` but
// `RaftLog::slice won't.`
raw_node.raft.set_max_committed_size_per_ready(size - 1);

let mut highest_applied = 1;
while highest_applied != 11 {
let mut rd = raw_node.ready();
let committed_entries = rd.take_committed_entries();
let next = committed_entries.first().map(|x| x.index).unwrap();
assert_eq!(highest_applied + 1, next);

highest_applied = committed_entries.last().map(|x| x.index).unwrap();
raw_node.raft.raft_log.commit_to(11);
}
}

#[derive(Default)]
struct IgnoreSizeHintMemStorage {
inner: MemStorage,
}

impl Storage for IgnoreSizeHintMemStorage {
fn initial_state(&self) -> Result<RaftState> {
self.inner.initial_state()
}

fn entries(
&self,
low: u64,
high: u64,
_max_size: impl Into<Option<u64>>,
) -> Result<Vec<Entry>> {
self.inner.entries(low, high, u64::MAX)
}

fn term(&self, idx: u64) -> Result<u64> {
self.inner.term(idx)
}

fn first_index(&self) -> Result<u64> {
self.inner.first_index()
}

fn last_index(&self) -> Result<u64> {
self.inner.last_index()
}

fn snapshot(&self, request_index: u64) -> Result<Snapshot> {
self.inner.snapshot(request_index)
}
}
4 changes: 4 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,9 @@ pub struct Config {
/// Specify maximum of uncommitted entry size.
/// When this limit is reached, all proposals to append new log will be dropped
pub max_uncommitted_size: u64,

/// Max size for committed entries in a `Ready`.
pub max_committed_size_per_ready: u64,
}

impl Default for Config {
Expand All @@ -116,6 +119,7 @@ impl Default for Config {
batch_append: false,
priority: 0,
max_uncommitted_size: NO_LIMIT,
max_committed_size_per_ready: NO_LIMIT,
}
}
}
Expand Down
4 changes: 1 addition & 3 deletions src/log_unstable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,20 +150,18 @@ impl Unstable {
let after = ents[0].index;
if after == self.offset + self.entries.len() as u64 {
// after is the next index in the self.entries, append directly
self.entries.extend_from_slice(ents);
} else if after <= self.offset {
// The log is being truncated to before our current offset
// portion, so set the offset and replace the entries
self.offset = after;
self.entries.clear();
self.entries.extend_from_slice(ents);
} else {
// truncate to after and copy to self.entries then append
let off = self.offset;
self.must_check_outofbounds(off, after);
self.entries.truncate((after - off) as usize);
self.entries.extend_from_slice(ents);
}
self.entries.extend_from_slice(ents);
}

/// Returns a slice of entries between the high and low.
Expand Down
9 changes: 9 additions & 0 deletions src/raft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,9 @@ pub struct RaftCore<T: Storage> {

/// Track uncommitted log entry on this node
uncommitted_state: UncommittedState,

/// Max size per committed entries in a `Read`.
pub(crate) max_committed_size_per_ready: u64,
}

/// A struct that represents the raft consensus itself. Stores details concerning the current
Expand Down Expand Up @@ -361,6 +364,7 @@ impl<T: Storage> Raft<T> {
uncommitted_size: 0,
last_log_tail_index: 0,
},
max_committed_size_per_ready: c.max_committed_size_per_ready,
},
};
confchange::restore(&mut r.prs, r.r.raft_log.last_index(), conf_state)?;
Expand Down Expand Up @@ -586,6 +590,11 @@ impl<T: Storage> Raft<T> {
.term(self.raft_log.applied)
.map_or(false, |t| t == self.term)
}

/// Set `max_committed_size_per_ready` to `size`.
pub fn set_max_committed_size_per_ready(&mut self, size: u64) {
self.max_committed_size_per_ready = size;
}
}

impl<T: Storage> RaftCore<T> {
Expand Down
10 changes: 5 additions & 5 deletions src/raft_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -414,11 +414,11 @@ impl<T: Storage> RaftLog<T> {
}

/// Returns committed and persisted entries since max(`since_idx` + 1, first_index).
pub fn next_entries_since(&self, since_idx: u64) -> Option<Vec<Entry>> {
pub fn next_entries_since(&self, since_idx: u64, max_size: Option<u64>) -> Option<Vec<Entry>> {
let offset = cmp::max(since_idx + 1, self.first_index());
let high = cmp::min(self.committed, self.persisted) + 1;
if high > offset {
match self.slice(offset, high, None) {
match self.slice(offset, high, max_size) {
Ok(vec) => return Some(vec),
Err(e) => fatal!(self.unstable.logger, "{}", e),
}
Expand All @@ -429,8 +429,8 @@ impl<T: Storage> RaftLog<T> {
/// Returns all the available entries for execution.
/// If applied is smaller than the index of snapshot, it returns all committed
/// entries after the index of snapshot.
pub fn next_entries(&self) -> Option<Vec<Entry>> {
self.next_entries_since(self.applied)
pub fn next_entries(&self, max_size: Option<u64>) -> Option<Vec<Entry>> {
self.next_entries_since(self.applied, max_size)
}

/// Returns whether there are committed and persisted entries since
Expand Down Expand Up @@ -1126,7 +1126,7 @@ mod test {
);
}

let next_entries = raft_log.next_entries();
let next_entries = raft_log.next_entries(None);
if next_entries != expect_entries.map(|n| n.to_vec()) {
panic!(
"#{}: next_entries = {:?}, want {:?}",
Expand Down
3 changes: 2 additions & 1 deletion src/raw_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -413,10 +413,11 @@ impl<T: Storage> RawNode<T> {
/// Generates a LightReady that has the committed entries and messages but no commit index.
fn gen_light_ready(&mut self) -> LightReady {
let mut rd = LightReady::default();
let max_size = Some(self.raft.max_committed_size_per_ready);
let raft = &mut self.raft;
rd.committed_entries = raft
.raft_log
.next_entries_since(self.commit_since_index)
.next_entries_since(self.commit_since_index, max_size)
.unwrap_or_default();
// Update raft uncommitted entries size
raft.reduce_uncommitted_size(&rd.committed_entries);
Expand Down
7 changes: 3 additions & 4 deletions src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,10 @@ pub fn limit_size<T: PbMessage + Clone>(entries: &mut Vec<T>, max: Option<u64>)
.take_while(|&e| {
if size == 0 {
size += u64::from(e.compute_size());
true
} else {
size += u64::from(e.compute_size());
size <= max
return true;
}
size += u64::from(e.compute_size());
size <= max
})
.count();

Expand Down