diff --git a/DEPS.bzl b/DEPS.bzl index a50b3d1a4a604..603e167c6dedb 100644 --- a/DEPS.bzl +++ b/DEPS.bzl @@ -4179,8 +4179,8 @@ def go_deps(): name = "com_github_tikv_client_go_v2", build_file_proto_mode = "disable_global", importpath = "github.com/tikv/client-go/v2", - sum = "h1:E+JsLmxpa4rsT0rS9m52+FhiiRbQxZYn079T4JD12jU=", - version = "v2.0.8-0.20241204085508-80a6b021f0f6", + sum = "h1:zrgHZRnd7Veio1Q+jV9LVa/JAfBi2Y7fGx91s7Uw+fo=", + version = "v2.0.8-0.20250108151910-7b4409c11946", ) go_repository( name = "com_github_tikv_pd", diff --git a/executor/executor_test.go b/executor/executor_test.go index 9b9c4b841b46d..e4dff3bd1220c 100644 --- a/executor/executor_test.go +++ b/executor/executor_test.go @@ -4827,7 +4827,7 @@ func TestStaleReadAtFutureTime(t *testing.T) { tk := testkit.NewTestKit(t, store) // Setting tx_read_ts to a time in the future will fail. (One day before the 2038 problem) - tk.MustGetErrMsg("set @@tx_read_ts = '2038-01-18 03:14:07'", "cannot set read timestamp to a future time") + tk.MustContainErrMsg("set @@tx_read_ts = '2038-01-18 03:14:07'", "cannot set read timestamp to a future time") // TxnReadTS Is not updated if check failed. require.Zero(t, tk.Session().GetSessionVars().TxnReadTS.PeakTxnReadTS()) } diff --git a/executor/set.go b/executor/set.go index a0b208e81be08..d8910da50a5dc 100644 --- a/executor/set.go +++ b/executor/set.go @@ -198,7 +198,8 @@ func (e *SetExecutor) setSysVariable(ctx context.Context, name string, v *expres newSnapshotTS := getSnapshotTSByName() newSnapshotIsSet := newSnapshotTS > 0 && newSnapshotTS != oldSnapshotTS if newSnapshotIsSet { - err = sessionctx.ValidateSnapshotReadTS(ctx, e.ctx.GetStore(), newSnapshotTS) + isStaleRead := name == variable.TiDBTxnReadTS + err = sessionctx.ValidateSnapshotReadTS(ctx, e.ctx.GetStore(), newSnapshotTS, isStaleRead) if name != variable.TiDBTxnReadTS { // Also check gc safe point for snapshot read. // We don't check snapshot with gc safe point for read_ts diff --git a/go.mod b/go.mod index 09cf4365e3f43..580c06463e06f 100644 --- a/go.mod +++ b/go.mod @@ -94,7 +94,7 @@ require ( github.com/stretchr/testify v1.8.2 github.com/tdakkota/asciicheck v0.2.0 github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2 - github.com/tikv/client-go/v2 v2.0.8-0.20241204085508-80a6b021f0f6 + github.com/tikv/client-go/v2 v2.0.8-0.20250108151910-7b4409c11946 github.com/tikv/pd/client v0.0.0-20240725070735-fb162bf0aa3f github.com/timakin/bodyclose v0.0.0-20221125081123-e39cf3fc478e github.com/twmb/murmur3 v1.1.6 diff --git a/go.sum b/go.sum index a581bb88152cb..7e6f8763f43b4 100644 --- a/go.sum +++ b/go.sum @@ -968,8 +968,8 @@ github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2 h1:mbAskLJ0oJf github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2/go.mod h1:2PfKggNGDuadAa0LElHrByyrz4JPZ9fFx6Gs7nx7ZZU= github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a h1:J/YdBZ46WKpXsxsW93SG+q0F8KI+yFrcIDT4c/RNoc4= github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a/go.mod h1:h4xBhSNtOeEosLJ4P7JyKXX7Cabg7AVkWCK5gV2vOrM= -github.com/tikv/client-go/v2 v2.0.8-0.20241204085508-80a6b021f0f6 h1:E+JsLmxpa4rsT0rS9m52+FhiiRbQxZYn079T4JD12jU= -github.com/tikv/client-go/v2 v2.0.8-0.20241204085508-80a6b021f0f6/go.mod h1:45NuHB8x+VAoztMIjF6hEgXvPQXhXWPfMxDg0N8CoRY= +github.com/tikv/client-go/v2 v2.0.8-0.20250108151910-7b4409c11946 h1:zrgHZRnd7Veio1Q+jV9LVa/JAfBi2Y7fGx91s7Uw+fo= +github.com/tikv/client-go/v2 v2.0.8-0.20250108151910-7b4409c11946/go.mod h1:45NuHB8x+VAoztMIjF6hEgXvPQXhXWPfMxDg0N8CoRY= github.com/tikv/pd/client v0.0.0-20240725070735-fb162bf0aa3f h1:Szw9YxqGGEneSniBd4ep09jgB77cKUy+AuhKOmdGPdE= github.com/tikv/pd/client v0.0.0-20240725070735-fb162bf0aa3f/go.mod h1:QCBn54O5lhfkYfxj8Tyiqaxue/mthHEMyi7AqJP/+n4= github.com/timakin/bodyclose v0.0.0-20221125081123-e39cf3fc478e h1:MV6KaVu/hzByHP0UvJ4HcMGE/8a6A4Rggc/0wx2AvJo= diff --git a/planner/core/planbuilder.go b/planner/core/planbuilder.go index 869a087f70e3f..9cc1490036bc1 100644 --- a/planner/core/planbuilder.go +++ b/planner/core/planbuilder.go @@ -3568,7 +3568,7 @@ func (b *PlanBuilder) buildSimple(ctx context.Context, node ast.StmtNode) (Plan, if err != nil { return nil, err } - if err := sessionctx.ValidateSnapshotReadTS(ctx, b.ctx.GetStore(), startTS); err != nil { + if err := sessionctx.ValidateSnapshotReadTS(ctx, b.ctx.GetStore(), startTS, true); err != nil { return nil, err } p.StaleTxnStartTS = startTS @@ -3582,7 +3582,7 @@ func (b *PlanBuilder) buildSimple(ctx context.Context, node ast.StmtNode) (Plan, if err != nil { return nil, err } - if err := sessionctx.ValidateSnapshotReadTS(ctx, b.ctx.GetStore(), startTS); err != nil { + if err := sessionctx.ValidateSnapshotReadTS(ctx, b.ctx.GetStore(), startTS, true); err != nil { return nil, err } p.StaleTxnStartTS = startTS diff --git a/sessionctx/context.go b/sessionctx/context.go index 6b59ddcce758a..e6856b0a8d410 100644 --- a/sessionctx/context.go +++ b/sessionctx/context.go @@ -219,9 +219,12 @@ const ( LastExecuteDDL basicCtxType = 3 ) -// ValidateSnapshotReadTS strictly validates that readTS does not exceed the PD timestamp -func ValidateSnapshotReadTS(ctx context.Context, store kv.Storage, readTS uint64) error { - return store.GetOracle().ValidateSnapshotReadTS(ctx, readTS, &oracle.Option{TxnScope: oracle.GlobalTxnScope}) +// ValidateSnapshotReadTS strictly validates that readTS does not exceed the PD timestamp. +// For read requests to the storage, the check can be implicitly performed when sending the RPC request. So this +// function is only needed when it's not proper to delay the check to when RPC requests are being sent (e.g., `BEGIN` +// statements that don't make reading operation immediately). +func ValidateSnapshotReadTS(ctx context.Context, store kv.Storage, readTS uint64, isStaleRead bool) error { + return store.GetOracle().ValidateReadTS(ctx, readTS, isStaleRead, &oracle.Option{TxnScope: oracle.GlobalTxnScope}) } // SysProcTracker is used to track background sys processes diff --git a/sessiontxn/staleread/processor.go b/sessiontxn/staleread/processor.go index 278d1b158a599..3423978972369 100644 --- a/sessiontxn/staleread/processor.go +++ b/sessiontxn/staleread/processor.go @@ -285,7 +285,7 @@ func parseAndValidateAsOf(ctx context.Context, sctx sessionctx.Context, asOf *as return 0, err } - if err = sessionctx.ValidateSnapshotReadTS(ctx, sctx.GetStore(), ts); err != nil { + if err = sessionctx.ValidateSnapshotReadTS(ctx, sctx.GetStore(), ts, true); err != nil { return 0, err } diff --git a/sessiontxn/staleread/util.go b/sessiontxn/staleread/util.go index 30a446bbb1817..d9d8ebef61be3 100644 --- a/sessiontxn/staleread/util.go +++ b/sessiontxn/staleread/util.go @@ -84,7 +84,7 @@ func CalculateTsWithReadStaleness(ctx context.Context, sctx sessionctx.Context, // If the final calculated exceeds the min safe ts, we are not sure whether the ts is safe to read (note that // reading with a ts larger than PD's max allocated ts + 1 is unsafe and may break linearizability). // So in this case, do an extra check on it. - err = sessionctx.ValidateSnapshotReadTS(ctx, sctx.GetStore(), readTS) + err = sessionctx.ValidateSnapshotReadTS(ctx, sctx.GetStore(), readTS, true) if err != nil { return 0, err } diff --git a/store/copr/BUILD.bazel b/store/copr/BUILD.bazel index d699288845a38..5182bcc381d55 100644 --- a/store/copr/BUILD.bazel +++ b/store/copr/BUILD.bazel @@ -51,6 +51,7 @@ go_library( "@com_github_tikv_client_go_v2//config", "@com_github_tikv_client_go_v2//error", "@com_github_tikv_client_go_v2//metrics", + "@com_github_tikv_client_go_v2//oracle", "@com_github_tikv_client_go_v2//tikv", "@com_github_tikv_client_go_v2//tikvrpc", "@com_github_tikv_client_go_v2//txnkv/txnlock", diff --git a/store/copr/batch_coprocessor.go b/store/copr/batch_coprocessor.go index 1ea0920987fa2..76294a4e73049 100644 --- a/store/copr/batch_coprocessor.go +++ b/store/copr/batch_coprocessor.go @@ -1150,7 +1150,7 @@ func (b *batchCopIterator) retryBatchCopTask(ctx context.Context, bo *backoff.Ba const readTimeoutUltraLong = 3600 * time.Second // For requests that may scan many regions for tiflash. func (b *batchCopIterator) handleTaskOnce(ctx context.Context, bo *backoff.Backoffer, task *batchCopTask) ([]*batchCopTask, error) { - sender := NewRegionBatchRequestSender(b.store.GetRegionCache(), b.store.GetTiKVClient(), b.enableCollectExecutionInfo) + sender := NewRegionBatchRequestSender(b.store.GetRegionCache(), b.store.GetTiKVClient(), b.store.store.GetOracle(), b.enableCollectExecutionInfo) var regionInfos = make([]*coprocessor.RegionInfo, 0, len(task.regionInfos)) for _, ri := range task.regionInfos { regionInfos = append(regionInfos, ri.toCoprocessorRegionInfo()) diff --git a/store/copr/batch_request_sender.go b/store/copr/batch_request_sender.go index 2e2df9bd10076..80c2b0a43ce0f 100644 --- a/store/copr/batch_request_sender.go +++ b/store/copr/batch_request_sender.go @@ -23,6 +23,7 @@ import ( "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/tidb/config" tikverr "github.com/tikv/client-go/v2/error" + "github.com/tikv/client-go/v2/oracle" "github.com/tikv/client-go/v2/tikv" "github.com/tikv/client-go/v2/tikvrpc" "google.golang.org/grpc/codes" @@ -56,9 +57,9 @@ type RegionBatchRequestSender struct { } // NewRegionBatchRequestSender creates a RegionBatchRequestSender object. -func NewRegionBatchRequestSender(cache *RegionCache, client tikv.Client, enableCollectExecutionInfo bool) *RegionBatchRequestSender { +func NewRegionBatchRequestSender(cache *RegionCache, client tikv.Client, oracle oracle.Oracle, enableCollectExecutionInfo bool) *RegionBatchRequestSender { return &RegionBatchRequestSender{ - RegionRequestSender: tikv.NewRegionRequestSender(cache.RegionCache, client), + RegionRequestSender: tikv.NewRegionRequestSender(cache.RegionCache, client, oracle), enableCollectExecutionInfo: enableCollectExecutionInfo, } } diff --git a/store/copr/mpp.go b/store/copr/mpp.go index 5bd374651c122..8239d59e405b1 100644 --- a/store/copr/mpp.go +++ b/store/copr/mpp.go @@ -275,7 +275,7 @@ func (m *mppIterator) handleDispatchReq(ctx context.Context, bo *Backoffer, req // Or else it's the task without region, which always happens in high layer task without table. // In that case if originalTask != nil { - sender := NewRegionBatchRequestSender(m.store.GetRegionCache(), m.store.GetTiKVClient(), m.enableCollectExecutionInfo) + sender := NewRegionBatchRequestSender(m.store.GetRegionCache(), m.store.GetTiKVClient(), m.store.store.GetOracle(), m.enableCollectExecutionInfo) rpcResp, retry, _, err = sender.SendReqToAddr(bo, originalTask.ctx, originalTask.regionInfos, wrappedReq, tikv.ReadTimeoutMedium) // No matter what the rpc error is, we won't retry the mpp dispatch tasks. // TODO: If we want to retry, we must redo the plan fragment cutting and task scheduling.