diff --git a/src/indexer/segment_writer.rs b/src/indexer/segment_writer.rs index f97908cffb..82e786b484 100644 --- a/src/indexer/segment_writer.rs +++ b/src/indexer/segment_writer.rs @@ -9,7 +9,7 @@ use crate::fastfield::FastFieldsWriter; use crate::fieldnorm::{FieldNormReaders, FieldNormsWriter}; use crate::indexer::segment_serializer::SegmentSerializer; use crate::postings::{ - compute_table_size, serialize_postings, IndexingContext, IndexingPosition, + compute_table_memory_size, serialize_postings, IndexingContext, IndexingPosition, PerFieldPostingsWriter, PostingsWriter, }; use crate::schema::{FieldEntry, FieldType, Schema, Term, Value}; @@ -26,7 +26,7 @@ fn compute_initial_table_size(per_thread_memory_budget: usize) -> crate::Result< let table_memory_upper_bound = per_thread_memory_budget / 3; (10..20) // We cap it at 2^19 = 512K capacity. .map(|power| 1 << power) - .take_while(|capacity| compute_table_size(*capacity) < table_memory_upper_bound) + .take_while(|capacity| compute_table_memory_size(*capacity) < table_memory_upper_bound) .last() .ok_or_else(|| { crate::TantivyError::InvalidArgument(format!( @@ -455,7 +455,7 @@ mod tests { fn test_hashmap_size() { assert_eq!(compute_initial_table_size(100_000).unwrap(), 1 << 11); assert_eq!(compute_initial_table_size(1_000_000).unwrap(), 1 << 14); - assert_eq!(compute_initial_table_size(10_000_000).unwrap(), 1 << 17); + assert_eq!(compute_initial_table_size(10_000_000).unwrap(), 1 << 18); assert_eq!(compute_initial_table_size(1_000_000_000).unwrap(), 1 << 19); assert_eq!(compute_initial_table_size(4_000_000_000).unwrap(), 1 << 19); } diff --git a/src/postings/mod.rs b/src/postings/mod.rs index aa24b4ca72..e323a29f84 100644 --- a/src/postings/mod.rs +++ b/src/postings/mod.rs @@ -17,7 +17,7 @@ mod serializer; mod skip; mod term_info; -pub(crate) use stacker::compute_table_size; +pub(crate) use stacker::compute_table_memory_size; pub use self::block_segment_postings::BlockSegmentPostings; pub(crate) use self::indexing_context::IndexingContext; diff --git a/stacker/Cargo.toml b/stacker/Cargo.toml index 3c51ccbaac..51b7d23507 100644 --- a/stacker/Cargo.toml +++ b/stacker/Cargo.toml @@ -6,5 +6,19 @@ license = "MIT" [dependencies] murmurhash32 = "0.3" -byteorder = "1" common = { version = "0.5", path = "../common/", package = "tantivy-common" } +criterion = "0.4.0" + +[[bench]] +harness = false +name = "crit_bench" +path = "benches/crit_bench.rs" + +[[example]] +name = "hashmap" +path = "example/hashmap.rs" + +[dev-dependencies] +rand = "0.8.5" +zipf = "7.0.0" + diff --git a/stacker/benches/crit_bench.rs b/stacker/benches/crit_bench.rs new file mode 100644 index 0000000000..1ffe476882 --- /dev/null +++ b/stacker/benches/crit_bench.rs @@ -0,0 +1,70 @@ +#![allow(dead_code)] +extern crate criterion; + +use criterion::*; +use rand::SeedableRng; +use tantivy_stacker::ArenaHashMap; + +const ALICE: &str = include_str!("../../benches/alice.txt"); + +fn bench_hashmap_throughput(c: &mut Criterion) { + let plot_config = PlotConfiguration::default().summary_scale(AxisScale::Linear); + + let mut group = c.benchmark_group("CreateHashMap"); + group.plot_config(plot_config); + + let input_name = "alice"; + let input_bytes = ALICE.len() as u64; + group.throughput(Throughput::Bytes(input_bytes)); + + group.bench_with_input( + BenchmarkId::new(input_name.to_string(), input_bytes), + &ALICE, + |b, i| b.iter(|| create_hash_map(i.split_whitespace().map(|el| el.as_bytes()))), + ); + // numbers + let input_bytes = 1_000_000 * 8 as u64; + group.throughput(Throughput::Bytes(input_bytes)); + + group.bench_with_input( + BenchmarkId::new("numbers".to_string(), input_bytes), + &(0..1_000_000u64), + |b, i| b.iter(|| create_hash_map(i.clone().map(|el| el.to_le_bytes()))), + ); + + // numbers zipf + use rand::distributions::Distribution; + use rand::rngs::StdRng; + let mut rng = StdRng::from_seed([3u8; 32]); + let zipf = zipf::ZipfDistribution::new(10_000, 1.03).unwrap(); + + let input_bytes = 1_000_000 * 8 as u64; + group.throughput(Throughput::Bytes(input_bytes)); + + group.bench_with_input( + BenchmarkId::new("numbers_zipf".to_string(), input_bytes), + &(0..1_000_000u64), + |b, i| b.iter(|| create_hash_map(i.clone().map(|_el| zipf.sample(&mut rng).to_le_bytes()))), + ); + + group.finish(); +} + +fn create_hash_map<'a, T: AsRef<[u8]>>(terms: impl Iterator) -> ArenaHashMap { + let mut map = ArenaHashMap::with_capacity(4); + for term in terms { + map.mutate_or_create(term.as_ref(), |val| { + if let Some(mut val) = val { + val += 1; + val + } else { + 1u64 + } + }); + } + + map +} + +criterion_group!(block_benches, bench_hashmap_throughput,); +criterion_main!(block_benches); diff --git a/stacker/example/hashmap.rs b/stacker/example/hashmap.rs new file mode 100644 index 0000000000..568392cdae --- /dev/null +++ b/stacker/example/hashmap.rs @@ -0,0 +1,27 @@ +use tantivy_stacker::ArenaHashMap; + +const ALICE: &str = include_str!("../../benches/alice.txt"); + +fn main() { + create_hash_map((0..100_000_000).map(|el| el.to_string())); + + for _ in 0..1000 { + create_hash_map(ALICE.split_whitespace()); + } +} + +fn create_hash_map<'a, T: AsRef>(terms: impl Iterator) -> ArenaHashMap { + let mut map = ArenaHashMap::with_capacity(4); + for term in terms { + map.mutate_or_create(term.as_ref().as_bytes(), |val| { + if let Some(mut val) = val { + val += 1; + val + } else { + 1u64 + } + }); + } + + map +} diff --git a/stacker/src/arena_hashmap.rs b/stacker/src/arena_hashmap.rs index 91695d1783..43bf37c849 100644 --- a/stacker/src/arena_hashmap.rs +++ b/stacker/src/arena_hashmap.rs @@ -1,7 +1,5 @@ -use std::{iter, mem, slice}; - -use byteorder::{ByteOrder, NativeEndian}; -use murmurhash32::murmurhash2; +use std::iter::{Cloned, Filter}; +use std::mem; use super::{Addr, MemoryArena}; use crate::memory_arena::store; @@ -10,17 +8,19 @@ use crate::UnorderedId; /// Returns the actual memory size in bytes /// required to create a table with a given capacity. /// required to create a table of size -pub fn compute_table_size(capacity: usize) -> usize { +pub fn compute_table_memory_size(capacity: usize) -> usize { capacity * mem::size_of::() } +type HashType = u32; + /// `KeyValue` is the item stored in the hash table. /// The key is actually a `BytesRef` object stored in an external memory arena. /// The `value_addr` also points to an address in the memory arena. #[derive(Copy, Clone)] struct KeyValue { key_value_addr: Addr, - hash: u32, + hash: HashType, unordered_id: UnorderedId, } @@ -28,16 +28,21 @@ impl Default for KeyValue { fn default() -> Self { KeyValue { key_value_addr: Addr::null_pointer(), - hash: 0u32, + hash: 0, unordered_id: UnorderedId::default(), } } } impl KeyValue { + #[inline] fn is_empty(self) -> bool { self.key_value_addr.is_null() } + #[inline] + fn is_not_empty_ref(&self) -> bool { + !self.key_value_addr.is_null() + } } /// Customized `HashMap` with `&[u8]` keys @@ -50,43 +55,47 @@ impl KeyValue { /// the computation of the hash of the key twice, /// or copying the key as long as there is no insert. pub struct ArenaHashMap { - table: Box<[KeyValue]>, + table: Vec, memory_arena: MemoryArena, mask: usize, - occupied: Vec, len: usize, } -struct QuadraticProbing { - hash: usize, - i: usize, - mask: usize, +struct LinearProbing { + hash: HashType, + i: u32, + mask: u32, } -impl QuadraticProbing { +impl LinearProbing { #[inline] - fn compute(hash: usize, mask: usize) -> QuadraticProbing { - QuadraticProbing { hash, i: 0, mask } + fn compute(hash: HashType, mask: usize) -> LinearProbing { + LinearProbing { + hash, + i: 0, + mask: mask as u32, + } } #[inline] fn next_probe(&mut self) -> usize { self.i += 1; - (self.hash + self.i) & self.mask + ((self.hash + self.i) & self.mask) as usize } } +type IterNonEmpty<'a> = Filter>, fn(&KeyValue) -> bool>; + pub struct Iter<'a> { hashmap: &'a ArenaHashMap, - inner: slice::Iter<'a, usize>, + inner: IterNonEmpty<'a>, } impl<'a> Iterator for Iter<'a> { type Item = (&'a [u8], Addr, UnorderedId); fn next(&mut self) -> Option { - self.inner.next().cloned().map(move |bucket: usize| { - let kv = self.hashmap.table[bucket]; + self.inner.next().map(move |kv| { let (key, offset): (&'a [u8], Addr) = self.hashmap.get_key_value(kv.key_value_addr); (key, offset, kv.unordered_id) }) @@ -107,10 +116,9 @@ impl Default for ArenaHashMap { fn default() -> Self { let memory_arena = MemoryArena::default(); ArenaHashMap { - table: Box::new([]), + table: Vec::new(), memory_arena, mask: 0, - occupied: Vec::new(), len: 0, } } @@ -120,26 +128,29 @@ impl ArenaHashMap { pub fn with_capacity(table_size: usize) -> ArenaHashMap { let table_size_power_of_2 = compute_previous_power_of_two(table_size); let memory_arena = MemoryArena::default(); - let table: Vec = iter::repeat(KeyValue::default()) - .take(table_size_power_of_2) - .collect(); + let table = vec![KeyValue::default(); table_size_power_of_2]; + ArenaHashMap { - table: table.into_boxed_slice(), + table, memory_arena, mask: table_size_power_of_2 - 1, - occupied: Vec::with_capacity(table_size_power_of_2 / 2), len: 0, } } + #[inline] + fn get_hash(&self, key: &[u8]) -> HashType { + murmurhash32::murmurhash2(key) + } + #[inline] pub fn read(&self, addr: Addr) -> Item { self.memory_arena.read(addr) } #[inline] - fn probe(&self, hash: u32) -> QuadraticProbing { - QuadraticProbing::compute(hash as usize, self.mask) + fn probe(&self, hash: HashType) -> LinearProbing { + LinearProbing::compute(hash, self.mask) } #[inline] @@ -149,15 +160,16 @@ impl ArenaHashMap { #[inline] fn is_saturated(&self) -> bool { - self.table.len() <= self.occupied.len() * 3 + self.table.len() <= self.len * 2 } #[inline] fn get_key_value(&self, addr: Addr) -> (&[u8], Addr) { let data = self.memory_arena.slice_from(addr); - let key_bytes_len = NativeEndian::read_u16(data) as usize; - let key_bytes: &[u8] = &data[2..][..key_bytes_len]; - (key_bytes, addr.offset(2u32 + key_bytes_len as u32)) + let (key_bytes_len_bytes, data) = data.split_at(2); + let key_bytes_len = u16::from_le_bytes(key_bytes_len_bytes.try_into().unwrap()); + let key_bytes: &[u8] = &data[..key_bytes_len as usize]; + (key_bytes, addr.offset(2 + key_bytes_len as u32)) } #[inline] @@ -171,10 +183,10 @@ impl ArenaHashMap { } #[inline] - fn set_bucket(&mut self, hash: u32, key_value_addr: Addr, bucket: usize) -> UnorderedId { - self.occupied.push(bucket); + fn set_bucket(&mut self, hash: HashType, key_value_addr: Addr, bucket: usize) -> UnorderedId { let unordered_id = self.len as UnorderedId; self.len += 1; + self.table[bucket] = KeyValue { key_value_addr, hash, @@ -196,7 +208,11 @@ impl ArenaHashMap { #[inline] pub fn iter(&self) -> Iter<'_> { Iter { - inner: self.occupied.iter(), + inner: self + .table + .iter() + .cloned() + .filter(KeyValue::is_not_empty_ref), hashmap: self, } } @@ -205,15 +221,13 @@ impl ArenaHashMap { let new_len = (self.table.len() * 2).max(1 << 13); let mask = new_len - 1; self.mask = mask; - let new_table = vec![KeyValue::default(); new_len].into_boxed_slice(); + let new_table = vec![KeyValue::default(); new_len]; let old_table = mem::replace(&mut self.table, new_table); - for old_pos in self.occupied.iter_mut() { - let key_value: KeyValue = old_table[*old_pos]; - let mut probe = QuadraticProbing::compute(key_value.hash as usize, mask); + for key_value in old_table.into_iter().filter(KeyValue::is_not_empty_ref) { + let mut probe = LinearProbing::compute(key_value.hash, mask); loop { let bucket = probe.next_probe(); if self.table[bucket].is_empty() { - *old_pos = bucket; self.table[bucket] = key_value; break; } @@ -222,9 +236,10 @@ impl ArenaHashMap { } /// Get a value associated to a key. + #[inline] pub fn get(&self, key: &[u8]) -> Option where V: Copy + 'static { - let hash = murmurhash2(key); + let hash = self.get_hash(key); let mut probe = self.probe(hash); loop { let bucket = probe.next_probe(); @@ -261,7 +276,7 @@ impl ArenaHashMap { if self.is_saturated() { self.resize(); } - let hash = murmurhash2(key); + let hash = self.get_hash(key); let mut probe = self.probe(hash); loop { let bucket = probe.next_probe(); @@ -273,11 +288,12 @@ impl ArenaHashMap { let key_addr = self.memory_arena.allocate_space(num_bytes); { let data = self.memory_arena.slice_mut(key_addr, num_bytes); - NativeEndian::write_u16(data, key.len() as u16); + data[..2].copy_from_slice(&(key.len() as u16).to_le_bytes()); let stop = 2 + key.len(); data[2..stop].copy_from_slice(key); store(&mut data[stop..], val); } + return self.set_bucket(hash, key_addr, bucket); } else if kv.hash == hash { if let Some(val_addr) = self.get_value_addr_if_key_match(key, kv.key_value_addr) { diff --git a/stacker/src/lib.rs b/stacker/src/lib.rs index add06359a4..be44f26271 100644 --- a/stacker/src/lib.rs +++ b/stacker/src/lib.rs @@ -2,9 +2,9 @@ mod arena_hashmap; mod expull; mod memory_arena; -pub use self::arena_hashmap::{compute_table_size, ArenaHashMap}; +pub use self::arena_hashmap::{compute_table_memory_size, ArenaHashMap}; pub use self::expull::ExpUnrolledLinkedList; pub use self::memory_arena::{Addr, MemoryArena}; /// When adding an element in a `ArenaHashMap`, we get a unique id associated to the given key. -pub type UnorderedId = u64; +pub type UnorderedId = u32; diff --git a/stacker/src/memory_arena.rs b/stacker/src/memory_arena.rs index 2155d4a0f7..b9e9d37705 100644 --- a/stacker/src/memory_arena.rs +++ b/stacker/src/memory_arena.rs @@ -41,42 +41,50 @@ pub struct Addr(u32); impl Addr { /// Creates a null pointer. + #[inline] pub fn null_pointer() -> Addr { Addr(u32::MAX) } /// Returns the `Addr` object for `addr + offset` + #[inline] pub fn offset(self, offset: u32) -> Addr { Addr(self.0.wrapping_add(offset)) } + #[inline] fn new(page_id: usize, local_addr: usize) -> Addr { Addr((page_id << NUM_BITS_PAGE_ADDR | local_addr) as u32) } + #[inline] fn page_id(self) -> usize { (self.0 as usize) >> NUM_BITS_PAGE_ADDR } + #[inline] fn page_local_addr(self) -> usize { (self.0 as usize) & (PAGE_SIZE - 1) } /// Returns true if and only if the `Addr` is null. + #[inline] pub fn is_null(self) -> bool { self.0 == u32::MAX } } +#[inline] pub fn store(dest: &mut [u8], val: Item) { - assert_eq!(dest.len(), std::mem::size_of::()); + debug_assert_eq!(dest.len(), std::mem::size_of::()); unsafe { ptr::write_unaligned(dest.as_mut_ptr() as *mut Item, val); } } +#[inline] pub fn load(data: &[u8]) -> Item { - assert_eq!(data.len(), std::mem::size_of::()); + debug_assert_eq!(data.len(), std::mem::size_of::()); unsafe { ptr::read_unaligned(data.as_ptr() as *const Item) } } @@ -111,6 +119,7 @@ impl MemoryArena { self.pages.len() * PAGE_SIZE } + #[inline] pub fn write_at(&mut self, addr: Addr, val: Item) { let dest = self.slice_mut(addr, std::mem::size_of::()); store(dest, val); @@ -121,14 +130,17 @@ impl MemoryArena { /// # Panics /// /// If the address is erroneous + #[inline] pub fn read(&self, addr: Addr) -> Item { load(self.slice(addr, mem::size_of::())) } + #[inline] pub fn slice(&self, addr: Addr, len: usize) -> &[u8] { self.pages[addr.page_id()].slice(addr.page_local_addr(), len) } + #[inline] pub fn slice_from(&self, addr: Addr) -> &[u8] { self.pages[addr.page_id()].slice_from(addr.page_local_addr()) } @@ -168,14 +180,17 @@ impl Page { len + self.len <= PAGE_SIZE } + #[inline] fn slice(&self, local_addr: usize, len: usize) -> &[u8] { &self.slice_from(local_addr)[..len] } + #[inline] fn slice_from(&self, local_addr: usize) -> &[u8] { &self.data[local_addr..] } + #[inline] fn slice_mut(&mut self, local_addr: usize, len: usize) -> &mut [u8] { &mut self.data[local_addr..][..len] }