Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

scheduler: fix hot region scheduler select store problem. #898

Merged
merged 3 commits into from
Jan 3, 2018
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
143 changes: 50 additions & 93 deletions server/schedulers/hot_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,23 +226,7 @@ func (h *balanceHotRegionsScheduler) calcScore(items []*core.RegionStat, cluster
}

func (h *balanceHotRegionsScheduler) balanceByPeer(cluster schedule.Cluster, storesStat core.StoreHotRegionsStat) (*core.RegionInfo, *metapb.Peer, *metapb.Peer) {
var (
maxReadBytes uint64
srcStoreID uint64
maxHotStoreRegionCount int
)

// get the srcStoreId
// We choose the store with the maximum hot region first;
// inside these stores, we choose the one with maximum written bytes.
for storeID, statistics := range storesStat {
count, readBytes := statistics.RegionsStat.Len(), statistics.TotalFlowBytes
if count >= 2 && (count > maxHotStoreRegionCount || (count == maxHotStoreRegionCount && readBytes > maxReadBytes)) {
maxHotStoreRegionCount = count
maxReadBytes = readBytes
srcStoreID = storeID
}
}
srcStoreID := h.selectSrcStore(storesStat)
if srcStoreID == 0 {
return nil, nil, nil
}
Expand Down Expand Up @@ -273,9 +257,8 @@ func (h *balanceHotRegionsScheduler) balanceByPeer(cluster schedule.Cluster, sto
destStoreIDs = append(destStoreIDs, store.GetId())
}

destStoreID = h.selectDestStoreByPeer(destStoreIDs, srcRegion, srcStoreID, storesStat)
destStoreID = h.selectDestStore(destStoreIDs, rs.FlowBytes, srcStoreID, storesStat)
if destStoreID != 0 {
srcRegion.ReadBytes = rs.FlowBytes
h.adjustBalanceLimit(srcStoreID, storesStat)

var srcPeer *metapb.Peer
Expand Down Expand Up @@ -305,64 +288,8 @@ func (h *balanceHotRegionsScheduler) balanceByPeer(cluster schedule.Cluster, sto
return nil, nil, nil
}

// selectDestStoreByPeer selects a target store to hold the region of the source region.
// We choose a target store based on the hot region number and written bytes of this store.
func (h *balanceHotRegionsScheduler) selectDestStoreByPeer(candidateStoreIDs []uint64, srcRegion *core.RegionInfo, srcStoreID uint64, storesStat core.StoreHotRegionsStat) uint64 {
sr := storesStat[srcStoreID]
srcReadBytes := sr.TotalFlowBytes
srcHotRegionsCount := sr.RegionsStat.Len()

var (
destStoreID uint64
minReadBytes uint64 = math.MaxUint64
)
minRegionsCount := int(math.MaxInt32)
for _, storeID := range candidateStoreIDs {
if s, ok := storesStat[storeID]; ok {
if srcHotRegionsCount-s.RegionsStat.Len() > 1 && minRegionsCount > s.RegionsStat.Len() {
destStoreID = storeID
minReadBytes = s.TotalFlowBytes
minRegionsCount = s.RegionsStat.Len()
continue
}
if minRegionsCount == s.RegionsStat.Len() && minReadBytes > s.TotalFlowBytes &&
uint64(float64(srcReadBytes)*hotRegionScheduleFactor) > s.TotalFlowBytes+2*srcRegion.ReadBytes {
minReadBytes = s.TotalFlowBytes
destStoreID = storeID
}
} else {
destStoreID = storeID
break
}
}
return destStoreID
}

func (h *balanceHotRegionsScheduler) balanceByLeader(cluster schedule.Cluster, storesStat core.StoreHotRegionsStat) (*core.RegionInfo, *metapb.Peer) {
var (
maxReadBytes uint64
srcStoreID uint64
maxHotStoreRegionCount int
)

// select srcStoreId by leader
for storeID, statistics := range storesStat {
if statistics.RegionsStat.Len() < 2 {
continue
}

if maxHotStoreRegionCount < statistics.RegionsStat.Len() {
maxHotStoreRegionCount = statistics.RegionsStat.Len()
maxReadBytes = statistics.TotalFlowBytes
srcStoreID = storeID
continue
}

if maxHotStoreRegionCount == statistics.RegionsStat.Len() && maxReadBytes < statistics.TotalFlowBytes {
maxReadBytes = statistics.TotalFlowBytes
srcStoreID = storeID
}
}
srcStoreID := h.selectSrcStore(storesStat)
if srcStoreID == 0 {
return nil, nil
}
Expand All @@ -375,7 +302,16 @@ func (h *balanceHotRegionsScheduler) balanceByLeader(cluster schedule.Cluster, s
continue
}

destPeer := h.selectDestStoreByLeader(srcRegion, storesStat)
candidateStoreIDs := make([]uint64, 0, len(srcRegion.Peers)-1)
for id := range srcRegion.GetFollowers() {
candidateStoreIDs = append(candidateStoreIDs, id)
}
destStoreID := h.selectDestStore(candidateStoreIDs, rs.FlowBytes, srcStoreID, storesStat)
if destStoreID == 0 {
continue
}

destPeer := srcRegion.GetStorePeer(destStoreID)
if destPeer != nil {
h.adjustBalanceLimit(srcStoreID, storesStat)
return srcRegion, destPeer
Expand All @@ -384,35 +320,56 @@ func (h *balanceHotRegionsScheduler) balanceByLeader(cluster schedule.Cluster, s
return nil, nil
}

func (h *balanceHotRegionsScheduler) selectDestStoreByLeader(srcRegion *core.RegionInfo, storesStat core.StoreHotRegionsStat) *metapb.Peer {
sr := storesStat[srcRegion.Leader.GetStoreId()]
srcReadBytes := sr.TotalFlowBytes
// Select the store to move hot regions from.
// We choose the store with the maximum number of hot region first.
// Inside these stores, we choose the one with maximum flow bytes.
func (h *balanceHotRegionsScheduler) selectSrcStore(stats core.StoreHotRegionsStat) (srcStoreID uint64) {
var (
maxFlowBytes uint64
maxHotStoreRegionCount int
)

for storeID, statistics := range stats {
count, flowBytes := statistics.RegionsStat.Len(), statistics.TotalFlowBytes
if count >= 2 && (count > maxHotStoreRegionCount || (count == maxHotStoreRegionCount && flowBytes > maxFlowBytes)) {
maxHotStoreRegionCount = count
maxFlowBytes = flowBytes
srcStoreID = storeID
}
}
return
}

// selectDestStore selects a target store to hold the region of the source region.
// We choose a target store based on the hot region number and flow bytes of this store.
func (h *balanceHotRegionsScheduler) selectDestStore(candidateStoreIDs []uint64, regionFlowBytes uint64, srcStoreID uint64, storesStat core.StoreHotRegionsStat) (destStoreID uint64) {
sr := storesStat[srcStoreID]
srcFlowBytes := sr.TotalFlowBytes
srcHotRegionsCount := sr.RegionsStat.Len()

var (
destPeer *metapb.Peer
minReadBytes uint64 = math.MaxUint64
minFlowBytes uint64 = math.MaxUint64
minRegionsCount = int(math.MaxInt32)
)
minRegionsCount := int(math.MaxInt32)
for storeID, peer := range srcRegion.GetFollowers() {
for _, storeID := range candidateStoreIDs {
if s, ok := storesStat[storeID]; ok {
if srcHotRegionsCount-s.RegionsStat.Len() > 1 && minRegionsCount > s.RegionsStat.Len() {
destPeer = peer
minReadBytes = s.TotalFlowBytes
destStoreID = storeID
minFlowBytes = s.TotalFlowBytes
minRegionsCount = s.RegionsStat.Len()
continue
}
if minRegionsCount == s.RegionsStat.Len() && minReadBytes > s.TotalFlowBytes &&
uint64(float64(srcReadBytes)*hotRegionScheduleFactor) > s.TotalFlowBytes+2*srcRegion.ReadBytes {
minReadBytes = s.TotalFlowBytes
destPeer = peer
if minRegionsCount == s.RegionsStat.Len() && minFlowBytes > s.TotalFlowBytes &&
uint64(float64(srcFlowBytes)*hotRegionScheduleFactor) > s.TotalFlowBytes+2*regionFlowBytes {
minFlowBytes = s.TotalFlowBytes
destStoreID = storeID
}
} else {
destPeer = peer
break
destStoreID = storeID
return
}
}
return destPeer
return
}

func (h *balanceHotRegionsScheduler) adjustBalanceLimit(storeID uint64, storesStat core.StoreHotRegionsStat) {
Expand Down