Skip to content

Commit

Permalink
server: fix decode issue for prefetch point plan index keys (#50037) (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Feb 27, 2024
1 parent 97894f2 commit 01a5573
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 3 deletions.
2 changes: 2 additions & 0 deletions server/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ go_library(
"//util/pdapi",
"//util/printer",
"//util/replayer",
"//util/resourcegrouptag",
"//util/sqlexec",
"//util/sys/linux",
"//util/timeutil",
Expand Down Expand Up @@ -121,6 +122,7 @@ go_library(
"@com_github_tiancaiamao_appdash//traceapp",
"@com_github_tikv_client_go_v2//oracle",
"@com_github_tikv_client_go_v2//tikv",
"@com_github_tikv_client_go_v2//tikvrpc",
"@com_github_tikv_client_go_v2//util",
"@com_sourcegraph_sourcegraph_appdash_data//:appdash-data",
"@org_golang_google_grpc//:grpc",
Expand Down
32 changes: 29 additions & 3 deletions server/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,10 +90,13 @@ import (
"github.com/pingcap/tidb/util/hack"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/memory"
"github.com/pingcap/tidb/util/resourcegrouptag"
tlsutil "github.com/pingcap/tidb/util/tls"
"github.com/pingcap/tidb/util/topsql"
topsqlstate "github.com/pingcap/tidb/util/topsql/state"
"github.com/pingcap/tidb/util/tracing"
"github.com/prometheus/client_golang/prometheus"
"github.com/tikv/client-go/v2/tikvrpc"
"github.com/tikv/client-go/v2/util"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -1865,7 +1868,7 @@ func (cc *clientConn) handleQuery(ctx context.Context, sql string) (err error) {
cc.ctx.GetSessionVars().InMultiStmts = true

// Only pre-build point plans for multi-statement query
pointPlans, err = cc.prefetchPointPlanKeys(ctx, stmts)
pointPlans, err = cc.prefetchPointPlanKeys(ctx, stmts, sql)
if err != nil {
for _, stmt := range stmts {
cc.onExtensionStmtEnd(stmt, false, err)
Expand Down Expand Up @@ -1941,7 +1944,7 @@ func (cc *clientConn) handleQuery(ctx context.Context, sql string) (err error) {
// prefetchPointPlanKeys extracts the point keys in multi-statement query,
// use BatchGet to get the keys, so the values will be cached in the snapshot cache, save RPC call cost.
// For pessimistic transaction, the keys will be batch locked.
func (cc *clientConn) prefetchPointPlanKeys(ctx context.Context, stmts []ast.StmtNode) ([]plannercore.Plan, error) {
func (cc *clientConn) prefetchPointPlanKeys(ctx context.Context, stmts []ast.StmtNode, sqls string) ([]plannercore.Plan, error) {
txn, err := cc.ctx.Txn(false)
if err != nil {
return nil, err
Expand All @@ -1965,6 +1968,7 @@ func (cc *clientConn) prefetchPointPlanKeys(ctx context.Context, stmts []ast.Stm
pointPlans := make([]plannercore.Plan, len(stmts))
var idxKeys []kv.Key //nolint: prealloc
var rowKeys []kv.Key //nolint: prealloc
isCommonHandle := make(map[string]bool, 0)

handlePlan := func(p plannercore.PhysicalPlan, resetStmtCtxFn func()) error {
var tableID int64
Expand All @@ -1982,6 +1986,7 @@ func (cc *clientConn) prefetchPointPlanKeys(ctx context.Context, stmts []ast.Stm
return err1
}
idxKeys = append(idxKeys, idxKey)
isCommonHandle[string(hack.String(idxKey))] = v.TblInfo.IsCommonHandle
} else {
rowKeys = append(rowKeys, tablecodec.EncodeRowKeyWithHandle(tableID, v.Handle))
}
Expand All @@ -2004,6 +2009,7 @@ func (cc *clientConn) prefetchPointPlanKeys(ctx context.Context, stmts []ast.Stm
return err1
}
idxKeys = append(idxKeys, idxKey)
isCommonHandle[string(hack.String(idxKey))] = v.TblInfo.IsCommonHandle
}
} else {
for i, handle := range v.Handles {
Expand Down Expand Up @@ -2068,12 +2074,14 @@ func (cc *clientConn) prefetchPointPlanKeys(ctx context.Context, stmts []ast.Stm
return pointPlans, nil
}
snapshot := txn.GetSnapshot()
setResourceGroupTaggerForMultiStmtPrefetch(snapshot, sqls)
idxVals, err1 := snapshot.BatchGet(ctx, idxKeys)
if err1 != nil {
return nil, err1
}
for idxKey, idxVal := range idxVals {
h, err2 := tablecodec.DecodeHandleInUniqueIndexValue(idxVal, false)
isCommonHd := isCommonHandle[idxKey]
h, err2 := tablecodec.DecodeHandleInUniqueIndexValue(idxVal, isCommonHd)
if err2 != nil {
return nil, err2
}
Expand All @@ -2097,6 +2105,24 @@ func (cc *clientConn) prefetchPointPlanKeys(ctx context.Context, stmts []ast.Stm
return pointPlans, nil
}

func setResourceGroupTaggerForMultiStmtPrefetch(snapshot kv.Snapshot, sqls string) {
if !topsqlstate.TopSQLEnabled() {
return
}
normalized, digest := parser.NormalizeDigest(sqls)
topsql.AttachAndRegisterSQLInfo(context.Background(), normalized, digest, false)
snapshot.SetOption(kv.ResourceGroupTagger, tikvrpc.ResourceGroupTagger(func(req *tikvrpc.Request) {
if req == nil {
return
}
if len(normalized) == 0 {
return
}
req.ResourceGroupTag = resourcegrouptag.EncodeResourceGroupTag(digest, nil,
resourcegrouptag.GetResourceGroupLabelByKey(resourcegrouptag.GetFirstKeyFromRequest(req)))
}))
}

// The first return value indicates whether the call of handleStmt has no side effect and can be retried.
// Currently, the first return value is used to fall back to TiKV when TiFlash is down.
func (cc *clientConn) handleStmt(ctx context.Context, stmt ast.StmtNode, warns []stmtctx.SQLWarn, lastStmt bool) (bool, error) {
Expand Down
12 changes: 12 additions & 0 deletions server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2154,6 +2154,18 @@ func (cli *testServerClient) runTestMultiStatements(t *testing.T) {
// the create table + drop table statements will return errors.
dbt.MustExec("CREATE DATABASE multistmtuse")
dbt.MustExec("use multistmtuse; create table if not exists t1 (id int); drop table t1;")

// Test issue #50012
dbt.MustExec("create database if not exists test;")
dbt.MustExec("use test;")
dbt.MustExec("CREATE TABLE t (a bigint(20), b int(10), PRIMARY KEY (b, a), UNIQUE KEY uk_a (a));")
dbt.MustExec("insert into t values (1, 1);")
dbt.MustExec("begin;")
rs := dbt.MustQuery("delete from t where a = 1; select 1;")
rs.Close()
rs = dbt.MustQuery("update t set b = 2 where a = 1; select 1;")
rs.Close()
dbt.MustExec("commit;")
})
}

Expand Down

0 comments on commit 01a5573

Please sign in to comment.