@@ -20,7 +20,7 @@ import (
20
20
var (
21
21
BackBlockNum int64 = 128 //节点高度不增加时向后取blocks的个数
22
22
BackwardBlockNum int64 = 16 //本节点高度不增加时并且落后peer的高度数
23
- checkHeightNoIncSeconds int64 = 5 * 60 //高度不增长时的检测周期目前暂定5分钟
23
+ checkHeightNoIncSeconds int64 = 200 //高度不增长时的检测周期
24
24
checkBlockHashSeconds int64 = 1 * 60 //1分钟检测一次tip hash和peer 对应高度的hash是否一致
25
25
fetchPeerListSeconds int64 = 5 //5 秒获取一个peerlist
26
26
MaxRollBlockNum int64 = 10000 //最大回退block数量
@@ -160,6 +160,13 @@ func (chain *BlockChain) SynRoutine() {
160
160
//30s尝试从peer节点请求ChunkRecord
161
161
chunkRecordSynTicker := time .NewTicker (30 * time .Second )
162
162
defer chunkRecordSynTicker .Stop ()
163
+
164
+ chain .UpdatesynBlkHeight (chain .GetBlockHeight ())
165
+ mode := chain .GetDownloadSyncStatus ()
166
+ chain .UpdateDownloadSyncStatus (forkChainDetectMode )
167
+ // make sure not on fork chain when node start
168
+ go chain .forkChainDectection (mode )
169
+
163
170
//节点下载模式
164
171
go chain .DownLoadBlocks ()
165
172
@@ -186,8 +193,10 @@ func (chain *BlockChain) SynRoutine() {
186
193
187
194
case <- checkBlockHashTicker .C :
188
195
//synlog.Info("checkBlockHashTicker")
189
- chain .tickerwg .Add (1 )
190
- go chain .CheckTipBlockHash ()
196
+ if chain .GetDownloadSyncStatus () != forkChainDetectMode {
197
+ chain .tickerwg .Add (1 )
198
+ go chain .CheckTipBlockHash ()
199
+ }
191
200
192
201
//定时检查系统时间,如果系统时间有问题,那么会有一个报警
193
202
case <- checkClockDriftTicker .C :
@@ -433,6 +442,33 @@ func (chain *BlockChain) GetPeerInfo(pid string) *PeerInfo {
433
442
return nil
434
443
}
435
444
445
+ // getForkDetectPeer 区块高度次大节点
446
+ func (chain * BlockChain ) getForkComparePeer () PeerInfo {
447
+
448
+ chain .peerMaxBlklock .Lock ()
449
+ defer chain .peerMaxBlklock .Unlock ()
450
+
451
+ if chain .peerList .Len () == 0 {
452
+ return PeerInfo {}
453
+ } else if chain .peerList .Len () == 1 {
454
+ return * chain .peerList [0 ]
455
+ } else {
456
+ return * chain .peerList [chain .peerList .Len ()- 2 ]
457
+ }
458
+ }
459
+
460
+ func (chain * BlockChain ) getActivePeersByHeight (height int64 ) []string {
461
+ chain .peerMaxBlklock .Lock ()
462
+ defer chain .peerMaxBlklock .Unlock ()
463
+ peers := make ([]string , 0 , 8 )
464
+ for _ , peer := range chain .peerList {
465
+ if peer .Height > height {
466
+ peers = append (peers , peer .Name )
467
+ }
468
+ }
469
+ return peers
470
+ }
471
+
436
472
//GetMaxPeerInfo 获取peerlist中最高节点的peerinfo
437
473
func (chain * BlockChain ) GetMaxPeerInfo () * PeerInfo {
438
474
chain .peerMaxBlklock .Lock ()
@@ -643,6 +679,77 @@ func (chain *BlockChain) SynBlocksFromPeers() {
643
679
}
644
680
}
645
681
682
+ // dectect if run in fork chain, try to correct
683
+ func (chain * BlockChain ) forkChainDectection (prevMode int ) {
684
+
685
+ // restore download mode
686
+ defer chain .UpdateDownloadSyncStatus (prevMode )
687
+
688
+ cmpPeer := chain .getForkComparePeer ()
689
+ for cmpPeer .Height == 0 {
690
+ time .Sleep (time .Second * 5 )
691
+ cmpPeer = chain .getForkComparePeer ()
692
+ }
693
+ localHeight := chain .GetBlockHeight ()
694
+ chainlog .Debug ("forkDectectInfo" , "height" , localHeight , "peerHeight" , cmpPeer .Height )
695
+ if cmpPeer .Height < localHeight + BackwardBlockNum {
696
+ return
697
+ }
698
+
699
+ if err := chain .FetchBlockHeaders (localHeight - BackwardBlockNum , localHeight , cmpPeer .Name ); err != nil {
700
+ chainlog .Error ("forkDectectFetchHeaders" , "err" , err )
701
+ }
702
+
703
+ // wait for finding fork point
704
+ var forkHeight int64
705
+ select {
706
+ case <- time .After (time .Minute * 2 ):
707
+ chainlog .Error ("forkDectect wait fork point timeout" )
708
+ return
709
+ case forkHeight = <- chain .forkPointChan :
710
+ }
711
+
712
+ // no forks
713
+ if forkHeight >= localHeight {
714
+ chainlog .Debug ("forkDectectNoForks" )
715
+ return
716
+ }
717
+
718
+ activePeer := chain .getActivePeersByHeight (cmpPeer .Height )
719
+ chainlog .Debug ("forkDectect" , "activePeer" , len (activePeer ))
720
+ if len (activePeer ) <= 0 {
721
+ return
722
+ }
723
+
724
+ readyDownload := make (chan struct {})
725
+
726
+ go func () {
727
+ for chain .downLoadTask .InProgress () {
728
+ time .Sleep (time .Second * 5 )
729
+ }
730
+ readyDownload <- struct {}{}
731
+ }()
732
+
733
+ select {
734
+ case <- time .After (time .Minute * 5 ):
735
+ chainlog .Error ("forkDectect wait download task timeout" )
736
+ chain .downLoadTask .Cancel ()
737
+ case <- readyDownload :
738
+ }
739
+ if chain .syncTask .InProgress () {
740
+ chain .syncTask .Cancel ()
741
+ }
742
+
743
+ if chain .GetBlockHeight () > localHeight {
744
+ chainlog .Info ("forkDectectBlkHeightIncreased" , "prev" , localHeight , "curr" , chain .GetBlockHeight ())
745
+ return
746
+ }
747
+
748
+ chainlog .Info ("forkDectectDownBlk" , "localHeight" , localHeight , "forkHeight" , forkHeight , "peers" , len (activePeer ))
749
+ go chain .ProcDownLoadBlocks (forkHeight , localHeight + 1 , activePeer )
750
+
751
+ }
752
+
646
753
//CheckHeightNoIncrease 在规定时间本链的高度没有增长,但peerlist中最新高度远远高于本节点高度,
647
754
//可能当前链是在分支链上,需从指定最长链的peer向后请求指定数量的blockheader
648
755
//请求bestchain.Height -BackBlockNum -- bestchain.Height的header
@@ -651,48 +758,28 @@ func (chain *BlockChain) CheckHeightNoIncrease() {
651
758
defer chain .tickerwg .Done ()
652
759
653
760
//获取当前主链的最新高度
654
- tipheight := chain .bestChain .Height ()
655
- laststorheight := chain .blockStore .Height ()
656
-
657
- if tipheight != laststorheight {
658
- synlog .Error ("CheckHeightNoIncrease" , "tipheight" , tipheight , "laststorheight" , laststorheight )
659
- return
660
- }
661
- //获取上个检测周期时的检测高度
662
- checkheight := chain .GetsynBlkHeight ()
761
+ localHeight := chain .GetBlockHeight ()
663
762
664
763
//bestchain的tip高度在变化,更新最新的检测高度即可,高度可能在增长或者回退
665
- if tipheight != checkheight {
666
- chain .UpdatesynBlkHeight (tipheight )
764
+ if localHeight != chain . GetsynBlkHeight () {
765
+ chain .UpdatesynBlkHeight (localHeight )
667
766
return
668
767
}
669
- //一个检测周期发现本节点bestchain的tip高度没有变化。
670
- //远远落后于高度的peer节点并且最高peer节点不是最优链,本节点可能在侧链上,
671
- //需要从最新的peer上向后取BackBlockNum个headers
672
- maxpeer := chain .GetMaxPeerInfo ()
673
- if maxpeer == nil {
674
- synlog .Error ("CheckHeightNoIncrease GetMaxPeerInfo is nil" )
768
+
769
+ mode := chain .GetDownloadSyncStatus ()
770
+ chainlog .Debug ("CheckHeightNoIncrease" , "localHeight" , localHeight , "downMode" , mode )
771
+ if mode == forkChainDetectMode {
675
772
return
676
773
}
677
- peermaxheight := maxpeer .Height
678
- pid := maxpeer .Name
679
- var err error
680
- if peermaxheight > tipheight && (peermaxheight - tipheight ) > BackwardBlockNum && ! chain .isBestChainPeer (pid ) {
681
- //从指定peer向后请求BackBlockNum个blockheaders
682
- synlog .Debug ("CheckHeightNoIncrease" , "tipheight" , tipheight , "pid" , pid )
683
- if tipheight > BackBlockNum {
684
- err = chain .FetchBlockHeaders (tipheight - BackBlockNum , tipheight , pid )
685
- } else {
686
- err = chain .FetchBlockHeaders (0 , tipheight , pid )
687
- }
688
- if err != nil {
689
- synlog .Error ("CheckHeightNoIncrease FetchBlockHeaders" , "err" , err )
690
- }
691
- }
774
+ chain .UpdateDownloadSyncStatus (forkChainDetectMode )
775
+ chain .forkChainDectection (mode )
692
776
}
693
777
694
778
//FetchBlockHeaders 从指定pid获取start到end之间的headers
695
779
func (chain * BlockChain ) FetchBlockHeaders (start int64 , end int64 , pid string ) (err error ) {
780
+ if start < 0 {
781
+ start = 0
782
+ }
696
783
if chain .client == nil {
697
784
synlog .Error ("FetchBlockHeaders chain client not bind message queue." )
698
785
return types .ErrClientNotBindQueue
@@ -753,11 +840,7 @@ func (chain *BlockChain) ProcBlockHeader(headers *types.Headers, peerid string)
753
840
if ! bytes .Equal (headers .Items [0 ].Hash , header .Hash ) {
754
841
synlog .Info ("ProcBlockHeader hash no equal" , "height" , height , "self hash" , common .ToHex (header .Hash ), "peer hash" , common .ToHex (headers .Items [0 ].Hash ))
755
842
756
- if height > BackBlockNum {
757
- err = chain .FetchBlockHeaders (height - BackBlockNum , height , peerid )
758
- } else if height != 0 {
759
- err = chain .FetchBlockHeaders (0 , height , peerid )
760
- }
843
+ err = chain .FetchBlockHeaders (height - BackBlockNum , height , peerid )
761
844
if err != nil {
762
845
synlog .Info ("ProcBlockHeader FetchBlockHeaders" , "err" , err )
763
846
}
@@ -795,18 +878,23 @@ func (chain *BlockChain) ProcBlockHeaders(headers *types.Headers, pid string) er
795
878
}
796
879
//继续向后取指定数量的headers
797
880
height := headers .Items [0 ].Height
798
- if height > BackBlockNum {
799
- err = chain .FetchBlockHeaders (height - BackBlockNum , height , pid )
800
- } else {
801
- err = chain .FetchBlockHeaders (0 , height , pid )
802
- }
881
+ err = chain .FetchBlockHeaders (height - BackBlockNum , height , pid )
803
882
if err != nil {
804
883
synlog .Info ("ProcBlockHeaders FetchBlockHeaders" , "err" , err )
805
884
}
806
885
return types .ErrContinueBack
807
886
}
808
887
synlog .Info ("ProcBlockHeaders find fork point" , "height" , ForkHeight , "hash" , common .ToHex (forkhash ))
809
888
889
+ if chain .GetDownloadSyncStatus () == forkChainDetectMode {
890
+ synlog .Error ("ProcBlockHeaders forkDetect" )
891
+ select {
892
+ case chain .forkPointChan <- ForkHeight :
893
+ default :
894
+ }
895
+ return nil
896
+ }
897
+
810
898
//获取此pid对应的peer信息,
811
899
peerinfo := chain .GetPeerInfo (pid )
812
900
if peerinfo == nil {
@@ -888,13 +976,8 @@ func (chain *BlockChain) CheckTipBlockHash() {
888
976
} else if peermaxheight == tipheight {
889
977
// 直接tip block hash比较,如果不相等需要从peer向后去指定的headers,尝试寻找分叉点
890
978
if ! bytes .Equal (tiphash , peerhash ) {
891
- if tipheight > BackBlockNum {
892
- synlog .Debug ("CheckTipBlockHash ==" , "peermaxheight" , peermaxheight , "tipheight" , tipheight )
893
- Err = chain .FetchBlockHeaders (tipheight - BackBlockNum , tipheight , pid )
894
- } else {
895
- synlog .Debug ("CheckTipBlockHash !=" , "peermaxheight" , peermaxheight , "tipheight" , tipheight )
896
- Err = chain .FetchBlockHeaders (1 , tipheight , pid )
897
- }
979
+ synlog .Debug ("CheckTipBlockHash ==" , "peermaxheight" , peermaxheight , "tipheight" , tipheight )
980
+ Err = chain .FetchBlockHeaders (tipheight - BackBlockNum , tipheight , pid )
898
981
}
899
982
} else {
900
983
@@ -903,13 +986,8 @@ func (chain *BlockChain) CheckTipBlockHash() {
903
986
return
904
987
}
905
988
if ! bytes .Equal (header .Hash , peerhash ) {
906
- if peermaxheight > BackBlockNum {
907
- synlog .Debug ("CheckTipBlockHash<!=" , "peermaxheight" , peermaxheight , "tipheight" , tipheight )
908
- Err = chain .FetchBlockHeaders (peermaxheight - BackBlockNum , peermaxheight , pid )
909
- } else {
910
- synlog .Debug ("CheckTipBlockHash<!=" , "peermaxheight" , peermaxheight , "tipheight" , tipheight )
911
- Err = chain .FetchBlockHeaders (1 , peermaxheight , pid )
912
- }
989
+ synlog .Debug ("CheckTipBlockHash<!=" , "peermaxheight" , peermaxheight , "tipheight" , tipheight )
990
+ Err = chain .FetchBlockHeaders (peermaxheight - BackBlockNum , peermaxheight , pid )
913
991
}
914
992
}
915
993
if Err != nil {
0 commit comments