@@ -50,7 +50,7 @@ export const FilterCodecs = {
50
50
51
51
class Subscription {
52
52
private readonly peer : Peer ;
53
- private readonly pubSubTopic : PubSubTopic ;
53
+ private readonly pubsubTopic : PubSubTopic ;
54
54
private newStream : ( peer : Peer ) => Promise < Stream > ;
55
55
56
56
private subscriptionCallbacks : Map <
@@ -59,12 +59,12 @@ class Subscription {
59
59
> ;
60
60
61
61
constructor (
62
- pubSubTopic : PubSubTopic ,
62
+ pubsubTopic : PubSubTopic ,
63
63
remotePeer : Peer ,
64
64
newStream : ( peer : Peer ) => Promise < Stream >
65
65
) {
66
66
this . peer = remotePeer ;
67
- this . pubSubTopic = pubSubTopic ;
67
+ this . pubsubTopic = pubsubTopic ;
68
68
this . newStream = newStream ;
69
69
this . subscriptionCallbacks = new Map ( ) ;
70
70
}
@@ -80,7 +80,7 @@ class Subscription {
80
80
const stream = await this . newStream ( this . peer ) ;
81
81
82
82
const request = FilterSubscribeRpc . createSubscribeRequest (
83
- this . pubSubTopic ,
83
+ this . pubsubTopic ,
84
84
contentTopics
85
85
) ;
86
86
@@ -145,7 +145,7 @@ class Subscription {
145
145
async unsubscribe ( contentTopics : ContentTopic [ ] ) : Promise < void > {
146
146
const stream = await this . newStream ( this . peer ) ;
147
147
const unsubscribeRequest = FilterSubscribeRpc . createUnsubscribeRequest (
148
- this . pubSubTopic ,
148
+ this . pubsubTopic ,
149
149
contentTopics
150
150
) ;
151
151
@@ -194,7 +194,7 @@ class Subscription {
194
194
const stream = await this . newStream ( this . peer ) ;
195
195
196
196
const request = FilterSubscribeRpc . createUnsubscribeAllRequest (
197
- this . pubSubTopic
197
+ this . pubsubTopic
198
198
) ;
199
199
200
200
try {
@@ -229,35 +229,35 @@ class Subscription {
229
229
log ( "No subscription callback available for " , contentTopic ) ;
230
230
return ;
231
231
}
232
- await pushMessage ( subscriptionCallback , this . pubSubTopic , message ) ;
232
+ await pushMessage ( subscriptionCallback , this . pubsubTopic , message ) ;
233
233
}
234
234
}
235
235
236
236
class Filter extends BaseProtocol implements IReceiver {
237
- private readonly pubSubTopics : PubSubTopic [ ] = [ ] ;
237
+ private readonly pubsubTopics : PubSubTopic [ ] = [ ] ;
238
238
private activeSubscriptions = new Map < string , Subscription > ( ) ;
239
239
private readonly NUM_PEERS_PROTOCOL = 1 ;
240
240
241
241
private getActiveSubscription (
242
- pubSubTopic : PubSubTopic ,
242
+ pubsubTopic : PubSubTopic ,
243
243
peerIdStr : PeerIdStr
244
244
) : Subscription | undefined {
245
- return this . activeSubscriptions . get ( `${ pubSubTopic } _${ peerIdStr } ` ) ;
245
+ return this . activeSubscriptions . get ( `${ pubsubTopic } _${ peerIdStr } ` ) ;
246
246
}
247
247
248
248
private setActiveSubscription (
249
- pubSubTopic : PubSubTopic ,
249
+ pubsubTopic : PubSubTopic ,
250
250
peerIdStr : PeerIdStr ,
251
251
subscription : Subscription
252
252
) : Subscription {
253
- this . activeSubscriptions . set ( `${ pubSubTopic } _${ peerIdStr } ` , subscription ) ;
253
+ this . activeSubscriptions . set ( `${ pubsubTopic } _${ peerIdStr } ` , subscription ) ;
254
254
return subscription ;
255
255
}
256
256
257
257
constructor ( libp2p : Libp2p , options ?: ProtocolCreateOptions ) {
258
258
super ( FilterCodecs . SUBSCRIBE , libp2p . components ) ;
259
259
260
- this . pubSubTopics = options ?. pubSubTopics || [ DefaultPubSubTopic ] ;
260
+ this . pubsubTopics = options ?. pubsubTopics || [ DefaultPubSubTopic ] ;
261
261
262
262
libp2p . handle ( FilterCodecs . PUSH , this . onRequest . bind ( this ) ) . catch ( ( e ) => {
263
263
log ( "Failed to register " , FilterCodecs . PUSH , e ) ;
@@ -267,9 +267,9 @@ class Filter extends BaseProtocol implements IReceiver {
267
267
}
268
268
269
269
async createSubscription (
270
- pubSubTopic : string = DefaultPubSubTopic
270
+ pubsubTopic : string = DefaultPubSubTopic
271
271
) : Promise < Subscription > {
272
- ensurePubsubTopicIsConfigured ( pubSubTopic , this . pubSubTopics ) ;
272
+ ensurePubsubTopicIsConfigured ( pubsubTopic , this . pubsubTopics ) ;
273
273
274
274
//TODO: get a relevant peer for the topic/shard
275
275
// https://github.com/waku-org/js-waku/pull/1586#discussion_r1336428230
@@ -281,11 +281,11 @@ class Filter extends BaseProtocol implements IReceiver {
281
281
) [ 0 ] ;
282
282
283
283
const subscription =
284
- this . getActiveSubscription ( pubSubTopic , peer . id . toString ( ) ) ??
284
+ this . getActiveSubscription ( pubsubTopic , peer . id . toString ( ) ) ??
285
285
this . setActiveSubscription (
286
- pubSubTopic ,
286
+ pubsubTopic ,
287
287
peer . id . toString ( ) ,
288
- new Subscription ( pubSubTopic , peer , this . getStream . bind ( this , peer ) )
288
+ new Subscription ( pubsubTopic , peer , this . getStream . bind ( this , peer ) )
289
289
) ;
290
290
291
291
return subscription ;
@@ -385,7 +385,7 @@ export function wakuFilter(
385
385
386
386
async function pushMessage < T extends IDecodedMessage > (
387
387
subscriptionCallback : SubscriptionCallback < T > ,
388
- pubSubTopic : PubSubTopic ,
388
+ pubsubTopic : PubSubTopic ,
389
389
message : WakuMessage
390
390
) : Promise < void > {
391
391
const { decoders, callback } = subscriptionCallback ;
@@ -399,7 +399,7 @@ async function pushMessage<T extends IDecodedMessage>(
399
399
try {
400
400
const decodePromises = decoders . map ( ( dec ) =>
401
401
dec
402
- . fromProtoObj ( pubSubTopic , message as IProtoMessage )
402
+ . fromProtoObj ( pubsubTopic , message as IProtoMessage )
403
403
. then ( ( decoded ) => decoded || Promise . reject ( "Decoding failed" ) )
404
404
) ;
405
405
0 commit comments