Skip to content

Commit a5217ae

Browse files
committed
merge with master
2 parents 4c446d5 + 9b0f1e8 commit a5217ae

File tree

5 files changed

+185
-23
lines changed

5 files changed

+185
-23
lines changed

packages/core/src/lib/filter/index.ts

+7-2
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,8 @@ export class FilterCore extends BaseProtocol implements IBaseProtocolCore {
3535
constructor(
3636
private handleIncomingMessage: (
3737
pubsubTopic: PubsubTopic,
38-
wakuMessage: WakuMessage
38+
wakuMessage: WakuMessage,
39+
peerIdStr: string
3940
) => Promise<void>,
4041
libp2p: Libp2p,
4142
options?: ProtocolCreateOptions
@@ -78,7 +79,11 @@ export class FilterCore extends BaseProtocol implements IBaseProtocolCore {
7879
return;
7980
}
8081

81-
await this.handleIncomingMessage(pubsubTopic, wakuMessage);
82+
await this.handleIncomingMessage(
83+
pubsubTopic,
84+
wakuMessage,
85+
connection.remotePeer.toString()
86+
);
8287
}
8388
}).then(
8489
() => {

packages/interfaces/src/filter.ts

+1
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import type { IReceiver } from "./receiver.js";
1616
export type SubscribeOptions = {
1717
keepAlive?: number;
1818
pingsBeforePeerRenewed?: number;
19+
maxMissedMessagesThreshold?: number;
1920
};
2021

2122
export type IFilter = IReceiver & IBaseProtocolCore;

packages/sdk/src/protocols/base_protocol.ts

+4-3
Original file line numberDiff line numberDiff line change
@@ -51,14 +51,15 @@ export class BaseProtocolSDK implements IBaseProtocolSDK {
5151
public async renewPeer(peerToDisconnect: PeerId): Promise<Peer> {
5252
this.log.info(`Renewing peer ${peerToDisconnect}`);
5353

54+
await this.connectionManager.dropConnection(peerToDisconnect);
55+
5456
const peer = (await this.findAndAddPeers(1))[0];
5557
if (!peer) {
56-
throw new Error(
57-
"Failed to find a new peer to replace the disconnected one"
58+
this.log.error(
59+
"Failed to find a new peer to replace the disconnected one."
5860
);
5961
}
6062

61-
await this.connectionManager.dropConnection(peerToDisconnect);
6263
this.peers = this.peers.filter((peer) => !peer.id.equals(peerToDisconnect));
6364
this.log.info(
6465
`Peer ${peerToDisconnect} disconnected and removed from the peer list`

packages/sdk/src/protocols/filter.ts

+112-17
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,8 @@ import { ConnectionManager, FilterCore } from "@waku/core";
44
import {
55
type Callback,
66
type ContentTopic,
7-
CoreProtocolResult,
8-
CreateSubscriptionResult,
7+
type CoreProtocolResult,
8+
type CreateSubscriptionResult,
99
EConnectionStateEvents,
1010
type IAsyncIterator,
1111
type IDecodedMessage,
@@ -14,13 +14,14 @@ import {
1414
type IProtoMessage,
1515
type ISubscriptionSDK,
1616
type Libp2p,
17+
type PeerIdStr,
1718
type ProtocolCreateOptions,
1819
ProtocolError,
19-
ProtocolUseOptions,
20+
type ProtocolUseOptions,
2021
type PubsubTopic,
21-
SDKProtocolResult,
22+
type SDKProtocolResult,
2223
type ShardingParams,
23-
SubscribeOptions,
24+
type SubscribeOptions,
2425
type Unsubscribe
2526
} from "@waku/interfaces";
2627
import { messageHashStr } from "@waku/message-hash";
@@ -40,9 +41,17 @@ type SubscriptionCallback<T extends IDecodedMessage> = {
4041
callback: Callback<T>;
4142
};
4243

44+
type ReceivedMessageHashes = {
45+
all: Set<string>;
46+
nodes: {
47+
[peerId: PeerIdStr]: Set<string>;
48+
};
49+
};
50+
4351
const log = new Logger("sdk:filter");
4452

4553
const DEFAULT_MAX_PINGS = 3;
54+
const DEFAULT_MAX_MISSED_MESSAGES_THRESHOLD = 3;
4655
const DEFAULT_KEEP_ALIVE = 30 * 1000;
4756

4857
const DEFAULT_SUBSCRIBE_OPTIONS = {
@@ -52,9 +61,12 @@ const DEFAULT_SUBSCRIBE_OPTIONS = {
5261
export class SubscriptionManager implements ISubscriptionSDK {
5362
readonly receivedMessagesHashStr: string[] = [];
5463
private keepAliveTimer: number | null = null;
64+
private readonly receivedMessagesHashes: ReceivedMessageHashes;
5565
private peerFailures: Map<string, number> = new Map();
66+
private missedMessagesByPeer: Map<string, number> = new Map();
5667
private maxPingFailures: number = DEFAULT_MAX_PINGS;
5768
private subscribeOptions: SubscribeOptions = DEFAULT_SUBSCRIBE_OPTIONS;
69+
private maxMissedMessagesThreshold = DEFAULT_MAX_MISSED_MESSAGES_THRESHOLD;
5870

5971
private contentTopics: ContentTopic[] = [];
6072
private subscriptionCallbacks: Map<
@@ -68,14 +80,35 @@ export class SubscriptionManager implements ISubscriptionSDK {
6880
private readonly connectionManager: ConnectionManager,
6981
private readonly getPeers: () => Peer[],
7082
private readonly renewPeer: (peerToDisconnect: PeerId) => Promise<Peer>
71-
) {}
83+
) {
84+
const allPeerIdStr = this.getPeers().map((p) => p.id.toString());
85+
this.receivedMessagesHashes = {
86+
all: new Set(),
87+
nodes: {
88+
...Object.fromEntries(allPeerIdStr.map((peerId) => [peerId, new Set()]))
89+
}
90+
};
91+
allPeerIdStr.forEach((peerId) => this.missedMessagesByPeer.set(peerId, 0));
92+
}
93+
94+
private addHash(hash: string, peerIdStr?: string): void {
95+
this.receivedMessagesHashes.all.add(hash);
96+
97+
if (peerIdStr) {
98+
this.receivedMessagesHashes.nodes[peerIdStr].add(hash);
99+
}
100+
}
72101

73102
public async subscribe<T extends IDecodedMessage>(
74103
decoders: IDecoder<T> | IDecoder<T>[],
75104
callback: Callback<T>,
76105
options: SubscribeOptions = DEFAULT_SUBSCRIBE_OPTIONS
77106
): Promise<SDKProtocolResult> {
107+
this.keepAliveTimer = options.keepAlive || DEFAULT_KEEP_ALIVE;
78108
this.maxPingFailures = options.pingsBeforePeerRenewed || DEFAULT_MAX_PINGS;
109+
this.maxMissedMessagesThreshold =
110+
options.maxMissedMessagesThreshold ||
111+
DEFAULT_MAX_MISSED_MESSAGES_THRESHOLD;
79112

80113
const decodersArray = Array.isArray(decoders) ? decoders : [decoders];
81114

@@ -179,11 +212,49 @@ export class SubscriptionManager implements ISubscriptionSDK {
179212
return finalResult;
180213
}
181214

182-
async processIncomingMessage(message: WakuMessage): Promise<void> {
215+
private async validateMessage(): Promise<void> {
216+
for (const hash of this.receivedMessagesHashes.all) {
217+
for (const [peerIdStr, hashes] of Object.entries(
218+
this.receivedMessagesHashes.nodes
219+
)) {
220+
if (!hashes.has(hash)) {
221+
this.incrementMissedMessageCount(peerIdStr);
222+
if (this.shouldRenewPeer(peerIdStr)) {
223+
log.info(
224+
`Peer ${peerIdStr} has missed too many messages, renewing.`
225+
);
226+
const peerId = this.getPeers().find(
227+
(p) => p.id.toString() === peerIdStr
228+
)?.id;
229+
if (!peerId) {
230+
log.error(
231+
`Unexpected Error: Peer ${peerIdStr} not found in connected peers.`
232+
);
233+
continue;
234+
}
235+
try {
236+
await this.renewAndSubscribePeer(peerId);
237+
} catch (error) {
238+
log.error(`Failed to renew peer ${peerIdStr}: ${error}`);
239+
}
240+
}
241+
}
242+
}
243+
}
244+
}
245+
246+
async processIncomingMessage(
247+
message: WakuMessage,
248+
peerIdStr: string
249+
): Promise<void> {
183250
const hashedMessageStr = messageHashStr(
184251
this.pubsubTopic,
185252
message as IProtoMessage
186253
);
254+
255+
this.addHash(hashedMessageStr, peerIdStr);
256+
void this.validateMessage();
257+
187258
if (this.receivedMessagesHashStr.includes(hashedMessageStr)) {
188259
log.info("Message already received, skipping");
189260
return;
@@ -276,15 +347,29 @@ export class SubscriptionManager implements ISubscriptionSDK {
276347
}
277348
}
278349

279-
private async renewAndSubscribePeer(peerId: PeerId): Promise<Peer> {
280-
const newPeer = await this.renewPeer(peerId);
281-
await this.protocol.subscribe(
282-
this.pubsubTopic,
283-
newPeer,
284-
Array.from(this.subscriptionCallbacks.keys())
285-
);
350+
private async renewAndSubscribePeer(
351+
peerId: PeerId
352+
): Promise<Peer | undefined> {
353+
try {
354+
const newPeer = await this.renewPeer(peerId);
355+
await this.protocol.subscribe(
356+
this.pubsubTopic,
357+
newPeer,
358+
Array.from(this.subscriptionCallbacks.keys())
359+
);
286360

287-
return newPeer;
361+
this.receivedMessagesHashes.nodes[newPeer.id.toString()] = new Set();
362+
this.missedMessagesByPeer.set(newPeer.id.toString(), 0);
363+
364+
return newPeer;
365+
} catch (error) {
366+
log.warn(`Failed to renew peer ${peerId.toString()}: ${error}.`);
367+
return;
368+
} finally {
369+
this.peerFailures.delete(peerId.toString());
370+
this.missedMessagesByPeer.delete(peerId.toString());
371+
delete this.receivedMessagesHashes.nodes[peerId.toString()];
372+
}
288373
}
289374

290375
private startBackgroundProcess(options: SubscribeOptions): void {
@@ -370,6 +455,16 @@ export class SubscriptionManager implements ISubscriptionSDK {
370455
this.subscribeOptions?.keepAlive || DEFAULT_SUBSCRIBE_OPTIONS.keepAlive
371456
);
372457
}
458+
459+
private incrementMissedMessageCount(peerIdStr: string): void {
460+
const currentCount = this.missedMessagesByPeer.get(peerIdStr) || 0;
461+
this.missedMessagesByPeer.set(peerIdStr, currentCount + 1);
462+
}
463+
464+
private shouldRenewPeer(peerIdStr: string): boolean {
465+
const missedMessages = this.missedMessagesByPeer.get(peerIdStr) || 0;
466+
return missedMessages > this.maxMissedMessagesThreshold;
467+
}
373468
}
374469

375470
class FilterSDK extends BaseProtocolSDK implements IFilterSDK {
@@ -385,7 +480,7 @@ class FilterSDK extends BaseProtocolSDK implements IFilterSDK {
385480
) {
386481
super(
387482
new FilterCore(
388-
async (pubsubTopic: PubsubTopic, wakuMessage: WakuMessage) => {
483+
async (pubsubTopic, wakuMessage, peerIdStr) => {
389484
const subscription = this.getActiveSubscription(pubsubTopic);
390485
if (!subscription) {
391486
log.error(
@@ -394,7 +489,7 @@ class FilterSDK extends BaseProtocolSDK implements IFilterSDK {
394489
return;
395490
}
396491

397-
await subscription.processIncomingMessage(wakuMessage);
492+
await subscription.processIncomingMessage(wakuMessage, peerIdStr);
398493
},
399494
libp2p,
400495
options

packages/tests/tests/filter/peer_management.spec.ts

+61-1
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
import {
22
DefaultPubsubTopic,
33
ISubscriptionSDK,
4-
LightNode
4+
LightNode,
5+
SDKProtocolResult
56
} from "@waku/interfaces";
67
import {
78
createDecoder,
@@ -16,6 +17,7 @@ import { describe } from "mocha";
1617
import {
1718
afterEachCustom,
1819
beforeEachCustom,
20+
ServiceNode,
1921
ServiceNodesFleet
2022
} from "../../src/index.js";
2123
import {
@@ -177,4 +179,62 @@ describe("Waku Filter: Peer Management: E2E", function () {
177179
waku.filter.numPeersToUse
178180
);
179181
});
182+
183+
it("Renews peer on consistent missed messages", async function () {
184+
const [serviceNodes, waku] = await runMultipleNodes(
185+
this.ctx,
186+
undefined,
187+
undefined,
188+
2
189+
);
190+
const serviceNodesPeerIdStr = await Promise.all(
191+
serviceNodes.nodes.map(async (node) =>
192+
(await node.getPeerId()).toString()
193+
)
194+
);
195+
const nodeWithoutDiscovery = new ServiceNode("WithoutDiscovery");
196+
await nodeWithoutDiscovery.start({ lightpush: true, filter: true });
197+
const nodeWithouDiscoveryPeerIdStr = (
198+
await nodeWithoutDiscovery.getPeerId()
199+
).toString();
200+
await waku.dial(await nodeWithoutDiscovery.getMultiaddrWithId());
201+
202+
const { error, subscription: sub } =
203+
await waku.filter.createSubscription(pubsubTopic);
204+
if (!sub || error) {
205+
throw new Error("Could not create subscription");
206+
}
207+
208+
const messages: DecodedMessage[] = [];
209+
const { successes } = await sub.subscribe([decoder], (msg) => {
210+
messages.push(msg);
211+
});
212+
213+
expect(successes.length).to.be.greaterThan(0);
214+
expect(successes.length).to.be.equal(waku.filter.numPeersToUse);
215+
216+
const sendMessage: () => Promise<SDKProtocolResult> = async () =>
217+
waku.lightPush.send(encoder, {
218+
payload: utf8ToBytes("Hello_World")
219+
});
220+
221+
await sendMessage();
222+
223+
successes
224+
.map((peerId) =>
225+
[nodeWithouDiscoveryPeerIdStr, ...serviceNodesPeerIdStr].includes(
226+
peerId.toString()
227+
)
228+
)
229+
.forEach((isConnected) => expect(isConnected).to.eq(true));
230+
231+
// send 2 more messages
232+
await sendMessage();
233+
await sendMessage();
234+
235+
expect(waku.filter.connectedPeers.length).to.equal(2);
236+
expect(
237+
waku.filter.connectedPeers.map((p) => p.id.toString())
238+
).to.not.include(nodeWithouDiscoveryPeerIdStr);
239+
});
180240
});

0 commit comments

Comments
 (0)