5
5
type ContentTopic ,
6
6
CoreProtocolResult ,
7
7
CreateSubscriptionResult ,
8
+ EConnectionStateEvents ,
8
9
type IAsyncIterator ,
9
10
type IDecodedMessage ,
10
11
type IDecoder ,
@@ -48,6 +49,8 @@ export class SubscriptionManager implements ISubscriptionSDK {
48
49
readonly peers : Peer [ ] ;
49
50
readonly receivedMessagesHashStr : string [ ] = [ ] ;
50
51
52
+ private subscribeOptions : SubscribeOptions = DEFAULT_SUBSCRIBE_OPTIONS ;
53
+
51
54
private keepAliveTimer : number | null = null ;
52
55
53
56
private subscriptionCallbacks : Map <
@@ -70,6 +73,8 @@ export class SubscriptionManager implements ISubscriptionSDK {
70
73
callback : Callback < T > ,
71
74
options : SubscribeOptions = DEFAULT_SUBSCRIBE_OPTIONS
72
75
) : Promise < SDKProtocolResult > {
76
+ this . subscribeOptions = options ;
77
+
73
78
const decodersArray = Array . isArray ( decoders ) ? decoders : [ decoders ] ;
74
79
75
80
// check that all decoders are configured for the same pubsub topic as this subscription
@@ -236,11 +241,40 @@ export class SubscriptionManager implements ISubscriptionSDK {
236
241
}
237
242
238
243
private startNetworkMonitoring ( ) : void {
239
- // this.protocol.addLibp2pEventListener("waku:connection", (evt) => console.log(evt));
244
+ // @ts -expect-error: tmp change while PR in draft
245
+ this . protocol . addLibp2pEventListener (
246
+ EConnectionStateEvents . CONNECTION_STATUS ,
247
+ this . networkStateListener as ( ) => void
248
+ ) ;
240
249
}
241
250
242
251
private stopNetworkMonitoring ( ) : void {
243
- // this.protocol.removeLibp2pEventListener("waku:connection", (evt) => console.log(evt));
252
+ // @ts -expect-error: tmp change while PR in draft
253
+ this . protocol . removeLibp2pEventListener (
254
+ EConnectionStateEvents . CONNECTION_STATUS ,
255
+ this . networkStateListener as ( ) => void
256
+ ) ;
257
+ }
258
+
259
+ private async networkStateListener ( isConnected : boolean ) : Promise < void > {
260
+ if ( ! isConnected ) {
261
+ this . stopKeepAlivePings ( ) ;
262
+ return ;
263
+ }
264
+
265
+ const result = await this . ping ( ) ;
266
+ const renewPeerPromises = result . failures . map ( ( v ) => {
267
+ if ( v . peerId ) {
268
+ // @ts -expect-error: tmp change while PR in draft
269
+ return this . protocol . renewPeer ( v . peerId ) ;
270
+ }
271
+ } ) ;
272
+
273
+ await Promise . all ( renewPeerPromises ) ;
274
+
275
+ this . startKeepAlivePings (
276
+ this . subscribeOptions ?. keepAlive || DEFAULT_SUBSCRIBE_OPTIONS . keepAlive
277
+ ) ;
244
278
}
245
279
246
280
private startKeepAlivePings ( interval : number ) : void {
0 commit comments