From 4ae5be190b0017675deedd9af8b80d8868e03f0e Mon Sep 17 00:00:00 2001 From: xufei Date: Fri, 17 Mar 2023 09:50:39 +0800 Subject: [PATCH] planner: add 2pb logic and fix some bugs for partitionTopN (#42334) ref pingcap/tidb#39792, close pingcap/tidb#42321 --- .../testdata/derive_topn_from_window_out.json | 6 ++-- planner/core/explain.go | 28 +++++++++++-------- planner/core/logical_plans.go | 1 + planner/core/plan_to_pb.go | 8 ++++++ planner/core/resolve_indices.go | 23 +++++++++++++++ planner/core/task.go | 4 +-- 6 files changed, 53 insertions(+), 17 deletions(-) diff --git a/planner/core/casetest/testdata/derive_topn_from_window_out.json b/planner/core/casetest/testdata/derive_topn_from_window_out.json index c96e47f30b924..671ae700e0ebf 100644 --- a/planner/core/casetest/testdata/derive_topn_from_window_out.json +++ b/planner/core/casetest/testdata/derive_topn_from_window_out.json @@ -371,7 +371,7 @@ " └─Window 1.00 root row_number()->Column#4 over(partition by test.t.b rows between current row and current row)", " └─Sort 1.00 root test.t.b", " └─TableReader 1.00 root data:Limit", - " └─Limit 1.00 cop[tikv] offset:0, count:1", + " └─Limit 1.00 cop[tikv] partition by test.t.b, offset:0, count:1", " └─TableFullScan 1.00 cop[tikv] table:t keep order:false, stats:pseudo" ], "Res": [ @@ -403,7 +403,7 @@ " └─Window 3.00 root row_number()->Column#4 over(partition by test.t.b rows between current row and current row)", " └─Sort 3.00 root test.t.b", " └─TableReader 3.00 root data:Limit", - " └─Limit 3.00 cop[tikv] offset:0, count:3", + " └─Limit 3.00 cop[tikv] partition by test.t.b, offset:0, count:3", " └─Selection 3.00 cop[tikv] ge(test.t.a, 2)", " └─TableFullScan 9.00 cop[tikv] table:t keep order:false, stats:pseudo" ], @@ -452,7 +452,7 @@ " └─Window 1.00 root row_number()->Column#4 over(partition by test.td.b rows between current row and current row)", " └─Sort 1.00 root test.td.b", " └─TableReader 1.00 root data:Limit", - " └─Limit 1.00 cop[tikv] offset:0, count:1", + " └─Limit 1.00 cop[tikv] partition by test.td.b, offset:0, count:1", " └─TableFullScan 1.00 cop[tikv] table:td keep order:false, stats:pseudo" ], "Res": [ diff --git a/planner/core/explain.go b/planner/core/explain.go index 17cc1f505e72d..446ee5e311e48 100644 --- a/planner/core/explain.go +++ b/planner/core/explain.go @@ -364,12 +364,14 @@ func (p *PhysicalSort) ExplainInfo() string { // ExplainInfo implements Plan interface. func (p *PhysicalLimit) ExplainInfo() string { - var str strings.Builder - str.WriteString("offset:") - str.WriteString(strconv.FormatUint(p.Offset, 10)) - str.WriteString(", count:") - str.WriteString(strconv.FormatUint(p.Count, 10)) - return str.String() + buffer := bytes.NewBufferString("") + if len(p.GetPartitionBy()) > 0 { + buffer = explainPartitionBy(buffer, p.GetPartitionBy(), false) + fmt.Fprintf(buffer, ", offset:%v, count:%v", p.Offset, p.Count) + } else { + fmt.Fprintf(buffer, "offset:%v, count:%v", p.Offset, p.Count) + } + return buffer.String() } // ExplainInfo implements Plan interface. @@ -960,12 +962,14 @@ func (lt *LogicalTopN) ExplainInfo() string { // ExplainInfo implements Plan interface. func (p *LogicalLimit) ExplainInfo() string { - var str strings.Builder - str.WriteString("offset:") - str.WriteString(strconv.FormatUint(p.Offset, 10)) - str.WriteString(", count:") - str.WriteString(strconv.FormatUint(p.Count, 10)) - return str.String() + buffer := bytes.NewBufferString("") + if len(p.GetPartitionBy()) > 0 { + buffer = explainPartitionBy(buffer, p.GetPartitionBy(), false) + fmt.Fprintf(buffer, ", offset:%v, count:%v", p.Offset, p.Count) + } else { + fmt.Fprintf(buffer, "offset:%v, count:%v", p.Offset, p.Count) + } + return buffer.String() } // ExplainInfo implements Plan interface. diff --git a/planner/core/logical_plans.go b/planner/core/logical_plans.go index 935c3ad887591..29c5c09e09f73 100644 --- a/planner/core/logical_plans.go +++ b/planner/core/logical_plans.go @@ -163,6 +163,7 @@ type LogicalJoin struct { rightPreferJoinType uint EqualConditions []*expression.ScalarFunction + // NAEQConditions means null aware equal conditions, which is used for null aware semi joins. NAEQConditions []*expression.ScalarFunction LeftConditions expression.CNFExprs RightConditions expression.CNFExprs diff --git a/planner/core/plan_to_pb.go b/planner/core/plan_to_pb.go index 45952ac7e0006..a51ced1664b75 100644 --- a/planner/core/plan_to_pb.go +++ b/planner/core/plan_to_pb.go @@ -180,6 +180,9 @@ func (p *PhysicalTopN) ToPB(ctx sessionctx.Context, storeType kv.StoreType) (*ti for _, item := range p.ByItems { topNExec.OrderBy = append(topNExec.OrderBy, expression.SortByItemToPB(sc, client, item.Expr, item.Desc)) } + for _, item := range p.PartitionBy { + topNExec.PartitionBy = append(topNExec.PartitionBy, expression.SortByItemToPB(sc, client, item.Col.Clone(), item.Desc)) + } executorID := "" if storeType == kv.TiFlash { var err error @@ -194,10 +197,15 @@ func (p *PhysicalTopN) ToPB(ctx sessionctx.Context, storeType kv.StoreType) (*ti // ToPB implements PhysicalPlan ToPB interface. func (p *PhysicalLimit) ToPB(ctx sessionctx.Context, storeType kv.StoreType) (*tipb.Executor, error) { + sc := ctx.GetSessionVars().StmtCtx + client := ctx.GetClient() limitExec := &tipb.Limit{ Limit: p.Count, } executorID := "" + for _, item := range p.PartitionBy { + limitExec.PartitionBy = append(limitExec.PartitionBy, expression.SortByItemToPB(sc, client, item.Col.Clone(), item.Desc)) + } if storeType == kv.TiFlash { var err error limitExec.Child, err = p.children[0].ToPB(ctx, storeType) diff --git a/planner/core/resolve_indices.go b/planner/core/resolve_indices.go index a38644ec07564..f3cb210cf5f88 100644 --- a/planner/core/resolve_indices.go +++ b/planner/core/resolve_indices.go @@ -584,6 +584,29 @@ func (p *PhysicalTopN) ResolveIndices() (err error) { return err } } + for i, item := range p.PartitionBy { + newCol, err := item.Col.ResolveIndices(p.children[0].Schema()) + if err != nil { + return err + } + p.PartitionBy[i].Col = newCol.(*expression.Column) + } + return +} + +// ResolveIndices implements Plan interface. +func (p *PhysicalLimit) ResolveIndices() (err error) { + err = p.basePhysicalPlan.ResolveIndices() + if err != nil { + return err + } + for i, item := range p.PartitionBy { + newCol, err := item.Col.ResolveIndices(p.children[0].Schema()) + if err != nil { + return err + } + p.PartitionBy[i].Col = newCol.(*expression.Column) + } return } diff --git a/planner/core/task.go b/planner/core/task.go index 163faa9b492f9..6b4d1370d9153 100644 --- a/planner/core/task.go +++ b/planner/core/task.go @@ -836,7 +836,7 @@ func (p *PhysicalLimit) attach2Task(tasks ...task) task { // Strictly speaking, for the row count of stats, we should multiply newCount with "regionNum", // but "regionNum" is unknown since the copTask can be a double read, so we ignore it now. stats := deriveLimitStats(childProfile, float64(newCount)) - pushedDownLimit := PhysicalLimit{Count: newCount}.Init(p.ctx, stats, p.blockOffset) + pushedDownLimit := PhysicalLimit{PartitionBy: newPartitionBy, Count: newCount}.Init(p.ctx, stats, p.blockOffset) cop = attachPlan2Task(pushedDownLimit, cop).(*copTask) // Don't use clone() so that Limit and its children share the same schema. Otherwise the virtual generated column may not be resolved right. pushedDownLimit.SetSchema(pushedDownLimit.children[0].Schema()) @@ -851,7 +851,7 @@ func (p *PhysicalLimit) attach2Task(tasks ...task) task { for _, partialScan := range cop.idxMergePartPlans { childProfile := partialScan.statsInfo() stats := deriveLimitStats(childProfile, float64(newCount)) - pushedDownLimit := PhysicalLimit{Count: newCount}.Init(p.ctx, stats, p.blockOffset) + pushedDownLimit := PhysicalLimit{PartitionBy: newPartitionBy, Count: newCount}.Init(p.ctx, stats, p.blockOffset) pushedDownLimit.SetChildren(partialScan) pushedDownLimit.SetSchema(pushedDownLimit.children[0].Schema()) limitChildren = append(limitChildren, pushedDownLimit)