Skip to content

Commit

Permalink
add RegexPhraseQuery
Browse files Browse the repository at this point in the history
RegexPhraseQuery supports phrase queries with regex. It supports regex
and wildcards. E.g. a query with wildcards:
"b* b* wolf" matches "big bad wolf"
Slop is supported as well:
"b* wolf"~2 matches "big bad wolf"

Regex queries may match a lot of terms where we still need to
keep track which term hit to load the positions.
The phrase query algorithm groups terms by their frequency
together in the union to prefilter groups early.

This PR comes with some new datastructures:

SimpleUnion - A union docset for a list of docsets. It doesn't do any
caching and is therefore well suited for datasets with lots of skipping.
(phrase search, but intersections in general)

LoadedPostings - Like SegmentPostings, but all docs and positions are loaded in
memory. SegmentPostings uses 1840 bytes per instance with its caches,
which is equivalent to 460 docids.
LoadedPostings is used for terms which have less than 100 docs.
LoadedPostings is only used to reduce memory consumption.

BitSetPostingUnion - Creates a `Posting` that uses the bitset for docid
hits and the docsets for positions. The BitSet is the precalculated
union of the docsets
In the RegexPhraseQuery there is a size limit of 512 docsets per PreAggregatedUnion,
before creating a new one.

Renamed Union to BufferedUnionScorer
Added proptests to test different union types.
  • Loading branch information
PSeitz committed Oct 10, 2024
1 parent 2f5a269 commit 028a938
Show file tree
Hide file tree
Showing 20 changed files with 1,634 additions and 256 deletions.
2 changes: 1 addition & 1 deletion src/core/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -421,7 +421,7 @@ fn test_non_text_json_term_freq() {
let mut term = Term::from_field_json_path(field, "tenant_id", false);
term.append_type_and_fast_value(75i64);

let postings = inv_idx
let mut postings = inv_idx
.read_postings(&term, IndexRecordOption::WithFreqsAndPositions)
.unwrap()
.unwrap();
Expand Down
5 changes: 5 additions & 0 deletions src/index/inverted_index_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,11 @@ impl InvertedIndexReader {
self.termdict.get(term.serialized_value_bytes())
}

/// Returns the term info associated with the str term.
pub fn get_term_info_str(&self, term: &str) -> io::Result<Option<TermInfo>> {
self.termdict.get(term.as_bytes())
}

/// Return the term dictionary datastructure.
pub fn terms(&self) -> &TermDictionary {
&self.termdict
Expand Down
2 changes: 1 addition & 1 deletion src/postings/block_segment_postings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -410,7 +410,7 @@ mod tests {

#[test]
fn test_empty_postings_doc_term_freq_returns_0() {
let postings = SegmentPostings::empty();
let mut postings = SegmentPostings::empty();
assert_eq!(postings.term_freq(), 1);
}

Expand Down
153 changes: 153 additions & 0 deletions src/postings/loaded_postings.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
use crate::docset::{DocSet, TERMINATED};
use crate::postings::{Postings, SegmentPostings};
use crate::DocId;

/// `LoadedPostings` is a `DocSet` and `Postings` implementation.
/// It is used to represent the postings of a term in memory.
/// It is suitable if there are few documents for a term.
///
/// It exists mainly to reduce memory usage.
/// `SegmentPostings` uses 1840 bytes per instance due to its caches.
/// It you need to keep many terms around with few docs, it's cheaper to load all the
/// postings in memory.
///
/// This is relevant for `RegexPhraseQuery`, which may have a lot of
/// terms.
/// E.g. 100_000 terms would need 184MB due to SegmentPostings.
pub struct LoadedPostings {
doc_ids: Vec<DocId>,
position_offsets: Vec<u32>,
positions: Vec<u32>,
cursor: usize,
}

impl LoadedPostings {
/// Creates a new `LoadedPostings` from a `SegmentPostings`.
///
/// It will also preload positions, if positions are available in the SegmentPostings.
pub fn load(segment_postings: &mut SegmentPostings) -> LoadedPostings {
let num_docs = segment_postings.doc_freq() as usize;
let mut doc_ids = Vec::with_capacity(num_docs);
let mut positions = Vec::with_capacity(num_docs);
let mut position_offsets = Vec::with_capacity(num_docs);
while segment_postings.doc() != TERMINATED {
position_offsets.push(positions.len() as u32);
doc_ids.push(segment_postings.doc());
segment_postings.append_positions_with_offset(0, &mut positions);
segment_postings.advance();
}
position_offsets.push(positions.len() as u32);
LoadedPostings {
doc_ids,
positions,
position_offsets,
cursor: 0,
}
}
}

impl From<(Vec<DocId>, Vec<Vec<u32>>)> for LoadedPostings {
fn from(doc_ids: (Vec<DocId>, Vec<Vec<u32>>)) -> LoadedPostings {
let mut position_offsets = Vec::new();
let mut positions = Vec::new();
for pos in &doc_ids.1 {
position_offsets.push(positions.len() as u32);
positions.extend_from_slice(pos);
}
position_offsets.push(positions.len() as u32);
LoadedPostings {
doc_ids: doc_ids.0,
positions,
position_offsets,
cursor: 0,
}
}
}

impl DocSet for LoadedPostings {
fn advance(&mut self) -> DocId {
self.cursor += 1;
if self.cursor >= self.doc_ids.len() {
self.cursor = self.doc_ids.len();
return TERMINATED;
}
self.doc()
}

fn doc(&self) -> DocId {
if self.cursor == self.doc_ids.len() {
return TERMINATED;
}
self.doc_ids[self.cursor]
}

fn size_hint(&self) -> u32 {
self.doc_ids.len() as u32
}
}
impl Postings for LoadedPostings {
fn term_freq(&mut self) -> u32 {
let start = self.position_offsets[self.cursor] as usize;
let stop = self.position_offsets[self.cursor + 1] as usize;
(stop - start) as u32
}

fn append_positions_with_offset(&mut self, offset: u32, output: &mut Vec<u32>) {
let start = self.position_offsets[self.cursor] as usize;
let stop = self.position_offsets[self.cursor + 1] as usize;
for pos in &self.positions[start..stop] {
output.push(*pos + offset);
}
}
}

#[cfg(test)]
pub mod tests {

use super::*;

#[test]
pub fn test_vec_postings() {
let doc_ids: Vec<DocId> = (0u32..1024u32).map(|e| e * 3).collect();
let mut postings = LoadedPostings::from((doc_ids, vec![]));
assert_eq!(postings.doc(), 0u32);
assert_eq!(postings.advance(), 3u32);
assert_eq!(postings.doc(), 3u32);
assert_eq!(postings.seek(14u32), 15u32);
assert_eq!(postings.doc(), 15u32);
assert_eq!(postings.seek(300u32), 300u32);
assert_eq!(postings.doc(), 300u32);
assert_eq!(postings.seek(6000u32), TERMINATED);
}

#[test]
pub fn test_vec_postings2() {
let doc_ids: Vec<DocId> = (0u32..1024u32).map(|e| e * 3).collect();
let mut positions = Vec::new();
positions.resize(1024, Vec::new());
positions[0] = vec![1u32, 2u32, 3u32];
positions[1] = vec![30u32];
positions[2] = vec![10u32];
positions[4] = vec![50u32];
let mut postings = LoadedPostings::from((doc_ids, positions));

let load = |postings: &mut LoadedPostings| {
let mut loaded_positions = Vec::new();
postings.positions(loaded_positions.as_mut());
loaded_positions
};
assert_eq!(postings.doc(), 0u32);
assert_eq!(load(&mut postings), vec![1u32, 2u32, 3u32]);

assert_eq!(postings.advance(), 3u32);
assert_eq!(postings.doc(), 3u32);

assert_eq!(load(&mut postings), vec![30u32]);

assert_eq!(postings.seek(14u32), 15u32);
assert_eq!(postings.doc(), 15u32);
assert_eq!(postings.seek(300u32), 300u32);
assert_eq!(postings.doc(), 300u32);
assert_eq!(postings.seek(6000u32), TERMINATED);
}
}
2 changes: 2 additions & 0 deletions src/postings/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ mod block_segment_postings;
pub(crate) mod compression;
mod indexing_context;
mod json_postings_writer;
mod loaded_postings;
mod per_field_postings_writer;
mod postings;
mod postings_writer;
Expand All @@ -17,6 +18,7 @@ mod serializer;
mod skip;
mod term_info;

pub(crate) use loaded_postings::LoadedPostings;
pub(crate) use stacker::compute_table_memory_size;

pub use self::block_segment_postings::BlockSegmentPostings;
Expand Down
21 changes: 19 additions & 2 deletions src/postings/postings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,33 @@ use crate::docset::DocSet;
/// for merging segments or for testing.
pub trait Postings: DocSet + 'static {
/// The number of times the term appears in the document.
fn term_freq(&self) -> u32;
fn term_freq(&mut self) -> u32;

/// Returns the positions offsetted with a given value.
/// It is not necessary to clear the `output` before calling this method.
/// The output vector will be resized to the `term_freq`.
fn positions_with_offset(&mut self, offset: u32, output: &mut Vec<u32>);
fn positions_with_offset(&mut self, offset: u32, output: &mut Vec<u32>) {
output.clear();
self.append_positions_with_offset(offset, output);
}

/// Returns the positions offsetted with a given value.
/// Data will be appended to the output.
fn append_positions_with_offset(&mut self, offset: u32, output: &mut Vec<u32>);

/// Returns the positions of the term in the given document.
/// The output vector will be resized to the `term_freq`.
fn positions(&mut self, output: &mut Vec<u32>) {
self.positions_with_offset(0u32, output);
}
}

impl Postings for Box<dyn Postings> {
fn term_freq(&mut self) -> u32 {
(**self).term_freq()
}

fn append_positions_with_offset(&mut self, offset: u32, output: &mut Vec<u32>) {
(**self).append_positions_with_offset(offset, output);
}
}
15 changes: 7 additions & 8 deletions src/postings/segment_postings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ impl Postings for SegmentPostings {
/// # Panics
///
/// Will panics if called without having called advance before.
fn term_freq(&self) -> u32 {
fn term_freq(&mut self) -> u32 {
debug_assert!(
// Here we do not use the len of `freqs()`
// because it is actually ok to request for the freq of doc
Expand All @@ -237,8 +237,9 @@ impl Postings for SegmentPostings {
self.block_cursor.freq(self.cur)
}

fn positions_with_offset(&mut self, offset: u32, output: &mut Vec<u32>) {
fn append_positions_with_offset(&mut self, offset: u32, output: &mut Vec<u32>) {
let term_freq = self.term_freq();
let prev_len = output.len();
if let Some(position_reader) = self.position_reader.as_mut() {
debug_assert!(
!self.block_cursor.freqs().is_empty(),
Expand All @@ -249,15 +250,13 @@ impl Postings for SegmentPostings {
.iter()
.cloned()
.sum::<u32>() as u64);
output.resize(term_freq as usize, 0u32);
position_reader.read(read_offset, &mut output[..]);
output.resize(prev_len + term_freq as usize, 0u32);
position_reader.read(read_offset, &mut output[prev_len..]);
let mut cum = offset;
for output_mut in output.iter_mut() {
for output_mut in output[prev_len..].iter_mut() {
cum += *output_mut;
*output_mut = cum;
}
} else {
output.clear();
}
}
}
Expand Down Expand Up @@ -289,7 +288,7 @@ mod tests {

#[test]
fn test_empty_postings_doc_term_freq_returns_0() {
let postings = SegmentPostings::empty();
let mut postings = SegmentPostings::empty();
assert_eq!(postings.term_freq(), 1);
}

Expand Down
13 changes: 13 additions & 0 deletions src/query/automaton_weight.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use tantivy_fst::Automaton;

use super::phrase_prefix_query::prefix_end;
use crate::index::SegmentReader;
use crate::postings::TermInfo;
use crate::query::{BitSetDocSet, ConstScorer, Explanation, Scorer, Weight};
use crate::schema::{Field, IndexRecordOption};
use crate::termdict::{TermDictionary, TermStreamer};
Expand Down Expand Up @@ -64,6 +65,18 @@ where

term_stream_builder.into_stream()
}

/// Returns the term infos that match the automaton
pub fn get_term_infos(&self, reader: &SegmentReader) -> crate::Result<Vec<TermInfo>> {
let inverted_index = reader.inverted_index(self.field)?;
let term_dict = inverted_index.terms();
let mut term_stream = self.automaton_stream(term_dict)?;
let mut term_infos = Vec::new();
while term_stream.advance() {
term_infos.push(term_stream.value().clone());
}
Ok(term_infos)
}
}

impl<A> Weight for AutomatonWeight<A>
Expand Down
4 changes: 2 additions & 2 deletions src/query/boolean_query/block_wand.rs
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ mod tests {

use crate::query::score_combiner::SumCombiner;
use crate::query::term_query::TermScorer;
use crate::query::{Bm25Weight, Scorer, Union};
use crate::query::{Bm25Weight, BufferedUnionScorer, Scorer};
use crate::{DocId, DocSet, Score, TERMINATED};

struct Float(Score);
Expand Down Expand Up @@ -371,7 +371,7 @@ mod tests {
fn compute_checkpoints_manual(term_scorers: Vec<TermScorer>, n: usize) -> Vec<(DocId, Score)> {
let mut heap: BinaryHeap<Float> = BinaryHeap::with_capacity(n);
let mut checkpoints: Vec<(DocId, Score)> = Vec::new();
let mut scorer = Union::build(term_scorers, SumCombiner::default);
let mut scorer = BufferedUnionScorer::build(term_scorers, SumCombiner::default);

let mut limit = Score::MIN;
loop {
Expand Down
19 changes: 12 additions & 7 deletions src/query/boolean_query/boolean_weight.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ use crate::query::score_combiner::{DoNothingCombiner, ScoreCombiner};
use crate::query::term_query::TermScorer;
use crate::query::weight::{for_each_docset_buffered, for_each_pruning_scorer, for_each_scorer};
use crate::query::{
intersect_scorers, EmptyScorer, Exclude, Explanation, Occur, RequiredOptionalScorer, Scorer,
Union, Weight,
intersect_scorers, BufferedUnionScorer, EmptyScorer, Exclude, Explanation, Occur,
RequiredOptionalScorer, Scorer, Weight,
};
use crate::{DocId, Score};

Expand Down Expand Up @@ -65,14 +65,17 @@ where
// Block wand is only available if we read frequencies.
return SpecializedScorer::TermUnion(scorers);
} else {
return SpecializedScorer::Other(Box::new(Union::build(
return SpecializedScorer::Other(Box::new(BufferedUnionScorer::build(
scorers,
score_combiner_fn,
)));
}
}
}
SpecializedScorer::Other(Box::new(Union::build(scorers, score_combiner_fn)))
SpecializedScorer::Other(Box::new(BufferedUnionScorer::build(
scorers,
score_combiner_fn,
)))
}

fn into_box_scorer<TScoreCombiner: ScoreCombiner>(
Expand All @@ -81,7 +84,7 @@ fn into_box_scorer<TScoreCombiner: ScoreCombiner>(
) -> Box<dyn Scorer> {
match scorer {
SpecializedScorer::TermUnion(term_scorers) => {
let union_scorer = Union::build(term_scorers, score_combiner_fn);
let union_scorer = BufferedUnionScorer::build(term_scorers, score_combiner_fn);
Box::new(union_scorer)
}
SpecializedScorer::Other(scorer) => scorer,
Expand Down Expand Up @@ -296,7 +299,8 @@ impl<TScoreCombiner: ScoreCombiner + Sync> Weight for BooleanWeight<TScoreCombin
let scorer = self.complex_scorer(reader, 1.0, &self.score_combiner_fn)?;
match scorer {
SpecializedScorer::TermUnion(term_scorers) => {
let mut union_scorer = Union::build(term_scorers, &self.score_combiner_fn);
let mut union_scorer =
BufferedUnionScorer::build(term_scorers, &self.score_combiner_fn);
for_each_scorer(&mut union_scorer, callback);
}
SpecializedScorer::Other(mut scorer) => {
Expand All @@ -316,7 +320,8 @@ impl<TScoreCombiner: ScoreCombiner + Sync> Weight for BooleanWeight<TScoreCombin

match scorer {
SpecializedScorer::TermUnion(term_scorers) => {
let mut union_scorer = Union::build(term_scorers, &self.score_combiner_fn);
let mut union_scorer =
BufferedUnionScorer::build(term_scorers, &self.score_combiner_fn);
for_each_docset_buffered(&mut union_scorer, &mut buffer, callback);
}
SpecializedScorer::Other(mut scorer) => {
Expand Down
Loading

0 comments on commit 028a938

Please sign in to comment.