Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

server: fix decode issue for prefetch point plan index keys #50037

Merged
merged 5 commits into from
Jan 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pkg/server/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ go_library(
"//pkg/util/logutil",
"//pkg/util/memory",
"//pkg/util/printer",
"//pkg/util/resourcegrouptag",
"//pkg/util/sqlexec",
"//pkg/util/sqlkiller",
"//pkg/util/sys/linux",
Expand Down Expand Up @@ -117,6 +118,7 @@ go_library(
"@com_github_soheilhy_cmux//:cmux",
"@com_github_stretchr_testify//require",
"@com_github_tiancaiamao_appdash//traceapp",
"@com_github_tikv_client_go_v2//tikvrpc",
"@com_github_tikv_client_go_v2//util",
"@com_sourcegraph_sourcegraph_appdash_data//:appdash-data",
"@org_golang_google_grpc//:grpc",
Expand Down
32 changes: 29 additions & 3 deletions pkg/server/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,10 +102,13 @@ import (
"github.com/pingcap/tidb/pkg/util/hack"
"github.com/pingcap/tidb/pkg/util/intest"
"github.com/pingcap/tidb/pkg/util/logutil"
"github.com/pingcap/tidb/pkg/util/resourcegrouptag"
tlsutil "github.com/pingcap/tidb/pkg/util/tls"
"github.com/pingcap/tidb/pkg/util/topsql"
topsqlstate "github.com/pingcap/tidb/pkg/util/topsql/state"
"github.com/pingcap/tidb/pkg/util/tracing"
"github.com/prometheus/client_golang/prometheus"
"github.com/tikv/client-go/v2/tikvrpc"
"github.com/tikv/client-go/v2/util"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -1739,7 +1742,7 @@ func (cc *clientConn) handleQuery(ctx context.Context, sql string) (err error) {
cc.ctx.GetSessionVars().InMultiStmts = true

// Only pre-build point plans for multi-statement query
pointPlans, err = cc.prefetchPointPlanKeys(ctx, stmts)
pointPlans, err = cc.prefetchPointPlanKeys(ctx, stmts, sql)
if err != nil {
for _, stmt := range stmts {
cc.onExtensionStmtEnd(stmt, false, err)
Expand Down Expand Up @@ -1815,7 +1818,7 @@ func (cc *clientConn) handleQuery(ctx context.Context, sql string) (err error) {
// prefetchPointPlanKeys extracts the point keys in multi-statement query,
// use BatchGet to get the keys, so the values will be cached in the snapshot cache, save RPC call cost.
// For pessimistic transaction, the keys will be batch locked.
func (cc *clientConn) prefetchPointPlanKeys(ctx context.Context, stmts []ast.StmtNode) ([]plannercore.Plan, error) {
func (cc *clientConn) prefetchPointPlanKeys(ctx context.Context, stmts []ast.StmtNode, sqls string) ([]plannercore.Plan, error) {
txn, err := cc.ctx.Txn(false)
if err != nil {
return nil, err
Expand All @@ -1839,6 +1842,7 @@ func (cc *clientConn) prefetchPointPlanKeys(ctx context.Context, stmts []ast.Stm
pointPlans := make([]plannercore.Plan, len(stmts))
var idxKeys []kv.Key //nolint: prealloc
var rowKeys []kv.Key //nolint: prealloc
isCommonHandle := make(map[string]bool, 0)

handlePlan := func(p plannercore.PhysicalPlan, resetStmtCtxFn func()) error {
var tableID int64
Expand All @@ -1856,6 +1860,7 @@ func (cc *clientConn) prefetchPointPlanKeys(ctx context.Context, stmts []ast.Stm
return err1
}
idxKeys = append(idxKeys, idxKey)
isCommonHandle[string(hack.String(idxKey))] = v.TblInfo.IsCommonHandle
} else {
rowKeys = append(rowKeys, tablecodec.EncodeRowKeyWithHandle(tableID, v.Handle))
}
Expand All @@ -1878,6 +1883,7 @@ func (cc *clientConn) prefetchPointPlanKeys(ctx context.Context, stmts []ast.Stm
return err1
}
idxKeys = append(idxKeys, idxKey)
isCommonHandle[string(hack.String(idxKey))] = v.TblInfo.IsCommonHandle
}
} else {
for i, handle := range v.Handles {
Expand Down Expand Up @@ -1942,12 +1948,14 @@ func (cc *clientConn) prefetchPointPlanKeys(ctx context.Context, stmts []ast.Stm
return pointPlans, nil
}
snapshot := txn.GetSnapshot()
setResourceGroupTaggerForMultiStmtPrefetch(snapshot, sqls)
idxVals, err1 := snapshot.BatchGet(ctx, idxKeys)
if err1 != nil {
return nil, err1
}
for idxKey, idxVal := range idxVals {
h, err2 := tablecodec.DecodeHandleInUniqueIndexValue(idxVal, false)
isCommonHd := isCommonHandle[idxKey]
h, err2 := tablecodec.DecodeHandleInUniqueIndexValue(idxVal, isCommonHd)
if err2 != nil {
return nil, err2
}
Expand All @@ -1971,6 +1979,24 @@ func (cc *clientConn) prefetchPointPlanKeys(ctx context.Context, stmts []ast.Stm
return pointPlans, nil
}

func setResourceGroupTaggerForMultiStmtPrefetch(snapshot kv.Snapshot, sqls string) {
if !topsqlstate.TopSQLEnabled() {
return
}
normalized, digest := parser.NormalizeDigest(sqls)
topsql.AttachAndRegisterSQLInfo(context.Background(), normalized, digest, false)
snapshot.SetOption(kv.ResourceGroupTagger, tikvrpc.ResourceGroupTagger(func(req *tikvrpc.Request) {
if req == nil {
return
}
if len(normalized) == 0 {
return
}
req.ResourceGroupTag = resourcegrouptag.EncodeResourceGroupTag(digest, nil,
resourcegrouptag.GetResourceGroupLabelByKey(resourcegrouptag.GetFirstKeyFromRequest(req)))
}))
}

// The first return value indicates whether the call of handleStmt has no side effect and can be retried.
// Currently, the first return value is used to fall back to TiKV when TiFlash is down.
func (cc *clientConn) handleStmt(
Expand Down
12 changes: 12 additions & 0 deletions pkg/server/internal/testserverclient/server_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2279,6 +2279,18 @@ func (cli *TestServerClient) RunTestMultiStatements(t *testing.T) {
// the create table + drop table statements will return errors.
dbt.MustExec("CREATE DATABASE multistmtuse")
dbt.MustExec("use multistmtuse; create table if not exists t1 (id int); drop table t1;")

// Test issue #50012
dbt.MustExec("create database if not exists test;")
dbt.MustExec("use test;")
dbt.MustExec("CREATE TABLE t (a bigint(20), b int(10), PRIMARY KEY (b, a), UNIQUE KEY uk_a (a));")
dbt.MustExec("insert into t values (1, 1);")
dbt.MustExec("begin;")
rs := dbt.MustQuery("delete from t where a = 1; select 1;")
rs.Close()
rs = dbt.MustQuery("update t set b = 2 where a = 1; select 1;")
rs.Close()
dbt.MustExec("commit;")
})
}

Expand Down