diff --git a/src/engine.rs b/src/engine.rs index 3ccaf540..a82d0775 100644 --- a/src/engine.rs +++ b/src/engine.rs @@ -1308,27 +1308,20 @@ mod tests { ); } - struct ConcurrentWritePlaygroud { + struct ConcurrentWriteContext { engine: Arc, ths: Vec>, - // thread index of leader that's currently writing - leader: Option, - // thread index of leader of next write group - next_leader: Option, } - impl ConcurrentWritePlaygroud { + impl ConcurrentWriteContext { fn new(engine: Arc) -> Self { Self { engine, ths: Vec::new(), - leader: None, - next_leader: None, } } fn leader_write(&mut self, mut log_batch: LogBatch) { - assert!(self.next_leader.is_none()); - if self.leader.is_none() { + if self.ths.is_empty() { fail::cfg("write_barrier::leader_exit", "pause").unwrap(); let engine_clone = self.engine.clone(); self.ths.push( @@ -1338,7 +1331,6 @@ mod tests { }) .unwrap(), ); - self.leader = Some(self.ths.len() - 1); } let engine_clone = self.engine.clone(); self.ths.push( @@ -1348,10 +1340,9 @@ mod tests { }) .unwrap(), ); - self.next_leader = Some(self.ths.len() - 1); } fn follower_write(&mut self, mut log_batch: LogBatch) { - assert!(self.next_leader.is_some()); + assert!(self.ths.len() == 2); let engine_clone = self.engine.clone(); self.ths.push( ThreadBuilder::new() @@ -1366,8 +1357,6 @@ mod tests { for t in self.ths.drain(..) { t.join().unwrap(); } - self.leader = None; - self.next_leader = None; } } @@ -1380,7 +1369,7 @@ mod tests { let mut cfg = Config::default(); cfg.dir = dir.path().to_str().unwrap().to_owned(); let engine = Arc::new(RaftLogEngine::open(cfg.clone()).unwrap()); - let mut playground = ConcurrentWritePlaygroud::new(engine.clone()); + let mut ctx = ConcurrentWriteContext::new(engine.clone()); let some_entries = vec![ Entry::new(), @@ -1390,18 +1379,18 @@ mod tests { }, ]; - playground.leader_write(LogBatch::default()); + ctx.leader_write(LogBatch::default()); let mut log_batch = LogBatch::default(); log_batch.add_entries::(1, &some_entries).unwrap(); - playground.follower_write(log_batch); - playground.join(); + ctx.follower_write(log_batch); + ctx.join(); let mut log_batch = LogBatch::default(); log_batch.add_entries::(2, &some_entries).unwrap(); - playground.leader_write(log_batch); - playground.follower_write(LogBatch::default()); - playground.join(); - drop(playground); + ctx.leader_write(log_batch); + ctx.follower_write(LogBatch::default()); + ctx.join(); + drop(ctx); drop(engine); let engine = RaftLogEngine::open(cfg).unwrap(); diff --git a/src/file_system.rs b/src/file_system.rs deleted file mode 100644 index 3dfca5eb..00000000 --- a/src/file_system.rs +++ /dev/null @@ -1,31 +0,0 @@ -// Copyright (c) 2017-present, PingCAP, Inc. Licensed under Apache-2.0. - -use std::io::{Read, Seek, Write}; -use std::path::Path; - -pub trait Readable: Seek + Read + Send + Sync {} -impl Readable for T {} - -pub trait Writable: Seek + Write + Send + Sync {} -impl Writable for T {} - -/// An overlay abstraction for accessing files. -pub trait FileSystem: Send + Sync { - fn open_file_reader( - &self, - path: &Path, - reader: Box, - ) -> Result, Box>; - - fn open_file_writer( - &self, - path: &Path, - writer: Box, - ) -> Result, Box>; - - fn create_file_writer( - &self, - path: &Path, - writer: Box, - ) -> Result, Box>; -} diff --git a/src/write_barrier.rs b/src/write_barrier.rs index 577b7430..86bdcf48 100644 --- a/src/write_barrier.rs +++ b/src/write_barrier.rs @@ -198,10 +198,9 @@ impl WriteBarrier { #[cfg(test)] mod tests { use super::*; - use std::sync::atomic::{AtomicU32, Ordering}; use std::sync::mpsc; use std::sync::Arc; - use std::thread::Builder as ThreadBuilder; + use std::thread::{self, Builder as ThreadBuilder}; use std::time::Duration; #[test] @@ -229,106 +228,107 @@ mod tests { assert_eq!(leaders, 4); } - struct WriterStats { - leader: AtomicU32, - exited: AtomicU32, + struct ConcurrentWriteContext { + barrier: Arc>, + + writer_seq: u32, + ths: Vec>, + tx: mpsc::SyncSender<()>, + rx: mpsc::Receiver<()>, } - impl WriterStats { + impl ConcurrentWriteContext { fn new() -> Self { - WriterStats { - leader: AtomicU32::new(0), - exited: AtomicU32::new(0), + let (tx, rx) = mpsc::sync_channel(0); + Self { + barrier: Default::default(), + writer_seq: 0, + ths: Vec::new(), + tx, + rx, } } - } - - #[test] - fn test_parallel_groups() { - let barrier: WriteBarrier = Default::default(); - let barrier = Arc::new(barrier); - let stats = Arc::new(WriterStats::new()); - - let wid = 0; - let (tx, rx) = mpsc::sync_channel(0); - let mut writer = Writer::new(&wid, false); - let mut wg = barrier.enter(&mut writer).unwrap(); - for w in wg.iter_mut() { - w.set_output(7); - } - - let mut ths = vec![]; - for i in 0..2 { - let (barrier_clone, stats_clone, tx_clone) = - (barrier.clone(), stats.clone(), tx.clone()); - ths.push( - ThreadBuilder::new() - .spawn(move || { - let wid = i + 1; - let mut writer = Writer::new(&wid, false); - if let Some(mut wg) = barrier_clone.enter(&mut writer) { - stats_clone.leader.fetch_add(1, Ordering::Relaxed); - tx_clone.send(()).unwrap(); - let mut total_writers = 0; - for w in wg.iter_mut() { - w.set_output(7); - total_writers += 1; + // 1) create `n` writers and form a new write group + // 2) current active write group finishes writing and exits + // 3) the new write group enters writing phrase + fn step(&mut self, n: usize) { + if self.ths.is_empty() { + // ensure there is one active write group. + self.writer_seq += 1; + let barrier_clone = self.barrier.clone(); + let tx_clone = self.tx.clone(); + let seq = self.writer_seq; + self.ths.push( + ThreadBuilder::new() + .spawn(move || { + let mut writer = Writer::new(&seq, false); + if let Some(mut wg) = barrier_clone.enter(&mut writer) { + let mut idx = 0; + for w in wg.iter_mut() { + w.set_output(seq + idx); + idx += 1; + } + assert_eq!(idx, 1); + tx_clone.send(()).unwrap(); } - assert_eq!(total_writers, 2); - } - writer.finish(); - stats_clone.exited.fetch_add(1, Ordering::Relaxed); - }) - .unwrap(), - ); - } - - std::thread::sleep(Duration::from_millis(5)); - assert_eq!(stats.leader.load(Ordering::Relaxed), 0); - assert_eq!(stats.exited.load(Ordering::Relaxed), 0); - std::mem::drop(wg); - writer.finish(); - - std::thread::sleep(Duration::from_millis(5)); - assert_eq!(stats.leader.load(Ordering::Relaxed), 1); - assert_eq!(stats.exited.load(Ordering::Relaxed), 0); - - for i in 0..2 { - let (barrier_clone, stats_clone, tx_clone) = - (barrier.clone(), stats.clone(), tx.clone()); - ths.push( - ThreadBuilder::new() - .spawn(move || { - let wid = i + 3; - let mut writer = Writer::new(&wid, false); - if let Some(mut wg) = barrier_clone.enter(&mut writer) { - stats_clone.leader.fetch_add(1, Ordering::Relaxed); - tx_clone.send(()).unwrap(); - let mut total_writers = 0; - for w in wg.iter_mut() { - w.set_output(7); - total_writers += 1; + assert_eq!(writer.finish(), seq); + }) + .unwrap(), + ); + } + let prev_writers = self.ths.len(); + let (ready_tx, ready_rx) = mpsc::channel(); + for _ in 0..n { + self.writer_seq += 1; + let barrier_clone = self.barrier.clone(); + let tx_clone = self.tx.clone(); + let ready_tx_clone = ready_tx.clone(); + let seq = self.writer_seq; + self.ths.push( + ThreadBuilder::new() + .spawn(move || { + let mut writer = Writer::new(&seq, false); + ready_tx_clone.send(()).unwrap(); + if let Some(mut wg) = barrier_clone.enter(&mut writer) { + let mut idx = 0; + for w in wg.iter_mut() { + w.set_output(idx); + idx += 1; + } + assert_eq!(idx, n as u32); + tx_clone.send(()).unwrap(); } - assert_eq!(total_writers, 1); - } - writer.finish(); - stats_clone.exited.fetch_add(1, Ordering::Relaxed); - }) - .unwrap(), - ); - std::thread::sleep(Duration::from_millis(5)); - assert_eq!(stats.leader.load(Ordering::Relaxed), 1 + i); - rx.recv().unwrap(); + writer.finish(); + }) + .unwrap(), + ); + } + for _ in 0..n { + ready_rx.recv().unwrap(); + } std::thread::sleep(Duration::from_millis(5)); - assert_eq!(stats.leader.load(Ordering::Relaxed), 2 + i); + // unblock current leader + self.rx.recv().unwrap(); + for th in self.ths.drain(0..prev_writers) { + th.join().unwrap(); + } + } + + fn join(&mut self) { + self.rx.recv().unwrap(); + for th in self.ths.drain(..) { + th.join().unwrap(); + } } + } - rx.recv().unwrap(); - for th in ths.drain(..) { - th.join().unwrap(); + #[test] + fn test_parallel_groups() { + let mut ctx = ConcurrentWriteContext::new(); + for i in 1..5 { + ctx.step(i); } - assert_eq!(stats.leader.load(Ordering::Relaxed), 1 + 2); - assert_eq!(stats.exited.load(Ordering::Relaxed), 2 + 2); + ctx.join(); } }