From 18b5df5b7d784c1699f020b3ac62e61850bdb0c2 Mon Sep 17 00:00:00 2001 From: MyonKeminta <9948422+MyonKeminta@users.noreply.github.com> Date: Thu, 19 Dec 2024 19:08:12 +0800 Subject: [PATCH] This is an automated cherry-pick of #58054 Signed-off-by: ti-chi-bot --- DEPS.bzl | 10 +++ go.mod | 7 ++ go.sum | 11 +++ pkg/ddl/column_change_test.go | 56 ++++++++++++-- pkg/ddl/column_test.go | 82 ++++++++++++++++++++- pkg/ddl/db_integration_test.go | 25 ++++--- pkg/ddl/ddl_worker_test.go | 4 +- pkg/ddl/index_change_test.go | 4 +- pkg/executor/set.go | 3 +- pkg/executor/test/executor/executor_test.go | 3 +- pkg/executor/test/writetest/BUILD.bazel | 3 + pkg/executor/test/writetest/write_test.go | 9 ++- pkg/planner/core/planbuilder.go | 4 +- pkg/sessionctx/context.go | 9 ++- pkg/sessiontxn/staleread/processor.go | 2 +- pkg/sessiontxn/staleread/util.go | 2 +- pkg/store/copr/BUILD.bazel | 1 + pkg/store/copr/batch_coprocessor.go | 2 +- pkg/store/copr/batch_request_sender.go | 5 +- pkg/store/copr/mpp.go | 2 +- pkg/util/localpool/localpool_race.go | 11 +++ pkg/util/mock/BUILD.bazel | 1 + pkg/util/mock/context.go | 56 +++++++++++--- 23 files changed, 264 insertions(+), 48 deletions(-) diff --git a/DEPS.bzl b/DEPS.bzl index 0ddba0a8564b3..05de734487847 100644 --- a/DEPS.bzl +++ b/DEPS.bzl @@ -7041,6 +7041,7 @@ def go_deps(): name = "com_github_tikv_client_go_v2", build_file_proto_mode = "disable_global", importpath = "github.com/tikv/client-go/v2", +<<<<<<< HEAD sha256 = "587d22d21daa1f44b18b0c2325fcb4233af71a985f397a36aa9db8796777b8f2", strip_prefix = "github.com/tikv/client-go/v2@v2.0.8-0.20241212025239-0dc41295f929", urls = [ @@ -7048,6 +7049,15 @@ def go_deps(): "http://ats.apps.svc/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20241212025239-0dc41295f929.zip", "https://cache.hawkingrei.com/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20241212025239-0dc41295f929.zip", "https://storage.googleapis.com/pingcapmirror/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20241212025239-0dc41295f929.zip", +======= + sha256 = "844684ee6ae7decc5cadcab3f95c526b66878f8401c71cf82af68ec0cc5257d5", + strip_prefix = "github.com/tikv/client-go/v2@v2.0.8-0.20241209094930-06d7f4b9233b", + urls = [ + "http://bazel-cache.pingcap.net:8080/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20241209094930-06d7f4b9233b.zip", + "http://ats.apps.svc/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20241209094930-06d7f4b9233b.zip", + "https://cache.hawkingrei.com/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20241209094930-06d7f4b9233b.zip", + "https://storage.googleapis.com/pingcapmirror/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20241209094930-06d7f4b9233b.zip", +>>>>>>> 0bf3e019002 (*: Update client-go and verify all read ts (#58054)) ], ) go_repository( diff --git a/go.mod b/go.mod index 7e213519ed566..57ea7df07f601 100644 --- a/go.mod +++ b/go.mod @@ -104,9 +104,16 @@ require ( github.com/stretchr/testify v1.9.0 github.com/tdakkota/asciicheck v0.2.0 github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2 +<<<<<<< HEAD github.com/tikv/client-go/v2 v2.0.8-0.20241212025239-0dc41295f929 github.com/tikv/pd/client v0.0.0-20240724132535-fcb34c90790c github.com/timakin/bodyclose v0.0.0-20230421092635-574207250966 +======= + github.com/tidwall/btree v1.7.0 + github.com/tikv/client-go/v2 v2.0.8-0.20241209094930-06d7f4b9233b + github.com/tikv/pd/client v0.0.0-20241111073742-238d4d79ea31 + github.com/timakin/bodyclose v0.0.0-20240125160201-f835fa56326a +>>>>>>> 0bf3e019002 (*: Update client-go and verify all read ts (#58054)) github.com/twmb/murmur3 v1.1.6 github.com/uber/jaeger-client-go v2.22.1+incompatible github.com/vbauerster/mpb/v7 v7.5.3 diff --git a/go.sum b/go.sum index b394d17476f08..ba08fb30a7091 100644 --- a/go.sum +++ b/go.sum @@ -996,12 +996,23 @@ github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2 h1:mbAskLJ0oJf github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2/go.mod h1:2PfKggNGDuadAa0LElHrByyrz4JPZ9fFx6Gs7nx7ZZU= github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a h1:J/YdBZ46WKpXsxsW93SG+q0F8KI+yFrcIDT4c/RNoc4= github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a/go.mod h1:h4xBhSNtOeEosLJ4P7JyKXX7Cabg7AVkWCK5gV2vOrM= +<<<<<<< HEAD github.com/tikv/client-go/v2 v2.0.8-0.20241212025239-0dc41295f929 h1:dvY5kl35L+aroDl3HQzOx4J7N6sdd8TiXEV+GhNemtU= github.com/tikv/client-go/v2 v2.0.8-0.20241212025239-0dc41295f929/go.mod h1:37p0ryKaieJbBpVDWnaPi2ZS6UFqkgpsemBLkGX2FvM= github.com/tikv/pd/client v0.0.0-20240724132535-fcb34c90790c h1:oZygf/SCdTUhjoHuZRE85EBgK0oA6LjikpWuJqqjM8U= github.com/tikv/pd/client v0.0.0-20240724132535-fcb34c90790c/go.mod h1:NW6Af689Jw1FDxjq+WL0nqOdmQ1XT0ly2R1SIKfQuUw= github.com/timakin/bodyclose v0.0.0-20230421092635-574207250966 h1:quvGphlmUVU+nhpFa4gg4yJyTRJ13reZMDHrKwYw53M= github.com/timakin/bodyclose v0.0.0-20230421092635-574207250966/go.mod h1:27bSVNWSBOHm+qRp1T9qzaIpsWEP6TbUnei/43HK+PQ= +======= +github.com/tidwall/btree v1.7.0 h1:L1fkJH/AuEh5zBnnBbmTwQ5Lt+bRJ5A8EWecslvo9iI= +github.com/tidwall/btree v1.7.0/go.mod h1:twD9XRA5jj9VUQGELzDO4HPQTNJsoWWfYEL+EUQ2cKY= +github.com/tikv/client-go/v2 v2.0.8-0.20241209094930-06d7f4b9233b h1:x8E2J8UuUa2ysUkgVfNGgiXxZ9nfqBpQ43PBLwmCitU= +github.com/tikv/client-go/v2 v2.0.8-0.20241209094930-06d7f4b9233b/go.mod h1:NI2GfVlB9n7DsIGCxrKcD4psrcuFNEV8m1BgyzK1Amc= +github.com/tikv/pd/client v0.0.0-20241111073742-238d4d79ea31 h1:oAYc4m5Eu1OY9ogJ103VO47AYPHvhtzbUPD8L8B67Qk= +github.com/tikv/pd/client v0.0.0-20241111073742-238d4d79ea31/go.mod h1:W5a0sDadwUpI9k8p7M77d3jo253ZHdmua+u4Ho4Xw8U= +github.com/timakin/bodyclose v0.0.0-20240125160201-f835fa56326a h1:A6uKudFIfAEpoPdaal3aSqGxBzLyU8TqyXImLwo6dIo= +github.com/timakin/bodyclose v0.0.0-20240125160201-f835fa56326a/go.mod h1:mkjARE7Yr8qU23YcGMSALbIxTQ9r9QBVahQOBRfU460= +>>>>>>> 0bf3e019002 (*: Update client-go and verify all read ts (#58054)) github.com/tklauser/go-sysconf v0.3.9/go.mod h1:11DU/5sG7UexIrp/O6g35hrWzu0JxlwQ3LSFUzyeuhs= github.com/tklauser/go-sysconf v0.3.12 h1:0QaGUFOdQaIVdPgfITYzaTegZvdCjmYO52cSFAEVmqU= github.com/tklauser/go-sysconf v0.3.12/go.mod h1:Ho14jnntGE1fpdOqQEEaiKRpvIavV0hSfmBq8nJbHYI= diff --git a/pkg/ddl/column_change_test.go b/pkg/ddl/column_change_test.go index 36462cb063c9a..ffe3864e56ef6 100644 --- a/pkg/ddl/column_change_test.go +++ b/pkg/ddl/column_change_test.go @@ -35,7 +35,6 @@ import ( "github.com/pingcap/tidb/pkg/testkit" "github.com/pingcap/tidb/pkg/testkit/external" "github.com/pingcap/tidb/pkg/types" - "github.com/pingcap/tidb/pkg/util/mock" "github.com/stretchr/testify/require" ) @@ -48,10 +47,14 @@ func TestColumnAdd(t *testing.T) { tk.MustExec("create table t (c1 int, c2 int);") tk.MustExec("insert t values (1, 2);") +<<<<<<< HEAD d := dom.DDL() tc := &callback.TestDDLCallback{Do: dom} ct := testNewContext(store) +======= + ct := testNewContext(t, store) +>>>>>>> 0bf3e019002 (*: Update client-go and verify all read ts (#58054)) // set up hook var ( deleteOnlyTable table.Table @@ -127,8 +130,13 @@ func TestColumnAdd(t *testing.T) { return } first = false +<<<<<<< HEAD sess := testNewContext(store) err := sessiontxn.NewTxn(context.Background(), sess) +======= + sess := testNewContext(t, store) + txn, err := newTxn(sess) +>>>>>>> 0bf3e019002 (*: Update client-go and verify all read ts (#58054)) require.NoError(t, err) _, err = writeOnlyTable.AddRecord(sess, types.MakeDatums(10, 10)) require.NoError(t, err) @@ -223,7 +231,15 @@ func checkAddWriteOnly(ctx sessionctx.Context, deleteOnlyTable, writeOnlyTable t if err != nil { return errors.Trace(err) } +<<<<<<< HEAD err = sessiontxn.NewTxn(context.Background(), ctx) +======= + err = txn.Commit(context.Background()) + if err != nil { + return errors.Trace(err) + } + txn, err = newTxn(ctx) +>>>>>>> 0bf3e019002 (*: Update client-go and verify all read ts (#58054)) if err != nil { return errors.Trace(err) } @@ -261,7 +277,15 @@ func checkAddWriteOnly(ctx sessionctx.Context, deleteOnlyTable, writeOnlyTable t if err != nil { return errors.Trace(err) } +<<<<<<< HEAD err = sessiontxn.NewTxn(context.Background(), ctx) +======= + err = txn.Commit(context.Background()) + if err != nil { + return errors.Trace(err) + } + txn, err = newTxn(ctx) +>>>>>>> 0bf3e019002 (*: Update client-go and verify all read ts (#58054)) if err != nil { return errors.Trace(err) } @@ -278,7 +302,15 @@ func checkAddWriteOnly(ctx sessionctx.Context, deleteOnlyTable, writeOnlyTable t if err != nil { return errors.Trace(err) } +<<<<<<< HEAD err = sessiontxn.NewTxn(context.Background(), ctx) +======= + err = txn.Commit(context.Background()) + if err != nil { + return errors.Trace(err) + } + _, err = newTxn(ctx) +>>>>>>> 0bf3e019002 (*: Update client-go and verify all read ts (#58054)) if err != nil { return errors.Trace(err) } @@ -308,7 +340,15 @@ func checkAddPublic(sctx sessionctx.Context, writeOnlyTable, publicTable table.T if err != nil { return errors.Trace(err) } +<<<<<<< HEAD err = sessiontxn.NewTxn(ctx, sctx) +======= + err = txn.Commit(context.Background()) + if err != nil { + return errors.Trace(err) + } + txn, err = newTxn(sctx) +>>>>>>> 0bf3e019002 (*: Update client-go and verify all read ts (#58054)) if err != nil { return errors.Trace(err) } @@ -325,7 +365,15 @@ func checkAddPublic(sctx sessionctx.Context, writeOnlyTable, publicTable table.T if err != nil { return errors.Trace(err) } +<<<<<<< HEAD err = sessiontxn.NewTxn(ctx, sctx) +======= + err = txn.Commit(context.Background()) + if err != nil { + return errors.Trace(err) + } + _, err = newTxn(sctx) +>>>>>>> 0bf3e019002 (*: Update client-go and verify all read ts (#58054)) if err != nil { return errors.Trace(err) } @@ -431,10 +479,8 @@ func testCheckJobDone(t *testing.T, store kv.Storage, jobID int64, isAdd bool) { } } -func testNewContext(store kv.Storage) sessionctx.Context { - ctx := mock.NewContext() - ctx.Store = store - return ctx +func testNewContext(t *testing.T, store kv.Storage) sessionctx.Context { + return testkit.NewSession(t, store) } func TestIssue40135(t *testing.T) { diff --git a/pkg/ddl/column_test.go b/pkg/ddl/column_test.go index 0f83e803a2995..ad1395959dc2e 100644 --- a/pkg/ddl/column_test.go +++ b/pkg/ddl/column_test.go @@ -167,8 +167,13 @@ func TestColumnBasic(t *testing.T) { tk.MustExec(fmt.Sprintf("insert into t1 values(%d, %d, %d)", i, 10*i, 100*i)) } +<<<<<<< HEAD ctx := testNewContext(store) err := sessiontxn.NewTxn(context.Background(), ctx) +======= + ctx := testNewContext(t, store) + txn, err := newTxn(ctx) +>>>>>>> 0bf3e019002 (*: Update client-go and verify all read ts (#58054)) require.NoError(t, err) var tableID int64 @@ -214,7 +219,13 @@ func TestColumnBasic(t *testing.T) { h, err := tbl.AddRecord(ctx, types.MakeDatums(11, 12, 13, 14)) require.NoError(t, err) +<<<<<<< HEAD err = sessiontxn.NewTxn(context.Background(), ctx) +======= + err = txn.Commit(context.Background()) + require.NoError(t, err) + _, err = newTxn(ctx) +>>>>>>> 0bf3e019002 (*: Update client-go and verify all read ts (#58054)) require.NoError(t, err) values, err := tables.RowWithCols(tbl, ctx, h, tbl.Cols()) require.NoError(t, err) @@ -385,7 +396,13 @@ func checkDeleteOnlyColumn(t *testing.T, ctx sessionctx.Context, tableID int64, newRow := types.MakeDatums(int64(11), int64(22), int64(33)) newHandle, err := tbl.AddRecord(ctx, newRow) require.NoError(t, err) +<<<<<<< HEAD err = sessiontxn.NewTxn(context.Background(), ctx) +======= + err = txn.Commit(context.Background()) + require.NoError(t, err) + txn, err = newTxn(ctx) +>>>>>>> 0bf3e019002 (*: Update client-go and verify all read ts (#58054)) require.NoError(t, err) rows := [][]types.Datum{row, newRow} @@ -407,7 +424,13 @@ func checkDeleteOnlyColumn(t *testing.T, ctx sessionctx.Context, tableID int64, err = tbl.RemoveRecord(ctx, newHandle, newRow) require.NoError(t, err) +<<<<<<< HEAD err = sessiontxn.NewTxn(context.Background(), ctx) +======= + err = txn.Commit(context.Background()) + require.NoError(t, err) + txn, err = newTxn(ctx) +>>>>>>> 0bf3e019002 (*: Update client-go and verify all read ts (#58054)) require.NoError(t, err) i = 0 err = tables.IterRecords(tbl, ctx, tbl.Cols(), func(_ kv.Handle, data []types.Datum, cols []*table.Column) (bool, error) { @@ -447,7 +470,13 @@ func checkWriteOnlyColumn(t *testing.T, ctx sessionctx.Context, tableID int64, h newRow := types.MakeDatums(int64(11), int64(22), int64(33)) newHandle, err := tbl.AddRecord(ctx, newRow) require.NoError(t, err) +<<<<<<< HEAD err = sessiontxn.NewTxn(context.Background(), ctx) +======= + err = txn.Commit(context.Background()) + require.NoError(t, err) + txn, err = newTxn(ctx) +>>>>>>> 0bf3e019002 (*: Update client-go and verify all read ts (#58054)) require.NoError(t, err) rows := [][]types.Datum{row, newRow} @@ -469,7 +498,14 @@ func checkWriteOnlyColumn(t *testing.T, ctx sessionctx.Context, tableID int64, h err = tbl.RemoveRecord(ctx, newHandle, newRow) require.NoError(t, err) +<<<<<<< HEAD err = sessiontxn.NewTxn(context.Background(), ctx) +======= + + err = txn.Commit(context.Background()) + require.NoError(t, err) + txn, err = newTxn(ctx) +>>>>>>> 0bf3e019002 (*: Update client-go and verify all read ts (#58054)) require.NoError(t, err) i = 0 @@ -507,7 +543,13 @@ func checkReorganizationColumn(t *testing.T, ctx sessionctx.Context, tableID int newRow := types.MakeDatums(int64(11), int64(22), int64(33)) newHandle, err := tbl.AddRecord(ctx, newRow) require.NoError(t, err) +<<<<<<< HEAD err = sessiontxn.NewTxn(context.Background(), ctx) +======= + err = txn.Commit(context.Background()) + require.NoError(t, err) + txn, err = newTxn(ctx) +>>>>>>> 0bf3e019002 (*: Update client-go and verify all read ts (#58054)) require.NoError(t, err) rows := [][]types.Datum{row, newRow} @@ -530,7 +572,13 @@ func checkReorganizationColumn(t *testing.T, ctx sessionctx.Context, tableID int err = tbl.RemoveRecord(ctx, newHandle, newRow) require.NoError(t, err) +<<<<<<< HEAD err = sessiontxn.NewTxn(context.Background(), ctx) +======= + err = txn.Commit(context.Background()) + require.NoError(t, err) + txn, err = newTxn(ctx) +>>>>>>> 0bf3e019002 (*: Update client-go and verify all read ts (#58054)) require.NoError(t, err) i = 0 @@ -573,7 +621,13 @@ func checkPublicColumn(t *testing.T, ctx sessionctx.Context, tableID int64, newC } handle, err := tbl.AddRecord(ctx, newRow) require.NoError(t, err) +<<<<<<< HEAD err = sessiontxn.NewTxn(context.Background(), ctx) +======= + err = txn.Commit(context.Background()) + require.NoError(t, err) + txn, err = newTxn(ctx) +>>>>>>> 0bf3e019002 (*: Update client-go and verify all read ts (#58054)) require.NoError(t, err) rows := [][]types.Datum{updatedRow, newRow} @@ -593,8 +647,14 @@ func checkPublicColumn(t *testing.T, ctx sessionctx.Context, tableID int64, newC err = tbl.RemoveRecord(ctx, handle, newRow) require.NoError(t, err) + err = txn.Commit(context.Background()) + require.NoError(t, err) +<<<<<<< HEAD err = sessiontxn.NewTxn(context.Background(), ctx) +======= + txn, err = newTxn(ctx) +>>>>>>> 0bf3e019002 (*: Update client-go and verify all read ts (#58054)) require.NoError(t, err) i = 0 @@ -610,8 +670,13 @@ func checkPublicColumn(t *testing.T, ctx sessionctx.Context, tableID int64, newC require.NoError(t, err) } +<<<<<<< HEAD func checkAddColumn(t *testing.T, state model.SchemaState, tableID int64, handle kv.Handle, newCol *table.Column, oldRow []types.Datum, columnValue interface{}, dom *domain.Domain, store kv.Storage, columnCnt int) { ctx := testNewContext(store) +======= +func checkAddColumn(t *testing.T, state model.SchemaState, tableID int64, handle kv.Handle, newCol *table.Column, oldRow []types.Datum, columnValue any, dom *domain.Domain, store kv.Storage, columnCnt int) { + ctx := testNewContext(t, store) +>>>>>>> 0bf3e019002 (*: Update client-go and verify all read ts (#58054)) switch state { case model.StateNone: checkNoneColumn(t, ctx, tableID, handle, newCol, columnValue, dom) @@ -655,8 +720,13 @@ func TestAddColumn(t *testing.T) { tableID = int64(tableIDi) tbl := testGetTable(t, dom, tableID) +<<<<<<< HEAD ctx := testNewContext(store) err := sessiontxn.NewTxn(context.Background(), ctx) +======= + ctx := testNewContext(t, store) + txn, err := newTxn(ctx) +>>>>>>> 0bf3e019002 (*: Update client-go and verify all read ts (#58054)) require.NoError(t, err) oldRow := types.MakeDatums(int64(1), int64(2), int64(3)) handle, err := tbl.AddRecord(ctx, oldRow) @@ -728,8 +798,13 @@ func TestAddColumns(t *testing.T) { tableID = int64(tableIDi) tbl := testGetTable(t, dom, tableID) +<<<<<<< HEAD ctx := testNewContext(store) err := sessiontxn.NewTxn(context.Background(), ctx) +======= + ctx := testNewContext(t, store) + txn, err := newTxn(ctx) +>>>>>>> 0bf3e019002 (*: Update client-go and verify all read ts (#58054)) require.NoError(t, err) oldRow := types.MakeDatums(int64(1), int64(2), int64(3)) handle, err := tbl.AddRecord(ctx, oldRow) @@ -791,7 +866,7 @@ func TestDropColumnInColumnTest(t *testing.T) { tableID = int64(tableIDi) tbl := testGetTable(t, dom, tableID) - ctx := testNewContext(store) + ctx := testNewContext(t, store) colName := "c4" defaultColValue := int64(4) row := types.MakeDatums(int64(1), int64(2), int64(3)) @@ -852,8 +927,13 @@ func TestDropColumns(t *testing.T) { tableID = int64(tableIDi) tbl := testGetTable(t, dom, tableID) +<<<<<<< HEAD ctx := testNewContext(store) err := sessiontxn.NewTxn(context.Background(), ctx) +======= + ctx := testNewContext(t, store) + txn, err := newTxn(ctx) +>>>>>>> 0bf3e019002 (*: Update client-go and verify all read ts (#58054)) require.NoError(t, err) colNames := []string{"c3", "c4"} diff --git a/pkg/ddl/db_integration_test.go b/pkg/ddl/db_integration_test.go index 1900b0b17208e..0a073601b9f84 100644 --- a/pkg/ddl/db_integration_test.go +++ b/pkg/ddl/db_integration_test.go @@ -52,7 +52,11 @@ import ( "github.com/pingcap/tidb/pkg/testkit/external" "github.com/pingcap/tidb/pkg/util/collate" "github.com/pingcap/tidb/pkg/util/dbterror" +<<<<<<< HEAD "github.com/pingcap/tidb/pkg/util/mock" +======= + "github.com/pingcap/tidb/pkg/util/dbterror/plannererrors" +>>>>>>> 0bf3e019002 (*: Update client-go and verify all read ts (#58054)) "github.com/stretchr/testify/require" ) @@ -526,11 +530,10 @@ func TestChangingTableCharset(t *testing.T) { tblInfo.Charset = "" tblInfo.Collate = "" updateTableInfo := func(tblInfo *model.TableInfo) { - mockCtx := mock.NewContext() - mockCtx.Store = store - err := sessiontxn.NewTxn(context.Background(), mockCtx) + ctx := testkit.NewSession(t, store) + err := sessiontxn.NewTxn(context.Background(), ctx) require.NoError(t, err) - txn, err := mockCtx.Txn(true) + txn, err := ctx.Txn(true) require.NoError(t, err) mt := meta.NewMeta(txn) @@ -772,11 +775,10 @@ func TestCaseInsensitiveCharsetAndCollate(t *testing.T) { tblInfo.Charset = "UTF8MB4" updateTableInfo := func(tblInfo *model.TableInfo) { - mockCtx := mock.NewContext() - mockCtx.Store = store - err := sessiontxn.NewTxn(context.Background(), mockCtx) + sctx := testkit.NewSession(t, store) + err := sessiontxn.NewTxn(context.Background(), sctx) require.NoError(t, err) - txn, err := mockCtx.Txn(true) + txn, err := sctx.Txn(true) require.NoError(t, err) mt := meta.NewMeta(txn) require.True(t, ok) @@ -1425,11 +1427,10 @@ func TestTreatOldVersionUTF8AsUTF8MB4(t *testing.T) { tblInfo.Version = model.TableInfoVersion0 tblInfo.Columns[0].Version = model.ColumnInfoVersion0 updateTableInfo := func(tblInfo *model.TableInfo) { - mockCtx := mock.NewContext() - mockCtx.Store = store - err := sessiontxn.NewTxn(context.Background(), mockCtx) + sctx := testkit.NewSession(t, store) + err := sessiontxn.NewTxn(context.Background(), sctx) require.NoError(t, err) - txn, err := mockCtx.Txn(true) + txn, err := sctx.Txn(true) require.NoError(t, err) mt := meta.NewMeta(txn) require.True(t, ok) diff --git a/pkg/ddl/ddl_worker_test.go b/pkg/ddl/ddl_worker_test.go index 9d6abc20dece7..41d3bc8465d65 100644 --- a/pkg/ddl/ddl_worker_test.go +++ b/pkg/ddl/ddl_worker_test.go @@ -51,7 +51,7 @@ func TestInvalidDDLJob(t *testing.T) { BinlogInfo: &model.HistoryInfo{}, Args: []interface{}{}, } - ctx := testNewContext(store) + ctx := testNewContext(t, store) ctx.SetValue(sessionctx.QueryString, "skip") err := dom.DDL().DoDDLJob(ctx, job) require.Equal(t, err.Error(), "[ddl:8204]invalid ddl job type: none") @@ -59,7 +59,7 @@ func TestInvalidDDLJob(t *testing.T) { func TestAddBatchJobError(t *testing.T) { store, dom := testkit.CreateMockStoreAndDomainWithSchemaLease(t, testLease) - ctx := testNewContext(store) + ctx := testNewContext(t, store) require.Nil(t, failpoint.Enable("github.com/pingcap/tidb/pkg/ddl/mockAddBatchDDLJobsErr", `return(true)`)) // Test the job runner should not hang forever. diff --git a/pkg/ddl/index_change_test.go b/pkg/ddl/index_change_test.go index 9c65c56b918a1..f5e8151bba127 100644 --- a/pkg/ddl/index_change_test.go +++ b/pkg/ddl/index_change_test.go @@ -59,7 +59,7 @@ func TestIndexChange(t *testing.T) { return } jobID.Store(job.ID) - ctx1 := testNewContext(store) + ctx1 := testNewContext(t, store) prevState = job.SchemaState require.NoError(t, dom.Reload()) tbl, exist := dom.InfoSchema().TableByID(job.TableID) @@ -108,7 +108,7 @@ func TestIndexChange(t *testing.T) { require.NoError(t, dom.Reload()) tbl, exist := dom.InfoSchema().TableByID(job.TableID) require.True(t, exist) - ctx1 := testNewContext(store) + ctx1 := testNewContext(t, store) switch job.SchemaState { case model.StateWriteOnly: writeOnlyTable = tbl diff --git a/pkg/executor/set.go b/pkg/executor/set.go index 1fcce5863c6f6..0bc3d936fce58 100644 --- a/pkg/executor/set.go +++ b/pkg/executor/set.go @@ -213,7 +213,8 @@ func (e *SetExecutor) setSysVariable(ctx context.Context, name string, v *expres newSnapshotTS := getSnapshotTSByName() newSnapshotIsSet := newSnapshotTS > 0 && newSnapshotTS != oldSnapshotTS if newSnapshotIsSet { - err = sessionctx.ValidateSnapshotReadTS(ctx, e.Ctx().GetStore(), newSnapshotTS) + isStaleRead := name == variable.TiDBTxnReadTS + err = sessionctx.ValidateSnapshotReadTS(ctx, e.Ctx().GetStore(), newSnapshotTS, isStaleRead) if name != variable.TiDBTxnReadTS { // Also check gc safe point for snapshot read. // We don't check snapshot with gc safe point for read_ts diff --git a/pkg/executor/test/executor/executor_test.go b/pkg/executor/test/executor/executor_test.go index 251218e90207f..049f8c434ef59 100644 --- a/pkg/executor/test/executor/executor_test.go +++ b/pkg/executor/test/executor/executor_test.go @@ -907,8 +907,7 @@ func TestExecutorBit(t *testing.T) { func TestCheckIndex(t *testing.T) { store, dom := testkit.CreateMockStoreAndDomain(t) - ctx := mock.NewContext() - ctx.Store = store + ctx := testkit.NewSession(t, store) se, err := session.CreateSession4Test(store) require.NoError(t, err) defer se.Close() diff --git a/pkg/executor/test/writetest/BUILD.bazel b/pkg/executor/test/writetest/BUILD.bazel index a1ba7d1035e1d..47b325e2ebb40 100644 --- a/pkg/executor/test/writetest/BUILD.bazel +++ b/pkg/executor/test/writetest/BUILD.bazel @@ -26,8 +26,11 @@ go_test( "//pkg/testkit", "//pkg/types", "//pkg/util", +<<<<<<< HEAD "//pkg/util/mock", "@com_github_pingcap_failpoint//:failpoint", +======= +>>>>>>> 0bf3e019002 (*: Update client-go and verify all read ts (#58054)) "@com_github_stretchr_testify//require", "@com_github_tikv_client_go_v2//tikv", "@io_opencensus_go//stats/view", diff --git a/pkg/executor/test/writetest/write_test.go b/pkg/executor/test/writetest/write_test.go index c2b2cafe29bfb..50ddd917d7bb7 100644 --- a/pkg/executor/test/writetest/write_test.go +++ b/pkg/executor/test/writetest/write_test.go @@ -37,7 +37,6 @@ import ( "github.com/pingcap/tidb/pkg/testkit" "github.com/pingcap/tidb/pkg/types" "github.com/pingcap/tidb/pkg/util" - "github.com/pingcap/tidb/pkg/util/mock" "github.com/stretchr/testify/require" ) @@ -1468,8 +1467,7 @@ func TestReplaceLog(t *testing.T) { tk.MustExec(`create table testLog (a int not null primary key, b int unique key);`) // Make some dangling index. - ctx := mock.NewContext() - ctx.Store = store + ctx := testkit.NewSession(t, store) is := domain.InfoSchema() dbName := model.NewCIStr("test") tblName := model.NewCIStr("testLog") @@ -1502,9 +1500,14 @@ func TestRebaseIfNeeded(t *testing.T) { tk.MustExec(`create table t (a int not null primary key auto_increment, b int unique key);`) tk.MustExec(`insert into t (b) values (1);`) +<<<<<<< HEAD ctx := mock.NewContext() ctx.Store = store tbl, err := domain.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("t")) +======= + ctx := testkit.NewSession(t, store) + tbl, err := domain.InfoSchema().TableByName(context.Background(), model.NewCIStr("test"), model.NewCIStr("t")) +>>>>>>> 0bf3e019002 (*: Update client-go and verify all read ts (#58054)) require.NoError(t, err) require.Nil(t, sessiontxn.NewTxn(context.Background(), ctx)) // AddRecord directly here will skip to rebase the auto ID in the insert statement, diff --git a/pkg/planner/core/planbuilder.go b/pkg/planner/core/planbuilder.go index 83bc087ad90f6..7fc58a36301c3 100644 --- a/pkg/planner/core/planbuilder.go +++ b/pkg/planner/core/planbuilder.go @@ -3696,7 +3696,7 @@ func (b *PlanBuilder) buildSimple(ctx context.Context, node ast.StmtNode) (Plan, if err != nil { return nil, err } - if err := sessionctx.ValidateSnapshotReadTS(ctx, b.ctx.GetStore(), startTS); err != nil { + if err := sessionctx.ValidateSnapshotReadTS(ctx, b.ctx.GetStore(), startTS, true); err != nil { return nil, err } p.StaleTxnStartTS = startTS @@ -3710,7 +3710,7 @@ func (b *PlanBuilder) buildSimple(ctx context.Context, node ast.StmtNode) (Plan, if err != nil { return nil, err } - if err := sessionctx.ValidateSnapshotReadTS(ctx, b.ctx.GetStore(), startTS); err != nil { + if err := sessionctx.ValidateSnapshotReadTS(ctx, b.ctx.GetStore(), startTS, true); err != nil { return nil, err } p.StaleTxnStartTS = startTS diff --git a/pkg/sessionctx/context.go b/pkg/sessionctx/context.go index 5ebb1052cd384..7b9aa6abf3fb4 100644 --- a/pkg/sessionctx/context.go +++ b/pkg/sessionctx/context.go @@ -213,9 +213,12 @@ const ( LastExecuteDDL basicCtxType = 3 ) -// ValidateSnapshotReadTS strictly validates that readTS does not exceed the PD timestamp -func ValidateSnapshotReadTS(ctx context.Context, store kv.Storage, readTS uint64) error { - return store.GetOracle().ValidateSnapshotReadTS(ctx, readTS, &oracle.Option{TxnScope: oracle.GlobalTxnScope}) +// ValidateSnapshotReadTS strictly validates that readTS does not exceed the PD timestamp. +// For read requests to the storage, the check can be implicitly performed when sending the RPC request. So this +// function is only needed when it's not proper to delay the check to when RPC requests are being sent (e.g., `BEGIN` +// statements that don't make reading operation immediately). +func ValidateSnapshotReadTS(ctx context.Context, store kv.Storage, readTS uint64, isStaleRead bool) error { + return store.GetOracle().ValidateReadTS(ctx, readTS, isStaleRead, &oracle.Option{TxnScope: oracle.GlobalTxnScope}) } // SysProcTracker is used to track background sys processes diff --git a/pkg/sessiontxn/staleread/processor.go b/pkg/sessiontxn/staleread/processor.go index 393c3e7c378bb..e1a3f547fd11b 100644 --- a/pkg/sessiontxn/staleread/processor.go +++ b/pkg/sessiontxn/staleread/processor.go @@ -285,7 +285,7 @@ func parseAndValidateAsOf(ctx context.Context, sctx sessionctx.Context, asOf *as return 0, err } - if err = sessionctx.ValidateSnapshotReadTS(ctx, sctx.GetStore(), ts); err != nil { + if err = sessionctx.ValidateSnapshotReadTS(ctx, sctx.GetStore(), ts, true); err != nil { return 0, err } diff --git a/pkg/sessiontxn/staleread/util.go b/pkg/sessiontxn/staleread/util.go index 09f3edc2dbe0b..01791c6437900 100644 --- a/pkg/sessiontxn/staleread/util.go +++ b/pkg/sessiontxn/staleread/util.go @@ -77,7 +77,7 @@ func CalculateTsWithReadStaleness(ctx context.Context, sctx sessionctx.Context, // If the final calculated exceeds the min safe ts, we are not sure whether the ts is safe to read (note that // reading with a ts larger than PD's max allocated ts + 1 is unsafe and may break linearizability). // So in this case, do an extra check on it. - err = sessionctx.ValidateSnapshotReadTS(ctx, sctx.GetStore(), readTS) + err = sessionctx.ValidateSnapshotReadTS(ctx, sctx.GetStore(), readTS, true) if err != nil { return 0, err } diff --git a/pkg/store/copr/BUILD.bazel b/pkg/store/copr/BUILD.bazel index 9bebc01364fbb..e8c17b8094c8d 100644 --- a/pkg/store/copr/BUILD.bazel +++ b/pkg/store/copr/BUILD.bazel @@ -55,6 +55,7 @@ go_library( "@com_github_tikv_client_go_v2//config", "@com_github_tikv_client_go_v2//error", "@com_github_tikv_client_go_v2//metrics", + "@com_github_tikv_client_go_v2//oracle", "@com_github_tikv_client_go_v2//tikv", "@com_github_tikv_client_go_v2//tikvrpc", "@com_github_tikv_client_go_v2//tikvrpc/interceptor", diff --git a/pkg/store/copr/batch_coprocessor.go b/pkg/store/copr/batch_coprocessor.go index c33e8f9f6e112..19c43a567e9c8 100644 --- a/pkg/store/copr/batch_coprocessor.go +++ b/pkg/store/copr/batch_coprocessor.go @@ -1292,7 +1292,7 @@ func (b *batchCopIterator) retryBatchCopTask(ctx context.Context, bo *backoff.Ba const TiFlashReadTimeoutUltraLong = 3600 * time.Second func (b *batchCopIterator) handleTaskOnce(ctx context.Context, bo *backoff.Backoffer, task *batchCopTask) ([]*batchCopTask, error) { - sender := NewRegionBatchRequestSender(b.store.GetRegionCache(), b.store.GetTiKVClient(), b.enableCollectExecutionInfo) + sender := NewRegionBatchRequestSender(b.store.GetRegionCache(), b.store.GetTiKVClient(), b.store.store.GetOracle(), b.enableCollectExecutionInfo) var regionInfos = make([]*coprocessor.RegionInfo, 0, len(task.regionInfos)) for _, ri := range task.regionInfos { regionInfos = append(regionInfos, ri.toCoprocessorRegionInfo()) diff --git a/pkg/store/copr/batch_request_sender.go b/pkg/store/copr/batch_request_sender.go index ccb138f7753c3..5c6d9a6cbe192 100644 --- a/pkg/store/copr/batch_request_sender.go +++ b/pkg/store/copr/batch_request_sender.go @@ -23,6 +23,7 @@ import ( "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/tidb/pkg/config" tikverr "github.com/tikv/client-go/v2/error" + "github.com/tikv/client-go/v2/oracle" "github.com/tikv/client-go/v2/tikv" "github.com/tikv/client-go/v2/tikvrpc" "google.golang.org/grpc/codes" @@ -56,9 +57,9 @@ type RegionBatchRequestSender struct { } // NewRegionBatchRequestSender creates a RegionBatchRequestSender object. -func NewRegionBatchRequestSender(cache *RegionCache, client tikv.Client, enableCollectExecutionInfo bool) *RegionBatchRequestSender { +func NewRegionBatchRequestSender(cache *RegionCache, client tikv.Client, oracle oracle.Oracle, enableCollectExecutionInfo bool) *RegionBatchRequestSender { return &RegionBatchRequestSender{ - RegionRequestSender: tikv.NewRegionRequestSender(cache.RegionCache, client), + RegionRequestSender: tikv.NewRegionRequestSender(cache.RegionCache, client, oracle), enableCollectExecutionInfo: enableCollectExecutionInfo, } } diff --git a/pkg/store/copr/mpp.go b/pkg/store/copr/mpp.go index cd0695a3e0d9d..cc5d361c6d73e 100644 --- a/pkg/store/copr/mpp.go +++ b/pkg/store/copr/mpp.go @@ -138,7 +138,7 @@ func (c *MPPClient) DispatchMPPTask(param kv.DispatchMPPTaskParam) (resp *mpp.Di // Or else it's the task without region, which always happens in high layer task without table. // In that case if originalTask != nil { - sender := NewRegionBatchRequestSender(c.store.GetRegionCache(), c.store.GetTiKVClient(), param.EnableCollectExecutionInfo) + sender := NewRegionBatchRequestSender(c.store.GetRegionCache(), c.store.GetTiKVClient(), c.store.store.GetOracle(), param.EnableCollectExecutionInfo) rpcResp, retry, _, err = sender.SendReqToAddr(bo, originalTask.ctx, originalTask.regionInfos, wrappedReq, tikv.ReadTimeoutMedium) // No matter what the rpc error is, we won't retry the mpp dispatch tasks. // TODO: If we want to retry, we must redo the plan fragment cutting and task scheduling. diff --git a/pkg/util/localpool/localpool_race.go b/pkg/util/localpool/localpool_race.go index 47d6d0229afae..b235353f1b139 100644 --- a/pkg/util/localpool/localpool_race.go +++ b/pkg/util/localpool/localpool_race.go @@ -16,6 +16,7 @@ package localpool +<<<<<<< HEAD:pkg/util/localpool/localpool_race.go // Get gets an object from the pool. func (p *LocalPool) Get() interface{} { return p.newFn() @@ -24,4 +25,14 @@ func (p *LocalPool) Get() interface{} { // Put puts an object back to the pool. func (p *LocalPool) Put(obj interface{}) bool { return false +======= +package mock + +// NewContext creates a new mocked sessionctx.Context. +// This function should only be used for testing. +// Avoid using this when you are in a context with a `kv.Storage` instance, especially when you are going to access +// the data in it. Consider using testkit.NewSession(t, store) instead when possible. +func NewContext() *Context { + return newContext() +>>>>>>> 0bf3e019002 (*: Update client-go and verify all read ts (#58054)):pkg/util/mock/fortest.go } diff --git a/pkg/util/mock/BUILD.bazel b/pkg/util/mock/BUILD.bazel index 75ac693889df1..88c26bfd03067 100644 --- a/pkg/util/mock/BUILD.bazel +++ b/pkg/util/mock/BUILD.bazel @@ -22,6 +22,7 @@ go_library( "//pkg/sessionctx/variable", "//pkg/util", "//pkg/util/disk", + "//pkg/util/logutil", "//pkg/util/memory", "//pkg/util/sli", "//pkg/util/sqlexec", diff --git a/pkg/util/mock/context.go b/pkg/util/mock/context.go index 7c6b52c96bce5..18e02c07d40e4 100644 --- a/pkg/util/mock/context.go +++ b/pkg/util/mock/context.go @@ -32,6 +32,7 @@ import ( "github.com/pingcap/tidb/pkg/sessionctx/variable" "github.com/pingcap/tidb/pkg/util" "github.com/pingcap/tidb/pkg/util/disk" + "github.com/pingcap/tidb/pkg/util/logutil" "github.com/pingcap/tidb/pkg/util/memory" "github.com/pingcap/tidb/pkg/util/sli" "github.com/pingcap/tidb/pkg/util/sqlexec" @@ -67,7 +68,7 @@ type wrapTxn struct { } func (txn *wrapTxn) validOrPending() bool { - return txn.tsFuture != nil || txn.Transaction.Valid() + return txn.tsFuture != nil || (txn.Transaction != nil && txn.Transaction.Valid()) } func (txn *wrapTxn) pending() bool { @@ -173,7 +174,15 @@ func (c *Context) GetSessionVars() *variable.SessionVars { } // Txn implements sessionctx.Context Txn interface. -func (c *Context) Txn(bool) (kv.Transaction, error) { +func (c *Context) Txn(active bool) (kv.Transaction, error) { + if active { + if !c.txn.validOrPending() { + err := c.newTxn(context.Background()) + if err != nil { + return nil, err + } + } + } return &c.txn, nil } @@ -253,10 +262,12 @@ func (c *Context) GetSessionPlanCache() sessionctx.PlanCache { return c.pcache } -// NewTxn implements the sessionctx.Context interface. -func (c *Context) NewTxn(context.Context) error { +// newTxn Creates new transaction on the session context. +func (c *Context) newTxn(ctx context.Context) error { if c.Store == nil { - return errors.New("store is not set") + logutil.Logger(ctx).Warn("mock.Context: No store is specified when trying to create new transaction. A fake transaction will be created. Note that this is unrecommended usage.") + c.fakeTxn() + return nil } if c.txn.Valid() { err := c.txn.Commit(c.ctx) @@ -273,14 +284,41 @@ func (c *Context) NewTxn(context.Context) error { return nil } -// NewStaleTxnWithStartTS implements the sessionctx.Context interface. -func (c *Context) NewStaleTxnWithStartTS(ctx context.Context, _ uint64) error { - return c.NewTxn(ctx) +// fakeTxn is used to let some tests pass in the context without an available kv.Storage. Once usages to access +// transactions without a kv.Storage are removed, this type should also be removed. +// New code should never use this. +type fakeTxn struct { + // The inner should always be nil. + kv.Transaction + startTS uint64 +} + +func (t *fakeTxn) StartTS() uint64 { + return t.startTS +} + +func (*fakeTxn) SetDiskFullOpt(_ kvrpcpb.DiskFullOpt) {} + +func (*fakeTxn) SetOption(_ int, _ any) {} + +func (*fakeTxn) Get(ctx context.Context, _ kv.Key) ([]byte, error) { + // Check your implementation if you meet this error. It's dangerous if some calculation relies on the data but the + // read result is faked. + logutil.Logger(ctx).Warn("mock.Context: No store is specified but trying to access data from a transaction.") + return nil, nil +} + +func (*fakeTxn) Valid() bool { return true } + +func (c *Context) fakeTxn() { + c.txn.Transaction = &fakeTxn{ + startTS: 1, + } } // RefreshTxnCtx implements the sessionctx.Context interface. func (c *Context) RefreshTxnCtx(ctx context.Context) error { - return errors.Trace(c.NewTxn(ctx)) + return errors.Trace(c.newTxn(ctx)) } // RollbackTxn indicates an expected call of RollbackTxn.