Skip to content

Commit 94c4141

Browse files
committed
[[Feat]] support multiple address format(33cn#1181)
2 parents 03b290b + 841cee2 commit 94c4141

File tree

4 files changed

+51
-13
lines changed

4 files changed

+51
-13
lines changed

system/p2p/dht/protocol/broadcast/broadcast.go

+5-3
Original file line numberDiff line numberDiff line change
@@ -206,7 +206,8 @@ func (p *broadcastProtocol) handleBroadcastReceive(msg subscribeMsg) {
206206
} else if topic == psBlockTopic {
207207
block := msg.value.(*types.Block)
208208
hash = hex.EncodeToString(block.Hash(p.ChainCfg))
209-
log.Debug("recvBlkPs", "height", block.GetHeight(), "hash", hash)
209+
log.Debug("recvBlk", "height", block.GetHeight(), "hash", hash,
210+
"size(KB)", float32(block.Size())/1024, "from", msg.publisher.String())
210211
err = p.postBlockChain(hash, msg.receiveFrom.String(), block, msg.publisher)
211212

212213
} else if topic == psLtBlockTopic {
@@ -216,14 +217,15 @@ func (p *broadcastProtocol) handleBroadcastReceive(msg subscribeMsg) {
216217
return
217218
}
218219
p.ltB.addLtBlock(lb, msg.receiveFrom, msg.publisher)
219-
log.Debug("recvLtBlk", "height", lb.GetHeader().GetHeight(), "hash", hash)
220+
log.Debug("recvLtBlk", "height", lb.GetHeader().GetHeight(), "hash", hash,
221+
"size(KB)", float32(lb.GetSize())/1024, "from", msg.publisher.String())
220222

221223
} else if strings.HasPrefix(topic, psPeerMsgTopicPrefix) {
222224
err = p.handlePeerMsg(msg.value.(*types.PeerPubSubMsg), msg.receiveFrom, msg.publisher)
223225
}
224226

225227
if err != nil {
226-
log.Error("receivePs", "topic", topic, "hash", hash, "post msg err", err)
228+
log.Error("handleBroadcastReceive", "topic", topic, "hash", hash, "post msg err", err)
227229
}
228230
}
229231

system/p2p/dht/protocol/broadcast/pubsub.go

+5-1
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ func (p *pubSub) init() {
6262
if !p.cfg.DisableValidation {
6363
p.val = initValidator(p)
6464
p.Pubsub.RegisterTopicValidator(psBlockTopic, p.val.validateBlock, pubsub.WithValidatorInline(true))
65-
p.Pubsub.RegisterTopicValidator(psTxTopic, p.val.validatePeer, pubsub.WithValidatorInline(true))
65+
p.Pubsub.RegisterTopicValidator(psTxTopic, p.val.validateTx, pubsub.WithValidatorInline(true))
6666
p.Pubsub.RegisterTopicValidator(psLtBlockTopic, p.val.validatePeer, pubsub.WithValidatorInline(true))
6767
}
6868

@@ -117,6 +117,10 @@ func (p *pubSub) handleSubMsg(in chan net.SubMsg) {
117117
return
118118
}
119119
topic := *data.Topic
120+
// 交易在pubsub内部验证时已经发送至mempool, 此处直接忽略
121+
if topic == psTxTopic {
122+
break
123+
}
120124
msg = p.newMsg(topic)
121125
err = p.decodeMsg(data.Data, &buf, msg)
122126
if err != nil {

system/p2p/dht/protocol/broadcast/validate.go

+32-2
Original file line numberDiff line numberDiff line change
@@ -177,8 +177,9 @@ func (v *validator) addBlockHeader(header *types.Header) {
177177
}
178178
}
179179

180-
func (v *validator) validateBlock(ctx context.Context, id peer.ID, msg *ps.Message) ps.ValidationResult {
180+
func (v *validator) validateBlock(ctx context.Context, _ peer.ID, msg *ps.Message) ps.ValidationResult {
181181

182+
id := msg.GetFrom()
182183
if id == v.Host.ID() {
183184
return ps.ValidationAccept
184185
}
@@ -249,10 +250,39 @@ func (v *validator) validateBlock(ctx context.Context, id peer.ID, msg *ps.Messa
249250
}
250251

251252
//
252-
func (v *validator) validatePeer(ctx context.Context, id peer.ID, msg *ps.Message) ps.ValidationResult {
253+
func (v *validator) validatePeer(ctx context.Context, _ peer.ID, msg *ps.Message) ps.ValidationResult {
254+
id := msg.GetFrom()
253255
if v.isDeniedPeer(id) {
254256
log.Debug("validatePeer", "topic", *msg.Topic, "denied peer", id.Pretty())
255257
return ps.ValidationReject
256258
}
257259
return ps.ValidationAccept
258260
}
261+
262+
func (v *validator) validateTx(ctx context.Context, _ peer.ID, msg *ps.Message) ps.ValidationResult {
263+
264+
from := msg.GetFrom()
265+
if from == v.Host.ID() {
266+
return ps.ValidationAccept
267+
}
268+
269+
tx := &types.Transaction{}
270+
err := v.decodeMsg(msg.Data, nil, tx)
271+
if err != nil {
272+
log.Error("validateTx", "decodeMsg err", err)
273+
return ps.ValidationReject
274+
}
275+
276+
//重复检测
277+
if v.txFilter.AddWithCheckAtomic(hex.EncodeToString(tx.Hash()), struct{}{}) {
278+
return ps.ValidationIgnore
279+
}
280+
281+
_, err = v.API.SendTx(tx)
282+
if err != nil {
283+
log.Debug("validateTx", "hash", hex.EncodeToString(tx.Hash()), "err", err)
284+
return ps.ValidationIgnore
285+
}
286+
287+
return ps.ValidationAccept
288+
}

system/p2p/dht/protocol/broadcast/validate_test.go

+9-7
Original file line numberDiff line numberDiff line change
@@ -92,11 +92,12 @@ func Test_validateBlock(t *testing.T) {
9292
proto, cancel := newTestProtocol()
9393
defer cancel()
9494
val.broadcastProtocol = proto
95-
96-
require.Equal(t, ps.ValidationAccept, val.validateBlock(val.Ctx, val.Host.ID(), nil))
95+
msg := &ps.Message{Message: &pubsub_pb.Message{From: []byte(val.Host.ID())}}
96+
require.Equal(t, ps.ValidationAccept, val.validateBlock(val.Ctx, val.Host.ID(), msg))
9797
val.addDeniedPeer("errpid", 10)
98-
require.Equal(t, ps.ValidationReject, val.validateBlock(val.Ctx, "errpid", nil))
99-
msg := &ps.Message{Message: &pubsub_pb.Message{Data: []byte("errmsg")}}
98+
msg = &ps.Message{Message: &pubsub_pb.Message{From: []byte("errpid")}}
99+
require.Equal(t, ps.ValidationReject, val.validateBlock(val.Ctx, "errpid", msg))
100+
msg = &ps.Message{Message: &pubsub_pb.Message{Data: []byte("errmsg")}}
100101
require.Equal(t, ps.ValidationReject, val.validateBlock(val.Ctx, "testpid", msg))
101102

102103
testBlock := &types.Block{Height: 1}
@@ -113,7 +114,8 @@ func Test_validatePeer(t *testing.T) {
113114
val := newValidator(newTestPubSub())
114115
val.addDeniedPeer("errpid", 10)
115116
topic := "tx"
116-
msg := &ps.Message{Message: &pubsub_pb.Message{Topic: &topic}}
117-
require.Equal(t, ps.ValidationReject, val.validatePeer(val.Ctx, "errpid", msg))
118-
require.Equal(t, ps.ValidationAccept, val.validatePeer(val.Ctx, "normalpid", msg))
117+
msg := &ps.Message{Message: &pubsub_pb.Message{Topic: &topic, From: []byte("errpid")}}
118+
require.Equal(t, ps.ValidationReject, val.validatePeer(val.Ctx, "", msg))
119+
msg.From = []byte("normalPid")
120+
require.Equal(t, ps.ValidationAccept, val.validatePeer(val.Ctx, "", msg))
119121
}

0 commit comments

Comments
 (0)