Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(static-sharding)!: allow multiple pubSubTopics #1586

Merged
merged 34 commits into from
Sep 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
90e5b77
`ProtocolCreateOptions` now has `pubSubTopic` as `pubSubTopic[]`
danisharora099 Sep 20, 2023
7217497
chore: update encoder & decoder to support `PubSubTopic`
danisharora099 Sep 20, 2023
c232e9f
feat(protocols): allow multiple `PubSubTopic[]`
danisharora099 Sep 20, 2023
0e1e1a1
feat(relay): allow multiple `PubSubTopic[]`
danisharora099 Sep 20, 2023
74c8800
chore(tests): update for new API
danisharora099 Sep 20, 2023
bad5d5e
chore: minor fixes
danisharora099 Sep 20, 2023
24c9ad0
chore: make store more robust
danisharora099 Sep 20, 2023
1bac330
fix(relay): correctly set types
danisharora099 Sep 20, 2023
cbc514a
chore(address comments): update terminology around configured pubsub …
danisharora099 Sep 21, 2023
697072c
chore(address comments): minor refactoring
danisharora099 Sep 21, 2023
3285849
chore(relay): split `subscribe` into smaller functions for readabilit…
danisharora099 Sep 21, 2023
5392b31
chore(address comments): refactor `waitForGossipSubPeerInMesh`
danisharora099 Sep 21, 2023
7f517ad
chore(store): only allow to query one `pubSubTopic`
danisharora099 Sep 21, 2023
76208e2
fix: `store` bug
danisharora099 Sep 21, 2023
2036d1f
Merge branch 'master' of github.com:waku-org/js-waku into feat/static…
danisharora099 Sep 21, 2023
356548e
feat(tests): add some basic tests
danisharora099 Sep 21, 2023
fd85ba0
sharding utils
danisharora099 Sep 26, 2023
8e8da03
address comments
danisharora099 Sep 26, 2023
ea7eef7
Merge branch 'master' of github.com:waku-org/js-waku into feat/static…
danisharora099 Sep 26, 2023
a84726b
feat(relay): re-add API for `getMeshPeers`
danisharora099 Sep 26, 2023
a626ceb
update error message
danisharora099 Sep 26, 2023
8054eb4
refactor for new API
danisharora099 Sep 26, 2023
8b9f558
feat: simplify handling of observers (#1614)
fryorcraken Sep 26, 2023
5a01805
Merge branch 'feat/staticsharding' of github.com:waku-org/js-waku int…
danisharora099 Sep 26, 2023
8c01b74
use `??` instead of `||`
danisharora099 Sep 26, 2023
08b05ea
update `pubsubTopic` to `pubSubTopic`
danisharora099 Sep 26, 2023
d49a298
update `interval` typo
danisharora099 Sep 26, 2023
11e8094
change occurence of `pubsubTopic` to `pubSubTopic`
danisharora099 Sep 26, 2023
1977bd7
relay: rm `getAllMeshPeers` and make `pubSubTopics` public
danisharora099 Sep 27, 2023
3fd68e5
relay: use `push_or_init_map` and move to `utils`
danisharora099 Sep 27, 2023
cbf588e
Merge branch 'master' of github.com:waku-org/js-waku into feat/static…
danisharora099 Sep 27, 2023
03ae72a
fix: update API for tests
danisharora099 Sep 27, 2023
6f0cbb7
fix: relay waitForRemotePeer
danisharora099 Sep 27, 2023
7739014
Merge branch 'master' into feat/staticsharding
danisharora099 Sep 27, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .cspell.json
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
"exponentiate",
"extip",
"fanout",
"sharded",
"floodsub",
"fontsource",
"globby",
Expand Down
26 changes: 8 additions & 18 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

27 changes: 17 additions & 10 deletions packages/core/src/lib/filter/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,11 @@ import type {
Unsubscribe
} from "@waku/interfaces";
import { WakuMessage } from "@waku/proto";
import { groupByContentTopic, toAsyncIterator } from "@waku/utils";
import {
ensurePubsubTopicIsConfigured,
groupByContentTopic,
toAsyncIterator
} from "@waku/utils";
import debug from "debug";
import all from "it-all";
import * as lp from "it-length-prefixed";
Expand Down Expand Up @@ -230,7 +234,7 @@ class Subscription {
}

class Filter extends BaseProtocol implements IReceiver {
private readonly options: ProtocolCreateOptions;
private readonly pubSubTopics: PubSubTopic[] = [];
private activeSubscriptions = new Map<string, Subscription>();
private readonly NUM_PEERS_PROTOCOL = 1;

Expand All @@ -253,19 +257,22 @@ class Filter extends BaseProtocol implements IReceiver {
constructor(libp2p: Libp2p, options?: ProtocolCreateOptions) {
super(FilterCodecs.SUBSCRIBE, libp2p.components);

this.pubSubTopics = options?.pubSubTopics || [DefaultPubSubTopic];

libp2p.handle(FilterCodecs.PUSH, this.onRequest.bind(this)).catch((e) => {
log("Failed to register ", FilterCodecs.PUSH, e);
});

this.activeSubscriptions = new Map();

this.options = options ?? {};
}

async createSubscription(pubSubTopic?: string): Promise<Subscription> {
const _pubSubTopic =
pubSubTopic ?? this.options.pubSubTopic ?? DefaultPubSubTopic;
async createSubscription(
pubSubTopic: string = DefaultPubSubTopic
): Promise<Subscription> {
ensurePubsubTopicIsConfigured(pubSubTopic, this.pubSubTopics);

//TODO: get a relevant peer for the topic/shard
// https://github.com/waku-org/js-waku/pull/1586#discussion_r1336428230
const peer = (
await this.getPeers({
maxBootstrapPeers: 1,
Expand All @@ -274,11 +281,11 @@ class Filter extends BaseProtocol implements IReceiver {
)[0];

const subscription =
this.getActiveSubscription(_pubSubTopic, peer.id.toString()) ??
this.getActiveSubscription(pubSubTopic, peer.id.toString()) ??
this.setActiveSubscription(
_pubSubTopic,
pubSubTopic,
peer.id.toString(),
new Subscription(_pubSubTopic, peer, this.getStream.bind(this, peer))
new Subscription(pubSubTopic, peer, this.getStream.bind(this, peer))
);

return subscription;
Expand Down
51 changes: 37 additions & 14 deletions packages/core/src/lib/keep_alive_manager.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import type { PeerId } from "@libp2p/interface/peer-id";
import type { PeerStore } from "@libp2p/interface/peer-store";
import type { IRelay } from "@waku/interfaces";
import type { IRelay, PeerIdStr } from "@waku/interfaces";
import type { KeepAliveOptions } from "@waku/interfaces";
import { utf8ToBytes } from "@waku/utils/bytes";
import debug from "debug";
Expand All @@ -13,7 +13,7 @@ const log = debug("waku:keep-alive");

export class KeepAliveManager {
private pingKeepAliveTimers: Map<string, ReturnType<typeof setInterval>>;
private relayKeepAliveTimers: Map<PeerId, ReturnType<typeof setInterval>>;
private relayKeepAliveTimers: Map<PeerId, ReturnType<typeof setInterval>[]>;
private options: KeepAliveOptions;
private relay?: IRelay;

Expand Down Expand Up @@ -66,17 +66,12 @@ export class KeepAliveManager {

const relay = this.relay;
if (relay && relayPeriodSecs !== 0) {
const encoder = createEncoder({
contentTopic: RelayPingContentTopic,
ephemeral: true
});
const interval = setInterval(() => {
log("Sending Waku Relay ping message");
relay
.send(encoder, { payload: new Uint8Array([1]) })
.catch((e) => log("Failed to send relay ping", e));
}, relayPeriodSecs * 1000);
this.relayKeepAliveTimers.set(peerId, interval);
const intervals = this.scheduleRelayPings(
relay,
relayPeriodSecs,
peerId.toString()
);
this.relayKeepAliveTimers.set(peerId, intervals);
}
}

Expand All @@ -89,7 +84,7 @@ export class KeepAliveManager {
}

if (this.relayKeepAliveTimers.has(peerId)) {
clearInterval(this.relayKeepAliveTimers.get(peerId));
this.relayKeepAliveTimers.get(peerId)?.map(clearInterval);
this.relayKeepAliveTimers.delete(peerId);
}
}
Expand All @@ -105,4 +100,32 @@ export class KeepAliveManager {
this.pingKeepAliveTimers.clear();
this.relayKeepAliveTimers.clear();
}

private scheduleRelayPings(
relay: IRelay,
relayPeriodSecs: number,
peerIdStr: PeerIdStr
): NodeJS.Timeout[] {
// send a ping message to each PubSubTopic the peer is part of
const intervals: NodeJS.Timeout[] = [];
for (const topic of relay.pubSubTopics) {
const meshPeers = relay.getMeshPeers(topic);
if (!meshPeers.includes(peerIdStr)) continue;

const encoder = createEncoder({
pubSubTopic: topic,
contentTopic: RelayPingContentTopic,
ephemeral: true
});
const interval = setInterval(() => {
log("Sending Waku Relay ping message");
relay
.send(encoder, { payload: new Uint8Array([1]) })
.catch((e) => log("Failed to send relay ping", e));
}, relayPeriodSecs * 1000);
intervals.push(interval);
}

return intervals;
}
}
12 changes: 8 additions & 4 deletions packages/core/src/lib/light_push/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@ import {
IMessage,
Libp2p,
ProtocolCreateOptions,
PubSubTopic,
SendError,
SendResult
} from "@waku/interfaces";
import { PushResponse } from "@waku/proto";
import { isSizeValid } from "@waku/utils";
import { ensurePubsubTopicIsConfigured, isSizeValid } from "@waku/utils";
import debug from "debug";
import all from "it-all";
import * as lp from "it-length-prefixed";
Expand Down Expand Up @@ -41,12 +42,12 @@ type PreparePushMessageResult =
* Implements the [Waku v2 Light Push protocol](https://rfc.vac.dev/spec/19/).
*/
class LightPush extends BaseProtocol implements ILightPush {
options: ProtocolCreateOptions;
private readonly pubSubTopics: PubSubTopic[];
private readonly NUM_PEERS_PROTOCOL = 1;

constructor(libp2p: Libp2p, options?: ProtocolCreateOptions) {
super(LightPushCodec, libp2p.components);
this.options = options || {};
this.pubSubTopics = options?.pubSubTopics ?? [DefaultPubSubTopic];
}

private async preparePushMessage(
Expand Down Expand Up @@ -82,7 +83,9 @@ class LightPush extends BaseProtocol implements ILightPush {
}

async send(encoder: IEncoder, message: IMessage): Promise<SendResult> {
const { pubSubTopic = DefaultPubSubTopic } = this.options;
const { pubSubTopic } = encoder;
ensurePubsubTopicIsConfigured(pubSubTopic, this.pubSubTopics);

const recipients: PeerId[] = [];

const { query, error: preparationError } = await this.preparePushMessage(
Expand All @@ -98,6 +101,7 @@ class LightPush extends BaseProtocol implements ILightPush {
};
}

//TODO: get a relevant peer for the topic/shard
const peers = await this.getPeers({
maxBootstrapPeers: 1,
numPeers: this.NUM_PEERS_PROTOCOL
Expand Down
21 changes: 16 additions & 5 deletions packages/core/src/lib/message/version_0.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,14 @@ import type {
IMessage,
IMetaSetter,
IProtoMessage,
IRateLimitProof
IRateLimitProof,
PubSubTopic
} from "@waku/interfaces";
import { proto_message as proto } from "@waku/proto";
import debug from "debug";

import { DefaultPubSubTopic } from "../constants.js";

const log = debug("waku:message:version-0");
const OneMillion = BigInt(1_000_000);

Expand Down Expand Up @@ -73,6 +76,7 @@ export class Encoder implements IEncoder {
constructor(
public contentTopic: string,
public ephemeral: boolean = false,
public pubSubTopic: PubSubTopic,
public metaSetter?: IMetaSetter
) {
if (!contentTopic || contentTopic === "") {
Expand Down Expand Up @@ -115,15 +119,19 @@ export class Encoder implements IEncoder {
* messages.
*/
export function createEncoder({
pubSubTopic = DefaultPubSubTopic,
contentTopic,
ephemeral,
metaSetter
}: EncoderOptions): Encoder {
return new Encoder(contentTopic, ephemeral, metaSetter);
return new Encoder(contentTopic, ephemeral, pubSubTopic, metaSetter);
}

export class Decoder implements IDecoder<DecodedMessage> {
constructor(public contentTopic: string) {
constructor(
public pubSubTopic: PubSubTopic,
public contentTopic: string
) {
if (!contentTopic || contentTopic === "") {
throw new Error("Content topic must be specified");
}
Expand Down Expand Up @@ -173,6 +181,9 @@ export class Decoder implements IDecoder<DecodedMessage> {
*
* @param contentTopic The resulting decoder will only decode messages with this content topic.
*/
export function createDecoder(contentTopic: string): Decoder {
return new Decoder(contentTopic);
export function createDecoder(
contentTopic: string,
pubsubTopic: PubSubTopic = DefaultPubSubTopic
): Decoder {
return new Decoder(pubsubTopic, contentTopic);
}
Loading