Skip to content

Commit 192058e

Browse files
bysomeone33cn
authored andcommitted
[[FIX]] add fork chain detection(#1236)
1 parent 8aca51e commit 192058e

File tree

3 files changed

+148
-62
lines changed

3 files changed

+148
-62
lines changed

blockchain/blocksyn.go

+137-59
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ import (
2020
var (
2121
BackBlockNum int64 = 128 //节点高度不增加时向后取blocks的个数
2222
BackwardBlockNum int64 = 16 //本节点高度不增加时并且落后peer的高度数
23-
checkHeightNoIncSeconds int64 = 5 * 60 //高度不增长时的检测周期目前暂定5分钟
23+
checkHeightNoIncSeconds int64 = 200 //高度不增长时的检测周期
2424
checkBlockHashSeconds int64 = 1 * 60 //1分钟检测一次tip hash和peer 对应高度的hash是否一致
2525
fetchPeerListSeconds int64 = 5 //5 秒获取一个peerlist
2626
MaxRollBlockNum int64 = 10000 //最大回退block数量
@@ -160,6 +160,13 @@ func (chain *BlockChain) SynRoutine() {
160160
//30s尝试从peer节点请求ChunkRecord
161161
chunkRecordSynTicker := time.NewTicker(30 * time.Second)
162162
defer chunkRecordSynTicker.Stop()
163+
164+
chain.UpdatesynBlkHeight(chain.GetBlockHeight())
165+
mode := chain.GetDownloadSyncStatus()
166+
chain.UpdateDownloadSyncStatus(forkChainDetectMode)
167+
// make sure not on fork chain when node start
168+
go chain.forkChainDectection(mode)
169+
163170
//节点下载模式
164171
go chain.DownLoadBlocks()
165172

@@ -186,8 +193,10 @@ func (chain *BlockChain) SynRoutine() {
186193

187194
case <-checkBlockHashTicker.C:
188195
//synlog.Info("checkBlockHashTicker")
189-
chain.tickerwg.Add(1)
190-
go chain.CheckTipBlockHash()
196+
if chain.GetDownloadSyncStatus() != forkChainDetectMode {
197+
chain.tickerwg.Add(1)
198+
go chain.CheckTipBlockHash()
199+
}
191200

192201
//定时检查系统时间,如果系统时间有问题,那么会有一个报警
193202
case <-checkClockDriftTicker.C:
@@ -433,6 +442,33 @@ func (chain *BlockChain) GetPeerInfo(pid string) *PeerInfo {
433442
return nil
434443
}
435444

445+
// getForkDetectPeer 区块高度次大节点
446+
func (chain *BlockChain) getForkComparePeer() PeerInfo {
447+
448+
chain.peerMaxBlklock.Lock()
449+
defer chain.peerMaxBlklock.Unlock()
450+
451+
if chain.peerList.Len() == 0 {
452+
return PeerInfo{}
453+
} else if chain.peerList.Len() == 1 {
454+
return *chain.peerList[0]
455+
} else {
456+
return *chain.peerList[chain.peerList.Len()-2]
457+
}
458+
}
459+
460+
func (chain *BlockChain) getActivePeersByHeight(height int64) []string {
461+
chain.peerMaxBlklock.Lock()
462+
defer chain.peerMaxBlklock.Unlock()
463+
peers := make([]string, 0, 8)
464+
for _, peer := range chain.peerList {
465+
if peer.Height > height {
466+
peers = append(peers, peer.Name)
467+
}
468+
}
469+
return peers
470+
}
471+
436472
//GetMaxPeerInfo 获取peerlist中最高节点的peerinfo
437473
func (chain *BlockChain) GetMaxPeerInfo() *PeerInfo {
438474
chain.peerMaxBlklock.Lock()
@@ -643,6 +679,77 @@ func (chain *BlockChain) SynBlocksFromPeers() {
643679
}
644680
}
645681

682+
// dectect if run in fork chain, try to correct
683+
func (chain *BlockChain) forkChainDectection(prevMode int) {
684+
685+
// restore download mode
686+
defer chain.UpdateDownloadSyncStatus(prevMode)
687+
688+
cmpPeer := chain.getForkComparePeer()
689+
for cmpPeer.Height == 0 {
690+
time.Sleep(time.Second * 5)
691+
cmpPeer = chain.getForkComparePeer()
692+
}
693+
localHeight := chain.GetBlockHeight()
694+
chainlog.Debug("forkDectectInfo", "height", localHeight, "peerHeight", cmpPeer.Height)
695+
if cmpPeer.Height < localHeight+BackwardBlockNum {
696+
return
697+
}
698+
699+
if err := chain.FetchBlockHeaders(localHeight-BackwardBlockNum, localHeight, cmpPeer.Name); err != nil {
700+
chainlog.Error("forkDectectFetchHeaders", "err", err)
701+
}
702+
703+
// wait for finding fork point
704+
var forkHeight int64
705+
select {
706+
case <-time.After(time.Minute * 2):
707+
chainlog.Error("forkDectect wait fork point timeout")
708+
return
709+
case forkHeight = <-chain.forkPointChan:
710+
}
711+
712+
// no forks
713+
if forkHeight >= localHeight {
714+
chainlog.Debug("forkDectectNoForks")
715+
return
716+
}
717+
718+
activePeer := chain.getActivePeersByHeight(cmpPeer.Height)
719+
chainlog.Debug("forkDectect", "activePeer", len(activePeer))
720+
if len(activePeer) <= 0 {
721+
return
722+
}
723+
724+
readyDownload := make(chan struct{})
725+
726+
go func() {
727+
for chain.downLoadTask.InProgress() {
728+
time.Sleep(time.Second * 5)
729+
}
730+
readyDownload <- struct{}{}
731+
}()
732+
733+
select {
734+
case <-time.After(time.Minute * 5):
735+
chainlog.Error("forkDectect wait download task timeout")
736+
chain.downLoadTask.Cancel()
737+
case <-readyDownload:
738+
}
739+
if chain.syncTask.InProgress() {
740+
chain.syncTask.Cancel()
741+
}
742+
743+
if chain.GetBlockHeight() > localHeight {
744+
chainlog.Info("forkDectectBlkHeightIncreased", "prev", localHeight, "curr", chain.GetBlockHeight())
745+
return
746+
}
747+
748+
chainlog.Info("forkDectectDownBlk", "localHeight", localHeight, "forkHeight", forkHeight, "peers", len(activePeer))
749+
go chain.ProcDownLoadBlocks(forkHeight, localHeight+1, activePeer)
750+
751+
}
752+
646753
//CheckHeightNoIncrease 在规定时间本链的高度没有增长,但peerlist中最新高度远远高于本节点高度,
647754
//可能当前链是在分支链上,需从指定最长链的peer向后请求指定数量的blockheader
648755
//请求bestchain.Height -BackBlockNum -- bestchain.Height的header
@@ -651,48 +758,28 @@ func (chain *BlockChain) CheckHeightNoIncrease() {
651758
defer chain.tickerwg.Done()
652759

653760
//获取当前主链的最新高度
654-
tipheight := chain.bestChain.Height()
655-
laststorheight := chain.blockStore.Height()
656-
657-
if tipheight != laststorheight {
658-
synlog.Error("CheckHeightNoIncrease", "tipheight", tipheight, "laststorheight", laststorheight)
659-
return
660-
}
661-
//获取上个检测周期时的检测高度
662-
checkheight := chain.GetsynBlkHeight()
761+
localHeight := chain.GetBlockHeight()
663762

664763
//bestchain的tip高度在变化,更新最新的检测高度即可,高度可能在增长或者回退
665-
if tipheight != checkheight {
666-
chain.UpdatesynBlkHeight(tipheight)
764+
if localHeight != chain.GetsynBlkHeight() {
765+
chain.UpdatesynBlkHeight(localHeight)
667766
return
668767
}
669-
//一个检测周期发现本节点bestchain的tip高度没有变化。
670-
//远远落后于高度的peer节点并且最高peer节点不是最优链,本节点可能在侧链上,
671-
//需要从最新的peer上向后取BackBlockNum个headers
672-
maxpeer := chain.GetMaxPeerInfo()
673-
if maxpeer == nil {
674-
synlog.Error("CheckHeightNoIncrease GetMaxPeerInfo is nil")
768+
769+
mode := chain.GetDownloadSyncStatus()
770+
chainlog.Debug("CheckHeightNoIncrease", "localHeight", localHeight, "downMode", mode)
771+
if mode == forkChainDetectMode {
675772
return
676773
}
677-
peermaxheight := maxpeer.Height
678-
pid := maxpeer.Name
679-
var err error
680-
if peermaxheight > tipheight && (peermaxheight-tipheight) > BackwardBlockNum && !chain.isBestChainPeer(pid) {
681-
//从指定peer向后请求BackBlockNum个blockheaders
682-
synlog.Debug("CheckHeightNoIncrease", "tipheight", tipheight, "pid", pid)
683-
if tipheight > BackBlockNum {
684-
err = chain.FetchBlockHeaders(tipheight-BackBlockNum, tipheight, pid)
685-
} else {
686-
err = chain.FetchBlockHeaders(0, tipheight, pid)
687-
}
688-
if err != nil {
689-
synlog.Error("CheckHeightNoIncrease FetchBlockHeaders", "err", err)
690-
}
691-
}
774+
chain.UpdateDownloadSyncStatus(forkChainDetectMode)
775+
chain.forkChainDectection(mode)
692776
}
693777

694778
//FetchBlockHeaders 从指定pid获取start到end之间的headers
695779
func (chain *BlockChain) FetchBlockHeaders(start int64, end int64, pid string) (err error) {
780+
if start < 0 {
781+
start = 0
782+
}
696783
if chain.client == nil {
697784
synlog.Error("FetchBlockHeaders chain client not bind message queue.")
698785
return types.ErrClientNotBindQueue
@@ -753,11 +840,7 @@ func (chain *BlockChain) ProcBlockHeader(headers *types.Headers, peerid string)
753840
if !bytes.Equal(headers.Items[0].Hash, header.Hash) {
754841
synlog.Info("ProcBlockHeader hash no equal", "height", height, "self hash", common.ToHex(header.Hash), "peer hash", common.ToHex(headers.Items[0].Hash))
755842

756-
if height > BackBlockNum {
757-
err = chain.FetchBlockHeaders(height-BackBlockNum, height, peerid)
758-
} else if height != 0 {
759-
err = chain.FetchBlockHeaders(0, height, peerid)
760-
}
843+
err = chain.FetchBlockHeaders(height-BackBlockNum, height, peerid)
761844
if err != nil {
762845
synlog.Info("ProcBlockHeader FetchBlockHeaders", "err", err)
763846
}
@@ -795,18 +878,23 @@ func (chain *BlockChain) ProcBlockHeaders(headers *types.Headers, pid string) er
795878
}
796879
//继续向后取指定数量的headers
797880
height := headers.Items[0].Height
798-
if height > BackBlockNum {
799-
err = chain.FetchBlockHeaders(height-BackBlockNum, height, pid)
800-
} else {
801-
err = chain.FetchBlockHeaders(0, height, pid)
802-
}
881+
err = chain.FetchBlockHeaders(height-BackBlockNum, height, pid)
803882
if err != nil {
804883
synlog.Info("ProcBlockHeaders FetchBlockHeaders", "err", err)
805884
}
806885
return types.ErrContinueBack
807886
}
808887
synlog.Info("ProcBlockHeaders find fork point", "height", ForkHeight, "hash", common.ToHex(forkhash))
809888

889+
if chain.GetDownloadSyncStatus() == forkChainDetectMode {
890+
synlog.Error("ProcBlockHeaders forkDetect")
891+
select {
892+
case chain.forkPointChan <- ForkHeight:
893+
default:
894+
}
895+
return nil
896+
}
897+
810898
//获取此pid对应的peer信息,
811899
peerinfo := chain.GetPeerInfo(pid)
812900
if peerinfo == nil {
@@ -888,13 +976,8 @@ func (chain *BlockChain) CheckTipBlockHash() {
888976
} else if peermaxheight == tipheight {
889977
// 直接tip block hash比较,如果不相等需要从peer向后去指定的headers,尝试寻找分叉点
890978
if !bytes.Equal(tiphash, peerhash) {
891-
if tipheight > BackBlockNum {
892-
synlog.Debug("CheckTipBlockHash ==", "peermaxheight", peermaxheight, "tipheight", tipheight)
893-
Err = chain.FetchBlockHeaders(tipheight-BackBlockNum, tipheight, pid)
894-
} else {
895-
synlog.Debug("CheckTipBlockHash !=", "peermaxheight", peermaxheight, "tipheight", tipheight)
896-
Err = chain.FetchBlockHeaders(1, tipheight, pid)
897-
}
979+
synlog.Debug("CheckTipBlockHash ==", "peermaxheight", peermaxheight, "tipheight", tipheight)
980+
Err = chain.FetchBlockHeaders(tipheight-BackBlockNum, tipheight, pid)
898981
}
899982
} else {
900983

@@ -903,13 +986,8 @@ func (chain *BlockChain) CheckTipBlockHash() {
903986
return
904987
}
905988
if !bytes.Equal(header.Hash, peerhash) {
906-
if peermaxheight > BackBlockNum {
907-
synlog.Debug("CheckTipBlockHash<!=", "peermaxheight", peermaxheight, "tipheight", tipheight)
908-
Err = chain.FetchBlockHeaders(peermaxheight-BackBlockNum, peermaxheight, pid)
909-
} else {
910-
synlog.Debug("CheckTipBlockHash<!=", "peermaxheight", peermaxheight, "tipheight", tipheight)
911-
Err = chain.FetchBlockHeaders(1, peermaxheight, pid)
912-
}
989+
synlog.Debug("CheckTipBlockHash<!=", "peermaxheight", peermaxheight, "tipheight", tipheight)
990+
Err = chain.FetchBlockHeaders(peermaxheight-BackBlockNum, peermaxheight, pid)
913991
}
914992
}
915993
if Err != nil {

blockchain/chain.go

+3
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,8 @@ type BlockChain struct {
6464
//记录本节点已经同步的block高度,用于节点追赶active链,处理节点分叉不同步的场景
6565
synBlockHeight int64
6666

67+
forkPointChan chan int64
68+
6769
//记录peer的最新block高度,用于节点追赶active链
6870
peerList PeerInfoList
6971
recvwg *sync.WaitGroup
@@ -194,6 +196,7 @@ func New(cfg *types.Chain33Config) *BlockChain {
194196
downloadMode: fastDownLoadMode,
195197
blockOnChain: &BlockOnChain{},
196198
onChainTimeout: 0,
199+
forkPointChan: make(chan int64, 1),
197200
}
198201
blockchain.initConfig(cfg)
199202
blockchain.blockCache = newBlockCache(cfg, defaultBlockHashCacheSize)

blockchain/download.go

+8-3
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,10 @@ const (
2828
//快速下载时需要的最少peer数量
2929
bestPeerCount = 2
3030

31-
normalDownLoadMode = 0
32-
fastDownLoadMode = 1
33-
chunkDownLoadMode = 2
31+
normalDownLoadMode = 0
32+
fastDownLoadMode = 1
33+
chunkDownLoadMode = 2
34+
forkChainDetectMode = 3
3435
)
3536

3637
//DownLoadInfo blockchain模块下载block处理结构体
@@ -431,6 +432,10 @@ func (chain *BlockChain) DownLoadTimeOutProc(height int64) {
431432

432433
// DownLoadBlocks 下载区块
433434
func (chain *BlockChain) DownLoadBlocks() {
435+
// wait fork chain detection
436+
for chain.GetDownloadSyncStatus() == forkChainDetectMode {
437+
time.Sleep(time.Second)
438+
}
434439
if !chain.cfg.DisableShard && chain.cfg.EnableFetchP2pstore {
435440
// 1.节点开启时候首先尝试进行chunkDownLoad下载
436441
chain.UpdateDownloadSyncStatus(chunkDownLoadMode) // 默认模式是fastDownLoadMode

0 commit comments

Comments
 (0)