Skip to content

Commit ee2d417

Browse files
committed
feat: create node and subscription by content topic
1 parent 382af33 commit ee2d417

File tree

11 files changed

+329
-32
lines changed

11 files changed

+329
-32
lines changed

.size-limit.cjs

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
module.exports = [
22
{
3-
name: "Waku core",
4-
path: "packages/core/bundle/index.js",
3+
name: "Waku node",
4+
path: "packages/sdk/bundle/index.js",
55
import: "{ WakuNode }",
66
},
77
{

packages/core/src/index.ts

-4
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
export { DefaultUserAgent } from "./lib/waku.js";
21
export { createEncoder, createDecoder } from "./lib/message/version_0.js";
32
export type {
43
Encoder,
@@ -7,9 +6,6 @@ export type {
76
} from "./lib/message/version_0.js";
87
export * as message from "./lib/message/index.js";
98

10-
export * as waku from "./lib/waku.js";
11-
export { WakuNode, WakuOptions } from "./lib/waku.js";
12-
139
export * as waku_filter from "./lib/filter/index.js";
1410
export { wakuFilter, FilterCodecs } from "./lib/filter/index.js";
1511

packages/core/src/lib/wait_for_remote_peer.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ const log = new Logger("wait-for-remote-peer");
88
/**
99
* Wait for a remote peer to be ready given the passed protocols.
1010
* Must be used after attempting to connect to nodes, using
11-
* {@link @waku/core!WakuNode.dial} or a bootstrap method with
11+
* {@link @waku/sdk!WakuNode.dial} or a bootstrap method with
1212
* {@link @waku/sdk!createLightNode}.
1313
*
1414
* If the passed protocols is a GossipSub protocol, then it resolves only once

packages/interfaces/src/protocols.ts

+2-2
Original file line numberDiff line numberDiff line change
@@ -62,11 +62,11 @@ export type ProtocolCreateOptions = {
6262
*/
6363
shardInfo?: Partial<ShardingParams>;
6464
/**
65-
* You can pass options to the `Libp2p` instance used by {@link @waku/core!WakuNode} using the `libp2p` property.
65+
* You can pass options to the `Libp2p` instance used by {@link @waku/sdk!WakuNode} using the `libp2p` property.
6666
* This property is the same type as the one passed to [`Libp2p.create`](https://github.com/libp2p/js-libp2p/blob/master/doc/API.md#create)
6767
* apart that we made the `modules` property optional and partial,
6868
* allowing its omission and letting Waku set good defaults.
69-
* Notes that some values are overridden by {@link @waku/core!WakuNode} to ensure it implements the Waku protocol.
69+
* Notes that some values are overridden by {@link @waku/sdk!WakuNode} to ensure it implements the Waku protocol.
7070
*/
7171
libp2p?: Partial<CreateLibp2pOptions>;
7272
/**

packages/sdk/src/content_topic.ts

+121
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
import type { Multiaddr } from "@multiformats/multiaddr";
2+
import { createDecoder, DecodedMessage, waitForRemotePeer } from "@waku/core";
3+
import {
4+
Callback,
5+
IDecoder,
6+
IFilterSubscription,
7+
LightNode,
8+
Protocols
9+
} from "@waku/interfaces";
10+
import {
11+
contentTopicToPubsubTopic,
12+
shardInfoToPubsubTopics
13+
} from "@waku/utils";
14+
15+
import { createLightNode } from "./create.js";
16+
17+
interface CreateTopicOptions {
18+
waku?: LightNode;
19+
peer: Multiaddr;
20+
}
21+
22+
// Given a Waku node, peer Multiaddr, and content topic, creates a decoder and
23+
// subscription for that content topic.
24+
async function prepareSubscription(
25+
waku: LightNode,
26+
contentTopic: string,
27+
peer: Multiaddr
28+
): Promise<{
29+
decoder: IDecoder<DecodedMessage>;
30+
subscription: IFilterSubscription;
31+
}> {
32+
// Validate that the Waku node matches assumptions
33+
if (!waku.filter) {
34+
throw new Error("Filter protocol missing from Waku node");
35+
}
36+
const { shardInfo } = waku.libp2p.components.metadata;
37+
if (!shardInfo) {
38+
throw new Error("Shard info missing from Waku node.");
39+
}
40+
41+
// Validate content topic and ensure node is configured for its corresponding pubsub topic
42+
const pubsubTopics = shardInfoToPubsubTopics(shardInfo);
43+
const pubsubTopic = contentTopicToPubsubTopic(contentTopic);
44+
if (!pubsubTopics.includes(pubsubTopic))
45+
throw new Error(
46+
"Content topic does not match any pubsub topic in shard info."
47+
);
48+
49+
await waku.dial(peer);
50+
await waitForRemotePeer(waku, [Protocols.Filter]);
51+
52+
// Create decoder and subscription
53+
let decoder = createDecoder(contentTopic, pubsubTopic);
54+
if (decoder) decoder = decoder ?? decoder;
55+
const subscription = await waku.filter.createSubscription(pubsubTopic);
56+
57+
return { decoder, subscription };
58+
}
59+
60+
/**
61+
* Creates a subscription and streams all new messages for a content topic.
62+
* Will create a light node configured for the content topic with default settings if a node is not provided in `opts`.
63+
* Assumes node is using autosharding.
64+
* @param contentTopic
65+
* @param opts
66+
*/
67+
export async function streamContentTopic(
68+
contentTopic: string,
69+
opts: CreateTopicOptions
70+
): Promise<[ReadableStream<DecodedMessage>, LightNode]> {
71+
opts.waku =
72+
opts.waku ??
73+
(await createLightNode({
74+
shardInfo: { contentTopics: [contentTopic] }
75+
}));
76+
const { decoder, subscription } = await prepareSubscription(
77+
opts.waku,
78+
contentTopic,
79+
opts.peer
80+
);
81+
82+
// Create a ReadableStream that receives any messages for the content topic
83+
const messageStream = new ReadableStream<DecodedMessage>({
84+
async start(controller) {
85+
await subscription.subscribe(decoder, (message) => {
86+
controller.enqueue(message);
87+
});
88+
},
89+
cancel() {
90+
return subscription.unsubscribe([contentTopic]);
91+
}
92+
});
93+
return [messageStream, opts.waku];
94+
}
95+
96+
/**
97+
* Subscribes to new messages for a content topic via callback function.
98+
* Will create a light node configured for the content topic with default settings if a node is not provided in `opts`.
99+
* Assumes node is using autosharding.
100+
* @param contentTopic
101+
* @param callback Called every time a new message is received on the content topic
102+
* @param opts
103+
*/
104+
export async function subscribeToContentTopic(
105+
contentTopic: string,
106+
callback: Callback<DecodedMessage>,
107+
opts: CreateTopicOptions
108+
): Promise<{ subscription: IFilterSubscription; waku: LightNode }> {
109+
opts.waku =
110+
opts.waku ??
111+
(await createLightNode({
112+
shardInfo: { contentTopics: [contentTopic] }
113+
}));
114+
const { decoder, subscription } = await prepareSubscription(
115+
opts.waku,
116+
contentTopic,
117+
opts.peer
118+
);
119+
await subscription.subscribe(decoder, callback);
120+
return { subscription, waku: opts.waku };
121+
}

packages/sdk/src/create.ts

+3-9
Original file line numberDiff line numberDiff line change
@@ -6,15 +6,7 @@ import { mplex } from "@libp2p/mplex";
66
import { ping } from "@libp2p/ping";
77
import { webSockets } from "@libp2p/websockets";
88
import { all as filterAll } from "@libp2p/websockets/filters";
9-
import {
10-
DefaultUserAgent,
11-
wakuFilter,
12-
wakuLightPush,
13-
wakuMetadata,
14-
WakuNode,
15-
WakuOptions,
16-
wakuStore
17-
} from "@waku/core";
9+
import { wakuFilter, wakuLightPush, wakuMetadata, wakuStore } from "@waku/core";
1810
import { enrTree, wakuDnsDiscovery } from "@waku/dns-discovery";
1911
import {
2012
type CreateLibp2pOptions,
@@ -34,6 +26,8 @@ import { RelayCreateOptions, wakuGossipSub, wakuRelay } from "@waku/relay";
3426
import { ensureShardingConfigured } from "@waku/utils";
3527
import { createLibp2p } from "libp2p";
3628

29+
import { DefaultUserAgent, WakuNode, WakuOptions } from "./waku.js";
30+
3731
const DEFAULT_NODE_REQUIREMENTS = {
3832
lightPush: 1,
3933
filter: 1,

packages/sdk/src/index.ts

+3-6
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,4 @@
1-
export {
2-
waitForRemotePeer,
3-
createEncoder,
4-
createDecoder,
5-
WakuNode
6-
} from "@waku/core";
1+
export { waitForRemotePeer, createEncoder, createDecoder } from "@waku/core";
72
export {
83
DecodedMessage,
94
Decoder,
@@ -12,6 +7,8 @@ export {
127

138
export { utf8ToBytes, bytesToUtf8 } from "@waku/utils/bytes";
149

10+
export * from "./content_topic.js";
11+
export * from "./waku.js";
1512
export * from "./create.js";
1613
export * as waku from "@waku/core";
1714
export * as utils from "@waku/utils";

packages/sdk/src/relay/index.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
import { WakuNode, WakuOptions } from "@waku/core";
21
import {
32
DefaultPubsubTopic,
43
type ProtocolCreateOptions,
@@ -8,6 +7,7 @@ import { RelayCreateOptions, wakuGossipSub, wakuRelay } from "@waku/relay";
87
import { ensureShardingConfigured } from "@waku/utils";
98

109
import { defaultLibp2p, defaultPeerDiscoveries } from "../create.js";
10+
import { WakuNode, WakuOptions } from "../waku.js";
1111

1212
/**
1313
* Create a Waku node that uses Waku Relay to send and receive messages,

packages/core/src/lib/waku.ts packages/sdk/src/waku.ts

+18-1
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,23 @@
11
import type { Stream } from "@libp2p/interface";
22
import { isPeerId, PeerId } from "@libp2p/interface";
33
import { multiaddr, Multiaddr, MultiaddrInput } from "@multiformats/multiaddr";
4+
import { ConnectionManager, DecodedMessage } from "@waku/core";
45
import type {
6+
Callback,
57
IFilter,
8+
IFilterSubscription,
69
ILightPush,
710
IRelay,
811
IStore,
912
Libp2p,
13+
LightNode,
1014
PubsubTopic,
1115
Waku
1216
} from "@waku/interfaces";
1317
import { Protocols } from "@waku/interfaces";
1418
import { Logger } from "@waku/utils";
1519

16-
import { ConnectionManager } from "./connection_manager.js";
20+
import { subscribeToContentTopic } from "./content_topic.js";
1721

1822
export const DefaultPingKeepAliveValueSecs = 5 * 60;
1923
export const DefaultRelayKeepAliveValueSecs = 5 * 60;
@@ -180,6 +184,19 @@ export class WakuNode implements Waku {
180184
await this.libp2p.stop();
181185
}
182186

187+
async subscribeToContentTopic(
188+
contentTopic: string,
189+
peer: Multiaddr,
190+
callback: Callback<DecodedMessage>
191+
): Promise<IFilterSubscription> {
192+
return (
193+
await subscribeToContentTopic(contentTopic, callback, {
194+
waku: this as LightNode,
195+
peer
196+
})
197+
).subscription;
198+
}
199+
183200
isStarted(): boolean {
184201
return this.libp2p.status == "started";
185202
}

0 commit comments

Comments
 (0)