Skip to content

Commit

Permalink
fix(storage): Assume sorted StringIterators, and retain sorted order (#…
Browse files Browse the repository at this point in the history
…13491)

* fix(storage): Assume sorted StringIterators, and retain sorted order

* embed struct field

* improve MergedStringIterator efficiency
  • Loading branch information
jacobmarble authored and stuartcarnie committed Apr 22, 2019
1 parent ad761ea commit 42bcbb1
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 31 deletions.
92 changes: 68 additions & 24 deletions storage/reads/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,49 +121,93 @@ func (h *resultSetHeap) Pop() interface{} {
return item
}

// MergedStringIterator merges multiple storage.StringIterators into one.
// It sorts and deduplicates adjacent values, so the output is sorted iff all inputs are sorted.
// If all inputs are not sorted, then output order and deduplication are undefined and unpleasant.
type MergedStringIterator struct {
iterators []cursors.StringIterator
uniqueValues map[string]struct{}
nextValue string
heap stringIteratorHeap
nextValue string
}

// API compatibility
var _ cursors.StringIterator = (*MergedStringIterator)(nil)

func NewMergedStringIterator(iterators []cursors.StringIterator) *MergedStringIterator {
return &MergedStringIterator{
iterators: iterators,
uniqueValues: make(map[string]struct{}),
nonEmptyIterators := make([]cursors.StringIterator, 0, len(iterators))

for _, iterator := range iterators {
// All iterators must be Next()'d so that their Value() methods return a meaningful value, and sort properly.
if iterator.Next() {
nonEmptyIterators = append(nonEmptyIterators, iterator)
}
}

msi := &MergedStringIterator{
heap: stringIteratorHeap{iterators: nonEmptyIterators},
}
heap.Init(&msi.heap)

return msi
}

func (mr *MergedStringIterator) Next() bool {
// TODO assume that each iterator is sorted, and iterate in sorted order
// https://github.com/influxdata/influxdb/issues/13440
for len(mr.iterators) > 0 {
iterator := mr.iterators[0]
func (msi *MergedStringIterator) Next() bool {
for msi.heap.Len() > 0 {
iterator := msi.heap.iterators[0]

haveNext := false
if proposedNextValue := iterator.Value(); proposedNextValue != msi.nextValue { // Skip dupes.
msi.nextValue = proposedNextValue
haveNext = true
}

for iterator.Next() {
mr.nextValue = iterator.Value()
if _, found := mr.uniqueValues[mr.nextValue]; !found {
mr.uniqueValues[mr.nextValue] = struct{}{}
return true
}
if iterator.Next() {
// iterator.Value() has changed, so re-order that iterator within the heap
heap.Fix(&msi.heap, 0)
} else {
// iterator is drained, so remove it from the heap
heap.Pop(&msi.heap)
}

// This iterator exhausted; move on to next iterator.
mr.iterators[0] = nil
mr.iterators = mr.iterators[1:]
if haveNext {
return true
}
}

mr.uniqueValues = nil
return false
}

func (mr *MergedStringIterator) Value() string {
return mr.nextValue
func (msi *MergedStringIterator) Value() string {
return msi.nextValue
}

func (mr *MergedStringIterator) Stats() cursors.CursorStats {
func (msi *MergedStringIterator) Stats() cursors.CursorStats {
return cursors.CursorStats{}
}

type stringIteratorHeap struct {
iterators []cursors.StringIterator
}

func (h stringIteratorHeap) Len() int {
return len(h.iterators)
}

func (h stringIteratorHeap) Less(i, j int) bool {
return h.iterators[i].Value() < h.iterators[j].Value()
}

func (h *stringIteratorHeap) Swap(i, j int) {
h.iterators[i], h.iterators[j] = h.iterators[j], h.iterators[i]
}

func (h *stringIteratorHeap) Push(x interface{}) {
h.iterators = append(h.iterators, x.(cursors.StringIterator))
}

func (h *stringIteratorHeap) Pop() interface{} {
n := len(h.iterators)
item := h.iterators[n-1]
h.iterators[n-1] = nil
h.iterators = h.iterators[:n-1]
return item
}
15 changes: 8 additions & 7 deletions storage/reads/merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,19 +115,20 @@ func TestNewMergedStringIterator(t *testing.T) {
{
name: "simple",
iterators: []cursors.StringIterator{
newMockStringIterator("foo", "bar"),
newMockStringIterator("bar", "foo"),
},
expectedValues: []string{"foo", "bar"},
expectedValues: []string{"bar", "foo"},
},
{
name: "duplicates",
iterators: []cursors.StringIterator{
newMockStringIterator("foo"),
newMockStringIterator("bar", "bar"),
newMockStringIterator("foo"),
newMockStringIterator("baz", "qux"),
newMockStringIterator("c"),
newMockStringIterator("b", "b"), // This kind of duplication is not explicitly documented, but works.
newMockStringIterator("a", "c"),
newMockStringIterator("b", "d"),
newMockStringIterator("0", "a", "b", "e"),
},
expectedValues: []string{"foo", "bar", "baz", "qux"},
expectedValues: []string{"0", "a", "b", "c", "d", "e"},
},
}

Expand Down

0 comments on commit 42bcbb1

Please sign in to comment.