@@ -6,20 +6,17 @@ import { Logger } from "@waku/utils";
6
6
7
7
type ReceivedMessageHashes = {
8
8
all : Set < string > ;
9
- nodes : {
10
- [ peerId : PeerIdStr ] : Set < string > ;
11
- } ;
9
+ nodes : Record < PeerIdStr , Set < string > > ;
12
10
} ;
13
11
14
12
const DEFAULT_MAX_MISSED_MESSAGES_THRESHOLD = 3 ;
15
13
16
14
const log = new Logger ( "sdk:filter:reliability_monitor" ) ;
17
15
18
16
export class ReliabilityMonitor {
19
- public receivedMessagesHashStr : string [ ] = [ ] ;
20
- public receivedMessagesHashes : ReceivedMessageHashes ;
21
- public missedMessagesByPeer : Map < string , number > = new Map ( ) ;
22
- public maxMissedMessagesThreshold = DEFAULT_MAX_MISSED_MESSAGES_THRESHOLD ;
17
+ private receivedMessagesHashes : ReceivedMessageHashes ;
18
+ private missedMessagesByPeer : Map < PeerIdStr , number > = new Map ( ) ;
19
+ private maxMissedMessagesThreshold = DEFAULT_MAX_MISSED_MESSAGES_THRESHOLD ;
23
20
24
21
public constructor (
25
22
private getPeers : ( ) => Peer [ ] ,
@@ -29,46 +26,37 @@ export class ReliabilityMonitor {
29
26
30
27
this . receivedMessagesHashes = {
31
28
all : new Set ( ) ,
32
- nodes : {
33
- ... Object . fromEntries ( allPeerIdStr . map ( ( peerId ) => [ peerId , new Set ( ) ] ) )
34
- }
29
+ nodes : Object . fromEntries (
30
+ allPeerIdStr . map ( ( peerId ) => [ peerId , new Set ( ) ] )
31
+ )
35
32
} ;
36
33
allPeerIdStr . forEach ( ( peerId ) => this . missedMessagesByPeer . set ( peerId , 0 ) ) ;
37
34
}
38
35
39
36
public setMaxMissedMessagesThreshold ( value : number | undefined ) : void {
40
- if ( value = == undefined ) {
41
- return ;
37
+ if ( value ! == undefined ) {
38
+ this . maxMissedMessagesThreshold = value ;
42
39
}
43
- this . maxMissedMessagesThreshold = value ;
44
- }
45
-
46
- public get messageHashes ( ) : string [ ] {
47
- return [ ...this . receivedMessagesHashes . all ] ;
48
40
}
49
41
50
42
public addMessage (
51
43
message : WakuMessage ,
52
44
pubsubTopic : PubsubTopic ,
53
- peerIdStr ?: string
45
+ peerIdStr ?: PeerIdStr
54
46
) : boolean {
55
47
const hashedMessageStr = messageHashStr (
56
48
pubsubTopic ,
57
49
message as IProtoMessage
58
50
) ;
59
51
52
+ const isNewMessage = ! this . receivedMessagesHashes . all . has ( hashedMessageStr ) ;
60
53
this . receivedMessagesHashes . all . add ( hashedMessageStr ) ;
61
54
62
55
if ( peerIdStr ) {
63
56
this . receivedMessagesHashes . nodes [ peerIdStr ] . add ( hashedMessageStr ) ;
64
57
}
65
58
66
- if ( this . receivedMessagesHashStr . includes ( hashedMessageStr ) ) {
67
- return true ;
68
- } else {
69
- this . receivedMessagesHashStr . push ( hashedMessageStr ) ;
70
- return false ;
71
- }
59
+ return ! isNewMessage ;
72
60
}
73
61
74
62
public async validateMessage ( ) : Promise < void > {
@@ -79,36 +67,38 @@ export class ReliabilityMonitor {
79
67
if ( ! hashes . has ( hash ) ) {
80
68
this . incrementMissedMessageCount ( peerIdStr ) ;
81
69
if ( this . shouldRenewPeer ( peerIdStr ) ) {
82
- log . info (
83
- `Peer ${ peerIdStr } has missed too many messages, renewing.`
84
- ) ;
85
- const peerId = this . getPeers ( ) . find (
86
- ( p ) => p . id . toString ( ) === peerIdStr
87
- ) ?. id ;
88
- if ( ! peerId ) {
89
- log . error (
90
- `Unexpected Error: Peer ${ peerIdStr } not found in connected peers.`
91
- ) ;
92
- continue ;
93
- }
94
- try {
95
- await this . renewAndSubscribePeer ( peerId ) ;
96
- } catch ( error ) {
97
- log . error ( `Failed to renew peer ${ peerIdStr } : ${ error } ` ) ;
98
- }
70
+ await this . renewPeer ( peerIdStr ) ;
99
71
}
100
72
}
101
73
}
102
74
}
103
75
}
104
76
105
- private incrementMissedMessageCount ( peerIdStr : string ) : void {
77
+ private incrementMissedMessageCount ( peerIdStr : PeerIdStr ) : void {
106
78
const currentCount = this . missedMessagesByPeer . get ( peerIdStr ) || 0 ;
107
79
this . missedMessagesByPeer . set ( peerIdStr , currentCount + 1 ) ;
108
80
}
109
81
110
- private shouldRenewPeer ( peerIdStr : string ) : boolean {
82
+ private shouldRenewPeer ( peerIdStr : PeerIdStr ) : boolean {
111
83
const missedMessages = this . missedMessagesByPeer . get ( peerIdStr ) || 0 ;
112
84
return missedMessages > this . maxMissedMessagesThreshold ;
113
85
}
86
+
87
+ private async renewPeer ( peerIdStr : PeerIdStr ) : Promise < void > {
88
+ log . info ( `Peer ${ peerIdStr } has missed too many messages, renewing.` ) ;
89
+ const peerId = this . getPeers ( ) . find (
90
+ ( p ) => p . id . toString ( ) === peerIdStr
91
+ ) ?. id ;
92
+ if ( ! peerId ) {
93
+ log . error (
94
+ `Unexpected Error: Peer ${ peerIdStr } not found in connected peers.`
95
+ ) ;
96
+ return ;
97
+ }
98
+ try {
99
+ await this . renewAndSubscribePeer ( peerId ) ;
100
+ } catch ( error ) {
101
+ log . error ( `Failed to renew peer ${ peerIdStr } : ${ error } ` ) ;
102
+ }
103
+ }
114
104
}
0 commit comments