Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MinSafeTS support TiFlash #642

Merged
merged 8 commits into from
Dec 19, 2022
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 23 additions & 5 deletions internal/locate/region_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -491,14 +491,16 @@ func (c *RegionCache) checkAndResolve(needCheckStores []*Store, needCheck func(*
}

// SetRegionCacheStore is used to set a store in region cache, for testing only
func (c *RegionCache) SetRegionCacheStore(id uint64, storeType tikvrpc.EndpointType, state uint64, labels []*metapb.StoreLabel) {
func (c *RegionCache) SetRegionCacheStore(id uint64, addr string, peerAddr string, storeType tikvrpc.EndpointType, state uint64, labels []*metapb.StoreLabel) {
c.storeMu.Lock()
defer c.storeMu.Unlock()
c.storeMu.stores[id] = &Store{
storeID: id,
storeType: storeType,
state: state,
labels: labels,
addr: addr,
peerAddr: peerAddr,
}
}

Expand Down Expand Up @@ -1385,15 +1387,23 @@ func (c *RegionCache) GetStoresByType(typ tikvrpc.EndpointType) []*Store {
})
}
stores = append(stores, &Store{
addr: store.addr,
storeID: store.storeID,
labels: storeLabel,
addr: store.addr,
peerAddr: store.peerAddr,
storeID: store.storeID,
labels: storeLabel,
storeType: typ,
})
}
}
return stores
}

func (c *RegionCache) GetAllStores() []*Store {
stores := c.GetStoresByType(tikvrpc.TiKV)
tiflashStores := c.GetStoresByType(tikvrpc.TiFlash)
return append(stores, tiflashStores...)
}

func filterUnavailablePeers(region *pd.Region) {
if len(region.DownPeers) == 0 {
return
Expand Down Expand Up @@ -1834,6 +1844,7 @@ func (c *RegionCache) reloadTiFlashComputeStores(bo *retry.Backoffer) (res []*St
res = append(res, &Store{
storeID: s.GetId(),
addr: s.GetAddress(),
peerAddr: s.GetPeerAddress(),
saddr: s.GetStatusAddress(),
storeType: tikvrpc.GetStoreTypeByMeta(s),
labels: s.GetLabels(),
Expand Down Expand Up @@ -2152,6 +2163,7 @@ func (r *Region) ContainsByEnd(key []byte) bool {
// Store contains a kv process's address.
type Store struct {
addr string // loaded store address
peerAddr string // TiFlash Proxy use peerAddr
saddr string // loaded store status address
storeID uint64 // store's id
state uint64 // unsafe store storeState
Expand Down Expand Up @@ -2239,6 +2251,7 @@ func (s *Store) initResolve(bo *retry.Backoffer, c *RegionCache) (addr string, e
return "", errors.Errorf("empty store(%d) address", s.storeID)
}
s.addr = addr
s.peerAddr = store.GetPeerAddress()
s.saddr = store.GetStatusAddress()
s.storeType = tikvrpc.GetStoreTypeByMeta(store)
s.labels = store.GetLabels()
Expand Down Expand Up @@ -2285,7 +2298,7 @@ func (s *Store) reResolve(c *RegionCache) (bool, error) {
storeType := tikvrpc.GetStoreTypeByMeta(store)
addr = store.GetAddress()
if s.addr != addr || !s.IsSameLabels(store.GetLabels()) {
newStore := &Store{storeID: s.storeID, addr: addr, saddr: store.GetStatusAddress(), storeType: storeType, labels: store.GetLabels(), state: uint64(resolved)}
newStore := &Store{storeID: s.storeID, addr: addr, peerAddr: store.GetPeerAddress(), saddr: store.GetStatusAddress(), storeType: storeType, labels: store.GetLabels(), state: uint64(resolved)}
c.storeMu.Lock()
c.storeMu.stores[newStore.storeID] = newStore
c.storeMu.Unlock()
Expand Down Expand Up @@ -2507,6 +2520,11 @@ func (s *Store) GetAddr() string {
return s.addr
}

// GetPeerAddr returns the peer address of the store
func (s *Store) GetPeerAddr() string {
return s.peerAddr
}

func invokeKVStatusAPI(addr string, timeout time.Duration) (l livenessState) {
start := time.Now()
defer func() {
Expand Down
37 changes: 23 additions & 14 deletions internal/mockstore/mocktikv/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,14 @@ var _ cluster.Cluster = &Cluster{}

// Cluster simulates a TiKV cluster. It focuses on management and the change of
// meta data. A Cluster mainly includes following 3 kinds of meta data:
// 1) Region: A Region is a fragment of TiKV's data whose range is [start, end).
// The data of a Region is duplicated to multiple Peers and distributed in
// multiple Stores.
// 2) Peer: A Peer is a replica of a Region's data. All peers of a Region form
// a group, each group elects a Leader to provide services.
// 3) Store: A Store is a storage/service node. Try to think it as a TiKV server
// process. Only the store with request's Region's leader Peer could respond
// to client's request.
// 1. Region: A Region is a fragment of TiKV's data whose range is [start, end).
// The data of a Region is duplicated to multiple Peers and distributed in
// multiple Stores.
// 2. Peer: A Peer is a replica of a Region's data. All peers of a Region form
// a group, each group elects a Leader to provide services.
// 3. Store: A Store is a storage/service node. Try to think it as a TiKV server
// process. Only the store with request's Region's leader Peer could respond
// to client's request.
type Cluster struct {
sync.RWMutex
id uint64
Expand Down Expand Up @@ -224,7 +224,7 @@ func (c *Cluster) AddStore(storeID uint64, addr string, labels ...*metapb.StoreL
c.Lock()
defer c.Unlock()

c.stores[storeID] = newStore(storeID, addr, labels...)
c.stores[storeID] = newStore(storeID, addr, addr, labels...)
}

// RemoveStore removes a Store from the cluster.
Expand All @@ -248,7 +248,15 @@ func (c *Cluster) MarkTombstone(storeID uint64) {
func (c *Cluster) UpdateStoreAddr(storeID uint64, addr string, labels ...*metapb.StoreLabel) {
c.Lock()
defer c.Unlock()
c.stores[storeID] = newStore(storeID, addr, labels...)
c.stores[storeID] = newStore(storeID, addr, addr, labels...)
}

// UpdateStorePeerAddr updates store peer address for cluster.
func (c *Cluster) UpdateStorePeerAddr(storeID uint64, peerAddr string, labels ...*metapb.StoreLabel) {
c.Lock()
defer c.Unlock()
addr := c.stores[storeID].meta.Address
c.stores[storeID] = newStore(storeID, addr, peerAddr, labels...)
}

// GetRegion returns a Region's meta and leader ID.
Expand Down Expand Up @@ -691,12 +699,13 @@ type Store struct {
cancel bool // return context.Cancelled error when cancel is true.
}

func newStore(storeID uint64, addr string, labels ...*metapb.StoreLabel) *Store {
func newStore(storeID uint64, addr string, peerAddr string, labels ...*metapb.StoreLabel) *Store {
return &Store{
meta: &metapb.Store{
Id: storeID,
Address: addr,
Labels: labels,
Id: storeID,
Address: addr,
PeerAddress: peerAddr,
Labels: labels,
},
}
}
Expand Down
5 changes: 4 additions & 1 deletion tikv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -521,13 +521,16 @@ func (s *KVStore) safeTSUpdater() {
}

func (s *KVStore) updateSafeTS(ctx context.Context) {
stores := s.regionCache.GetStoresByType(tikvrpc.TiKV)
stores := s.regionCache.GetAllStores()
tikvClient := s.GetTiKVClient()
wg := &sync.WaitGroup{}
wg.Add(len(stores))
for _, store := range stores {
storeID := store.StoreID()
storeAddr := store.GetAddr()
if store.IsTiFlash() {
storeAddr = store.GetPeerAddr()
}
go func(ctx context.Context, wg *sync.WaitGroup, storeID uint64, storeAddr string) {
defer wg.Done()
resp, err := tikvClient.SendRequest(ctx, storeAddr, tikvrpc.NewRequest(tikvrpc.CmdStoreSafeTS, &kvrpcpb.StoreSafeTSRequest{KeyRange: &kvrpcpb.KeyRange{
Expand Down
123 changes: 123 additions & 0 deletions tikv/kv_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
// Copyright 2022 TiKV Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package tikv

import (
"context"
"fmt"
"testing"
"time"

"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/stretchr/testify/suite"
"github.com/tikv/client-go/v2/internal/mockstore/mocktikv"
"github.com/tikv/client-go/v2/oracle"
"github.com/tikv/client-go/v2/testutils"
"github.com/tikv/client-go/v2/tikvrpc"
)

func TestKV(t *testing.T) {
suite.Run(t, new(testKVSuite))
}

type testKVSuite struct {
suite.Suite
store *KVStore
cluster *mocktikv.Cluster
tikvStoreId uint64
tiflashStoreId uint64
tiflashPeerStoreId uint64
}

func (s *testKVSuite) SetupTest() {
client, cluster, pdClient, err := testutils.NewMockTiKV("", nil)
s.Require().Nil(err)
testutils.BootstrapWithSingleStore(cluster)
store, err := NewTestTiKVStore(client, pdClient, nil, nil, 0)
s.Require().Nil(err)

s.store = store
s.cluster = cluster

storeIDs, _, _, _ := mocktikv.BootstrapWithMultiStores(s.cluster, 2)
s.tikvStoreId = storeIDs[0]
s.tiflashStoreId = storeIDs[1]
tiflashPeerAddrId := cluster.AllocIDs(1)
s.tiflashPeerStoreId = tiflashPeerAddrId[0]

s.cluster.UpdateStorePeerAddr(s.tiflashStoreId, s.storeAddr(s.tiflashPeerStoreId), &metapb.StoreLabel{Key: "engine", Value: "tiflash"})
s.store.regionCache.SetRegionCacheStore(s.tikvStoreId, s.storeAddr(s.tikvStoreId), s.storeAddr(s.tikvStoreId), tikvrpc.TiKV, 1, nil)
var labels []*metapb.StoreLabel
labels = append(labels, &metapb.StoreLabel{Key: "engine", Value: "tiflash"})
s.store.regionCache.SetRegionCacheStore(s.tiflashStoreId, s.storeAddr(s.tiflashStoreId), s.storeAddr(s.tiflashPeerStoreId), tikvrpc.TiFlash, 1, labels)

}

func (s *testKVSuite) TearDownTest() {
s.Require().Nil(s.store.Close())
}

func (s *testKVSuite) storeAddr(id uint64) string {
return fmt.Sprintf("store%d", id)
}

type storeSafeTsMockClient struct {
Client
requestCount int
testSuite *testKVSuite
}

func (c *storeSafeTsMockClient) SendRequest(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (*tikvrpc.Response, error) {
if req.Type != tikvrpc.CmdStoreSafeTS {
return c.Client.SendRequest(ctx, addr, req, timeout)
}
c.requestCount++
resp := &tikvrpc.Response{}
if addr == c.testSuite.storeAddr(c.testSuite.tiflashPeerStoreId) {
resp.Resp = &kvrpcpb.StoreSafeTSResponse{SafeTs: 80}
} else {
resp.Resp = &kvrpcpb.StoreSafeTSResponse{SafeTs: 100}
}
return resp, nil
}

func (c *storeSafeTsMockClient) Close() error {
return c.Client.Close()
}

func (c *storeSafeTsMockClient) CloseAddr(addr string) error {
return c.Client.CloseAddr(addr)
}

func (s *testKVSuite) TestMinSafeTs() {
mockClient := storeSafeTsMockClient{
Client: s.store.GetTiKVClient(),
testSuite: s,
}
s.store.SetTiKVClient(&mockClient)

// wait for updateMinSafeTS
var retryCount int
for s.store.GetMinSafeTS(oracle.GlobalTxnScope) != 80 {
time.Sleep(2 * time.Second)
if retryCount > 5 {
break
}
retryCount++
}
s.Require().GreaterOrEqual(mockClient.requestCount, 2)
s.Require().Equal(uint64(80), s.store.GetMinSafeTS(oracle.GlobalTxnScope))
}
2 changes: 1 addition & 1 deletion tikv/test_probe.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func (s StoreProbe) SaveSafePoint(v uint64) error {

// SetRegionCacheStore is used to set a store in region cache, for testing only
func (s StoreProbe) SetRegionCacheStore(id uint64, storeType tikvrpc.EndpointType, state uint64, labels []*metapb.StoreLabel) {
s.regionCache.SetRegionCacheStore(id, storeType, state, labels)
s.regionCache.SetRegionCacheStore(id, "", "", storeType, state, labels)
}

// SetSafeTS is used to set safeTS for the store with `storeID`
Expand Down