Skip to content

Commit

Permalink
executor: control Chunk size for TopN&Sort (#9364)
Browse files Browse the repository at this point in the history
  • Loading branch information
qw4990 authored Feb 21, 2019
1 parent 46c1cf1 commit 7a24081
Show file tree
Hide file tree
Showing 2 changed files with 188 additions and 6 deletions.
183 changes: 183 additions & 0 deletions executor/executor_required_rows_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,19 @@ package executor
import (
"context"
"fmt"
"math"
"math/rand"

"github.com/cznic/mathutil"
. "github.com/pingcap/check"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb/expression"
plannercore "github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/memory"
"github.com/pingcap/tidb/util/mock"
)

Expand Down Expand Up @@ -184,5 +187,185 @@ func defaultCtx() sessionctx.Context {
ctx := mock.NewContext()
ctx.GetSessionVars().InitChunkSize = variable.DefInitChunkSize
ctx.GetSessionVars().MaxChunkSize = variable.DefMaxChunkSize
ctx.GetSessionVars().MemQuotaSort = variable.DefTiDBMemQuotaSort
ctx.GetSessionVars().StmtCtx.MemTracker = memory.NewTracker("", ctx.GetSessionVars().MemQuotaQuery)
return ctx
}

func (s *testExecSuite) TestSortRequiredRows(c *C) {
maxChunkSize := defaultCtx().GetSessionVars().MaxChunkSize
testCases := []struct {
totalRows int
groupBy []int
requiredRows []int
expectedRows []int
expectedRowsDS []int
}{
{
totalRows: 10,
groupBy: []int{0},
requiredRows: []int{1, 5, 3, 10},
expectedRows: []int{1, 5, 3, 1},
expectedRowsDS: []int{10, 0},
},
{
totalRows: 10,
groupBy: []int{0, 1},
requiredRows: []int{1, 5, 3, 10},
expectedRows: []int{1, 5, 3, 1},
expectedRowsDS: []int{10, 0},
},
{
totalRows: maxChunkSize + 1,
groupBy: []int{0},
requiredRows: []int{1, 5, 3, 10, maxChunkSize},
expectedRows: []int{1, 5, 3, 10, (maxChunkSize + 1) - 1 - 5 - 3 - 10},
expectedRowsDS: []int{maxChunkSize, 1, 0},
},
{
totalRows: 3*maxChunkSize + 1,
groupBy: []int{0},
requiredRows: []int{1, 5, 3, 10, maxChunkSize},
expectedRows: []int{1, 5, 3, 10, maxChunkSize},
expectedRowsDS: []int{maxChunkSize, maxChunkSize, maxChunkSize, 1, 0},
},
}

for _, testCase := range testCases {
sctx := defaultCtx()
ctx := context.Background()
ds := newRequiredRowsDataSource(sctx, testCase.totalRows, testCase.expectedRowsDS)
byItems := make([]*plannercore.ByItems, 0, len(testCase.groupBy))
for _, groupBy := range testCase.groupBy {
col := ds.Schema().Columns[groupBy]
byItems = append(byItems, &plannercore.ByItems{Expr: col})
}
exec := buildSortExec(sctx, byItems, ds)
c.Assert(exec.Open(ctx), IsNil)
chk := exec.newFirstChunk()
for i := range testCase.requiredRows {
chk.SetRequiredRows(testCase.requiredRows[i], maxChunkSize)
c.Assert(exec.Next(ctx, chunk.NewRecordBatch(chk)), IsNil)
c.Assert(chk.NumRows(), Equals, testCase.expectedRows[i])
}
c.Assert(ds.checkNumNextCalled(), IsNil)
}
}

func buildSortExec(sctx sessionctx.Context, byItems []*plannercore.ByItems, src Executor) Executor {
sortExec := SortExec{
baseExecutor: newBaseExecutor(sctx, src.Schema(), "", src),
ByItems: byItems,
schema: src.Schema(),
}
return &sortExec
}

func (s *testExecSuite) TestTopNRequiredRows(c *C) {
maxChunkSize := defaultCtx().GetSessionVars().MaxChunkSize
testCases := []struct {
totalRows int
topNOffset int
topNCount int
groupBy []int
requiredRows []int
expectedRows []int
expectedRowsDS []int
}{
{
totalRows: 10,
topNOffset: 0,
topNCount: 10,
groupBy: []int{0},
requiredRows: []int{1, 1, 1, 1, 10},
expectedRows: []int{1, 1, 1, 1, 6},
expectedRowsDS: []int{10, 0},
},
{
totalRows: 100,
topNOffset: 15,
topNCount: 11,
groupBy: []int{0},
requiredRows: []int{1, 1, 1, 1, 10},
expectedRows: []int{1, 1, 1, 1, 7},
expectedRowsDS: []int{26, 100 - 26, 0},
},
{
totalRows: 100,
topNOffset: 95,
topNCount: 10,
groupBy: []int{0},
requiredRows: []int{1, 2, 3, 10},
expectedRows: []int{1, 2, 2, 0},
expectedRowsDS: []int{100, 0, 0},
},
{
totalRows: maxChunkSize + 20,
topNOffset: 1,
topNCount: 5,
groupBy: []int{0, 1},
requiredRows: []int{1, 3, 7, 10},
expectedRows: []int{1, 3, 1, 0},
expectedRowsDS: []int{6, maxChunkSize, 14, 0},
},
{
totalRows: maxChunkSize + maxChunkSize + 20,
topNOffset: maxChunkSize + 10,
topNCount: 8,
groupBy: []int{0, 1},
requiredRows: []int{1, 2, 3, 5, 7},
expectedRows: []int{1, 2, 3, 2, 0},
expectedRowsDS: []int{maxChunkSize, 18, maxChunkSize, 2, 0},
},
{
totalRows: maxChunkSize*5 + 10,
topNOffset: maxChunkSize*5 + 20,
topNCount: 10,
groupBy: []int{0, 1},
requiredRows: []int{1, 2, 3},
expectedRows: []int{0, 0, 0},
expectedRowsDS: []int{maxChunkSize, maxChunkSize, maxChunkSize, maxChunkSize, maxChunkSize, 10, 0, 0},
},
{
totalRows: maxChunkSize + maxChunkSize + 10,
topNOffset: 10,
topNCount: math.MaxInt64,
groupBy: []int{0, 1},
requiredRows: []int{1, 2, 3, maxChunkSize, maxChunkSize},
expectedRows: []int{1, 2, 3, maxChunkSize, maxChunkSize - 1 - 2 - 3},
expectedRowsDS: []int{maxChunkSize, maxChunkSize, 10, 0, 0},
},
}

for _, testCase := range testCases {
sctx := defaultCtx()
ctx := context.Background()
ds := newRequiredRowsDataSource(sctx, testCase.totalRows, testCase.expectedRowsDS)
byItems := make([]*plannercore.ByItems, 0, len(testCase.groupBy))
for _, groupBy := range testCase.groupBy {
col := ds.Schema().Columns[groupBy]
byItems = append(byItems, &plannercore.ByItems{Expr: col})
}
exec := buildTopNExec(sctx, testCase.topNOffset, testCase.topNCount, byItems, ds)
c.Assert(exec.Open(ctx), IsNil)
chk := exec.newFirstChunk()
for i := range testCase.requiredRows {
chk.SetRequiredRows(testCase.requiredRows[i], maxChunkSize)
c.Assert(exec.Next(ctx, chunk.NewRecordBatch(chk)), IsNil)
c.Assert(chk.NumRows(), Equals, testCase.expectedRows[i])
}
c.Assert(ds.checkNumNextCalled(), IsNil)
}
}

func buildTopNExec(ctx sessionctx.Context, offset, count int, byItems []*plannercore.ByItems, src Executor) Executor {
sortExec := SortExec{
baseExecutor: newBaseExecutor(ctx, src.Schema(), "", src),
ByItems: byItems,
schema: src.Schema(),
}
return &TopNExec{
SortExec: sortExec,
limit: &plannercore.PhysicalLimit{Count: uint64(count), Offset: uint64(offset)},
}
}
11 changes: 5 additions & 6 deletions executor/sort.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
"sort"
"time"

opentracing "github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go"
"github.com/pingcap/errors"
"github.com/pingcap/tidb/expression"
plannercore "github.com/pingcap/tidb/planner/core"
Expand Down Expand Up @@ -93,10 +93,7 @@ func (e *SortExec) Next(ctx context.Context, req *chunk.RecordBatch) error {
sort.Slice(e.rowPtrs, e.keyColumnsLess)
e.fetched = true
}
for req.NumRows() < e.maxChunkSize {
if e.Idx >= len(e.rowPtrs) {
return nil
}
for !req.IsFull() && e.Idx < len(e.rowPtrs) {
rowPtr := e.rowPtrs[e.Idx]
req.AppendRow(e.rowChunks.GetRow(rowPtr))
e.Idx++
Expand Down Expand Up @@ -265,7 +262,7 @@ func (e *TopNExec) Next(ctx context.Context, req *chunk.RecordBatch) error {
if e.Idx >= len(e.rowPtrs) {
return nil
}
for req.NumRows() < e.maxChunkSize && e.Idx < len(e.rowPtrs) {
for !req.IsFull() && e.Idx < len(e.rowPtrs) {
row := e.rowChunks.GetRow(e.rowPtrs[e.Idx])
req.AppendRow(row)
e.Idx++
Expand All @@ -280,6 +277,8 @@ func (e *TopNExec) loadChunksUntilTotalLimit(ctx context.Context) error {
e.rowChunks.GetMemTracker().SetLabel("rowChunks")
for uint64(e.rowChunks.Len()) < e.totalLimit {
srcChk := e.children[0].newFirstChunk()
// adjust required rows by total limit
srcChk.SetRequiredRows(int(e.totalLimit-uint64(e.rowChunks.Len())), e.maxChunkSize)
err := e.children[0].Next(ctx, chunk.NewRecordBatch(srcChk))
if err != nil {
return errors.Trace(err)
Expand Down

0 comments on commit 7a24081

Please sign in to comment.