Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

run asan in CI and stabilize write barrier test #120

Merged
merged 10 commits into from
Oct 20, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,13 @@ jobs:
profile: minimal
toolchain: nightly-2021-07-28
override: true
components: rustfmt, clippy
components: rustfmt, clippy, rust-src
- name: Format
run: cargo fmt --all -- --check
- name: Clippy
run: cargo clippy --all
- name: Build
run: cargo build --verbose
- name: Run tests
run: RUST_BACKTRACE=1 cargo test --features failpoints --all --verbose
run: RUST_BACKTRACE=1 cargo test --features failpoints --all --verbose -- --nocapture
- name: Run asan tests
if: ${{ matrix.os == 'ubuntu-latest' }}
run: RUST_BACKTRACE=1 RUSTFLAGS=-Zsanitizer=address RUSTDOCFLAGS=-Zsanitizer=address cargo test -Zbuild-std --target x86_64-unknown-linux-gnu --features failpoints --verbose -- --nocapture
1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ path = "examples/append_compact_purge.rs"
[dependencies]
byteorder = "1.2"
crc32fast = "1.2"
crossbeam = "0.8"
fail = "0.4"
fxhash = "0.2"
hex = "0.4"
Expand Down
35 changes: 12 additions & 23 deletions src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1308,27 +1308,20 @@ mod tests {
);
}

struct ConcurrentWritePlaygroud {
struct ConcurrentWriteContext {
engine: Arc<RaftLogEngine>,
ths: Vec<std::thread::JoinHandle<()>>,
// thread index of leader that's currently writing
leader: Option<usize>,
// thread index of leader of next write group
next_leader: Option<usize>,
}

impl ConcurrentWritePlaygroud {
impl ConcurrentWriteContext {
fn new(engine: Arc<RaftLogEngine>) -> Self {
Self {
engine,
ths: Vec::new(),
leader: None,
next_leader: None,
}
}
fn leader_write(&mut self, mut log_batch: LogBatch) {
assert!(self.next_leader.is_none());
if self.leader.is_none() {
if self.ths.is_empty() {
fail::cfg("write_barrier::leader_exit", "pause").unwrap();
let engine_clone = self.engine.clone();
self.ths.push(
Expand All @@ -1338,7 +1331,6 @@ mod tests {
})
.unwrap(),
);
self.leader = Some(self.ths.len() - 1);
}
let engine_clone = self.engine.clone();
self.ths.push(
Expand All @@ -1348,10 +1340,9 @@ mod tests {
})
.unwrap(),
);
self.next_leader = Some(self.ths.len() - 1);
}
fn follower_write(&mut self, mut log_batch: LogBatch) {
assert!(self.next_leader.is_some());
assert!(self.ths.len() == 2);
let engine_clone = self.engine.clone();
self.ths.push(
ThreadBuilder::new()
Expand All @@ -1366,8 +1357,6 @@ mod tests {
for t in self.ths.drain(..) {
t.join().unwrap();
}
self.leader = None;
self.next_leader = None;
}
}

Expand All @@ -1380,7 +1369,7 @@ mod tests {
let mut cfg = Config::default();
cfg.dir = dir.path().to_str().unwrap().to_owned();
let engine = Arc::new(RaftLogEngine::open(cfg.clone()).unwrap());
let mut playground = ConcurrentWritePlaygroud::new(engine.clone());
let mut ctx = ConcurrentWriteContext::new(engine.clone());

let some_entries = vec![
Entry::new(),
Expand All @@ -1390,18 +1379,18 @@ mod tests {
},
];

playground.leader_write(LogBatch::default());
ctx.leader_write(LogBatch::default());
let mut log_batch = LogBatch::default();
log_batch.add_entries::<Entry>(1, &some_entries).unwrap();
playground.follower_write(log_batch);
playground.join();
ctx.follower_write(log_batch);
ctx.join();

let mut log_batch = LogBatch::default();
log_batch.add_entries::<Entry>(2, &some_entries).unwrap();
playground.leader_write(log_batch);
playground.follower_write(LogBatch::default());
playground.join();
drop(playground);
ctx.leader_write(log_batch);
ctx.follower_write(LogBatch::default());
ctx.join();
drop(ctx);
drop(engine);

let engine = RaftLogEngine::open(cfg).unwrap();
Expand Down
31 changes: 0 additions & 31 deletions src/file_system.rs

This file was deleted.

1 change: 0 additions & 1 deletion src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ use crc32fast::Hasher;
use serde::de::{self, Unexpected, Visitor};
use serde::{Deserialize, Deserializer, Serialize, Serializer};

pub use crossbeam::channel::SendError as ScheduleError;
pub type HashMap<K, V> = StdHashMap<K, V, BuildHasherDefault<fxhash::FxHasher>>;

const UNIT: u64 = 1;
Expand Down
185 changes: 94 additions & 91 deletions src/write_barrier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,10 +198,9 @@ impl<P, O> WriteBarrier<P, O> {
#[cfg(test)]
mod tests {
use super::*;
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::mpsc;
use std::sync::Arc;
use std::thread::Builder as ThreadBuilder;
use std::thread::{self, Builder as ThreadBuilder};
use std::time::Duration;

#[test]
Expand Down Expand Up @@ -229,106 +228,110 @@ mod tests {
assert_eq!(leaders, 4);
}

struct WriterStats {
leader: AtomicU32,
exited: AtomicU32,
struct ConcurrentWriteContext {
barrier: Arc<WriteBarrier<u32, u32>>,

writer_seq: u32,
ths: Vec<thread::JoinHandle<()>>,
tx: mpsc::SyncSender<()>,
rx: mpsc::Receiver<()>,
}

impl WriterStats {
impl ConcurrentWriteContext {
fn new() -> Self {
WriterStats {
leader: AtomicU32::new(0),
exited: AtomicU32::new(0),
let (tx, rx) = mpsc::sync_channel(0);
Self {
barrier: Default::default(),
writer_seq: 0,
ths: Vec::new(),
tx,
rx,
}
}
}

#[test]
fn test_parallel_groups() {
let barrier: WriteBarrier<u32, u32> = Default::default();
let barrier = Arc::new(barrier);
let stats = Arc::new(WriterStats::new());

let wid = 0;
let (tx, rx) = mpsc::sync_channel(0);

let mut writer = Writer::new(&wid, false);
let mut wg = barrier.enter(&mut writer).unwrap();
for w in wg.iter_mut() {
w.set_output(7);
}

let mut ths = vec![];
for i in 0..2 {
let (barrier_clone, stats_clone, tx_clone) =
(barrier.clone(), stats.clone(), tx.clone());
ths.push(
ThreadBuilder::new()
.spawn(move || {
let wid = i + 1;
let mut writer = Writer::new(&wid, false);
if let Some(mut wg) = barrier_clone.enter(&mut writer) {
stats_clone.leader.fetch_add(1, Ordering::Relaxed);
tx_clone.send(()).unwrap();
let mut total_writers = 0;
for w in wg.iter_mut() {
w.set_output(7);
total_writers += 1;
// 1) create `n` writers and form a new write group
// 2) current active write group finishes writing and exits
// 3) the new write group enters writing phrase
fn step(&mut self, n: usize) {
let (ready_tx, ready_rx) = mpsc::channel();
if self.ths.is_empty() {
// ensure there is one active write group.
self.writer_seq += 1;
let barrier_clone = self.barrier.clone();
let tx_clone = self.tx.clone();
let ready_tx_clone = ready_tx.clone();
let seq = self.writer_seq;
self.ths.push(
ThreadBuilder::new()
.spawn(move || {
let mut writer = Writer::new(&seq, false);
ready_tx_clone.send(()).unwrap();
if let Some(mut wg) = barrier_clone.enter(&mut writer) {
let mut idx = 0;
for w in wg.iter_mut() {
w.set_output(*w.get_payload());
idx += 1;
}
assert_eq!(idx, 1);
tx_clone.send(()).unwrap();
}
assert_eq!(total_writers, 2);
}
writer.finish();
stats_clone.exited.fetch_add(1, Ordering::Relaxed);
})
.unwrap(),
);
}

std::thread::sleep(Duration::from_millis(5));
assert_eq!(stats.leader.load(Ordering::Relaxed), 0);
assert_eq!(stats.exited.load(Ordering::Relaxed), 0);
std::mem::drop(wg);
writer.finish();

std::thread::sleep(Duration::from_millis(5));
assert_eq!(stats.leader.load(Ordering::Relaxed), 1);
assert_eq!(stats.exited.load(Ordering::Relaxed), 0);

for i in 0..2 {
let (barrier_clone, stats_clone, tx_clone) =
(barrier.clone(), stats.clone(), tx.clone());
ths.push(
ThreadBuilder::new()
.spawn(move || {
let wid = i + 3;
let mut writer = Writer::new(&wid, false);
if let Some(mut wg) = barrier_clone.enter(&mut writer) {
stats_clone.leader.fetch_add(1, Ordering::Relaxed);
tx_clone.send(()).unwrap();
let mut total_writers = 0;
for w in wg.iter_mut() {
w.set_output(7);
total_writers += 1;
assert_eq!(writer.finish(), seq);
})
.unwrap(),
);
ready_rx.recv().unwrap();
}
let prev_writers = self.ths.len();
for _ in 0..n {
self.writer_seq += 1;
let barrier_clone = self.barrier.clone();
let tx_clone = self.tx.clone();
let ready_tx_clone = ready_tx.clone();
let seq = self.writer_seq;
self.ths.push(
ThreadBuilder::new()
.spawn(move || {
let mut writer = Writer::new(&seq, false);
ready_tx_clone.send(()).unwrap();
if let Some(mut wg) = barrier_clone.enter(&mut writer) {
let mut idx = 0;
for w in wg.iter_mut() {
w.set_output(*w.get_payload());
idx += 1;
}
assert_eq!(idx, n as u32);
tx_clone.send(()).unwrap();
}
assert_eq!(total_writers, 1);
}
writer.finish();
stats_clone.exited.fetch_add(1, Ordering::Relaxed);
})
.unwrap(),
);
std::thread::sleep(Duration::from_millis(5));
assert_eq!(stats.leader.load(Ordering::Relaxed), 1 + i);
rx.recv().unwrap();
assert_eq!(writer.finish(), seq);
})
.unwrap(),
);
}
for _ in 0..n {
ready_rx.recv().unwrap();
}
std::thread::sleep(Duration::from_millis(5));
assert_eq!(stats.leader.load(Ordering::Relaxed), 2 + i);
// unblock current leader
self.rx.recv().unwrap();
for th in self.ths.drain(0..prev_writers) {
th.join().unwrap();
}
}

fn join(&mut self) {
self.rx.recv().unwrap();
for th in self.ths.drain(..) {
th.join().unwrap();
}
}
}

rx.recv().unwrap();
for th in ths.drain(..) {
th.join().unwrap();
#[test]
fn test_parallel_groups() {
let mut ctx = ConcurrentWriteContext::new();
for i in 1..5 {
ctx.step(i);
}
assert_eq!(stats.leader.load(Ordering::Relaxed), 1 + 2);
assert_eq!(stats.exited.load(Ordering::Relaxed), 2 + 2);
ctx.join();
}
}