Skip to content

Commit

Permalink
stablize write barrier test
Browse files Browse the repository at this point in the history
Signed-off-by: tabokie <xy.tao@outlook.com>
  • Loading branch information
tabokie committed Oct 19, 2021
1 parent a3470d4 commit 4def866
Show file tree
Hide file tree
Showing 3 changed files with 103 additions and 145 deletions.
35 changes: 12 additions & 23 deletions src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1308,27 +1308,20 @@ mod tests {
);
}

struct ConcurrentWritePlaygroud {
struct ConcurrentWriteContext {
engine: Arc<RaftLogEngine>,
ths: Vec<std::thread::JoinHandle<()>>,
// thread index of leader that's currently writing
leader: Option<usize>,
// thread index of leader of next write group
next_leader: Option<usize>,
}

impl ConcurrentWritePlaygroud {
impl ConcurrentWriteContext {
fn new(engine: Arc<RaftLogEngine>) -> Self {
Self {
engine,
ths: Vec::new(),
leader: None,
next_leader: None,
}
}
fn leader_write(&mut self, mut log_batch: LogBatch) {
assert!(self.next_leader.is_none());
if self.leader.is_none() {
if self.ths.is_empty() {
fail::cfg("write_barrier::leader_exit", "pause").unwrap();
let engine_clone = self.engine.clone();
self.ths.push(
Expand All @@ -1338,7 +1331,6 @@ mod tests {
})
.unwrap(),
);
self.leader = Some(self.ths.len() - 1);
}
let engine_clone = self.engine.clone();
self.ths.push(
Expand All @@ -1348,10 +1340,9 @@ mod tests {
})
.unwrap(),
);
self.next_leader = Some(self.ths.len() - 1);
}
fn follower_write(&mut self, mut log_batch: LogBatch) {
assert!(self.next_leader.is_some());
assert!(self.ths.len() == 2);
let engine_clone = self.engine.clone();
self.ths.push(
ThreadBuilder::new()
Expand All @@ -1366,8 +1357,6 @@ mod tests {
for t in self.ths.drain(..) {
t.join().unwrap();
}
self.leader = None;
self.next_leader = None;
}
}

Expand All @@ -1380,7 +1369,7 @@ mod tests {
let mut cfg = Config::default();
cfg.dir = dir.path().to_str().unwrap().to_owned();
let engine = Arc::new(RaftLogEngine::open(cfg.clone()).unwrap());
let mut playground = ConcurrentWritePlaygroud::new(engine.clone());
let mut ctx = ConcurrentWriteContext::new(engine.clone());

let some_entries = vec![
Entry::new(),
Expand All @@ -1390,18 +1379,18 @@ mod tests {
},
];

playground.leader_write(LogBatch::default());
ctx.leader_write(LogBatch::default());
let mut log_batch = LogBatch::default();
log_batch.add_entries::<Entry>(1, &some_entries).unwrap();
playground.follower_write(log_batch);
playground.join();
ctx.follower_write(log_batch);
ctx.join();

let mut log_batch = LogBatch::default();
log_batch.add_entries::<Entry>(2, &some_entries).unwrap();
playground.leader_write(log_batch);
playground.follower_write(LogBatch::default());
playground.join();
drop(playground);
ctx.leader_write(log_batch);
ctx.follower_write(LogBatch::default());
ctx.join();
drop(ctx);
drop(engine);

let engine = RaftLogEngine::open(cfg).unwrap();
Expand Down
31 changes: 0 additions & 31 deletions src/file_system.rs

This file was deleted.

182 changes: 91 additions & 91 deletions src/write_barrier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,10 +198,9 @@ impl<P, O> WriteBarrier<P, O> {
#[cfg(test)]
mod tests {
use super::*;
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::mpsc;
use std::sync::Arc;
use std::thread::Builder as ThreadBuilder;
use std::thread::{self, Builder as ThreadBuilder};
use std::time::Duration;

#[test]
Expand Down Expand Up @@ -229,106 +228,107 @@ mod tests {
assert_eq!(leaders, 4);
}

struct WriterStats {
leader: AtomicU32,
exited: AtomicU32,
struct ConcurrentWriteContext {
barrier: Arc<WriteBarrier<u32, u32>>,

writer_seq: u32,
ths: Vec<thread::JoinHandle<()>>,
tx: mpsc::SyncSender<()>,
rx: mpsc::Receiver<()>,
}

impl WriterStats {
impl ConcurrentWriteContext {
fn new() -> Self {
WriterStats {
leader: AtomicU32::new(0),
exited: AtomicU32::new(0),
let (tx, rx) = mpsc::sync_channel(0);
Self {
barrier: Default::default(),
writer_seq: 0,
ths: Vec::new(),
tx,
rx,
}
}
}

#[test]
fn test_parallel_groups() {
let barrier: WriteBarrier<u32, u32> = Default::default();
let barrier = Arc::new(barrier);
let stats = Arc::new(WriterStats::new());

let wid = 0;
let (tx, rx) = mpsc::sync_channel(0);

let mut writer = Writer::new(&wid, false);
let mut wg = barrier.enter(&mut writer).unwrap();
for w in wg.iter_mut() {
w.set_output(7);
}

let mut ths = vec![];
for i in 0..2 {
let (barrier_clone, stats_clone, tx_clone) =
(barrier.clone(), stats.clone(), tx.clone());
ths.push(
ThreadBuilder::new()
.spawn(move || {
let wid = i + 1;
let mut writer = Writer::new(&wid, false);
if let Some(mut wg) = barrier_clone.enter(&mut writer) {
stats_clone.leader.fetch_add(1, Ordering::Relaxed);
tx_clone.send(()).unwrap();
let mut total_writers = 0;
for w in wg.iter_mut() {
w.set_output(7);
total_writers += 1;
// 1) create `n` writers and form a new write group
// 2) current active write group finishes writing and exits
// 3) the new write group enters writing phrase
fn step(&mut self, n: usize) {
if self.ths.is_empty() {
// ensure there is one active write group.
self.writer_seq += 1;
let barrier_clone = self.barrier.clone();
let tx_clone = self.tx.clone();
let seq = self.writer_seq;
self.ths.push(
ThreadBuilder::new()
.spawn(move || {
let mut writer = Writer::new(&seq, false);
if let Some(mut wg) = barrier_clone.enter(&mut writer) {
let mut idx = 0;
for w in wg.iter_mut() {
w.set_output(seq + idx);
idx += 1;
}
assert_eq!(idx, 1);
tx_clone.send(()).unwrap();
}
assert_eq!(total_writers, 2);
}
writer.finish();
stats_clone.exited.fetch_add(1, Ordering::Relaxed);
})
.unwrap(),
);
}

std::thread::sleep(Duration::from_millis(5));
assert_eq!(stats.leader.load(Ordering::Relaxed), 0);
assert_eq!(stats.exited.load(Ordering::Relaxed), 0);
std::mem::drop(wg);
writer.finish();

std::thread::sleep(Duration::from_millis(5));
assert_eq!(stats.leader.load(Ordering::Relaxed), 1);
assert_eq!(stats.exited.load(Ordering::Relaxed), 0);

for i in 0..2 {
let (barrier_clone, stats_clone, tx_clone) =
(barrier.clone(), stats.clone(), tx.clone());
ths.push(
ThreadBuilder::new()
.spawn(move || {
let wid = i + 3;
let mut writer = Writer::new(&wid, false);
if let Some(mut wg) = barrier_clone.enter(&mut writer) {
stats_clone.leader.fetch_add(1, Ordering::Relaxed);
tx_clone.send(()).unwrap();
let mut total_writers = 0;
for w in wg.iter_mut() {
w.set_output(7);
total_writers += 1;
assert_eq!(writer.finish(), seq);
})
.unwrap(),
);
}
let prev_writers = self.ths.len();
let (ready_tx, ready_rx) = mpsc::channel();
for _ in 0..n {
self.writer_seq += 1;
let barrier_clone = self.barrier.clone();
let tx_clone = self.tx.clone();
let ready_tx_clone = ready_tx.clone();
let seq = self.writer_seq;
self.ths.push(
ThreadBuilder::new()
.spawn(move || {
let mut writer = Writer::new(&seq, false);
ready_tx_clone.send(()).unwrap();
if let Some(mut wg) = barrier_clone.enter(&mut writer) {
let mut idx = 0;
for w in wg.iter_mut() {
w.set_output(idx);
idx += 1;
}
assert_eq!(idx, n as u32);
tx_clone.send(()).unwrap();
}
assert_eq!(total_writers, 1);
}
writer.finish();
stats_clone.exited.fetch_add(1, Ordering::Relaxed);
})
.unwrap(),
);
std::thread::sleep(Duration::from_millis(5));
assert_eq!(stats.leader.load(Ordering::Relaxed), 1 + i);
rx.recv().unwrap();
writer.finish();
})
.unwrap(),
);
}
for _ in 0..n {
ready_rx.recv().unwrap();
}
std::thread::sleep(Duration::from_millis(5));
assert_eq!(stats.leader.load(Ordering::Relaxed), 2 + i);
// unblock current leader
self.rx.recv().unwrap();
for th in self.ths.drain(0..prev_writers) {
th.join().unwrap();
}
}

fn join(&mut self) {
self.rx.recv().unwrap();
for th in self.ths.drain(..) {
th.join().unwrap();
}
}
}

rx.recv().unwrap();
for th in ths.drain(..) {
th.join().unwrap();
#[test]
fn test_parallel_groups() {
let mut ctx = ConcurrentWriteContext::new();
for i in 1..5 {
ctx.step(i);
}
assert_eq!(stats.leader.load(Ordering::Relaxed), 1 + 2);
assert_eq!(stats.exited.load(Ordering::Relaxed), 2 + 2);
ctx.join();
}
}

0 comments on commit 4def866

Please sign in to comment.