Skip to content

Commit

Permalink
schedulers: fix hot region distinct score filter (#934) (#939)
Browse files Browse the repository at this point in the history
  • Loading branch information
nolouch authored and disksing committed Jan 29, 2018
1 parent 3b3d1b4 commit 137fa73
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 14 deletions.
45 changes: 32 additions & 13 deletions server/balancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -895,22 +895,26 @@ func (s *testBalanceHotWriteRegionSchedulerSuite) TestBalance(c *C) {
tc := newTestClusterInfo(cluster)

_, opt := newTestScheduleConfig()
opt.rep = newTestReplication(3, "zone", "host")
hb, err := schedule.CreateScheduler("hot-write-region", opt)
c.Assert(err, IsNil)

// Add stores 1, 2, 3, 4, 5 with region counts 3, 2, 2, 2, 0.
tc.addRegionStore(1, 3)
tc.addRegionStore(2, 2)
tc.addRegionStore(3, 2)
tc.addRegionStore(4, 2)
tc.addRegionStore(5, 0)
// Add stores 1, 2, 3, 4, 5, 6 with region counts 3, 2, 2, 2, 0, 0.

tc.addLabelsStore(1, 3, map[string]string{"zone": "z1", "host": "h1"})
tc.addLabelsStore(2, 2, map[string]string{"zone": "z2", "host": "h2"})
tc.addLabelsStore(3, 2, map[string]string{"zone": "z3", "host": "h3"})
tc.addLabelsStore(4, 2, map[string]string{"zone": "z4", "host": "h4"})
tc.addLabelsStore(5, 0, map[string]string{"zone": "z2", "host": "h5"})
tc.addLabelsStore(6, 0, map[string]string{"zone": "z5", "host": "h6"})

// Report store written bytes.
tc.updateStorageWrittenBytes(1, 75*1024*1024)
tc.updateStorageWrittenBytes(2, 45*1024*1024)
tc.updateStorageWrittenBytes(3, 45*1024*1024)
tc.updateStorageWrittenBytes(4, 60*1024*1024)
tc.updateStorageWrittenBytes(5, 0)
tc.updateStorageWrittenBytes(6, 0)

// Region 1, 2 and 3 are hot regions.
//| region_id | leader_sotre | follower_store | follower_store | written_bytes |
Expand All @@ -923,25 +927,35 @@ func (s *testBalanceHotWriteRegionSchedulerSuite) TestBalance(c *C) {
tc.addLeaderRegionWithWriteInfo(3, 1, 512*1024*regionHeartBeatReportInterval, 2, 4)
hotRegionLowThreshold = 0

// Will transfer a hot region from store 1 to store 5, because the total count of peers
// Will transfer a hot region from store 1 to store 6, because the total count of peers
// which is hot for store 1 is more larger than other stores.
checkTransferPeerWithLeaderTransfer(c, hb.Schedule(cluster), 1, 5)
op := hb.Schedule(tc)
c.Assert(op, NotNil)
if op.RegionID() == 2 {
checkTransferPeerWithLeaderTransferFrom(c, op, 1)
} else {
checkTransferPeerWithLeaderTransfer(c, op, 1, 6)
}

// After transfer a hot region from store 1 to store 5
//| region_id | leader_sotre | follower_store | follower_store | written_bytes |
//|-----------|--------------|----------------|----------------|---------------|
//| 1 | 1 | 2 | 3 | 512KB |
//| 2 | 1 | 3 | 4 | 512KB |
//| 3 | 5 | 2 | 4 | 512KB |
//| 3 | 6 | 2 | 4 | 512KB |
//| 4 | 5 | 6 | 1 | 512KB |
//| 5 | 3 | 4 | 5 | 512KB |
tc.updateStorageWrittenBytes(1, 60*1024*1024)
tc.updateStorageWrittenBytes(2, 30*1024*1024)
tc.updateStorageWrittenBytes(3, 60*1024*1024)
tc.updateStorageWrittenBytes(4, 30*1024*1024)
tc.updateStorageWrittenBytes(5, 30*1024*1024)
tc.updateStorageWrittenBytes(5, 0*1024*1024)
tc.updateStorageWrittenBytes(6, 30*1024*1024)
tc.addLeaderRegionWithWriteInfo(1, 1, 512*1024*regionHeartBeatReportInterval, 2, 3)
tc.addLeaderRegionWithWriteInfo(2, 1, 512*1024*regionHeartBeatReportInterval, 3, 4)
tc.addLeaderRegionWithWriteInfo(3, 5, 512*1024*regionHeartBeatReportInterval, 2, 4)

tc.addLeaderRegionWithWriteInfo(2, 1, 512*1024*regionHeartBeatReportInterval, 2, 3)
tc.addLeaderRegionWithWriteInfo(3, 6, 512*1024*regionHeartBeatReportInterval, 1, 4)
tc.addLeaderRegionWithWriteInfo(4, 5, 512*1024*regionHeartBeatReportInterval, 6, 4)
tc.addLeaderRegionWithWriteInfo(5, 3, 512*1024*regionHeartBeatReportInterval, 4, 5)
// We can find that the leader of all hot regions are on store 1,
// so one of the leader will transfer to another store.
checkTransferLeaderFrom(c, hb.Schedule(cluster), 1)
Expand Down Expand Up @@ -1031,3 +1045,8 @@ func (s *testBalanceHotReadRegionSchedulerSuite) TestBalance(c *C) {
// We will Transfer peer from 1 to 5
checkTransferPeerWithLeaderTransfer(c, hb.Schedule(cluster), 1, 5)
}

func checkTransferPeerWithLeaderTransferFrom(c *C, op *schedule.Operator, sourceID uint64) {
c.Assert(op.Len(), Equals, 3)
c.Assert(op.Step(2).(schedule.RemovePeer).FromStore, Equals, sourceID)
}
3 changes: 2 additions & 1 deletion server/schedulers/hot_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,10 +252,11 @@ func (h *balanceHotRegionsScheduler) balanceByPeer(cluster schedule.Cluster, sto
continue
}

srcStore := cluster.GetStore(srcStoreID)
filters := []schedule.Filter{
schedule.NewExcludedFilter(srcRegion.GetStoreIds(), srcRegion.GetStoreIds()),
schedule.NewDistinctScoreFilter(h.opt.GetLocationLabels(), stores, cluster.GetLeaderStore(srcRegion)),
schedule.NewStateFilter(h.opt),
schedule.NewDistinctScoreFilter(h.opt.GetLocationLabels(), cluster.GetRegionStores(srcRegion), srcStore),
schedule.NewStorageThresholdFilter(h.opt),
}
destStoreIDs := make([]uint64, 0, len(stores))
Expand Down

0 comments on commit 137fa73

Please sign in to comment.