Skip to content

Commit

Permalink
*: Use strict validation for stale read ts & flashback ts (#57050) (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored and ekexium committed Jan 16, 2025
1 parent d97f0ee commit d8655c0
Show file tree
Hide file tree
Showing 16 changed files with 78 additions and 206 deletions.
4 changes: 2 additions & 2 deletions DEPS.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -3603,8 +3603,8 @@ def go_deps():
name = "com_github_tikv_client_go_v2",
build_file_proto_mode = "disable_global",
importpath = "github.com/tikv/client-go/v2",
sum = "h1:0YcirnuxtXC9eQRb231im1M5w/n7JFuOo0IgE/K9ffM=",
version = "v2.0.4-0.20241125064444-5f59e4e34c62",
sum = "h1:P6bhZG2yFFuKYvOpfltUbt89sbHohq4BAv2P4GB3fL8=",
version = "v2.0.4-0.20250109055446-ccec7efbf0f7",
)
go_repository(
name = "com_github_tikv_pd_client",
Expand Down
110 changes: 0 additions & 110 deletions MODULE.bazel.lock

This file was deleted.

13 changes: 4 additions & 9 deletions ddl/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta"
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
Expand Down Expand Up @@ -108,16 +107,12 @@ func getStoreGlobalMinSafeTS(s kv.Storage) time.Time {

// ValidateFlashbackTS validates that flashBackTS in range [gcSafePoint, currentTS).
func ValidateFlashbackTS(ctx context.Context, sctx sessionctx.Context, flashBackTS uint64) error {
currentTS, err := sctx.GetStore().GetOracle().GetStaleTimestamp(ctx, oracle.GlobalTxnScope, 0)
// If we fail to calculate currentTS from local time, fallback to get a timestamp from PD.
currentVer, err := sctx.GetStore().CurrentVersion(oracle.GlobalTxnScope)
if err != nil {
metrics.ValidateReadTSFromPDCount.Inc()
currentVer, err := sctx.GetStore().CurrentVersion(oracle.GlobalTxnScope)
if err != nil {
return errors.Errorf("fail to validate flashback timestamp: %v", err)
}
currentTS = currentVer.Ver
return errors.Errorf("fail to validate flashback timestamp: %v", err)
}
currentTS := currentVer.Ver

oracleFlashbackTS := oracle.GetTimeFromTS(flashBackTS)
if oracleFlashbackTS.After(oracle.GetTimeFromTS(currentTS)) {
return errors.Errorf("cannot set flashback timestamp to future time")
Expand Down
6 changes: 2 additions & 4 deletions executor/set.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,10 +197,8 @@ func (e *SetExecutor) setSysVariable(ctx context.Context, name string, v *expres
newSnapshotTS := getSnapshotTSByName()
newSnapshotIsSet := newSnapshotTS > 0 && newSnapshotTS != oldSnapshotTS
if newSnapshotIsSet {
if name == variable.TiDBTxnReadTS {
err = sessionctx.ValidateStaleReadTS(ctx, e.ctx, newSnapshotTS)
} else {
err = sessionctx.ValidateSnapshotReadTS(ctx, e.ctx, newSnapshotTS)
err = sessionctx.ValidateSnapshotReadTS(ctx, e.ctx.GetStore(), newSnapshotTS)
if name != variable.TiDBTxnReadTS {
// Also check gc safe point for snapshot read.
// We don't check snapshot with gc safe point for read_ts
// Client-go will automatically check the snapshotTS with gc safe point. It's unnecessary to check gc safe point during set executor.
Expand Down
25 changes: 21 additions & 4 deletions executor/stale_txn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package executor_test
import (
"context"
"fmt"
"strconv"
"testing"
"time"

Expand Down Expand Up @@ -1406,14 +1407,30 @@ func TestStaleTSO(t *testing.T) {
tk.MustExec("create table t (id int)")

tk.MustExec("insert into t values(1)")
ts1, err := strconv.ParseUint(tk.MustQuery("select json_extract(@@tidb_last_txn_info, '$.commit_ts')").Rows()[0][0].(string), 10, 64)
require.NoError(t, err)

// Wait until the physical advances for 1s
var currentTS uint64
for {
tk.MustExec("begin")
currentTS, err = strconv.ParseUint(tk.MustQuery("select @@tidb_current_ts").Rows()[0][0].(string), 10, 64)
require.NoError(t, err)
tk.MustExec("rollback")
if oracle.GetTimeFromTS(currentTS).After(oracle.GetTimeFromTS(ts1).Add(time.Second)) {
break
}
time.Sleep(time.Millisecond * 100)
}

asOfExprs := []string{
"now(3) - interval 1 second",
"current_time() - interval 1 second",
"curtime() - interval 1 second",
"now(3) - interval 10 second",
"current_time() - interval 10 second",
"curtime() - interval 10 second",
}

nextTSO := oracle.GoTimeToTS(time.Now().Add(2 * time.Second))
nextPhysical := oracle.GetPhysical(oracle.GetTimeFromTS(currentTS).Add(10 * time.Second))
nextTSO := oracle.ComposeTS(nextPhysical, oracle.ExtractLogical(currentTS))
require.Nil(t, failpoint.Enable("github.com/pingcap/tidb/sessiontxn/staleread/mockStaleReadTSO", fmt.Sprintf("return(%d)", nextTSO)))
defer failpoint.Disable("github.com/pingcap/tidb/sessiontxn/staleread/mockStaleReadTSO")
for _, expr := range asOfExprs {
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ require (
github.com/stretchr/testify v1.8.4
github.com/tdakkota/asciicheck v0.1.1
github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2
github.com/tikv/client-go/v2 v2.0.4-0.20241125064444-5f59e4e34c62
github.com/tikv/client-go/v2 v2.0.4-0.20250109055446-ccec7efbf0f7
github.com/tikv/pd/client v0.0.0-20230904040343-947701a32c05
github.com/timakin/bodyclose v0.0.0-20210704033933-f49887972144
github.com/twmb/murmur3 v1.1.3
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -948,8 +948,8 @@ github.com/tenntenn/text/transform v0.0.0-20200319021203-7eef512accb3 h1:f+jULpR
github.com/tenntenn/text/transform v0.0.0-20200319021203-7eef512accb3/go.mod h1:ON8b8w4BN/kE1EOhwT0o+d62W65a6aPw1nouo9LMgyY=
github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2 h1:mbAskLJ0oJfDRtkanvQPiooDH8HvJ2FBh+iKT/OmiQQ=
github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2/go.mod h1:2PfKggNGDuadAa0LElHrByyrz4JPZ9fFx6Gs7nx7ZZU=
github.com/tikv/client-go/v2 v2.0.4-0.20241125064444-5f59e4e34c62 h1:0YcirnuxtXC9eQRb231im1M5w/n7JFuOo0IgE/K9ffM=
github.com/tikv/client-go/v2 v2.0.4-0.20241125064444-5f59e4e34c62/go.mod h1:mmVCLP2OqWvQJPOIevQPZvGphzh/oq9vv8J5LDfpadQ=
github.com/tikv/client-go/v2 v2.0.4-0.20250109055446-ccec7efbf0f7 h1:P6bhZG2yFFuKYvOpfltUbt89sbHohq4BAv2P4GB3fL8=
github.com/tikv/client-go/v2 v2.0.4-0.20250109055446-ccec7efbf0f7/go.mod h1:mmVCLP2OqWvQJPOIevQPZvGphzh/oq9vv8J5LDfpadQ=
github.com/tikv/pd/client v0.0.0-20230904040343-947701a32c05 h1:e4hLUKfgfPeJPZwOfU+/I/03G0sn6IZqVcbX/5o+hvM=
github.com/tikv/pd/client v0.0.0-20230904040343-947701a32c05/go.mod h1:MLIl+d2WbOF4A3U88WKtyXrQQW417wZDDvBcq2IW9bQ=
github.com/timakin/bodyclose v0.0.0-20210704033933-f49887972144 h1:kl4KhGNsJIbDHS9/4U9yQo1UcPQM0kOMJHn29EoH/Ro=
Expand Down
Empty file added pkg/sessionctx/BUILD.bazel
Empty file.
2 changes: 1 addition & 1 deletion planner/core/plan_cache_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -515,7 +515,7 @@ type PlanCacheStmt struct {
SQLDigest *parser.Digest
PlanDigest *parser.Digest
ForUpdateRead bool
SnapshotTSEvaluator func(sessionctx.Context) (uint64, error)
SnapshotTSEvaluator func(context.Context, sessionctx.Context) (uint64, error)
NormalizedSQL4PC string
SQLDigest4PC string

Expand Down
4 changes: 2 additions & 2 deletions planner/core/planbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -3399,7 +3399,7 @@ func (b *PlanBuilder) buildSimple(ctx context.Context, node ast.StmtNode) (Plan,
if err != nil {
return nil, err
}
if err := sessionctx.ValidateStaleReadTS(ctx, b.ctx, startTS); err != nil {
if err := sessionctx.ValidateSnapshotReadTS(ctx, b.ctx.GetStore(), startTS); err != nil {
return nil, err
}
p.StaleTxnStartTS = startTS
Expand All @@ -3413,7 +3413,7 @@ func (b *PlanBuilder) buildSimple(ctx context.Context, node ast.StmtNode) (Plan,
if err != nil {
return nil, err
}
if err := sessionctx.ValidateStaleReadTS(ctx, b.ctx, startTS); err != nil {
if err := sessionctx.ValidateSnapshotReadTS(ctx, b.ctx.GetStore(), startTS); err != nil {
return nil, err
}
p.StaleTxnStartTS = startTS
Expand Down
2 changes: 1 addition & 1 deletion planner/core/preprocess.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ var _ = PreprocessorReturn{}.initedLastSnapshotTS
type PreprocessorReturn struct {
initedLastSnapshotTS bool
IsStaleness bool
SnapshotTSEvaluator func(sessionctx.Context) (uint64, error)
SnapshotTSEvaluator func(context.Context, sessionctx.Context) (uint64, error)
// LastSnapshotTS is the last evaluated snapshotTS if any
// otherwise it defaults to zero
LastSnapshotTS uint64
Expand Down
2 changes: 0 additions & 2 deletions sessionctx/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ go_library(
deps = [
"//extension",
"//kv",
"//metrics",
"//parser/model",
"//sessionctx/sessionstates",
"//sessionctx/variable",
Expand All @@ -17,7 +16,6 @@ go_library(
"//util/kvcache",
"//util/sli",
"//util/topsql/stmtstats",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_kvproto//pkg/kvrpcpb",
"@com_github_pingcap_tipb//go-binlog",
"@com_github_tikv_client_go_v2//oracle",
Expand Down
43 changes: 2 additions & 41 deletions sessionctx/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,10 @@ package sessionctx
import (
"context"
"fmt"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/tidb/extension"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/sessionctx/sessionstates"
"github.com/pingcap/tidb/sessionctx/variable"
Expand Down Expand Up @@ -223,44 +220,8 @@ const (
)

// ValidateSnapshotReadTS strictly validates that readTS does not exceed the PD timestamp
func ValidateSnapshotReadTS(ctx context.Context, sctx Context, readTS uint64) error {
latestTS, err := sctx.GetStore().GetOracle().GetLowResolutionTimestamp(ctx, &oracle.Option{TxnScope: oracle.GlobalTxnScope})
// If we fail to get latestTS or the readTS exceeds it, get a timestamp from PD to double check
if err != nil || readTS > latestTS {
metrics.ValidateReadTSFromPDCount.Inc()
currentVer, err := sctx.GetStore().CurrentVersion(oracle.GlobalTxnScope)
if err != nil {
return errors.Errorf("fail to validate read timestamp: %v", err)
}
if readTS > currentVer.Ver {
return errors.Errorf("cannot set read timestamp to a future time")
}
}
return nil
}

// How far future from now ValidateStaleReadTS allows at most
const allowedTimeFromNow = 100 * time.Millisecond

// ValidateStaleReadTS validates that readTS does not exceed the current time not strictly.
func ValidateStaleReadTS(ctx context.Context, sctx Context, readTS uint64) error {
currentTS, err := sctx.GetSessionVars().StmtCtx.GetStaleTSO()
if currentTS == 0 || err != nil {
currentTS, err = sctx.GetStore().GetOracle().GetStaleTimestamp(ctx, oracle.GlobalTxnScope, 0)
}
// If we fail to calculate currentTS from local time, fallback to get a timestamp from PD
if err != nil {
metrics.ValidateReadTSFromPDCount.Inc()
currentVer, err := sctx.GetStore().CurrentVersion(oracle.GlobalTxnScope)
if err != nil {
return errors.Errorf("fail to validate read timestamp: %v", err)
}
currentTS = currentVer.Ver
}
if oracle.GetTimeFromTS(readTS).After(oracle.GetTimeFromTS(currentTS).Add(allowedTimeFromNow)) {
return errors.Errorf("cannot set read timestamp to a future time")
}
return nil
func ValidateSnapshotReadTS(ctx context.Context, store kv.Storage, readTS uint64) error {
return store.GetOracle().ValidateSnapshotReadTS(ctx, readTS, &oracle.Option{TxnScope: oracle.GlobalTxnScope})
}

// SysProcTracker is used to track background sys processes
Expand Down
Loading

0 comments on commit d8655c0

Please sign in to comment.