Skip to content

Commit

Permalink
*: Update client-go and verify all read ts (pingcap#58054)
Browse files Browse the repository at this point in the history
  • Loading branch information
MyonKeminta authored and ekexium committed Jan 15, 2025
1 parent 50a5149 commit 2c7d4d9
Show file tree
Hide file tree
Showing 22 changed files with 100 additions and 82 deletions.
12 changes: 6 additions & 6 deletions DEPS.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -6807,13 +6807,13 @@ def go_deps():
name = "com_github_tikv_client_go_v2",
build_file_proto_mode = "disable_global",
importpath = "github.com/tikv/client-go/v2",
sha256 = "4d03d9794b514c22355693f7ed5ae3a806d04aef704c57e7791cbc972275d72b",
strip_prefix = "github.com/tikv/client-go/v2@v2.0.8-0.20241212055527-4d50d6744f0c",
sha256 = "a3d9e3e951fb4574736c5fde28f8a2a7b6f23dbc29b0164eeb4e9a70fcef8989",
strip_prefix = "github.com/tikv/client-go/v2@v2.0.8-0.20250113074634-8691f3dca42d",
urls = [
"http://bazel-cache.pingcap.net:8080/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20241212055527-4d50d6744f0c.zip",
"http://ats.apps.svc/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20241212055527-4d50d6744f0c.zip",
"https://cache.hawkingrei.com/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20241212055527-4d50d6744f0c.zip",
"https://storage.googleapis.com/pingcapmirror/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20241212055527-4d50d6744f0c.zip",
"http://bazel-cache.pingcap.net:8080/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20250113074634-8691f3dca42d.zip",
"http://ats.apps.svc/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20250113074634-8691f3dca42d.zip",
"https://cache.hawkingrei.com/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20250113074634-8691f3dca42d.zip",
"https://storage.googleapis.com/pingcapmirror/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20250113074634-8691f3dca42d.zip",
],
)
go_repository(
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ require (
github.com/tdakkota/asciicheck v0.2.0
github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2
github.com/tidwall/btree v1.7.0
github.com/tikv/client-go/v2 v2.0.8-0.20241212055527-4d50d6744f0c
github.com/tikv/client-go/v2 v2.0.8-0.20250113074634-8691f3dca42d
github.com/tikv/pd/client v0.0.0-20240806105739-10ecdbe92b55
github.com/timakin/bodyclose v0.0.0-20240125160201-f835fa56326a
github.com/twmb/murmur3 v1.1.6
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -783,8 +783,8 @@ github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a h1:J/YdBZ46WKpXsxsW
github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a/go.mod h1:h4xBhSNtOeEosLJ4P7JyKXX7Cabg7AVkWCK5gV2vOrM=
github.com/tidwall/btree v1.7.0 h1:L1fkJH/AuEh5zBnnBbmTwQ5Lt+bRJ5A8EWecslvo9iI=
github.com/tidwall/btree v1.7.0/go.mod h1:twD9XRA5jj9VUQGELzDO4HPQTNJsoWWfYEL+EUQ2cKY=
github.com/tikv/client-go/v2 v2.0.8-0.20241212055527-4d50d6744f0c h1:55hXQ+8kmgvUv7W+sSnCyp5BpmeFE9yfXFfYKUokMLw=
github.com/tikv/client-go/v2 v2.0.8-0.20241212055527-4d50d6744f0c/go.mod h1:+vXk4Aex17GnI8gfSMPxrL0SQLbBYgP3Db4FvHiImwM=
github.com/tikv/client-go/v2 v2.0.8-0.20250113074634-8691f3dca42d h1:dukrwQApf0Zc6EjfXh/tK5q5P9bFjxvFJx/hKb32tqs=
github.com/tikv/client-go/v2 v2.0.8-0.20250113074634-8691f3dca42d/go.mod h1:+vXk4Aex17GnI8gfSMPxrL0SQLbBYgP3Db4FvHiImwM=
github.com/tikv/pd/client v0.0.0-20240806105739-10ecdbe92b55 h1:+1unfy0TcJJtud3d7BuYsvNG6tPVuXIH+WiIFhOx1Kc=
github.com/tikv/pd/client v0.0.0-20240806105739-10ecdbe92b55/go.mod h1:1zqLOMhnkZIpBLj2oXOO2bWvtXhb12OmYr+cPkjQ6tI=
github.com/timakin/bodyclose v0.0.0-20240125160201-f835fa56326a h1:A6uKudFIfAEpoPdaal3aSqGxBzLyU8TqyXImLwo6dIo=
Expand Down
11 changes: 4 additions & 7 deletions pkg/ddl/column_change_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ import (
"github.com/pingcap/tidb/pkg/testkit"
"github.com/pingcap/tidb/pkg/testkit/external"
"github.com/pingcap/tidb/pkg/types"
"github.com/pingcap/tidb/pkg/util/mock"
"github.com/stretchr/testify/require"
)

Expand All @@ -51,7 +50,7 @@ func TestColumnAdd(t *testing.T) {
d := dom.DDL()
tc := &callback.TestDDLCallback{Do: dom}

ct := testNewContext(store)
ct := testNewContext(t, store)
// set up hook
var (
deleteOnlyTable table.Table
Expand Down Expand Up @@ -127,7 +126,7 @@ func TestColumnAdd(t *testing.T) {
return
}
first = false
sess := testNewContext(store)
sess := testNewContext(t, store)
err := sessiontxn.NewTxn(context.Background(), sess)
require.NoError(t, err)
_, err = writeOnlyTable.AddRecord(sess.GetTableCtx(), types.MakeDatums(10, 10))
Expand Down Expand Up @@ -431,10 +430,8 @@ func testCheckJobDone(t *testing.T, store kv.Storage, jobID int64, isAdd bool) {
}
}

func testNewContext(store kv.Storage) sessionctx.Context {
ctx := mock.NewContext()
ctx.Store = store
return ctx
func testNewContext(t *testing.T, store kv.Storage) sessionctx.Context {
return testkit.NewSession(t, store)
}

func TestIssue40135(t *testing.T) {
Expand Down
12 changes: 6 additions & 6 deletions pkg/ddl/column_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ func TestColumnBasic(t *testing.T) {
tk.MustExec(fmt.Sprintf("insert into t1 values(%d, %d, %d)", i, 10*i, 100*i))
}

ctx := testNewContext(store)
ctx := testNewContext(t, store)
err := sessiontxn.NewTxn(context.Background(), ctx)
require.NoError(t, err)

Expand Down Expand Up @@ -611,7 +611,7 @@ func checkPublicColumn(t *testing.T, ctx sessionctx.Context, tableID int64, newC
}

func checkAddColumn(t *testing.T, state model.SchemaState, tableID int64, handle kv.Handle, newCol *table.Column, oldRow []types.Datum, columnValue any, dom *domain.Domain, store kv.Storage, columnCnt int) {
ctx := testNewContext(store)
ctx := testNewContext(t, store)
switch state {
case model.StateNone:
checkNoneColumn(t, ctx, tableID, handle, newCol, columnValue, dom)
Expand Down Expand Up @@ -655,7 +655,7 @@ func TestAddColumn(t *testing.T) {
tableID = int64(tableIDi)
tbl := testGetTable(t, dom, tableID)

ctx := testNewContext(store)
ctx := testNewContext(t, store)
err := sessiontxn.NewTxn(context.Background(), ctx)
require.NoError(t, err)
oldRow := types.MakeDatums(int64(1), int64(2), int64(3))
Expand Down Expand Up @@ -728,7 +728,7 @@ func TestAddColumns(t *testing.T) {
tableID = int64(tableIDi)
tbl := testGetTable(t, dom, tableID)

ctx := testNewContext(store)
ctx := testNewContext(t, store)
err := sessiontxn.NewTxn(context.Background(), ctx)
require.NoError(t, err)
oldRow := types.MakeDatums(int64(1), int64(2), int64(3))
Expand Down Expand Up @@ -791,7 +791,7 @@ func TestDropColumnInColumnTest(t *testing.T) {
tableID = int64(tableIDi)
tbl := testGetTable(t, dom, tableID)

ctx := testNewContext(store)
ctx := testNewContext(t, store)
colName := "c4"
defaultColValue := int64(4)
row := types.MakeDatums(int64(1), int64(2), int64(3))
Expand Down Expand Up @@ -852,7 +852,7 @@ func TestDropColumns(t *testing.T) {
tableID = int64(tableIDi)
tbl := testGetTable(t, dom, tableID)

ctx := testNewContext(store)
ctx := testNewContext(t, store)
err := sessiontxn.NewTxn(context.Background(), ctx)
require.NoError(t, err)

Expand Down
22 changes: 9 additions & 13 deletions pkg/ddl/db_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ import (
"github.com/pingcap/tidb/pkg/util/collate"
"github.com/pingcap/tidb/pkg/util/dbterror"
"github.com/pingcap/tidb/pkg/util/dbterror/plannererrors"
"github.com/pingcap/tidb/pkg/util/mock"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -526,11 +525,10 @@ func TestChangingTableCharset(t *testing.T) {
tblInfo.Charset = ""
tblInfo.Collate = ""
updateTableInfo := func(tblInfo *model.TableInfo) {
mockCtx := mock.NewContext()
mockCtx.Store = store
err := sessiontxn.NewTxn(context.Background(), mockCtx)
ctx := testkit.NewSession(t, store)
err := sessiontxn.NewTxn(context.Background(), ctx)
require.NoError(t, err)
txn, err := mockCtx.Txn(true)
txn, err := ctx.Txn(true)
require.NoError(t, err)
mt := meta.NewMeta(txn)

Expand Down Expand Up @@ -772,11 +770,10 @@ func TestCaseInsensitiveCharsetAndCollate(t *testing.T) {
tblInfo.Charset = "UTF8MB4"

updateTableInfo := func(tblInfo *model.TableInfo) {
mockCtx := mock.NewContext()
mockCtx.Store = store
err := sessiontxn.NewTxn(context.Background(), mockCtx)
sctx := testkit.NewSession(t, store)
err := sessiontxn.NewTxn(context.Background(), sctx)
require.NoError(t, err)
txn, err := mockCtx.Txn(true)
txn, err := sctx.Txn(true)
require.NoError(t, err)
mt := meta.NewMeta(txn)
require.True(t, ok)
Expand Down Expand Up @@ -1425,11 +1422,10 @@ func TestTreatOldVersionUTF8AsUTF8MB4(t *testing.T) {
tblInfo.Version = model.TableInfoVersion0
tblInfo.Columns[0].Version = model.ColumnInfoVersion0
updateTableInfo := func(tblInfo *model.TableInfo) {
mockCtx := mock.NewContext()
mockCtx.Store = store
err := sessiontxn.NewTxn(context.Background(), mockCtx)
sctx := testkit.NewSession(t, store)
err := sessiontxn.NewTxn(context.Background(), sctx)
require.NoError(t, err)
txn, err := mockCtx.Txn(true)
txn, err := sctx.Txn(true)
require.NoError(t, err)
mt := meta.NewMeta(txn)
require.True(t, ok)
Expand Down
4 changes: 2 additions & 2 deletions pkg/ddl/ddl_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,15 +51,15 @@ func TestInvalidDDLJob(t *testing.T) {
BinlogInfo: &model.HistoryInfo{},
Args: []any{},
}
ctx := testNewContext(store)
ctx := testNewContext(t, store)
ctx.SetValue(sessionctx.QueryString, "skip")
err := dom.DDL().DoDDLJob(ctx, job)
require.Equal(t, err.Error(), "[ddl:8204]invalid ddl job type: none")
}

func TestAddBatchJobError(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomainWithSchemaLease(t, testLease)
ctx := testNewContext(store)
ctx := testNewContext(t, store)

require.Nil(t, failpoint.Enable("github.com/pingcap/tidb/pkg/ddl/mockAddBatchDDLJobsErr", `return(true)`))
// Test the job runner should not hang forever.
Expand Down
4 changes: 2 additions & 2 deletions pkg/ddl/index_change_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func TestIndexChange(t *testing.T) {
return
}
jobID.Store(job.ID)
ctx1 := testNewContext(store)
ctx1 := testNewContext(t, store)
prevState = job.SchemaState
require.NoError(t, dom.Reload())
tbl, exist := dom.InfoSchema().TableByID(job.TableID)
Expand Down Expand Up @@ -111,7 +111,7 @@ func TestIndexChange(t *testing.T) {
require.NoError(t, dom.Reload())
tbl, exist := dom.InfoSchema().TableByID(job.TableID)
require.True(t, exist)
ctx1 := testNewContext(store)
ctx1 := testNewContext(t, store)
switch job.SchemaState {
case model.StateWriteOnly:
writeOnlyTable = tbl
Expand Down
3 changes: 2 additions & 1 deletion pkg/executor/set.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,8 @@ func (e *SetExecutor) setSysVariable(ctx context.Context, name string, v *expres
newSnapshotTS := getSnapshotTSByName()
newSnapshotIsSet := newSnapshotTS > 0 && newSnapshotTS != oldSnapshotTS
if newSnapshotIsSet {
err = sessionctx.ValidateSnapshotReadTS(ctx, e.Ctx().GetStore(), newSnapshotTS)
isStaleRead := name == variable.TiDBTxnReadTS
err = sessionctx.ValidateSnapshotReadTS(ctx, e.Ctx().GetStore(), newSnapshotTS, isStaleRead)
if name != variable.TiDBTxnReadTS {
// Also check gc safe point for snapshot read.
// We don't check snapshot with gc safe point for read_ts
Expand Down
3 changes: 1 addition & 2 deletions pkg/executor/test/executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,8 +285,7 @@ func TestNotFillCacheFlag(t *testing.T) {
func TestCheckIndex(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)

ctx := mock.NewContext()
ctx.Store = store
ctx := testkit.NewSession(t, store)
se, err := session.CreateSession4Test(store)
require.NoError(t, err)
defer se.Close()
Expand Down
1 change: 0 additions & 1 deletion pkg/executor/test/writetest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ go_test(
"//pkg/testkit",
"//pkg/types",
"//pkg/util",
"//pkg/util/mock",
"@com_github_stretchr_testify//require",
"@com_github_tikv_client_go_v2//tikv",
"@io_opencensus_go//stats/view",
Expand Down
3 changes: 1 addition & 2 deletions pkg/executor/test/writetest/write_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,8 +334,7 @@ func TestReplaceLog(t *testing.T) {
tk.MustExec(`create table testLog (a int not null primary key, b int unique key);`)

// Make some dangling index.
ctx := mock.NewContext()
ctx.Store = store
ctx := testkit.NewSession(t, store)
is := domain.InfoSchema()
dbName := model.NewCIStr("test")
tblName := model.NewCIStr("testLog")
Expand Down
4 changes: 2 additions & 2 deletions pkg/planner/core/planbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -3387,7 +3387,7 @@ func (b *PlanBuilder) buildSimple(ctx context.Context, node ast.StmtNode) (Plan,
if err != nil {
return nil, err
}
if err := sessionctx.ValidateSnapshotReadTS(ctx, b.ctx.GetStore(), startTS); err != nil {
if err := sessionctx.ValidateSnapshotReadTS(ctx, b.ctx.GetStore(), startTS, true); err != nil {
return nil, err
}
p.StaleTxnStartTS = startTS
Expand All @@ -3401,7 +3401,7 @@ func (b *PlanBuilder) buildSimple(ctx context.Context, node ast.StmtNode) (Plan,
if err != nil {
return nil, err
}
if err := sessionctx.ValidateSnapshotReadTS(ctx, b.ctx.GetStore(), startTS); err != nil {
if err := sessionctx.ValidateSnapshotReadTS(ctx, b.ctx.GetStore(), startTS, true); err != nil {
return nil, err
}
p.StaleTxnStartTS = startTS
Expand Down
9 changes: 6 additions & 3 deletions pkg/sessionctx/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,10 @@ const (
LastExecuteDDL basicCtxType = 3
)

// ValidateSnapshotReadTS strictly validates that readTS does not exceed the PD timestamp
func ValidateSnapshotReadTS(ctx context.Context, store kv.Storage, readTS uint64) error {
return store.GetOracle().ValidateSnapshotReadTS(ctx, readTS, &oracle.Option{TxnScope: oracle.GlobalTxnScope})
// ValidateSnapshotReadTS strictly validates that readTS does not exceed the PD timestamp.
// For read requests to the storage, the check can be implicitly performed when sending the RPC request. So this
// function is only needed when it's not proper to delay the check to when RPC requests are being sent (e.g., `BEGIN`
// statements that don't make reading operation immediately).
func ValidateSnapshotReadTS(ctx context.Context, store kv.Storage, readTS uint64, isStaleRead bool) error {
return store.GetOracle().ValidateReadTS(ctx, readTS, isStaleRead, &oracle.Option{TxnScope: oracle.GlobalTxnScope})
}
2 changes: 1 addition & 1 deletion pkg/sessiontxn/staleread/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ func parseAndValidateAsOf(ctx context.Context, sctx sessionctx.Context, asOf *as
return 0, err
}

if err = sessionctx.ValidateSnapshotReadTS(ctx, sctx.GetStore(), ts); err != nil {
if err = sessionctx.ValidateSnapshotReadTS(ctx, sctx.GetStore(), ts, true); err != nil {
return 0, err
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/sessiontxn/staleread/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func CalculateTsWithReadStaleness(ctx context.Context, sctx sessionctx.Context,
// If the final calculated exceeds the min safe ts, we are not sure whether the ts is safe to read (note that
// reading with a ts larger than PD's max allocated ts + 1 is unsafe and may break linearizability).
// So in this case, do an extra check on it.
err = sessionctx.ValidateSnapshotReadTS(ctx, sctx.GetStore(), readTS)
err = sessionctx.ValidateSnapshotReadTS(ctx, sctx.GetStore(), readTS, true)
if err != nil {
return 0, err
}
Expand Down
1 change: 1 addition & 0 deletions pkg/store/copr/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ go_library(
"@com_github_tikv_client_go_v2//config",
"@com_github_tikv_client_go_v2//error",
"@com_github_tikv_client_go_v2//metrics",
"@com_github_tikv_client_go_v2//oracle",
"@com_github_tikv_client_go_v2//tikv",
"@com_github_tikv_client_go_v2//tikvrpc",
"@com_github_tikv_client_go_v2//tikvrpc/interceptor",
Expand Down
2 changes: 1 addition & 1 deletion pkg/store/copr/batch_coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1312,7 +1312,7 @@ func (b *batchCopIterator) retryBatchCopTask(ctx context.Context, bo *backoff.Ba
const TiFlashReadTimeoutUltraLong = 3600 * time.Second

func (b *batchCopIterator) handleTaskOnce(ctx context.Context, bo *backoff.Backoffer, task *batchCopTask) ([]*batchCopTask, error) {
sender := NewRegionBatchRequestSender(b.store.GetRegionCache(), b.store.GetTiKVClient(), b.enableCollectExecutionInfo)
sender := NewRegionBatchRequestSender(b.store.GetRegionCache(), b.store.GetTiKVClient(), b.store.store.GetOracle(), b.enableCollectExecutionInfo)
var regionInfos = make([]*coprocessor.RegionInfo, 0, len(task.regionInfos))
for _, ri := range task.regionInfos {
regionInfos = append(regionInfos, ri.toCoprocessorRegionInfo())
Expand Down
5 changes: 3 additions & 2 deletions pkg/store/copr/batch_request_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/tidb/pkg/config"
tikverr "github.com/tikv/client-go/v2/error"
"github.com/tikv/client-go/v2/oracle"
"github.com/tikv/client-go/v2/tikv"
"github.com/tikv/client-go/v2/tikvrpc"
"google.golang.org/grpc/codes"
Expand Down Expand Up @@ -56,9 +57,9 @@ type RegionBatchRequestSender struct {
}

// NewRegionBatchRequestSender creates a RegionBatchRequestSender object.
func NewRegionBatchRequestSender(cache *RegionCache, client tikv.Client, enableCollectExecutionInfo bool) *RegionBatchRequestSender {
func NewRegionBatchRequestSender(cache *RegionCache, client tikv.Client, oracle oracle.Oracle, enableCollectExecutionInfo bool) *RegionBatchRequestSender {
return &RegionBatchRequestSender{
RegionRequestSender: tikv.NewRegionRequestSender(cache.RegionCache, client),
RegionRequestSender: tikv.NewRegionRequestSender(cache.RegionCache, client, oracle),
enableCollectExecutionInfo: enableCollectExecutionInfo,
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/store/copr/mpp.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ func (c *MPPClient) DispatchMPPTask(param kv.DispatchMPPTaskParam) (resp *mpp.Di
// Or else it's the task without region, which always happens in high layer task without table.
// In that case
if originalTask != nil {
sender := NewRegionBatchRequestSender(c.store.GetRegionCache(), c.store.GetTiKVClient(), param.EnableCollectExecutionInfo)
sender := NewRegionBatchRequestSender(c.store.GetRegionCache(), c.store.GetTiKVClient(), c.store.store.GetOracle(), param.EnableCollectExecutionInfo)
rpcResp, retry, _, err = sender.SendReqToAddr(bo, originalTask.ctx, originalTask.regionInfos, wrappedReq, tikv.ReadTimeoutMedium)
// No matter what the rpc error is, we won't retry the mpp dispatch tasks.
// TODO: If we want to retry, we must redo the plan fragment cutting and task scheduling.
Expand Down
1 change: 1 addition & 0 deletions pkg/util/mock/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ go_library(
"//pkg/util",
"//pkg/util/chunk",
"//pkg/util/disk",
"//pkg/util/logutil",
"//pkg/util/memory",
"//pkg/util/sli",
"//pkg/util/sqlexec",
Expand Down
Loading

0 comments on commit 2c7d4d9

Please sign in to comment.