From 6d5235fcec6c888140a7b6d3157a1bcbd22d8634 Mon Sep 17 00:00:00 2001 From: Faith Chikwekwe Date: Thu, 18 Mar 2021 03:01:40 -0700 Subject: [PATCH] feat(query): enable min/max pushdown --- cmd/storectl/generate/generate.go | 1 + go.mod | 2 + go.sum | 1 + .../influxdb/min_max_influxdb_test.flux | 23 + query/stdlib/influxdata/influxdb/rules.go | 2 +- storage/flux/table.gen.go | 977 ++++++++++++------ storage/flux/table.gen.go.tmpl | 209 ++-- storage/flux/table.go | 11 +- 8 files changed, 846 insertions(+), 380 deletions(-) create mode 100644 cmd/storectl/generate/generate.go create mode 100644 query/stdlib/influxdata/influxdb/min_max_influxdb_test.flux diff --git a/cmd/storectl/generate/generate.go b/cmd/storectl/generate/generate.go new file mode 100644 index 00000000000..eb8347795a7 --- /dev/null +++ b/cmd/storectl/generate/generate.go @@ -0,0 +1 @@ +package generate diff --git a/go.mod b/go.mod index 7e9b9818f27..ab070d93d8d 100644 --- a/go.mod +++ b/go.mod @@ -127,3 +127,5 @@ require ( replace github.com/apache/arrow/go/arrow v0.0.0-20191024131854-af6fa24be0db => github.com/influxdata/arrow/go/arrow v0.0.0-20200917142114-986e413c1705 replace github.com/nats-io/nats-streaming-server v0.11.2 => github.com/influxdata/nats-streaming-server v0.11.3-0.20201112040610-c277f7560803 + +replace github.com/influxdata/flux => /Users/faith/code/flux diff --git a/go.sum b/go.sum index cc213930693..5de3709c5f5 100644 --- a/go.sum +++ b/go.sum @@ -338,6 +338,7 @@ github.com/influxdata/line-protocol v0.0.0-20180522152040-32c6aa80de5e h1:/o3vQt github.com/influxdata/line-protocol v0.0.0-20180522152040-32c6aa80de5e/go.mod h1:4kt73NQhadE3daL3WhR5EJ/J2ocX0PZzwxQ0gXJ7oFE= github.com/influxdata/nats-streaming-server v0.11.3-0.20201112040610-c277f7560803 h1:LpaVAM5Www2R7M0GJAxAdL3swBvmna8Pyzw6F7o+j04= github.com/influxdata/nats-streaming-server v0.11.3-0.20201112040610-c277f7560803/go.mod h1:qgAMR6M9EokX+R5X7jUQfubwBdS1tBIl4yVJ3shhcWk= +github.com/influxdata/pkg-config v0.2.5/go.mod h1:EMS7Ll0S4qkzDk53XS3Z72/egBsPInt+BeRxb0WeSwk= github.com/influxdata/pkg-config v0.2.6 h1:GQFKw3m3OmmPMze9n75ZVVtNu4LJ2MJolHbxvg4AAvg= github.com/influxdata/pkg-config v0.2.6/go.mod h1:EMS7Ll0S4qkzDk53XS3Z72/egBsPInt+BeRxb0WeSwk= github.com/influxdata/promql/v2 v2.12.0/go.mod h1:fxOPu+DY0bqCTCECchSRtWfc+0X19ybifQhZoQNF5D8= diff --git a/query/stdlib/influxdata/influxdb/min_max_influxdb_test.flux b/query/stdlib/influxdata/influxdb/min_max_influxdb_test.flux new file mode 100644 index 00000000000..8a2ddf6206d --- /dev/null +++ b/query/stdlib/influxdata/influxdb/min_max_influxdb_test.flux @@ -0,0 +1,23 @@ +package influxdb_test + +import "testing/expect" + +option now = () => (2030-01-01T00:00:00Z) + +testcase push_down_min_bare extends "flux/planner/group_min_test" { + expect.planner(rules: [ + "PushDownGroupAggregateRule": 1, + "PushDownRangeRule": 1, + "PushDownFilterRule": 1, + ]) + group_min_test.group_min_bare() +} + +testcase push_down_max_bare extends "flux/planner/group_max_test" { + expect.planner(rules: [ + "PushDownGroupAggregateRule": 1, + "PushDownRangeRule": 1, + "PushDownFilterRule": 1, + ]) + group_max_test.group_max_bare() +} diff --git a/query/stdlib/influxdata/influxdb/rules.go b/query/stdlib/influxdata/influxdb/rules.go index ce00d2fde75..cf1b87ce51f 100644 --- a/query/stdlib/influxdata/influxdb/rules.go +++ b/query/stdlib/influxdata/influxdb/rules.go @@ -30,7 +30,7 @@ func init() { PushDownWindowAggregateByTimeRule{}, PushDownBareAggregateRule{}, GroupWindowAggregateTransposeRule{}, - // PushDownGroupAggregateRule{}, + PushDownGroupAggregateRule{}, ) plan.RegisterLogicalRules( MergeFiltersRule{}, diff --git a/storage/flux/table.gen.go b/storage/flux/table.gen.go index 76d0b984391..6cca1e10b7f 100644 --- a/storage/flux/table.gen.go +++ b/storage/flux/table.gen.go @@ -289,7 +289,7 @@ func (t *floatWindowTable) advance() bool { if !ok { return false } - values := t.mergeValues(stop.Int64Values()) + vals := t.mergeValues(stop.Int64Values()) // Retrieve the buffer for the data to avoid allocating // additional slices. If the buffer is still being used @@ -305,12 +305,12 @@ func (t *floatWindowTable) advance() bool { cr.cols[timeColIdx] = start stop.Release() } - cr.cols[valueColIdx] = values + cr.cols[valueColIdx] = vals t.appendBounds(cr) } else { cr.cols[startColIdx] = start cr.cols[stopColIdx] = stop - cr.cols[valueColIdxWithoutTime] = values + cr.cols[valueColIdxWithoutTime] = vals } t.appendTags(cr) return true @@ -465,31 +465,31 @@ func (t *floatEmptyWindowSelectorTable) advance() bool { return false } - values := t.arrowBuilder() - values.Resize(storage.MaxPointsPerBlock) + vals := t.arrowBuilder() + vals.Resize(storage.MaxPointsPerBlock) var cr *colReader switch t.timeColumn { case execute.DefaultStartColLabel: - start := t.startTimes(values) + start := t.startTimes(vals) cr = t.allocateBuffer(start.Len()) cr.cols[timeColIdx] = start t.appendBounds(cr) case execute.DefaultStopColLabel: - stop := t.stopTimes(values) + stop := t.stopTimes(vals) cr = t.allocateBuffer(stop.Len()) cr.cols[timeColIdx] = stop t.appendBounds(cr) default: - start, stop, time := t.startStopTimes(values) + start, stop, time := t.startStopTimes(vals) cr = t.allocateBuffer(time.Len()) cr.cols[startColIdx] = start cr.cols[stopColIdx] = stop cr.cols[timeColIdx] = time } - cr.cols[valueColIdx] = values.NewFloat64Array() + cr.cols[valueColIdx] = vals.NewFloat64Array() t.appendTags(cr) return true } @@ -711,11 +711,11 @@ func (t *floatGroupTable) advance() bool { return false } var arr *cursors.FloatArray - var len int + var length int for { arr = t.cur.Next() - len = arr.Len() - if len > 0 { + length = arr.Len() + if length > 0 { break } if !t.advanceCursor() { @@ -729,7 +729,7 @@ func (t *floatGroupTable) advance() bool { // additional slices. If the buffer is still being used // because the references were retained, then we will // allocate a new buffer. - colReader := t.allocateBuffer(len) + colReader := t.allocateBuffer(length) colReader.cols[timeColIdx] = arrow.NewInt(arr.Timestamps, t.alloc) colReader.cols[valueColIdx] = t.toArrowBuffer(arr.Values) t.appendTags(colReader) @@ -737,20 +737,17 @@ func (t *floatGroupTable) advance() bool { return true } - aggregate, err := determineFloatAggregateMethod(t.gc.Aggregate().Type) + aggregate, err := makeFloatAggregateAccumulator(t.gc.Aggregate().Type) if err != nil { t.err = err return false } - ts, v := aggregate(arr.Timestamps, arr.Values) - timestamps, values := []int64{ts}, []float64{v} + aggregate.AccumulateFirst(arr.Timestamps, arr.Values, t.tags) for { arr = t.cur.Next() if arr.Len() > 0 { - ts, v := aggregate(arr.Timestamps, arr.Values) - timestamps = append(timestamps, ts) - values = append(values, v) + aggregate.AccumulateMore(arr.Timestamps, arr.Values, t.tags) continue } @@ -758,7 +755,7 @@ func (t *floatGroupTable) advance() bool { break } } - timestamp, value := aggregate(timestamps, values) + timestamp, value, tags := aggregate.Result() colReader := t.allocateBuffer(1) if IsSelector(t.gc.Aggregate()) { @@ -767,23 +764,116 @@ func (t *floatGroupTable) advance() bool { } else { colReader.cols[valueColIdxWithoutTime] = t.toArrowBuffer([]float64{value}) } - t.appendTags(colReader) + t.appendTheseTags(colReader, tags) t.appendBounds(colReader) return true } -type floatAggregateMethod func([]int64, []float64) (int64, float64) +type FloatAggregateAccumulator interface { + // AccumulateFirst receives an initial array of items to select from. + // It selects an item and stores the state. Afterwards, more data can + // be supplied with AccumulateMore and the results can be requested at + // any time. Without a call to AccumulateFirst the results are not + // defined. + AccumulateFirst(timestamps []int64, values []float64, tags [][]byte) -// determineFloatAggregateMethod returns the method for aggregating -// returned points within the same group. The incoming points are the -// ones returned for each series and the method returned here will + // AccumulateMore receives additional array elements to select from. + AccumulateMore(timestamps []int64, values []float64, tags [][]byte) + + // Result returns the item selected from the data received so far. + Result() (int64, float64, [][]byte) +} + +// The selector method takes a ( timestamp, value ) pair, a +// ( []timestamp, []value ) pair, and a starting index. It applies the selector +// to the single value and the array, starting at the supplied index. It +// returns -1 if the single value is selected and a non-negative value if an +// item from the array is selected. +type floatSelectorMethod func(int64, float64, []int64, []float64, int) int + +// The selector accumulator tracks currently-selected item. +type floatSelectorAccumulator struct { + selector floatSelectorMethod + + ts int64 + v float64 + tags [][]byte +} + +func (a *floatSelectorAccumulator) AccumulateFirst(timestamps []int64, values []float64, tags [][]byte) { + index := a.selector(timestamps[0], values[0], timestamps, values, 1) + if index < 0 { + a.ts = timestamps[0] + a.v = values[0] + } else { + a.ts = timestamps[index] + a.v = values[index] + } + a.tags = make([][]byte, len(tags)) + copy(a.tags, tags) +} + +func (a *floatSelectorAccumulator) AccumulateMore(timestamps []int64, values []float64, tags [][]byte) { + index := a.selector(a.ts, a.v, timestamps, values, 0) + if index >= 0 { + a.ts = timestamps[index] + a.v = values[index] + + if len(tags) > cap(a.tags) { + a.tags = make([][]byte, len(tags)) + } else { + a.tags = a.tags[:len(tags)] + } + copy(a.tags, tags) + } +} + +func (a *floatSelectorAccumulator) Result() (int64, float64, [][]byte) { + return a.ts, a.v, a.tags +} + +// The aggregate method takes a value, an array of values, and a starting +// index, applies an aggregate operation over the value and the array, starting +// at the given index, and returns the result. +type floatAggregateMethod func(float64, []float64, int) float64 + +type floatAggregateAccumulator struct { + aggregate floatAggregateMethod + accum float64 + + // For pure aggregates it doesn't matter what we return for tags, but + // we need to satisfy the interface. We will just return the most + // recently seen tags. + tags [][]byte +} + +func (a *floatAggregateAccumulator) AccumulateFirst(timestamps []int64, values []float64, tags [][]byte) { + a.accum = a.aggregate(values[0], values, 1) + a.tags = tags +} + +func (a *floatAggregateAccumulator) AccumulateMore(timestamps []int64, values []float64, tags [][]byte) { + a.accum = a.aggregate(a.accum, values, 0) + a.tags = tags +} + +// For group aggregates (non-selectors), the timestamp is always math.MaxInt64. +// their final result does not contain _time, so this timestamp value can be +// anything and it won't matter. +func (a *floatAggregateAccumulator) Result() (int64, float64, [][]byte) { + return math.MaxInt64, a.accum, a.tags +} + +// makeFloatAggregateAccumulator returns the interface implementation for +// aggregating returned points within the same group. The incoming points are +// the ones returned for each series and the struct returned here will // aggregate the aggregates. -func determineFloatAggregateMethod(agg datatypes.Aggregate_AggregateType) (floatAggregateMethod, error) { +func makeFloatAggregateAccumulator(agg datatypes.Aggregate_AggregateType) (FloatAggregateAccumulator, error) { switch agg { case datatypes.AggregateTypeFirst: - return aggregateFirstGroupsFloat, nil + return &floatSelectorAccumulator{selector: selectorFirstGroupsFloat}, nil case datatypes.AggregateTypeLast: - return aggregateLastGroupsFloat, nil + return &floatSelectorAccumulator{selector: selectorLastGroupsFloat}, nil case datatypes.AggregateTypeCount: return nil, &errors.Error{ @@ -793,15 +883,15 @@ func determineFloatAggregateMethod(agg datatypes.Aggregate_AggregateType) (float case datatypes.AggregateTypeSum: - return aggregateSumGroupsFloat, nil + return &floatAggregateAccumulator{aggregate: aggregateSumGroupsFloat}, nil case datatypes.AggregateTypeMin: - return aggregateMinGroupsFloat, nil + return &floatSelectorAccumulator{selector: selectorMinGroupsFloat}, nil case datatypes.AggregateTypeMax: - return aggregateMaxGroupsFloat, nil + return &floatSelectorAccumulator{selector: selectorMaxGroupsFloat}, nil default: return nil, &errors.Error{ @@ -811,72 +901,63 @@ func determineFloatAggregateMethod(agg datatypes.Aggregate_AggregateType) (float } } -func aggregateMinGroupsFloat(timestamps []int64, values []float64) (int64, float64) { - value := values[0] - timestamp := timestamps[0] +func selectorMinGroupsFloat(ts int64, v float64, timestamps []int64, values []float64, i int) int { + index := -1 - for i := 1; i < len(values); i++ { - if value > values[i] { - value = values[i] - timestamp = timestamps[i] + for ; i < len(values); i++ { + if v > values[i] { + index = i + v = values[i] } } - return timestamp, value + return index } -func aggregateMaxGroupsFloat(timestamps []int64, values []float64) (int64, float64) { - value := values[0] - timestamp := timestamps[0] +func selectorMaxGroupsFloat(ts int64, v float64, timestamps []int64, values []float64, i int) int { + index := -1 - for i := 1; i < len(values); i++ { - if value < values[i] { - value = values[i] - timestamp = timestamps[i] + for ; i < len(values); i++ { + if v > values[i] { + index = i + v = values[i] } } - return timestamp, value + return index } -// For group count and sum, the timestamp here is always math.MaxInt64. -// their final result does not contain _time, so this timestamp value can be anything -// and it won't matter. - -func aggregateSumGroupsFloat(_ []int64, values []float64) (int64, float64) { - var sum float64 - for _, v := range values { - sum += v +func aggregateSumGroupsFloat(sum float64, values []float64, i int) float64 { + for ; i < len(values); i++ { + sum += values[i] } - return math.MaxInt64, sum + return sum } -func aggregateFirstGroupsFloat(timestamps []int64, values []float64) (int64, float64) { - value := values[0] - timestamp := timestamps[0] +func selectorFirstGroupsFloat(ts int64, v float64, timestamps []int64, values []float64, i int) int { + index := -1 - for i := 1; i < len(values); i++ { - if timestamp > timestamps[i] { - value = values[i] - timestamp = timestamps[i] + for ; i < len(values); i++ { + if ts > timestamps[i] { + index = i + ts = timestamps[i] } } - return timestamp, value + return index } -func aggregateLastGroupsFloat(timestamps []int64, values []float64) (int64, float64) { - value := values[0] - timestamp := timestamps[0] +func selectorLastGroupsFloat(ts int64, v float64, timestamps []int64, values []float64, i int) int { + index := -1 - for i := 1; i < len(values); i++ { - if timestamp < timestamps[i] { - value = values[i] - timestamp = timestamps[i] + for ; i < len(values); i++ { + if ts < timestamps[i] { + index = i + ts = timestamps[i] } } - return timestamp, value + return index } func (t *floatGroupTable) advanceCursor() bool { @@ -1185,7 +1266,7 @@ func (t *integerWindowTable) advance() bool { if !ok { return false } - values := t.mergeValues(stop.Int64Values()) + vals := t.mergeValues(stop.Int64Values()) // Retrieve the buffer for the data to avoid allocating // additional slices. If the buffer is still being used @@ -1201,12 +1282,12 @@ func (t *integerWindowTable) advance() bool { cr.cols[timeColIdx] = start stop.Release() } - cr.cols[valueColIdx] = values + cr.cols[valueColIdx] = vals t.appendBounds(cr) } else { cr.cols[startColIdx] = start cr.cols[stopColIdx] = stop - cr.cols[valueColIdxWithoutTime] = values + cr.cols[valueColIdxWithoutTime] = vals } t.appendTags(cr) return true @@ -1361,31 +1442,31 @@ func (t *integerEmptyWindowSelectorTable) advance() bool { return false } - values := t.arrowBuilder() - values.Resize(storage.MaxPointsPerBlock) + vals := t.arrowBuilder() + vals.Resize(storage.MaxPointsPerBlock) var cr *colReader switch t.timeColumn { case execute.DefaultStartColLabel: - start := t.startTimes(values) + start := t.startTimes(vals) cr = t.allocateBuffer(start.Len()) cr.cols[timeColIdx] = start t.appendBounds(cr) case execute.DefaultStopColLabel: - stop := t.stopTimes(values) + stop := t.stopTimes(vals) cr = t.allocateBuffer(stop.Len()) cr.cols[timeColIdx] = stop t.appendBounds(cr) default: - start, stop, time := t.startStopTimes(values) + start, stop, time := t.startStopTimes(vals) cr = t.allocateBuffer(time.Len()) cr.cols[startColIdx] = start cr.cols[stopColIdx] = stop cr.cols[timeColIdx] = time } - cr.cols[valueColIdx] = values.NewInt64Array() + cr.cols[valueColIdx] = vals.NewInt64Array() t.appendTags(cr) return true } @@ -1607,11 +1688,11 @@ func (t *integerGroupTable) advance() bool { return false } var arr *cursors.IntegerArray - var len int + var length int for { arr = t.cur.Next() - len = arr.Len() - if len > 0 { + length = arr.Len() + if length > 0 { break } if !t.advanceCursor() { @@ -1625,7 +1706,7 @@ func (t *integerGroupTable) advance() bool { // additional slices. If the buffer is still being used // because the references were retained, then we will // allocate a new buffer. - colReader := t.allocateBuffer(len) + colReader := t.allocateBuffer(length) colReader.cols[timeColIdx] = arrow.NewInt(arr.Timestamps, t.alloc) colReader.cols[valueColIdx] = t.toArrowBuffer(arr.Values) t.appendTags(colReader) @@ -1633,20 +1714,17 @@ func (t *integerGroupTable) advance() bool { return true } - aggregate, err := determineIntegerAggregateMethod(t.gc.Aggregate().Type) + aggregate, err := makeIntegerAggregateAccumulator(t.gc.Aggregate().Type) if err != nil { t.err = err return false } - ts, v := aggregate(arr.Timestamps, arr.Values) - timestamps, values := []int64{ts}, []int64{v} + aggregate.AccumulateFirst(arr.Timestamps, arr.Values, t.tags) for { arr = t.cur.Next() if arr.Len() > 0 { - ts, v := aggregate(arr.Timestamps, arr.Values) - timestamps = append(timestamps, ts) - values = append(values, v) + aggregate.AccumulateMore(arr.Timestamps, arr.Values, t.tags) continue } @@ -1654,7 +1732,7 @@ func (t *integerGroupTable) advance() bool { break } } - timestamp, value := aggregate(timestamps, values) + timestamp, value, tags := aggregate.Result() colReader := t.allocateBuffer(1) if IsSelector(t.gc.Aggregate()) { @@ -1663,38 +1741,131 @@ func (t *integerGroupTable) advance() bool { } else { colReader.cols[valueColIdxWithoutTime] = t.toArrowBuffer([]int64{value}) } - t.appendTags(colReader) + t.appendTheseTags(colReader, tags) t.appendBounds(colReader) return true } -type integerAggregateMethod func([]int64, []int64) (int64, int64) +type IntegerAggregateAccumulator interface { + // AccumulateFirst receives an initial array of items to select from. + // It selects an item and stores the state. Afterwards, more data can + // be supplied with AccumulateMore and the results can be requested at + // any time. Without a call to AccumulateFirst the results are not + // defined. + AccumulateFirst(timestamps []int64, values []int64, tags [][]byte) + + // AccumulateMore receives additional array elements to select from. + AccumulateMore(timestamps []int64, values []int64, tags [][]byte) + + // Result returns the item selected from the data received so far. + Result() (int64, int64, [][]byte) +} -// determineIntegerAggregateMethod returns the method for aggregating -// returned points within the same group. The incoming points are the -// ones returned for each series and the method returned here will +// The selector method takes a ( timestamp, value ) pair, a +// ( []timestamp, []value ) pair, and a starting index. It applies the selector +// to the single value and the array, starting at the supplied index. It +// returns -1 if the single value is selected and a non-negative value if an +// item from the array is selected. +type integerSelectorMethod func(int64, int64, []int64, []int64, int) int + +// The selector accumulator tracks currently-selected item. +type integerSelectorAccumulator struct { + selector integerSelectorMethod + + ts int64 + v int64 + tags [][]byte +} + +func (a *integerSelectorAccumulator) AccumulateFirst(timestamps []int64, values []int64, tags [][]byte) { + index := a.selector(timestamps[0], values[0], timestamps, values, 1) + if index < 0 { + a.ts = timestamps[0] + a.v = values[0] + } else { + a.ts = timestamps[index] + a.v = values[index] + } + a.tags = make([][]byte, len(tags)) + copy(a.tags, tags) +} + +func (a *integerSelectorAccumulator) AccumulateMore(timestamps []int64, values []int64, tags [][]byte) { + index := a.selector(a.ts, a.v, timestamps, values, 0) + if index >= 0 { + a.ts = timestamps[index] + a.v = values[index] + + if len(tags) > cap(a.tags) { + a.tags = make([][]byte, len(tags)) + } else { + a.tags = a.tags[:len(tags)] + } + copy(a.tags, tags) + } +} + +func (a *integerSelectorAccumulator) Result() (int64, int64, [][]byte) { + return a.ts, a.v, a.tags +} + +// The aggregate method takes a value, an array of values, and a starting +// index, applies an aggregate operation over the value and the array, starting +// at the given index, and returns the result. +type integerAggregateMethod func(int64, []int64, int) int64 + +type integerAggregateAccumulator struct { + aggregate integerAggregateMethod + accum int64 + + // For pure aggregates it doesn't matter what we return for tags, but + // we need to satisfy the interface. We will just return the most + // recently seen tags. + tags [][]byte +} + +func (a *integerAggregateAccumulator) AccumulateFirst(timestamps []int64, values []int64, tags [][]byte) { + a.accum = a.aggregate(values[0], values, 1) + a.tags = tags +} + +func (a *integerAggregateAccumulator) AccumulateMore(timestamps []int64, values []int64, tags [][]byte) { + a.accum = a.aggregate(a.accum, values, 0) + a.tags = tags +} + +// For group aggregates (non-selectors), the timestamp is always math.MaxInt64. +// their final result does not contain _time, so this timestamp value can be +// anything and it won't matter. +func (a *integerAggregateAccumulator) Result() (int64, int64, [][]byte) { + return math.MaxInt64, a.accum, a.tags +} + +// makeIntegerAggregateAccumulator returns the interface implementation for +// aggregating returned points within the same group. The incoming points are +// the ones returned for each series and the struct returned here will // aggregate the aggregates. -func determineIntegerAggregateMethod(agg datatypes.Aggregate_AggregateType) (integerAggregateMethod, error) { +func makeIntegerAggregateAccumulator(agg datatypes.Aggregate_AggregateType) (IntegerAggregateAccumulator, error) { switch agg { case datatypes.AggregateTypeFirst: - return aggregateFirstGroupsInteger, nil + return &integerSelectorAccumulator{selector: selectorFirstGroupsInteger}, nil case datatypes.AggregateTypeLast: - return aggregateLastGroupsInteger, nil + return &integerSelectorAccumulator{selector: selectorLastGroupsInteger}, nil case datatypes.AggregateTypeCount: - return aggregateCountGroupsInteger, nil + return &integerAggregateAccumulator{aggregate: aggregateCountGroupsInteger}, nil case datatypes.AggregateTypeSum: - return aggregateSumGroupsInteger, nil + return &integerAggregateAccumulator{aggregate: aggregateSumGroupsInteger}, nil case datatypes.AggregateTypeMin: - return aggregateMinGroupsInteger, nil + return &integerSelectorAccumulator{selector: selectorMinGroupsInteger}, nil case datatypes.AggregateTypeMax: - return aggregateMaxGroupsInteger, nil + return &integerSelectorAccumulator{selector: selectorMaxGroupsInteger}, nil default: return nil, &errors.Error{ @@ -1704,76 +1875,67 @@ func determineIntegerAggregateMethod(agg datatypes.Aggregate_AggregateType) (int } } -func aggregateMinGroupsInteger(timestamps []int64, values []int64) (int64, int64) { - value := values[0] - timestamp := timestamps[0] +func selectorMinGroupsInteger(ts int64, v int64, timestamps []int64, values []int64, i int) int { + index := -1 - for i := 1; i < len(values); i++ { - if value > values[i] { - value = values[i] - timestamp = timestamps[i] + for ; i < len(values); i++ { + if v > values[i] { + index = i + v = values[i] } } - return timestamp, value + return index } -func aggregateMaxGroupsInteger(timestamps []int64, values []int64) (int64, int64) { - value := values[0] - timestamp := timestamps[0] +func selectorMaxGroupsInteger(ts int64, v int64, timestamps []int64, values []int64, i int) int { + index := -1 - for i := 1; i < len(values); i++ { - if value < values[i] { - value = values[i] - timestamp = timestamps[i] + for ; i < len(values); i++ { + if v > values[i] { + index = i + v = values[i] } } - return timestamp, value + return index } -// For group count and sum, the timestamp here is always math.MaxInt64. -// their final result does not contain _time, so this timestamp value can be anything -// and it won't matter. - -func aggregateCountGroupsInteger(timestamps []int64, values []int64) (int64, int64) { - return aggregateSumGroupsInteger(timestamps, values) +func aggregateCountGroupsInteger(accum int64, values []int64, i int) int64 { + return aggregateSumGroupsInteger(accum, values, i) } -func aggregateSumGroupsInteger(_ []int64, values []int64) (int64, int64) { - var sum int64 - for _, v := range values { - sum += v +func aggregateSumGroupsInteger(sum int64, values []int64, i int) int64 { + for ; i < len(values); i++ { + sum += values[i] } - return math.MaxInt64, sum + return sum } -func aggregateFirstGroupsInteger(timestamps []int64, values []int64) (int64, int64) { - value := values[0] - timestamp := timestamps[0] +func selectorFirstGroupsInteger(ts int64, v int64, timestamps []int64, values []int64, i int) int { + index := -1 - for i := 1; i < len(values); i++ { - if timestamp > timestamps[i] { - value = values[i] - timestamp = timestamps[i] + for ; i < len(values); i++ { + if ts > timestamps[i] { + index = i + ts = timestamps[i] } } - return timestamp, value + return index } -func aggregateLastGroupsInteger(timestamps []int64, values []int64) (int64, int64) { - value := values[0] - timestamp := timestamps[0] +func selectorLastGroupsInteger(ts int64, v int64, timestamps []int64, values []int64, i int) int { + index := -1 - for i := 1; i < len(values); i++ { - if timestamp < timestamps[i] { - value = values[i] - timestamp = timestamps[i] + for ; i < len(values); i++ { + if ts < timestamps[i] { + index = i + ts = timestamps[i] } } - return timestamp, value + return index } func (t *integerGroupTable) advanceCursor() bool { @@ -2080,7 +2242,7 @@ func (t *unsignedWindowTable) advance() bool { if !ok { return false } - values := t.mergeValues(stop.Int64Values()) + vals := t.mergeValues(stop.Int64Values()) // Retrieve the buffer for the data to avoid allocating // additional slices. If the buffer is still being used @@ -2096,12 +2258,12 @@ func (t *unsignedWindowTable) advance() bool { cr.cols[timeColIdx] = start stop.Release() } - cr.cols[valueColIdx] = values + cr.cols[valueColIdx] = vals t.appendBounds(cr) } else { cr.cols[startColIdx] = start cr.cols[stopColIdx] = stop - cr.cols[valueColIdxWithoutTime] = values + cr.cols[valueColIdxWithoutTime] = vals } t.appendTags(cr) return true @@ -2256,31 +2418,31 @@ func (t *unsignedEmptyWindowSelectorTable) advance() bool { return false } - values := t.arrowBuilder() - values.Resize(storage.MaxPointsPerBlock) + vals := t.arrowBuilder() + vals.Resize(storage.MaxPointsPerBlock) var cr *colReader switch t.timeColumn { case execute.DefaultStartColLabel: - start := t.startTimes(values) + start := t.startTimes(vals) cr = t.allocateBuffer(start.Len()) cr.cols[timeColIdx] = start t.appendBounds(cr) case execute.DefaultStopColLabel: - stop := t.stopTimes(values) + stop := t.stopTimes(vals) cr = t.allocateBuffer(stop.Len()) cr.cols[timeColIdx] = stop t.appendBounds(cr) default: - start, stop, time := t.startStopTimes(values) + start, stop, time := t.startStopTimes(vals) cr = t.allocateBuffer(time.Len()) cr.cols[startColIdx] = start cr.cols[stopColIdx] = stop cr.cols[timeColIdx] = time } - cr.cols[valueColIdx] = values.NewUint64Array() + cr.cols[valueColIdx] = vals.NewUint64Array() t.appendTags(cr) return true } @@ -2502,11 +2664,11 @@ func (t *unsignedGroupTable) advance() bool { return false } var arr *cursors.UnsignedArray - var len int + var length int for { arr = t.cur.Next() - len = arr.Len() - if len > 0 { + length = arr.Len() + if length > 0 { break } if !t.advanceCursor() { @@ -2520,7 +2682,7 @@ func (t *unsignedGroupTable) advance() bool { // additional slices. If the buffer is still being used // because the references were retained, then we will // allocate a new buffer. - colReader := t.allocateBuffer(len) + colReader := t.allocateBuffer(length) colReader.cols[timeColIdx] = arrow.NewInt(arr.Timestamps, t.alloc) colReader.cols[valueColIdx] = t.toArrowBuffer(arr.Values) t.appendTags(colReader) @@ -2528,20 +2690,17 @@ func (t *unsignedGroupTable) advance() bool { return true } - aggregate, err := determineUnsignedAggregateMethod(t.gc.Aggregate().Type) + aggregate, err := makeUnsignedAggregateAccumulator(t.gc.Aggregate().Type) if err != nil { t.err = err return false } - ts, v := aggregate(arr.Timestamps, arr.Values) - timestamps, values := []int64{ts}, []uint64{v} + aggregate.AccumulateFirst(arr.Timestamps, arr.Values, t.tags) for { arr = t.cur.Next() if arr.Len() > 0 { - ts, v := aggregate(arr.Timestamps, arr.Values) - timestamps = append(timestamps, ts) - values = append(values, v) + aggregate.AccumulateMore(arr.Timestamps, arr.Values, t.tags) continue } @@ -2549,7 +2708,7 @@ func (t *unsignedGroupTable) advance() bool { break } } - timestamp, value := aggregate(timestamps, values) + timestamp, value, tags := aggregate.Result() colReader := t.allocateBuffer(1) if IsSelector(t.gc.Aggregate()) { @@ -2558,23 +2717,116 @@ func (t *unsignedGroupTable) advance() bool { } else { colReader.cols[valueColIdxWithoutTime] = t.toArrowBuffer([]uint64{value}) } - t.appendTags(colReader) + t.appendTheseTags(colReader, tags) t.appendBounds(colReader) return true } -type unsignedAggregateMethod func([]int64, []uint64) (int64, uint64) +type UnsignedAggregateAccumulator interface { + // AccumulateFirst receives an initial array of items to select from. + // It selects an item and stores the state. Afterwards, more data can + // be supplied with AccumulateMore and the results can be requested at + // any time. Without a call to AccumulateFirst the results are not + // defined. + AccumulateFirst(timestamps []int64, values []uint64, tags [][]byte) + + // AccumulateMore receives additional array elements to select from. + AccumulateMore(timestamps []int64, values []uint64, tags [][]byte) + + // Result returns the item selected from the data received so far. + Result() (int64, uint64, [][]byte) +} + +// The selector method takes a ( timestamp, value ) pair, a +// ( []timestamp, []value ) pair, and a starting index. It applies the selector +// to the single value and the array, starting at the supplied index. It +// returns -1 if the single value is selected and a non-negative value if an +// item from the array is selected. +type unsignedSelectorMethod func(int64, uint64, []int64, []uint64, int) int + +// The selector accumulator tracks currently-selected item. +type unsignedSelectorAccumulator struct { + selector unsignedSelectorMethod + + ts int64 + v uint64 + tags [][]byte +} + +func (a *unsignedSelectorAccumulator) AccumulateFirst(timestamps []int64, values []uint64, tags [][]byte) { + index := a.selector(timestamps[0], values[0], timestamps, values, 1) + if index < 0 { + a.ts = timestamps[0] + a.v = values[0] + } else { + a.ts = timestamps[index] + a.v = values[index] + } + a.tags = make([][]byte, len(tags)) + copy(a.tags, tags) +} + +func (a *unsignedSelectorAccumulator) AccumulateMore(timestamps []int64, values []uint64, tags [][]byte) { + index := a.selector(a.ts, a.v, timestamps, values, 0) + if index >= 0 { + a.ts = timestamps[index] + a.v = values[index] -// determineUnsignedAggregateMethod returns the method for aggregating -// returned points within the same group. The incoming points are the -// ones returned for each series and the method returned here will + if len(tags) > cap(a.tags) { + a.tags = make([][]byte, len(tags)) + } else { + a.tags = a.tags[:len(tags)] + } + copy(a.tags, tags) + } +} + +func (a *unsignedSelectorAccumulator) Result() (int64, uint64, [][]byte) { + return a.ts, a.v, a.tags +} + +// The aggregate method takes a value, an array of values, and a starting +// index, applies an aggregate operation over the value and the array, starting +// at the given index, and returns the result. +type unsignedAggregateMethod func(uint64, []uint64, int) uint64 + +type unsignedAggregateAccumulator struct { + aggregate unsignedAggregateMethod + accum uint64 + + // For pure aggregates it doesn't matter what we return for tags, but + // we need to satisfy the interface. We will just return the most + // recently seen tags. + tags [][]byte +} + +func (a *unsignedAggregateAccumulator) AccumulateFirst(timestamps []int64, values []uint64, tags [][]byte) { + a.accum = a.aggregate(values[0], values, 1) + a.tags = tags +} + +func (a *unsignedAggregateAccumulator) AccumulateMore(timestamps []int64, values []uint64, tags [][]byte) { + a.accum = a.aggregate(a.accum, values, 0) + a.tags = tags +} + +// For group aggregates (non-selectors), the timestamp is always math.MaxInt64. +// their final result does not contain _time, so this timestamp value can be +// anything and it won't matter. +func (a *unsignedAggregateAccumulator) Result() (int64, uint64, [][]byte) { + return math.MaxInt64, a.accum, a.tags +} + +// makeUnsignedAggregateAccumulator returns the interface implementation for +// aggregating returned points within the same group. The incoming points are +// the ones returned for each series and the struct returned here will // aggregate the aggregates. -func determineUnsignedAggregateMethod(agg datatypes.Aggregate_AggregateType) (unsignedAggregateMethod, error) { +func makeUnsignedAggregateAccumulator(agg datatypes.Aggregate_AggregateType) (UnsignedAggregateAccumulator, error) { switch agg { case datatypes.AggregateTypeFirst: - return aggregateFirstGroupsUnsigned, nil + return &unsignedSelectorAccumulator{selector: selectorFirstGroupsUnsigned}, nil case datatypes.AggregateTypeLast: - return aggregateLastGroupsUnsigned, nil + return &unsignedSelectorAccumulator{selector: selectorLastGroupsUnsigned}, nil case datatypes.AggregateTypeCount: return nil, &errors.Error{ @@ -2584,15 +2836,15 @@ func determineUnsignedAggregateMethod(agg datatypes.Aggregate_AggregateType) (un case datatypes.AggregateTypeSum: - return aggregateSumGroupsUnsigned, nil + return &unsignedAggregateAccumulator{aggregate: aggregateSumGroupsUnsigned}, nil case datatypes.AggregateTypeMin: - return aggregateMinGroupsUnsigned, nil + return &unsignedSelectorAccumulator{selector: selectorMinGroupsUnsigned}, nil case datatypes.AggregateTypeMax: - return aggregateMaxGroupsUnsigned, nil + return &unsignedSelectorAccumulator{selector: selectorMaxGroupsUnsigned}, nil default: return nil, &errors.Error{ @@ -2602,72 +2854,63 @@ func determineUnsignedAggregateMethod(agg datatypes.Aggregate_AggregateType) (un } } -func aggregateMinGroupsUnsigned(timestamps []int64, values []uint64) (int64, uint64) { - value := values[0] - timestamp := timestamps[0] +func selectorMinGroupsUnsigned(ts int64, v uint64, timestamps []int64, values []uint64, i int) int { + index := -1 - for i := 1; i < len(values); i++ { - if value > values[i] { - value = values[i] - timestamp = timestamps[i] + for ; i < len(values); i++ { + if v > values[i] { + index = i + v = values[i] } } - return timestamp, value + return index } -func aggregateMaxGroupsUnsigned(timestamps []int64, values []uint64) (int64, uint64) { - value := values[0] - timestamp := timestamps[0] +func selectorMaxGroupsUnsigned(ts int64, v uint64, timestamps []int64, values []uint64, i int) int { + index := -1 - for i := 1; i < len(values); i++ { - if value < values[i] { - value = values[i] - timestamp = timestamps[i] + for ; i < len(values); i++ { + if v > values[i] { + index = i + v = values[i] } } - return timestamp, value + return index } -// For group count and sum, the timestamp here is always math.MaxInt64. -// their final result does not contain _time, so this timestamp value can be anything -// and it won't matter. - -func aggregateSumGroupsUnsigned(_ []int64, values []uint64) (int64, uint64) { - var sum uint64 - for _, v := range values { - sum += v +func aggregateSumGroupsUnsigned(sum uint64, values []uint64, i int) uint64 { + for ; i < len(values); i++ { + sum += values[i] } - return math.MaxInt64, sum + return sum } -func aggregateFirstGroupsUnsigned(timestamps []int64, values []uint64) (int64, uint64) { - value := values[0] - timestamp := timestamps[0] +func selectorFirstGroupsUnsigned(ts int64, v uint64, timestamps []int64, values []uint64, i int) int { + index := -1 - for i := 1; i < len(values); i++ { - if timestamp > timestamps[i] { - value = values[i] - timestamp = timestamps[i] + for ; i < len(values); i++ { + if ts > timestamps[i] { + index = i + ts = timestamps[i] } } - return timestamp, value + return index } -func aggregateLastGroupsUnsigned(timestamps []int64, values []uint64) (int64, uint64) { - value := values[0] - timestamp := timestamps[0] +func selectorLastGroupsUnsigned(ts int64, v uint64, timestamps []int64, values []uint64, i int) int { + index := -1 - for i := 1; i < len(values); i++ { - if timestamp < timestamps[i] { - value = values[i] - timestamp = timestamps[i] + for ; i < len(values); i++ { + if ts < timestamps[i] { + index = i + ts = timestamps[i] } } - return timestamp, value + return index } func (t *unsignedGroupTable) advanceCursor() bool { @@ -2974,7 +3217,7 @@ func (t *stringWindowTable) advance() bool { if !ok { return false } - values := t.mergeValues(stop.Int64Values()) + vals := t.mergeValues(stop.Int64Values()) // Retrieve the buffer for the data to avoid allocating // additional slices. If the buffer is still being used @@ -2990,12 +3233,12 @@ func (t *stringWindowTable) advance() bool { cr.cols[timeColIdx] = start stop.Release() } - cr.cols[valueColIdx] = values + cr.cols[valueColIdx] = vals t.appendBounds(cr) } else { cr.cols[startColIdx] = start cr.cols[stopColIdx] = stop - cr.cols[valueColIdxWithoutTime] = values + cr.cols[valueColIdxWithoutTime] = vals } t.appendTags(cr) return true @@ -3150,31 +3393,31 @@ func (t *stringEmptyWindowSelectorTable) advance() bool { return false } - values := t.arrowBuilder() - values.Resize(storage.MaxPointsPerBlock) + vals := t.arrowBuilder() + vals.Resize(storage.MaxPointsPerBlock) var cr *colReader switch t.timeColumn { case execute.DefaultStartColLabel: - start := t.startTimes(values) + start := t.startTimes(vals) cr = t.allocateBuffer(start.Len()) cr.cols[timeColIdx] = start t.appendBounds(cr) case execute.DefaultStopColLabel: - stop := t.stopTimes(values) + stop := t.stopTimes(vals) cr = t.allocateBuffer(stop.Len()) cr.cols[timeColIdx] = stop t.appendBounds(cr) default: - start, stop, time := t.startStopTimes(values) + start, stop, time := t.startStopTimes(vals) cr = t.allocateBuffer(time.Len()) cr.cols[startColIdx] = start cr.cols[stopColIdx] = stop cr.cols[timeColIdx] = time } - cr.cols[valueColIdx] = values.NewBinaryArray() + cr.cols[valueColIdx] = vals.NewBinaryArray() t.appendTags(cr) return true } @@ -3396,11 +3639,11 @@ func (t *stringGroupTable) advance() bool { return false } var arr *cursors.StringArray - var len int + var length int for { arr = t.cur.Next() - len = arr.Len() - if len > 0 { + length = arr.Len() + if length > 0 { break } if !t.advanceCursor() { @@ -3414,7 +3657,7 @@ func (t *stringGroupTable) advance() bool { // additional slices. If the buffer is still being used // because the references were retained, then we will // allocate a new buffer. - colReader := t.allocateBuffer(len) + colReader := t.allocateBuffer(length) colReader.cols[timeColIdx] = arrow.NewInt(arr.Timestamps, t.alloc) colReader.cols[valueColIdx] = t.toArrowBuffer(arr.Values) t.appendTags(colReader) @@ -3422,20 +3665,17 @@ func (t *stringGroupTable) advance() bool { return true } - aggregate, err := determineStringAggregateMethod(t.gc.Aggregate().Type) + aggregate, err := makeStringAggregateAccumulator(t.gc.Aggregate().Type) if err != nil { t.err = err return false } - ts, v := aggregate(arr.Timestamps, arr.Values) - timestamps, values := []int64{ts}, []string{v} + aggregate.AccumulateFirst(arr.Timestamps, arr.Values, t.tags) for { arr = t.cur.Next() if arr.Len() > 0 { - ts, v := aggregate(arr.Timestamps, arr.Values) - timestamps = append(timestamps, ts) - values = append(values, v) + aggregate.AccumulateMore(arr.Timestamps, arr.Values, t.tags) continue } @@ -3443,7 +3683,7 @@ func (t *stringGroupTable) advance() bool { break } } - timestamp, value := aggregate(timestamps, values) + timestamp, value, tags := aggregate.Result() colReader := t.allocateBuffer(1) if IsSelector(t.gc.Aggregate()) { @@ -3452,23 +3692,84 @@ func (t *stringGroupTable) advance() bool { } else { colReader.cols[valueColIdxWithoutTime] = t.toArrowBuffer([]string{value}) } - t.appendTags(colReader) + t.appendTheseTags(colReader, tags) t.appendBounds(colReader) return true } -type stringAggregateMethod func([]int64, []string) (int64, string) +type StringAggregateAccumulator interface { + // AccumulateFirst receives an initial array of items to select from. + // It selects an item and stores the state. Afterwards, more data can + // be supplied with AccumulateMore and the results can be requested at + // any time. Without a call to AccumulateFirst the results are not + // defined. + AccumulateFirst(timestamps []int64, values []string, tags [][]byte) -// determineStringAggregateMethod returns the method for aggregating -// returned points within the same group. The incoming points are the -// ones returned for each series and the method returned here will + // AccumulateMore receives additional array elements to select from. + AccumulateMore(timestamps []int64, values []string, tags [][]byte) + + // Result returns the item selected from the data received so far. + Result() (int64, string, [][]byte) +} + +// The selector method takes a ( timestamp, value ) pair, a +// ( []timestamp, []value ) pair, and a starting index. It applies the selector +// to the single value and the array, starting at the supplied index. It +// returns -1 if the single value is selected and a non-negative value if an +// item from the array is selected. +type stringSelectorMethod func(int64, string, []int64, []string, int) int + +// The selector accumulator tracks currently-selected item. +type stringSelectorAccumulator struct { + selector stringSelectorMethod + + ts int64 + v string + tags [][]byte +} + +func (a *stringSelectorAccumulator) AccumulateFirst(timestamps []int64, values []string, tags [][]byte) { + index := a.selector(timestamps[0], values[0], timestamps, values, 1) + if index < 0 { + a.ts = timestamps[0] + a.v = values[0] + } else { + a.ts = timestamps[index] + a.v = values[index] + } + a.tags = make([][]byte, len(tags)) + copy(a.tags, tags) +} + +func (a *stringSelectorAccumulator) AccumulateMore(timestamps []int64, values []string, tags [][]byte) { + index := a.selector(a.ts, a.v, timestamps, values, 0) + if index >= 0 { + a.ts = timestamps[index] + a.v = values[index] + + if len(tags) > cap(a.tags) { + a.tags = make([][]byte, len(tags)) + } else { + a.tags = a.tags[:len(tags)] + } + copy(a.tags, tags) + } +} + +func (a *stringSelectorAccumulator) Result() (int64, string, [][]byte) { + return a.ts, a.v, a.tags +} + +// makeStringAggregateAccumulator returns the interface implementation for +// aggregating returned points within the same group. The incoming points are +// the ones returned for each series and the struct returned here will // aggregate the aggregates. -func determineStringAggregateMethod(agg datatypes.Aggregate_AggregateType) (stringAggregateMethod, error) { +func makeStringAggregateAccumulator(agg datatypes.Aggregate_AggregateType) (StringAggregateAccumulator, error) { switch agg { case datatypes.AggregateTypeFirst: - return aggregateFirstGroupsString, nil + return &stringSelectorAccumulator{selector: selectorFirstGroupsString}, nil case datatypes.AggregateTypeLast: - return aggregateLastGroupsString, nil + return &stringSelectorAccumulator{selector: selectorLastGroupsString}, nil case datatypes.AggregateTypeCount: return nil, &errors.Error{ @@ -3505,36 +3806,30 @@ func determineStringAggregateMethod(agg datatypes.Aggregate_AggregateType) (stri } } -// For group count and sum, the timestamp here is always math.MaxInt64. -// their final result does not contain _time, so this timestamp value can be anything -// and it won't matter. - -func aggregateFirstGroupsString(timestamps []int64, values []string) (int64, string) { - value := values[0] - timestamp := timestamps[0] +func selectorFirstGroupsString(ts int64, v string, timestamps []int64, values []string, i int) int { + index := -1 - for i := 1; i < len(values); i++ { - if timestamp > timestamps[i] { - value = values[i] - timestamp = timestamps[i] + for ; i < len(values); i++ { + if ts > timestamps[i] { + index = i + ts = timestamps[i] } } - return timestamp, value + return index } -func aggregateLastGroupsString(timestamps []int64, values []string) (int64, string) { - value := values[0] - timestamp := timestamps[0] +func selectorLastGroupsString(ts int64, v string, timestamps []int64, values []string, i int) int { + index := -1 - for i := 1; i < len(values); i++ { - if timestamp < timestamps[i] { - value = values[i] - timestamp = timestamps[i] + for ; i < len(values); i++ { + if ts < timestamps[i] { + index = i + ts = timestamps[i] } } - return timestamp, value + return index } func (t *stringGroupTable) advanceCursor() bool { @@ -3841,7 +4136,7 @@ func (t *booleanWindowTable) advance() bool { if !ok { return false } - values := t.mergeValues(stop.Int64Values()) + vals := t.mergeValues(stop.Int64Values()) // Retrieve the buffer for the data to avoid allocating // additional slices. If the buffer is still being used @@ -3857,12 +4152,12 @@ func (t *booleanWindowTable) advance() bool { cr.cols[timeColIdx] = start stop.Release() } - cr.cols[valueColIdx] = values + cr.cols[valueColIdx] = vals t.appendBounds(cr) } else { cr.cols[startColIdx] = start cr.cols[stopColIdx] = stop - cr.cols[valueColIdxWithoutTime] = values + cr.cols[valueColIdxWithoutTime] = vals } t.appendTags(cr) return true @@ -4017,31 +4312,31 @@ func (t *booleanEmptyWindowSelectorTable) advance() bool { return false } - values := t.arrowBuilder() - values.Resize(storage.MaxPointsPerBlock) + vals := t.arrowBuilder() + vals.Resize(storage.MaxPointsPerBlock) var cr *colReader switch t.timeColumn { case execute.DefaultStartColLabel: - start := t.startTimes(values) + start := t.startTimes(vals) cr = t.allocateBuffer(start.Len()) cr.cols[timeColIdx] = start t.appendBounds(cr) case execute.DefaultStopColLabel: - stop := t.stopTimes(values) + stop := t.stopTimes(vals) cr = t.allocateBuffer(stop.Len()) cr.cols[timeColIdx] = stop t.appendBounds(cr) default: - start, stop, time := t.startStopTimes(values) + start, stop, time := t.startStopTimes(vals) cr = t.allocateBuffer(time.Len()) cr.cols[startColIdx] = start cr.cols[stopColIdx] = stop cr.cols[timeColIdx] = time } - cr.cols[valueColIdx] = values.NewBooleanArray() + cr.cols[valueColIdx] = vals.NewBooleanArray() t.appendTags(cr) return true } @@ -4263,11 +4558,11 @@ func (t *booleanGroupTable) advance() bool { return false } var arr *cursors.BooleanArray - var len int + var length int for { arr = t.cur.Next() - len = arr.Len() - if len > 0 { + length = arr.Len() + if length > 0 { break } if !t.advanceCursor() { @@ -4281,7 +4576,7 @@ func (t *booleanGroupTable) advance() bool { // additional slices. If the buffer is still being used // because the references were retained, then we will // allocate a new buffer. - colReader := t.allocateBuffer(len) + colReader := t.allocateBuffer(length) colReader.cols[timeColIdx] = arrow.NewInt(arr.Timestamps, t.alloc) colReader.cols[valueColIdx] = t.toArrowBuffer(arr.Values) t.appendTags(colReader) @@ -4289,20 +4584,17 @@ func (t *booleanGroupTable) advance() bool { return true } - aggregate, err := determineBooleanAggregateMethod(t.gc.Aggregate().Type) + aggregate, err := makeBooleanAggregateAccumulator(t.gc.Aggregate().Type) if err != nil { t.err = err return false } - ts, v := aggregate(arr.Timestamps, arr.Values) - timestamps, values := []int64{ts}, []bool{v} + aggregate.AccumulateFirst(arr.Timestamps, arr.Values, t.tags) for { arr = t.cur.Next() if arr.Len() > 0 { - ts, v := aggregate(arr.Timestamps, arr.Values) - timestamps = append(timestamps, ts) - values = append(values, v) + aggregate.AccumulateMore(arr.Timestamps, arr.Values, t.tags) continue } @@ -4310,7 +4602,7 @@ func (t *booleanGroupTable) advance() bool { break } } - timestamp, value := aggregate(timestamps, values) + timestamp, value, tags := aggregate.Result() colReader := t.allocateBuffer(1) if IsSelector(t.gc.Aggregate()) { @@ -4319,23 +4611,84 @@ func (t *booleanGroupTable) advance() bool { } else { colReader.cols[valueColIdxWithoutTime] = t.toArrowBuffer([]bool{value}) } - t.appendTags(colReader) + t.appendTheseTags(colReader, tags) t.appendBounds(colReader) return true } -type booleanAggregateMethod func([]int64, []bool) (int64, bool) +type BooleanAggregateAccumulator interface { + // AccumulateFirst receives an initial array of items to select from. + // It selects an item and stores the state. Afterwards, more data can + // be supplied with AccumulateMore and the results can be requested at + // any time. Without a call to AccumulateFirst the results are not + // defined. + AccumulateFirst(timestamps []int64, values []bool, tags [][]byte) + + // AccumulateMore receives additional array elements to select from. + AccumulateMore(timestamps []int64, values []bool, tags [][]byte) + + // Result returns the item selected from the data received so far. + Result() (int64, bool, [][]byte) +} + +// The selector method takes a ( timestamp, value ) pair, a +// ( []timestamp, []value ) pair, and a starting index. It applies the selector +// to the single value and the array, starting at the supplied index. It +// returns -1 if the single value is selected and a non-negative value if an +// item from the array is selected. +type booleanSelectorMethod func(int64, bool, []int64, []bool, int) int -// determineBooleanAggregateMethod returns the method for aggregating -// returned points within the same group. The incoming points are the -// ones returned for each series and the method returned here will +// The selector accumulator tracks currently-selected item. +type booleanSelectorAccumulator struct { + selector booleanSelectorMethod + + ts int64 + v bool + tags [][]byte +} + +func (a *booleanSelectorAccumulator) AccumulateFirst(timestamps []int64, values []bool, tags [][]byte) { + index := a.selector(timestamps[0], values[0], timestamps, values, 1) + if index < 0 { + a.ts = timestamps[0] + a.v = values[0] + } else { + a.ts = timestamps[index] + a.v = values[index] + } + a.tags = make([][]byte, len(tags)) + copy(a.tags, tags) +} + +func (a *booleanSelectorAccumulator) AccumulateMore(timestamps []int64, values []bool, tags [][]byte) { + index := a.selector(a.ts, a.v, timestamps, values, 0) + if index >= 0 { + a.ts = timestamps[index] + a.v = values[index] + + if len(tags) > cap(a.tags) { + a.tags = make([][]byte, len(tags)) + } else { + a.tags = a.tags[:len(tags)] + } + copy(a.tags, tags) + } +} + +func (a *booleanSelectorAccumulator) Result() (int64, bool, [][]byte) { + return a.ts, a.v, a.tags +} + +// makeBooleanAggregateAccumulator returns the interface implementation for +// aggregating returned points within the same group. The incoming points are +// the ones returned for each series and the struct returned here will // aggregate the aggregates. -func determineBooleanAggregateMethod(agg datatypes.Aggregate_AggregateType) (booleanAggregateMethod, error) { +func makeBooleanAggregateAccumulator(agg datatypes.Aggregate_AggregateType) (BooleanAggregateAccumulator, error) { switch agg { case datatypes.AggregateTypeFirst: - return aggregateFirstGroupsBoolean, nil + return &booleanSelectorAccumulator{selector: selectorFirstGroupsBoolean}, nil case datatypes.AggregateTypeLast: - return aggregateLastGroupsBoolean, nil + return &booleanSelectorAccumulator{selector: selectorLastGroupsBoolean}, nil case datatypes.AggregateTypeCount: return nil, &errors.Error{ @@ -4372,36 +4725,30 @@ func determineBooleanAggregateMethod(agg datatypes.Aggregate_AggregateType) (boo } } -// For group count and sum, the timestamp here is always math.MaxInt64. -// their final result does not contain _time, so this timestamp value can be anything -// and it won't matter. - -func aggregateFirstGroupsBoolean(timestamps []int64, values []bool) (int64, bool) { - value := values[0] - timestamp := timestamps[0] +func selectorFirstGroupsBoolean(ts int64, v bool, timestamps []int64, values []bool, i int) int { + index := -1 - for i := 1; i < len(values); i++ { - if timestamp > timestamps[i] { - value = values[i] - timestamp = timestamps[i] + for ; i < len(values); i++ { + if ts > timestamps[i] { + index = i + ts = timestamps[i] } } - return timestamp, value + return index } -func aggregateLastGroupsBoolean(timestamps []int64, values []bool) (int64, bool) { - value := values[0] - timestamp := timestamps[0] +func selectorLastGroupsBoolean(ts int64, v bool, timestamps []int64, values []bool, i int) int { + index := -1 - for i := 1; i < len(values); i++ { - if timestamp < timestamps[i] { - value = values[i] - timestamp = timestamps[i] + for ; i < len(values); i++ { + if ts < timestamps[i] { + index = i + ts = timestamps[i] } } - return timestamp, value + return index } func (t *booleanGroupTable) advanceCursor() bool { diff --git a/storage/flux/table.gen.go.tmpl b/storage/flux/table.gen.go.tmpl index e6c168967c7..d7b320f2a06 100644 --- a/storage/flux/table.gen.go.tmpl +++ b/storage/flux/table.gen.go.tmpl @@ -17,6 +17,7 @@ import ( "github.com/influxdata/influxdb/v2/storage/reads/datatypes" storage "github.com/influxdata/influxdb/v2/storage/reads" "github.com/influxdata/influxdb/v2/tsdb/cursors" + "github.com/influxdata/influxdb/v2/storage/reads/datatypes" ) {{range .}} // @@ -733,20 +734,17 @@ func (t *{{.name}}GroupTable) advance() bool { return true } - aggregate, err := determine{{.Name}}AggregateMethod(t.gc.Aggregate().Type) + aggregate, err := make{{.Name}}AggregateAccumulator(t.gc.Aggregate().Type) if err != nil { t.err = err return false } - ts, v := aggregate(arr.Timestamps, arr.Values) - timestamps, values := []int64{ts}, []{{.Type}}{v} + aggregate.AccumulateFirst(arr.Timestamps, arr.Values, t.tags) for { arr = t.cur.Next() if arr.Len() > 0 { - ts, v := aggregate(arr.Timestamps, arr.Values) - timestamps = append(timestamps, ts) - values = append(values, v) + aggregate.AccumulateMore(arr.Timestamps, arr.Values, t.tags) continue } @@ -754,7 +752,7 @@ func (t *{{.name}}GroupTable) advance() bool { break } } - timestamp, value := aggregate(timestamps, values) + timestamp, value, tags := aggregate.Result() colReader := t.allocateBuffer(1) if IsSelector(t.gc.Aggregate()) { @@ -763,26 +761,123 @@ func (t *{{.name}}GroupTable) advance() bool { } else { colReader.cols[valueColIdxWithoutTime] = t.toArrowBuffer([]{{.Type}}{value}) } - t.appendTags(colReader) + t.appendTheseTags(colReader, tags) t.appendBounds(colReader) return true } -type {{.name}}AggregateMethod func([]int64, []{{.Type}}) (int64, {{.Type}}) +type {{.Name}}AggregateAccumulator interface { + // AccumulateFirst receives an initial array of items to select from. + // It selects an item and stores the state. Afterwards, more data can + // be supplied with AccumulateMore and the results can be requested at + // any time. Without a call to AccumulateFirst the results are not + // defined. + AccumulateFirst(timestamps []int64, values []{{.Type}}, tags [][]byte) + + // AccumulateMore receives additional array elements to select from. + AccumulateMore(timestamps []int64, values []{{.Type}}, tags [][]byte) + + // Result returns the item selected from the data received so far. + Result() (int64, {{.Type}}, [][]byte) +} + +// The selector method takes a ( timestamp, value ) pair, a +// ( []timestamp, []value ) pair, and a starting index. It applies the selector +// to the single value and the array, starting at the supplied index. It +// returns -1 if the single value is selected and a non-negative value if an +// item from the array is selected. +type {{.name}}SelectorMethod func(int64, {{.Type}}, []int64, []{{.Type}}, int) (int) + +// The selector accumulator tracks currently-selected item. +type {{.name}}SelectorAccumulator struct { + selector {{.name}}SelectorMethod + + ts int64 + v {{.Type}} + tags [][]byte +} + +func (a *{{.name}}SelectorAccumulator) AccumulateFirst(timestamps []int64, values []{{.Type}}, tags [][]byte) { + index := a.selector(timestamps[0], values[0], timestamps, values, 1) + if index < 0 { + a.ts = timestamps[0] + a.v = values[0] + } else { + a.ts = timestamps[index] + a.v = values[index] + } + a.tags = make([][]byte, len(tags)) + copy(a.tags, tags) +} + +func (a *{{.name}}SelectorAccumulator) AccumulateMore(timestamps []int64, values []{{.Type}}, tags [][]byte) { + index := a.selector(a.ts, a.v, timestamps, values, 0) + if index >= 0 { + a.ts = timestamps[index] + a.v = values[index] + + if len(tags) > cap(a.tags) { + a.tags = make([][]byte, len(tags)) + } else { + a.tags = a.tags[:len(tags)] + } + copy(a.tags, tags) + } +} + +func (a *{{.name}}SelectorAccumulator) Result() (int64, {{.Type}}, [][]byte) { + return a.ts, a.v, a.tags +} + +{{if and (ne .Name "Boolean") (ne .Name "String")}} + +// The aggregate method takes a value, an array of values, and a starting +// index, applies an aggregate operation over the value and the array, starting +// at the given index, and returns the result. +type {{.name}}AggregateMethod func({{.Type}}, []{{.Type}}, int) ({{.Type}}) + +type {{.name}}AggregateAccumulator struct { + aggregate {{.name}}AggregateMethod + accum {{.Type}} + + // For pure aggregates it doesn't matter what we return for tags, but + // we need to satisfy the interface. We will just return the most + // recently seen tags. + tags [][]byte +} + +func (a *{{.name}}AggregateAccumulator) AccumulateFirst(timestamps []int64, values []{{.Type}}, tags [][]byte) { + a.accum = a.aggregate(values[0], values, 1) + a.tags = tags +} + +func (a *{{.name}}AggregateAccumulator) AccumulateMore(timestamps []int64, values []{{.Type}}, tags [][]byte) { + a.accum = a.aggregate(a.accum, values, 0) + a.tags = tags +} + +// For group aggregates (non-selectors), the timestamp is always math.MaxInt64. +// their final result does not contain _time, so this timestamp value can be +// anything and it won't matter. +func (a *{{.name}}AggregateAccumulator) Result() (int64, {{.Type}}, [][]byte) { + return math.MaxInt64, a.accum, a.tags +} + +{{end}} -// determine{{.Name}}AggregateMethod returns the method for aggregating -// returned points within the same group. The incoming points are the -// ones returned for each series and the method returned here will +// make{{.Name}}AggregateAccumulator returns the interface implementation for +// aggregating returned points within the same group. The incoming points are +// the ones returned for each series and the struct returned here will // aggregate the aggregates. -func determine{{.Name}}AggregateMethod(agg datatypes.Aggregate_AggregateType) ({{.name}}AggregateMethod, error){ +func make{{.Name}}AggregateAccumulator(agg datatypes.Aggregate_AggregateType) ({{.Name}}AggregateAccumulator, error){ switch agg { case datatypes.AggregateTypeFirst: - return aggregateFirstGroups{{.Name}}, nil + return &{{.name}}SelectorAccumulator{selector: selectorFirstGroups{{.Name}}}, nil case datatypes.AggregateTypeLast: - return aggregateLastGroups{{.Name}}, nil + return &{{.name}}SelectorAccumulator{selector: selectorLastGroups{{.Name}}}, nil case datatypes.AggregateTypeCount: {{if eq .Name "Integer"}} - return aggregateCountGroups{{.Name}}, nil + return &{{.name}}AggregateAccumulator{aggregate: aggregateCountGroups{{.Name}}}, nil {{else}} return nil, &errors.Error { Code: errors.EInvalid, @@ -791,7 +886,7 @@ func determine{{.Name}}AggregateMethod(agg datatypes.Aggregate_AggregateType) ({ {{end}} case datatypes.AggregateTypeSum: {{if and (ne .Name "Boolean") (ne .Name "String")}} - return aggregateSumGroups{{.Name}}, nil + return &{{.name}}AggregateAccumulator{aggregate: aggregateSumGroups{{.Name}}}, nil {{else}} return nil, &errors.Error { Code: errors.EInvalid, @@ -800,7 +895,7 @@ func determine{{.Name}}AggregateMethod(agg datatypes.Aggregate_AggregateType) ({ {{end}} case datatypes.AggregateTypeMin: {{if and (ne .Name "Boolean") (ne .Name "String")}} - return aggregateMinGroups{{.Name}}, nil + return &{{.name}}SelectorAccumulator{selector: selectorMinGroups{{.Name}}}, nil {{else}} return nil, &errors.Error { Code: errors.EInvalid, @@ -809,7 +904,7 @@ func determine{{.Name}}AggregateMethod(agg datatypes.Aggregate_AggregateType) ({ {{end}} case datatypes.AggregateTypeMax: {{if and (ne .Name "Boolean") (ne .Name "String")}} - return aggregateMaxGroups{{.Name}}, nil + return &{{.name}}SelectorAccumulator{selector: selectorMaxGroups{{.Name}}}, nil {{else}} return nil, &errors.Error { Code: errors.EInvalid, @@ -825,82 +920,74 @@ func determine{{.Name}}AggregateMethod(agg datatypes.Aggregate_AggregateType) ({ } {{if and (ne .Name "Boolean") (ne .Name "String")}} -func aggregateMinGroups{{.Name}}(timestamps []int64, values []{{.Type}}) (int64, {{.Type}}) { - value := values[0] - timestamp := timestamps[0] +func selectorMinGroups{{.Name}}(ts int64, v {{.Type}}, timestamps []int64, values []{{.Type}}, i int) (int) { + index := -1 - for i := 1; i < len(values); i++ { - if value > values[i] { - value = values[i] - timestamp = timestamps[i] + for ; i < len(values); i++ { + if v > values[i] { + index = i + v = values[i] } } - return timestamp, value + return index } {{end}} {{if and (ne .Name "Boolean") (ne .Name "String")}} -func aggregateMaxGroups{{.Name}}(timestamps []int64, values []{{.Type}}) (int64, {{.Type}}) { - value := values[0] - timestamp := timestamps[0] +func selectorMaxGroups{{.Name}}(ts int64, v {{.Type}}, timestamps []int64, values []{{.Type}}, i int) (int) { + index := -1 - for i := 1; i < len(values); i++ { - if value < values[i] { - value = values[i] - timestamp = timestamps[i] + for ; i < len(values); i++ { + if v > values[i] { + index = i + v = values[i] } } - return timestamp, value + return index } {{end}} -// For group count and sum, the timestamp here is always math.MaxInt64. -// their final result does not contain _time, so this timestamp value can be anything -// and it won't matter. {{if eq .Name "Integer"}} -func aggregateCountGroups{{.Name}}(timestamps []int64, values []{{.Type}}) (int64, {{.Type}}) { - return aggregateSumGroups{{.Name}}(timestamps, values) +func aggregateCountGroups{{.Name}}(accum {{.Type}}, values []{{.Type}}, i int) ({{.Type}}) { + return aggregateSumGroups{{.Name}}(accum, values, i) } {{end}} {{if and (ne .Name "Boolean") (ne .Name "String")}} -func aggregateSumGroups{{.Name}}(_ []int64, values []{{.Type}}) (int64, {{.Type}}) { - var sum {{.Type}} - for _, v := range values { - sum += v +func aggregateSumGroups{{.Name}}(sum {{.Type}}, values []{{.Type}}, i int) ({{.Type}}) { + for ; i< len(values); i++ { + sum += values[i] } - return math.MaxInt64, sum + return sum } {{end}} -func aggregateFirstGroups{{.Name}}(timestamps []int64, values []{{.Type}}) (int64, {{.Type}}) { - value := values[0] - timestamp := timestamps[0] +func selectorFirstGroups{{.Name}}(ts int64, v {{.Type}}, timestamps []int64, values []{{.Type}}, i int) (int) { + index := -1 - for i := 1; i < len(values); i++ { - if timestamp > timestamps[i] { - value = values[i] - timestamp = timestamps[i] + for ; i < len(values); i++ { + if ts > timestamps[i] { + index = i + ts = timestamps[i] } } - return timestamp, value + return index } -func aggregateLastGroups{{.Name}}(timestamps []int64, values []{{.Type}}) (int64, {{.Type}}) { - value := values[0] - timestamp := timestamps[0] +func selectorLastGroups{{.Name}}(ts int64, v {{.Type}}, timestamps []int64, values []{{.Type}}, i int) (int) { + index := -1 - for i := 1; i < len(values); i++ { - if timestamp < timestamps[i] { - value = values[i] - timestamp = timestamps[i] + for ; i < len(values); i++ { + if ts < timestamps[i] { + index = i + ts = timestamps[i] } } - return timestamp, value + return index } func (t *{{.name}}GroupTable) advanceCursor() bool { diff --git a/storage/flux/table.go b/storage/flux/table.go index 51738024474..4df88db5f39 100644 --- a/storage/flux/table.go +++ b/storage/flux/table.go @@ -211,16 +211,21 @@ func (t *table) readTags(tags models.Tags) { } } -// appendTags fills the colBufs for the tag columns with the tag value. -func (t *table) appendTags(cr *colReader) { +// appendTheseTags fills the colBufs for the tag columns with the given tag values. +func (t *table) appendTheseTags(cr *colReader, tags [][]byte) { for j := range t.cols { - v := t.tags[j] + v := tags[j] if v != nil { cr.cols[j] = t.cache.GetTag(string(v), cr.l, t.alloc) } } } +// appendTags fills the colBufs for the tag columns with the tag values from the table structure. +func (t *table) appendTags(cr *colReader) { + t.appendTheseTags(cr, t.tags) +} + // appendBounds fills the colBufs for the time bounds func (t *table) appendBounds(cr *colReader) { start, stop := t.cache.GetBounds(t.bounds, cr.l, t.alloc)