Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

remove index sorting #2434

Merged
merged 1 commit into from
Jun 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion columnar/benches/bench_access.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ fn generate_columnar(card: Card, num_docs: u32) -> Column {
}

let mut wrt: Vec<u8> = Vec::new();
columnar_writer.serialize(num_docs, None, &mut wrt).unwrap();
columnar_writer.serialize(num_docs, &mut wrt).unwrap();

let reader = ColumnarReader::open(wrt).unwrap();
reader.read_columns("price").unwrap()[0]
Expand Down
2 changes: 1 addition & 1 deletion columnar/benches/bench_first_vals.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ fn get_test_columns() -> Columns {
}
let mut buffer: Vec<u8> = Vec::new();
dataframe_writer
.serialize(data.len() as u32, None, &mut buffer)
.serialize(data.len() as u32, &mut buffer)
.unwrap();
let columnar = ColumnarReader::open(buffer).unwrap();

Expand Down
2 changes: 1 addition & 1 deletion columnar/benches/bench_merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ fn generate_columnar(card: Card, num_docs: u32) -> ColumnarReader {
}

let mut wrt: Vec<u8> = Vec::new();
columnar_writer.serialize(num_docs, None, &mut wrt).unwrap();
columnar_writer.serialize(num_docs, &mut wrt).unwrap();

ColumnarReader::open(wrt).unwrap()
}
Expand Down
4 changes: 1 addition & 3 deletions columnar/src/column_index/optional_index/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,7 @@ fn test_optional_index_with_num_docs(num_docs: u32) {
let mut dataframe_writer = ColumnarWriter::default();
dataframe_writer.record_numerical(100, "score", 80i64);
let mut buffer: Vec<u8> = Vec::new();
dataframe_writer
.serialize(num_docs, None, &mut buffer)
.unwrap();
dataframe_writer.serialize(num_docs, &mut buffer).unwrap();
let columnar = ColumnarReader::open(buffer).unwrap();
assert_eq!(columnar.num_columns(), 1);
let cols: Vec<DynamicColumnHandle> = columnar.read_columns("score").unwrap();
Expand Down
1 change: 1 addition & 0 deletions columnar/src/columnar/merge/merge_mapping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ pub enum MergeRowOrder {
Stack(StackMergeOrder),
/// Some more complex mapping, that may interleaves rows from the different readers and
/// drop rows, or do both.
/// TODO: remove ordering part here
Shuffled(ShuffleMergeOrder),
}

Expand Down
14 changes: 4 additions & 10 deletions columnar/src/columnar/merge/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ fn make_columnar<T: Into<NumericalValue> + HasAssociatedColumnType + Copy>(
}
let mut buffer: Vec<u8> = Vec::new();
dataframe_writer
.serialize(vals.len() as RowId, None, &mut buffer)
.serialize(vals.len() as RowId, &mut buffer)
.unwrap();
ColumnarReader::open(buffer).unwrap()
}
Expand Down Expand Up @@ -157,9 +157,7 @@ fn make_numerical_columnar_multiple_columns(
.max()
.unwrap_or(0u32);
let mut buffer: Vec<u8> = Vec::new();
dataframe_writer
.serialize(num_rows, None, &mut buffer)
.unwrap();
dataframe_writer.serialize(num_rows, &mut buffer).unwrap();
ColumnarReader::open(buffer).unwrap()
}

Expand All @@ -182,9 +180,7 @@ fn make_byte_columnar_multiple_columns(
}
}
let mut buffer: Vec<u8> = Vec::new();
dataframe_writer
.serialize(num_rows, None, &mut buffer)
.unwrap();
dataframe_writer.serialize(num_rows, &mut buffer).unwrap();
ColumnarReader::open(buffer).unwrap()
}

Expand All @@ -203,9 +199,7 @@ fn make_text_columnar_multiple_columns(columns: &[(&str, &[&[&str]])]) -> Column
.max()
.unwrap_or(0u32);
let mut buffer: Vec<u8> = Vec::new();
dataframe_writer
.serialize(num_rows, None, &mut buffer)
.unwrap();
dataframe_writer.serialize(num_rows, &mut buffer).unwrap();
ColumnarReader::open(buffer).unwrap()
}

Expand Down
4 changes: 2 additions & 2 deletions columnar/src/columnar/reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ mod tests {
columnar_writer.record_column_type("col1", ColumnType::Str, false);
columnar_writer.record_column_type("col2", ColumnType::U64, false);
let mut buffer = Vec::new();
columnar_writer.serialize(1, None, &mut buffer).unwrap();
columnar_writer.serialize(1, &mut buffer).unwrap();
let columnar = ColumnarReader::open(buffer).unwrap();
let columns = columnar.list_columns().unwrap();
assert_eq!(columns.len(), 2);
Expand All @@ -211,7 +211,7 @@ mod tests {
columnar_writer.record_column_type("count", ColumnType::U64, false);
columnar_writer.record_numerical(1, "count", 1u64);
let mut buffer = Vec::new();
columnar_writer.serialize(2, None, &mut buffer).unwrap();
columnar_writer.serialize(2, &mut buffer).unwrap();
let columnar = ColumnarReader::open(buffer).unwrap();
let columns = columnar.list_columns().unwrap();
assert_eq!(columns.len(), 1);
Expand Down
29 changes: 2 additions & 27 deletions columnar/src/columnar/writer/column_writers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,31 +41,10 @@ impl ColumnWriter {
pub(super) fn operation_iterator<'a, V: SymbolValue>(
&self,
arena: &MemoryArena,
old_to_new_ids_opt: Option<&[RowId]>,
buffer: &'a mut Vec<u8>,
) -> impl Iterator<Item = ColumnOperation<V>> + 'a {
buffer.clear();
self.values.read_to_end(arena, buffer);
if let Some(old_to_new_ids) = old_to_new_ids_opt {
// TODO avoid the extra deserialization / serialization.
let mut sorted_ops: Vec<(RowId, ColumnOperation<V>)> = Vec::new();
let mut new_doc = 0u32;
let mut cursor = &buffer[..];
for op in std::iter::from_fn(|| ColumnOperation::<V>::deserialize(&mut cursor)) {
if let ColumnOperation::NewDoc(doc) = &op {
new_doc = old_to_new_ids[*doc as usize];
sorted_ops.push((new_doc, ColumnOperation::NewDoc(new_doc)));
} else {
sorted_ops.push((new_doc, op));
}
}
// stable sort is crucial here.
sorted_ops.sort_by_key(|(new_doc_id, _)| *new_doc_id);
buffer.clear();
for (_, op) in sorted_ops {
buffer.extend_from_slice(op.serialize().as_ref());
}
}
let mut cursor: &[u8] = &buffer[..];
std::iter::from_fn(move || ColumnOperation::deserialize(&mut cursor))
}
Expand Down Expand Up @@ -231,11 +210,9 @@ impl NumericalColumnWriter {
pub(super) fn operation_iterator<'a>(
self,
arena: &MemoryArena,
old_to_new_ids: Option<&[RowId]>,
buffer: &'a mut Vec<u8>,
) -> impl Iterator<Item = ColumnOperation<NumericalValue>> + 'a {
self.column_writer
.operation_iterator(arena, old_to_new_ids, buffer)
self.column_writer.operation_iterator(arena, buffer)
}
}

Expand Down Expand Up @@ -277,11 +254,9 @@ impl StrOrBytesColumnWriter {
pub(super) fn operation_iterator<'a>(
&self,
arena: &MemoryArena,
old_to_new_ids: Option<&[RowId]>,
byte_buffer: &'a mut Vec<u8>,
) -> impl Iterator<Item = ColumnOperation<UnorderedId>> + 'a {
self.column_writer
.operation_iterator(arena, old_to_new_ids, byte_buffer)
self.column_writer.operation_iterator(arena, byte_buffer)
}
}

Expand Down
105 changes: 12 additions & 93 deletions columnar/src/columnar/writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ struct SpareBuffers {
/// columnar_writer.record_str(1u32 /* doc id */, "product_name", "Apple");
/// columnar_writer.record_numerical(0u32 /* doc id */, "price", 10.5f64); //< uh oh we ended up mixing integer and floats.
/// let mut wrt: Vec<u8> = Vec::new();
/// columnar_writer.serialize(2u32, None, &mut wrt).unwrap();
/// columnar_writer.serialize(2u32, &mut wrt).unwrap();
/// ```
#[derive(Default)]
pub struct ColumnarWriter {
Expand Down Expand Up @@ -75,63 +75,6 @@ impl ColumnarWriter {
.sum::<usize>()
}

/// Returns the list of doc ids from 0..num_docs sorted by the `sort_field`
/// column.
///
/// If the column is multivalued, use the first value for scoring.
/// If no value is associated to a specific row, the document is assigned
/// the lowest possible score.
///
/// The sort applied is stable.
pub fn sort_order(&self, sort_field: &str, num_docs: RowId, reversed: bool) -> Vec<u32> {
let Some(numerical_col_writer) = self
.numerical_field_hash_map
.get::<NumericalColumnWriter>(sort_field.as_bytes())
.or_else(|| {
self.datetime_field_hash_map
.get::<NumericalColumnWriter>(sort_field.as_bytes())
})
else {
return Vec::new();
};
let mut symbols_buffer = Vec::new();
let mut values = Vec::new();
let mut start_doc_check_fill = 0;
let mut current_doc_opt: Option<RowId> = None;
// Assumption: NewDoc will never call the same doc twice and is strictly increasing between
// calls
for op in numerical_col_writer.operation_iterator(&self.arena, None, &mut symbols_buffer) {
match op {
ColumnOperation::NewDoc(doc) => {
current_doc_opt = Some(doc);
}
ColumnOperation::Value(numerical_value) => {
if let Some(current_doc) = current_doc_opt {
// Fill up with 0.0 since last doc
values.extend((start_doc_check_fill..current_doc).map(|doc| (0.0, doc)));
start_doc_check_fill = current_doc + 1;
// handle multi values
current_doc_opt = None;

let score: f32 = f64::coerce(numerical_value) as f32;
values.push((score, current_doc));
}
}
}
}
for doc in values.len() as u32..num_docs {
values.push((0.0f32, doc));
}
values.sort_by(|(left_score, _), (right_score, _)| {
if reversed {
right_score.total_cmp(left_score)
} else {
left_score.total_cmp(right_score)
}
});
values.into_iter().map(|(_score, doc)| doc).collect()
}

/// Records a column type. This is useful to bypass the coercion process,
/// makes sure the empty is present in the resulting columnar, or set
/// the `sort_values_within_row`.
Expand Down Expand Up @@ -302,12 +245,7 @@ impl ColumnarWriter {
},
);
}
pub fn serialize(
&mut self,
num_docs: RowId,
old_to_new_row_ids: Option<&[RowId]>,
wrt: &mut dyn io::Write,
) -> io::Result<()> {
pub fn serialize(&mut self, num_docs: RowId, wrt: &mut dyn io::Write) -> io::Result<()> {
let mut serializer = ColumnarSerializer::new(wrt);
let mut columns: Vec<(&[u8], ColumnType, Addr)> = self
.numerical_field_hash_map
Expand Down Expand Up @@ -358,11 +296,7 @@ impl ColumnarWriter {
serialize_bool_column(
cardinality,
num_docs,
column_writer.operation_iterator(
arena,
old_to_new_row_ids,
&mut symbol_byte_buffer,
),
column_writer.operation_iterator(arena, &mut symbol_byte_buffer),
buffers,
&mut column_serializer,
)?;
Expand All @@ -376,11 +310,7 @@ impl ColumnarWriter {
serialize_ip_addr_column(
cardinality,
num_docs,
column_writer.operation_iterator(
arena,
old_to_new_row_ids,
&mut symbol_byte_buffer,
),
column_writer.operation_iterator(arena, &mut symbol_byte_buffer),
buffers,
&mut column_serializer,
)?;
Expand All @@ -405,11 +335,8 @@ impl ColumnarWriter {
num_docs,
str_or_bytes_column_writer.sort_values_within_row,
dictionary_builder,
str_or_bytes_column_writer.operation_iterator(
arena,
old_to_new_row_ids,
&mut symbol_byte_buffer,
),
str_or_bytes_column_writer
.operation_iterator(arena, &mut symbol_byte_buffer),
buffers,
&self.arena,
&mut column_serializer,
Expand All @@ -427,11 +354,7 @@ impl ColumnarWriter {
cardinality,
num_docs,
numerical_type,
numerical_column_writer.operation_iterator(
arena,
old_to_new_row_ids,
&mut symbol_byte_buffer,
),
numerical_column_writer.operation_iterator(arena, &mut symbol_byte_buffer),
buffers,
&mut column_serializer,
)?;
Expand All @@ -446,11 +369,7 @@ impl ColumnarWriter {
cardinality,
num_docs,
NumericalType::I64,
column_writer.operation_iterator(
arena,
old_to_new_row_ids,
&mut symbol_byte_buffer,
),
column_writer.operation_iterator(arena, &mut symbol_byte_buffer),
buffers,
&mut column_serializer,
)?;
Expand Down Expand Up @@ -757,7 +676,7 @@ mod tests {
assert_eq!(column_writer.get_cardinality(3), Cardinality::Full);
let mut buffer = Vec::new();
let symbols: Vec<ColumnOperation<NumericalValue>> = column_writer
.operation_iterator(&arena, None, &mut buffer)
.operation_iterator(&arena, &mut buffer)
.collect();
assert_eq!(symbols.len(), 6);
assert!(matches!(symbols[0], ColumnOperation::NewDoc(0u32)));
Expand Down Expand Up @@ -786,7 +705,7 @@ mod tests {
assert_eq!(column_writer.get_cardinality(3), Cardinality::Optional);
let mut buffer = Vec::new();
let symbols: Vec<ColumnOperation<NumericalValue>> = column_writer
.operation_iterator(&arena, None, &mut buffer)
.operation_iterator(&arena, &mut buffer)
.collect();
assert_eq!(symbols.len(), 4);
assert!(matches!(symbols[0], ColumnOperation::NewDoc(1u32)));
Expand All @@ -809,7 +728,7 @@ mod tests {
assert_eq!(column_writer.get_cardinality(2), Cardinality::Optional);
let mut buffer = Vec::new();
let symbols: Vec<ColumnOperation<NumericalValue>> = column_writer
.operation_iterator(&arena, None, &mut buffer)
.operation_iterator(&arena, &mut buffer)
.collect();
assert_eq!(symbols.len(), 2);
assert!(matches!(symbols[0], ColumnOperation::NewDoc(0u32)));
Expand All @@ -828,7 +747,7 @@ mod tests {
assert_eq!(column_writer.get_cardinality(1), Cardinality::Multivalued);
let mut buffer = Vec::new();
let symbols: Vec<ColumnOperation<NumericalValue>> = column_writer
.operation_iterator(&arena, None, &mut buffer)
.operation_iterator(&arena, &mut buffer)
.collect();
assert_eq!(symbols.len(), 3);
assert!(matches!(symbols[0], ColumnOperation::NewDoc(0u32)));
Expand Down
Loading
Loading