Skip to content

Commit 809681a

Browse files
authored
Merge pull request #1231 from waku-org/feat/pubsub-topic-message
2 parents 0c63b29 + 628ac50 commit 809681a

15 files changed

+68
-22
lines changed

packages/core/src/lib/filter/index.ts

+8-3
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ export type RequestID = string;
3434
type Subscription<T extends IDecodedMessage> = {
3535
decoders: IDecoder<T>[];
3636
callback: Callback<T>;
37+
pubSubTopic: string;
3738
};
3839

3940
/**
@@ -108,7 +109,8 @@ class Filter extends BaseProtocol implements IFilter {
108109
throw e;
109110
}
110111

111-
this.subscriptions.set(requestId, { callback, decoders });
112+
const subscription: Subscription<T> = { callback, decoders, pubSubTopic };
113+
this.subscriptions.set(requestId, subscription);
112114

113115
return async () => {
114116
await this.unsubscribe(pubSubTopic, contentFilters, requestId, peer);
@@ -150,7 +152,7 @@ class Filter extends BaseProtocol implements IFilter {
150152
log(`No subscription locally registered for request ID ${requestId}`);
151153
return;
152154
}
153-
const { decoders, callback } = subscription;
155+
const { decoders, callback, pubSubTopic } = subscription;
154156

155157
if (!decoders || !decoders.length) {
156158
log(`No decoder registered for request ID ${requestId}`);
@@ -170,7 +172,10 @@ class Filter extends BaseProtocol implements IFilter {
170172
// noinspection ES6MissingAwait
171173
decoders.forEach(async (dec: IDecoder<T>) => {
172174
if (didDecodeMsg) return;
173-
const decoded = await dec.fromProtoObj(toProtoMessage(protoMessage));
175+
const decoded = await dec.fromProtoObj(
176+
pubSubTopic,
177+
toProtoMessage(protoMessage)
178+
);
174179
if (!decoded) {
175180
log("Not able to decode message");
176181
return;

packages/core/src/lib/message/topic_only_message.ts

+6-2
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,10 @@ export class TopicOnlyMessage implements IDecodedMessage {
1414
public timestamp: undefined;
1515
public ephemeral: undefined;
1616

17-
constructor(private proto: ProtoTopicOnlyMessage) {}
17+
constructor(
18+
public pubSubTopic: string,
19+
private proto: ProtoTopicOnlyMessage
20+
) {}
1821

1922
get contentTopic(): string {
2023
return this.proto.contentTopic;
@@ -38,8 +41,9 @@ export class TopicOnlyDecoder implements IDecoder<TopicOnlyMessage> {
3841
}
3942

4043
async fromProtoObj(
44+
pubSubTopic: string,
4145
proto: IProtoMessage
4246
): Promise<TopicOnlyMessage | undefined> {
43-
return new TopicOnlyMessage(proto);
47+
return new TopicOnlyMessage(pubSubTopic, proto);
4448
}
4549
}

packages/core/src/lib/message/version_0.spec.ts

+4-4
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import fc from "fast-check";
44
import { createDecoder, createEncoder, DecodedMessage } from "./version_0.js";
55

66
const TestContentTopic = "/test/1/waku-message/utf8";
7+
const TestPubSubTopic = "/test/pubsub/topic";
78

89
describe("Waku Message version 0", function () {
910
it("Round trip binary serialization", async function () {
@@ -16,10 +17,12 @@ describe("Waku Message version 0", function () {
1617
const decoder = createDecoder(TestContentTopic);
1718
const protoResult = await decoder.fromWireToProtoObj(bytes);
1819
const result = (await decoder.fromProtoObj(
20+
TestPubSubTopic,
1921
protoResult!
2022
)) as DecodedMessage;
2123

2224
expect(result.contentTopic).to.eq(TestContentTopic);
25+
expect(result.pubSubTopic).to.eq(TestPubSubTopic);
2326
expect(result.version).to.eq(0);
2427
expect(result.ephemeral).to.be.false;
2528
expect(result.payload).to.deep.eq(payload);
@@ -39,14 +42,11 @@ describe("Waku Message version 0", function () {
3942
const decoder = createDecoder(TestContentTopic);
4043
const protoResult = await decoder.fromWireToProtoObj(bytes);
4144
const result = (await decoder.fromProtoObj(
45+
TestPubSubTopic,
4246
protoResult!
4347
)) as DecodedMessage;
4448

45-
expect(result.contentTopic).to.eq(TestContentTopic);
46-
expect(result.version).to.eq(0);
4749
expect(result.ephemeral).to.be.true;
48-
expect(result.payload).to.deep.eq(payload);
49-
expect(result.timestamp).to.not.be.undefined;
5050
})
5151
);
5252
});

packages/core/src/lib/message/version_0.ts

+3-2
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ export const Version = 0;
1717
export { proto };
1818

1919
export class DecodedMessage implements IDecodedMessage {
20-
constructor(protected proto: proto.WakuMessage) {}
20+
constructor(public pubSubTopic: string, protected proto: proto.WakuMessage) {}
2121

2222
get ephemeral(): boolean {
2323
return Boolean(this.proto.ephemeral);
@@ -115,6 +115,7 @@ export class Decoder implements IDecoder<DecodedMessage> {
115115
}
116116

117117
async fromProtoObj(
118+
pubSubTopic: string,
118119
proto: IProtoMessage
119120
): Promise<DecodedMessage | undefined> {
120121
// https://rfc.vac.dev/spec/14/
@@ -129,7 +130,7 @@ export class Decoder implements IDecoder<DecodedMessage> {
129130
return Promise.resolve(undefined);
130131
}
131132

132-
return new DecodedMessage(proto);
133+
return new DecodedMessage(pubSubTopic, proto);
133134
}
134135
}
135136

packages/core/src/lib/relay/index.ts

+6-4
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,7 @@ class Relay extends GossipSub implements IRelay {
122122
}
123123

124124
private async processIncomingMessage<T extends IDecodedMessage>(
125+
pubSubTopic: string,
125126
bytes: Uint8Array
126127
): Promise<void> {
127128
const topicOnlyMsg = await this.defaultDecoder.fromWireToProtoObj(bytes);
@@ -143,7 +144,7 @@ class Relay extends GossipSub implements IRelay {
143144
log("Internal error: message previously decoded failed on 2nd pass.");
144145
return;
145146
}
146-
const msg = await decoder.fromProtoObj(protoMsg);
147+
const msg = await decoder.fromProtoObj(pubSubTopic, protoMsg);
147148
if (msg) {
148149
callback(msg);
149150
} else {
@@ -165,9 +166,10 @@ class Relay extends GossipSub implements IRelay {
165166
if (event.detail.msg.topic !== pubSubTopic) return;
166167
log(`Message received on ${pubSubTopic}`);
167168

168-
this.processIncomingMessage(event.detail.msg.data).catch((e) =>
169-
log("Failed to process incoming message", e)
170-
);
169+
this.processIncomingMessage(
170+
event.detail.msg.topic,
171+
event.detail.msg.data
172+
).catch((e) => log("Failed to process incoming message", e));
171173
}
172174
);
173175

packages/core/src/lib/store/index.ts

+4-1
Original file line numberDiff line numberDiff line change
@@ -311,7 +311,10 @@ async function* paginate<T extends IDecodedMessage>(
311311
if (typeof contentTopic !== "undefined") {
312312
const decoder = decoders.get(contentTopic);
313313
if (decoder) {
314-
return decoder.fromProtoObj(toProtoMessage(protoMsg));
314+
return decoder.fromProtoObj(
315+
queryOpts.pubSubTopic,
316+
toProtoMessage(protoMsg)
317+
);
315318
}
316319
}
317320
return Promise.resolve(undefined);

packages/interfaces/src/message.ts

+5-1
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ export interface IEncoder {
5050
export interface IDecodedMessage {
5151
payload: Uint8Array;
5252
contentTopic: string;
53+
pubSubTopic: string;
5354
timestamp: Date | undefined;
5455
rateLimitProof: IRateLimitProof | undefined;
5556
ephemeral: boolean | undefined;
@@ -58,5 +59,8 @@ export interface IDecodedMessage {
5859
export interface IDecoder<T extends IDecodedMessage> {
5960
contentTopic: string;
6061
fromWireToProtoObj: (bytes: Uint8Array) => Promise<IProtoMessage | undefined>;
61-
fromProtoObj: (proto: IProtoMessage) => Promise<T | undefined>;
62+
fromProtoObj: (
63+
pubSubTopic: string,
64+
proto: IProtoMessage
65+
) => Promise<T | undefined>;
6266
}

packages/message-encryption/src/decoded_message.ts

+2-1
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,13 @@ export class DecodedMessage
1111
private readonly _decodedPayload: Uint8Array;
1212

1313
constructor(
14+
pubSubTopic: string,
1415
proto: proto.WakuMessage,
1516
decodedPayload: Uint8Array,
1617
public signature?: Uint8Array,
1718
public signaturePublicKey?: Uint8Array
1819
) {
19-
super(proto);
20+
super(pubSubTopic, proto);
2021
this._decodedPayload = decodedPayload;
2122
}
2223

packages/message-encryption/src/ecies.spec.ts

+11-2
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import { getPublicKey } from "./crypto/index.js";
55
import { createDecoder, createEncoder } from "./ecies.js";
66

77
const TestContentTopic = "/test/1/waku-message/utf8";
8+
const TestPubSubTopic = "/test/pubsub/topic";
89

910
describe("Ecies Encryption", function () {
1011
it("Round trip binary encryption [ecies, no signature]", async function () {
@@ -24,10 +25,14 @@ describe("Ecies Encryption", function () {
2425
const decoder = createDecoder(TestContentTopic, privateKey);
2526
const protoResult = await decoder.fromWireToProtoObj(bytes!);
2627
if (!protoResult) throw "Failed to proto decode";
27-
const result = await decoder.fromProtoObj(protoResult);
28+
const result = await decoder.fromProtoObj(
29+
TestPubSubTopic,
30+
protoResult
31+
);
2832
if (!result) throw "Failed to decode";
2933

3034
expect(result.contentTopic).to.equal(TestContentTopic);
35+
expect(result.pubSubTopic).to.equal(TestPubSubTopic);
3136
expect(result.version).to.equal(1);
3237
expect(result?.payload).to.deep.equal(payload);
3338
expect(result.signature).to.be.undefined;
@@ -59,10 +64,14 @@ describe("Ecies Encryption", function () {
5964
const decoder = createDecoder(TestContentTopic, bobPrivateKey);
6065
const protoResult = await decoder.fromWireToProtoObj(bytes!);
6166
if (!protoResult) throw "Failed to proto decode";
62-
const result = await decoder.fromProtoObj(protoResult);
67+
const result = await decoder.fromProtoObj(
68+
TestPubSubTopic,
69+
protoResult
70+
);
6371
if (!result) throw "Failed to decode";
6472

6573
expect(result.contentTopic).to.equal(TestContentTopic);
74+
expect(result.pubSubTopic).to.equal(TestPubSubTopic);
6675
expect(result.version).to.equal(1);
6776
expect(result?.payload).to.deep.equal(payload);
6877
expect(result.signature).to.not.be.undefined;

packages/message-encryption/src/ecies.ts

+2
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,7 @@ class Decoder extends DecoderV0 implements IDecoder<DecodedMessage> {
9595
}
9696

9797
async fromProtoObj(
98+
pubSubTopic: string,
9899
protoMessage: IProtoMessage
99100
): Promise<DecodedMessage | undefined> {
100101
const cipherPayload = protoMessage.payload;
@@ -135,6 +136,7 @@ class Decoder extends DecoderV0 implements IDecoder<DecodedMessage> {
135136

136137
log("Message decrypted", protoMessage);
137138
return new DecodedMessage(
139+
pubSubTopic,
138140
protoMessage,
139141
res.payload,
140142
res.sig?.signature,

packages/message-encryption/src/symmetric.spec.ts

+11-2
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import { getPublicKey } from "./crypto/index.js";
55
import { createDecoder, createEncoder } from "./symmetric.js";
66

77
const TestContentTopic = "/test/1/waku-message/utf8";
8+
const TestPubSubTopic = "/test/pubsub/topic";
89

910
describe("Symmetric Encryption", function () {
1011
it("Round trip binary encryption [symmetric, no signature]", async function () {
@@ -22,10 +23,14 @@ describe("Symmetric Encryption", function () {
2223
const decoder = createDecoder(TestContentTopic, symKey);
2324
const protoResult = await decoder.fromWireToProtoObj(bytes!);
2425
if (!protoResult) throw "Failed to proto decode";
25-
const result = await decoder.fromProtoObj(protoResult);
26+
const result = await decoder.fromProtoObj(
27+
TestPubSubTopic,
28+
protoResult
29+
);
2630
if (!result) throw "Failed to decode";
2731

2832
expect(result.contentTopic).to.equal(TestContentTopic);
33+
expect(result.pubSubTopic).to.equal(TestPubSubTopic);
2934
expect(result.version).to.equal(1);
3035
expect(result?.payload).to.deep.equal(payload);
3136
expect(result.signature).to.be.undefined;
@@ -54,10 +59,14 @@ describe("Symmetric Encryption", function () {
5459
const decoder = createDecoder(TestContentTopic, symKey);
5560
const protoResult = await decoder.fromWireToProtoObj(bytes!);
5661
if (!protoResult) throw "Failed to proto decode";
57-
const result = await decoder.fromProtoObj(protoResult);
62+
const result = await decoder.fromProtoObj(
63+
TestPubSubTopic,
64+
protoResult
65+
);
5866
if (!result) throw "Failed to decode";
5967

6068
expect(result.contentTopic).to.equal(TestContentTopic);
69+
expect(result.pubSubTopic).to.equal(TestPubSubTopic);
6170
expect(result.version).to.equal(1);
6271
expect(result?.payload).to.deep.equal(payload);
6372
expect(result.signature).to.not.be.undefined;

packages/message-encryption/src/symmetric.ts

+2
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@ class Decoder extends DecoderV0 implements IDecoder<DecodedMessage> {
9090
}
9191

9292
async fromProtoObj(
93+
pubSubTopic: string,
9394
protoMessage: IProtoMessage
9495
): Promise<DecodedMessage | undefined> {
9596
const cipherPayload = protoMessage.payload;
@@ -130,6 +131,7 @@ class Decoder extends DecoderV0 implements IDecoder<DecodedMessage> {
130131

131132
log("Message decrypted", protoMessage);
132133
return new DecodedMessage(
134+
pubSubTopic,
133135
protoMessage,
134136
res.payload,
135137
res.sig?.signature,

packages/tests/tests/filter.node.spec.ts

+2
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import {
22
createDecoder,
33
createEncoder,
44
DecodedMessage,
5+
DefaultPubSubTopic,
56
waitForRemotePeer,
67
} from "@waku/core";
78
import { createLightNode } from "@waku/create";
@@ -52,6 +53,7 @@ describe("Waku Filter", () => {
5253
log("Got a message");
5354
messageCount++;
5455
expect(msg.contentTopic).to.eq(TestContentTopic);
56+
expect(msg.pubSubTopic).to.eq(DefaultPubSubTopic);
5557
expect(bytesToUtf8(msg.payload)).to.eq(messageText);
5658
};
5759

packages/tests/tests/relay.node.spec.ts

+1
Original file line numberDiff line numberDiff line change
@@ -334,6 +334,7 @@ describe("Waku Relay [node only]", () => {
334334
await waku3NoMsgPromise;
335335

336336
expect(bytesToUtf8(waku2ReceivedMsg.payload!)).to.eq(messageText);
337+
expect(waku2ReceivedMsg.pubSubTopic).to.eq(pubSubTopic);
337338
});
338339
});
339340

packages/tests/tests/store.node.spec.ts

+1
Original file line numberDiff line numberDiff line change
@@ -615,6 +615,7 @@ describe("Waku Store, custom pubsub topic", () => {
615615
const msg = await promise;
616616
if (msg) {
617617
messages.push(msg);
618+
expect(msg.pubSubTopic).to.eq(customPubSubTopic);
618619
}
619620
});
620621

0 commit comments

Comments
 (0)