diff --git a/storage/reads/merge.go b/storage/reads/merge.go index 46930a52db2..b28b92126ba 100644 --- a/storage/reads/merge.go +++ b/storage/reads/merge.go @@ -121,49 +121,93 @@ func (h *resultSetHeap) Pop() interface{} { return item } +// MergedStringIterator merges multiple storage.StringIterators into one. +// It sorts and deduplicates adjacent values, so the output is sorted iff all inputs are sorted. +// If all inputs are not sorted, then output order and deduplication are undefined and unpleasant. type MergedStringIterator struct { - iterators []cursors.StringIterator - uniqueValues map[string]struct{} - nextValue string + heap stringIteratorHeap + nextValue string } // API compatibility var _ cursors.StringIterator = (*MergedStringIterator)(nil) func NewMergedStringIterator(iterators []cursors.StringIterator) *MergedStringIterator { - return &MergedStringIterator{ - iterators: iterators, - uniqueValues: make(map[string]struct{}), + nonEmptyIterators := make([]cursors.StringIterator, 0, len(iterators)) + + for _, iterator := range iterators { + // All iterators must be Next()'d so that their Value() methods return a meaningful value, and sort properly. + if iterator.Next() { + nonEmptyIterators = append(nonEmptyIterators, iterator) + } + } + + msi := &MergedStringIterator{ + heap: stringIteratorHeap{iterators: nonEmptyIterators}, } + heap.Init(&msi.heap) + + return msi } -func (mr *MergedStringIterator) Next() bool { - // TODO assume that each iterator is sorted, and iterate in sorted order - // https://github.com/influxdata/influxdb/issues/13440 - for len(mr.iterators) > 0 { - iterator := mr.iterators[0] +func (msi *MergedStringIterator) Next() bool { + for msi.heap.Len() > 0 { + iterator := msi.heap.iterators[0] + + haveNext := false + if proposedNextValue := iterator.Value(); proposedNextValue != msi.nextValue { // Skip dupes. + msi.nextValue = proposedNextValue + haveNext = true + } - for iterator.Next() { - mr.nextValue = iterator.Value() - if _, found := mr.uniqueValues[mr.nextValue]; !found { - mr.uniqueValues[mr.nextValue] = struct{}{} - return true - } + if iterator.Next() { + // iterator.Value() has changed, so re-order that iterator within the heap + heap.Fix(&msi.heap, 0) + } else { + // iterator is drained, so remove it from the heap + heap.Pop(&msi.heap) } - // This iterator exhausted; move on to next iterator. - mr.iterators[0] = nil - mr.iterators = mr.iterators[1:] + if haveNext { + return true + } } - mr.uniqueValues = nil return false } -func (mr *MergedStringIterator) Value() string { - return mr.nextValue +func (msi *MergedStringIterator) Value() string { + return msi.nextValue } -func (mr *MergedStringIterator) Stats() cursors.CursorStats { +func (msi *MergedStringIterator) Stats() cursors.CursorStats { return cursors.CursorStats{} } + +type stringIteratorHeap struct { + iterators []cursors.StringIterator +} + +func (h stringIteratorHeap) Len() int { + return len(h.iterators) +} + +func (h stringIteratorHeap) Less(i, j int) bool { + return h.iterators[i].Value() < h.iterators[j].Value() +} + +func (h *stringIteratorHeap) Swap(i, j int) { + h.iterators[i], h.iterators[j] = h.iterators[j], h.iterators[i] +} + +func (h *stringIteratorHeap) Push(x interface{}) { + h.iterators = append(h.iterators, x.(cursors.StringIterator)) +} + +func (h *stringIteratorHeap) Pop() interface{} { + n := len(h.iterators) + item := h.iterators[n-1] + h.iterators[n-1] = nil + h.iterators = h.iterators[:n-1] + return item +} diff --git a/storage/reads/merge_test.go b/storage/reads/merge_test.go index e3685912aff..fddc70e0541 100644 --- a/storage/reads/merge_test.go +++ b/storage/reads/merge_test.go @@ -115,19 +115,20 @@ func TestNewMergedStringIterator(t *testing.T) { { name: "simple", iterators: []cursors.StringIterator{ - newMockStringIterator("foo", "bar"), + newMockStringIterator("bar", "foo"), }, - expectedValues: []string{"foo", "bar"}, + expectedValues: []string{"bar", "foo"}, }, { name: "duplicates", iterators: []cursors.StringIterator{ - newMockStringIterator("foo"), - newMockStringIterator("bar", "bar"), - newMockStringIterator("foo"), - newMockStringIterator("baz", "qux"), + newMockStringIterator("c"), + newMockStringIterator("b", "b"), // This kind of duplication is not explicitly documented, but works. + newMockStringIterator("a", "c"), + newMockStringIterator("b", "d"), + newMockStringIterator("0", "a", "b", "e"), }, - expectedValues: []string{"foo", "bar", "baz", "qux"}, + expectedValues: []string{"0", "a", "b", "c", "d", "e"}, }, }