Skip to content

Commit 72f97d4

Browse files
authored
feat: add 1MB restriction to LightPush and Relay (#1351)
* add 1MB restriction to LightPush and Relay * fix condition * improve lightPush test * update test * add isomorphic-webcrypto * import module * add errors to SendResult and tests * fix lint
1 parent ef87af6 commit 72f97d4

File tree

10 files changed

+379
-219
lines changed

10 files changed

+379
-219
lines changed

package-lock.json

+182-180
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

packages/core/src/lib/light_push/index.ts

+26-9
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,16 @@
11
import type { Libp2p } from "@libp2p/interface-libp2p";
22
import type { PeerId } from "@libp2p/interface-peer-id";
3-
import type {
3+
import {
44
IEncoder,
55
ILightPush,
66
IMessage,
77
ProtocolCreateOptions,
88
ProtocolOptions,
9+
SendError,
910
SendResult,
1011
} from "@waku/interfaces";
1112
import { PushResponse } from "@waku/proto";
13+
import { isSizeValid } from "@waku/utils";
1214
import debug from "debug";
1315
import all from "it-all";
1416
import * as lp from "it-length-prefixed";
@@ -47,12 +49,24 @@ class LightPush extends BaseProtocol implements ILightPush {
4749
const stream = await this.newStream(peer);
4850

4951
const recipients: PeerId[] = [];
52+
let error: undefined | SendError = undefined;
5053

5154
try {
55+
if (!isSizeValid(message.payload)) {
56+
log("Failed to send waku light push: message is bigger that 1MB");
57+
return {
58+
recipients,
59+
error: SendError.SIZE_TOO_BIG,
60+
};
61+
}
62+
5263
const protoMessage = await encoder.toProtoObj(message);
5364
if (!protoMessage) {
5465
log("Failed to encode to protoMessage, aborting push");
55-
return { recipients };
66+
return {
67+
recipients,
68+
error: SendError.ENCODE_FAILED,
69+
};
5670
}
5771
const query = PushRpc.createRequest(protoMessage, pubSubTopic);
5872
const res = await pipe(
@@ -70,21 +84,24 @@ class LightPush extends BaseProtocol implements ILightPush {
7084

7185
const response = PushRpc.decode(bytes).response;
7286

73-
if (!response) {
74-
log("No response in PushRPC");
75-
return { recipients };
76-
}
77-
78-
if (response.isSuccess) {
87+
if (response?.isSuccess) {
7988
recipients.push(peer.id);
89+
} else {
90+
log("No response in PushRPC");
91+
error = SendError.NO_RPC_RESPONSE;
8092
}
8193
} catch (err) {
8294
log("Failed to decode push reply", err);
95+
error = SendError.DECODE_FAILED;
8396
}
8497
} catch (err) {
8598
log("Failed to send waku light push request", err);
99+
error = SendError.GENERIC_FAIL;
86100
}
87-
return { recipients };
101+
return {
102+
error,
103+
recipients,
104+
};
88105
}
89106
}
90107

packages/interfaces/src/protocols.ts

+9
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,15 @@ export type Callback<T extends IDecodedMessage> = (
6262
msg: T
6363
) => void | Promise<void>;
6464

65+
export enum SendError {
66+
GENERIC_FAIL = "Generic error",
67+
ENCODE_FAILED = "Failed to encode",
68+
DECODE_FAILED = "Failed to decode",
69+
SIZE_TOO_BIG = "Size is too big",
70+
NO_RPC_RESPONSE = "No RPC response",
71+
}
72+
6573
export interface SendResult {
74+
error?: SendError;
6675
recipients: PeerId[];
6776
}

packages/relay/src/index.ts

+15-3
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ import type { Libp2p } from "@libp2p/interface-libp2p";
1010
import type { PubSub } from "@libp2p/interface-pubsub";
1111
import { sha256 } from "@noble/hashes/sha256";
1212
import { DefaultPubSubTopic } from "@waku/core";
13-
import type {
13+
import {
1414
ActiveSubscriptions,
1515
Callback,
1616
IAsyncIterator,
@@ -21,9 +21,10 @@ import type {
2121
IRelay,
2222
ProtocolCreateOptions,
2323
ProtocolOptions,
24+
SendError,
2425
SendResult,
2526
} from "@waku/interfaces";
26-
import { groupByContentTopic, toAsyncIterator } from "@waku/utils";
27+
import { groupByContentTopic, isSizeValid, toAsyncIterator } from "@waku/utils";
2728
import debug from "debug";
2829

2930
import { RelayCodecs } from "./constants.js";
@@ -97,10 +98,21 @@ class Relay implements IRelay {
9798
* Send Waku message.
9899
*/
99100
public async send(encoder: IEncoder, message: IMessage): Promise<SendResult> {
101+
if (!isSizeValid(message.payload)) {
102+
log("Failed to send waku relay: message is bigger that 1MB");
103+
return {
104+
recipients: [],
105+
error: SendError.SIZE_TOO_BIG,
106+
};
107+
}
108+
100109
const msg = await encoder.toWire(message);
101110
if (!msg) {
102111
log("Failed to encode message, aborting publish");
103-
return { recipients: [] };
112+
return {
113+
recipients: [],
114+
error: SendError.ENCODE_FAILED,
115+
};
104116
}
105117

106118
return this.gossipSub.publish(this.pubSubTopic, msg);

packages/tests/src/index.ts

+1
Original file line numberDiff line numberDiff line change
@@ -10,3 +10,4 @@ export * from "./constants.js";
1010
export * from "./delay.js";
1111
export * from "./log_file.js";
1212
export * from "./nwaku.js";
13+
export * from "./random_array.js";

packages/tests/src/random_array.ts

+15
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
import crypto from "crypto";
2+
3+
export function generateRandomUint8Array(sizeInBytes: number): Uint8Array {
4+
const chunkSize = 65536; // Maximum entropy available
5+
const chunks = Math.ceil(sizeInBytes / chunkSize);
6+
const buffer = new Uint8Array(sizeInBytes);
7+
8+
for (let i = 0; i < chunks; i++) {
9+
const chunk = new Uint8Array(chunkSize);
10+
crypto.getRandomValues(chunk);
11+
buffer.set(chunk, i * chunkSize);
12+
}
13+
14+
return buffer;
15+
}

packages/tests/tests/light_push.node.spec.ts

+57-26
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import { createEncoder, waitForRemotePeer } from "@waku/core";
22
import { createLightNode } from "@waku/create";
3-
import type { LightNode } from "@waku/interfaces";
3+
import { LightNode, SendError } from "@waku/interfaces";
44
import { Protocols } from "@waku/interfaces";
55
import { utf8ToBytes } from "@waku/utils/bytes";
66
import { expect } from "chai";
@@ -9,6 +9,7 @@ import debug from "debug";
99
import {
1010
base64ToUtf8,
1111
delay,
12+
generateRandomUint8Array,
1213
makeLogFileName,
1314
MessageRpcResponse,
1415
NOISE_KEY_1,
@@ -26,24 +27,43 @@ describe("Waku Light Push [node only]", () => {
2627
let waku: LightNode;
2728
let nwaku: Nwaku;
2829

29-
afterEach(async function () {
30-
!!nwaku &&
31-
nwaku.stop().catch((e) => console.log("Nwaku failed to stop", e));
32-
!!waku && waku.stop().catch((e) => console.log("Waku failed to stop", e));
33-
});
34-
35-
it("Push successfully", async function () {
36-
this.timeout(15_000);
37-
38-
nwaku = new Nwaku(makeLogFileName(this));
39-
await nwaku.start({ lightpush: true, relay: true });
30+
const runNodes = async (
31+
context: Mocha.Context,
32+
pubSubTopic?: string
33+
): Promise<void> => {
34+
const nwakuOptional = pubSubTopic ? { topics: pubSubTopic } : {};
35+
nwaku = new Nwaku(makeLogFileName(context));
36+
await nwaku.start({
37+
lightpush: true,
38+
relay: true,
39+
...nwakuOptional,
40+
});
4041

4142
waku = await createLightNode({
43+
pubSubTopic,
4244
staticNoiseKey: NOISE_KEY_1,
4345
});
4446
await waku.start();
4547
await waku.dial(await nwaku.getMultiaddrWithId());
4648
await waitForRemotePeer(waku, [Protocols.LightPush]);
49+
};
50+
51+
beforeEach(async function () {
52+
this.timeout(15_000);
53+
await runNodes(this);
54+
});
55+
56+
afterEach(async function () {
57+
try {
58+
nwaku?.stop();
59+
waku?.stop();
60+
} catch (e) {
61+
console.error("Failed to stop nodes: ", e);
62+
}
63+
});
64+
65+
it("Push successfully", async function () {
66+
this.timeout(15_000);
4767

4868
const messageText = "Light Push works!";
4969

@@ -63,28 +83,39 @@ describe("Waku Light Push [node only]", () => {
6383
expect(base64ToUtf8(msgs[0].payload)).to.equal(messageText);
6484
});
6585

66-
it("Push on custom pubsub topic", async function () {
86+
it("Pushes messages equal or less that 1MB", async function () {
6787
this.timeout(15_000);
88+
const MB = 1024 ** 2;
6889

69-
const customPubSubTopic = "/waku/2/custom-dapp/proto";
90+
let pushResponse = await waku.lightPush.send(TestEncoder, {
91+
payload: generateRandomUint8Array(MB),
92+
});
93+
expect(pushResponse.recipients.length).to.greaterThan(0);
7094

71-
nwaku = new Nwaku(makeLogFileName(this));
72-
await nwaku.start({
73-
lightpush: true,
74-
topics: customPubSubTopic,
75-
relay: true,
95+
pushResponse = await waku.lightPush.send(TestEncoder, {
96+
payload: generateRandomUint8Array(65536),
7697
});
98+
expect(pushResponse.recipients.length).to.greaterThan(0);
99+
});
77100

78-
waku = await createLightNode({
79-
pubSubTopic: customPubSubTopic,
80-
staticNoiseKey: NOISE_KEY_1,
101+
it("Fails to push message bigger that 1MB", async function () {
102+
this.timeout(15_000);
103+
const MB = 1024 ** 2;
104+
105+
const pushResponse = await waku.lightPush.send(TestEncoder, {
106+
payload: generateRandomUint8Array(MB + 65536),
81107
});
82-
await waku.start();
83-
await waku.dial(await nwaku.getMultiaddrWithId());
84-
await waitForRemotePeer(waku, [Protocols.LightPush]);
108+
expect(pushResponse.recipients.length).to.eq(0);
109+
expect(pushResponse.error).to.eq(SendError.SIZE_TOO_BIG);
110+
});
85111

86-
const nimPeerId = await nwaku.getPeerId();
112+
it("Push on custom pubsub topic", async function () {
113+
this.timeout(15_000);
87114

115+
const customPubSubTopic = "/waku/2/custom-dapp/proto";
116+
await runNodes(this, customPubSubTopic);
117+
118+
const nimPeerId = await nwaku.getPeerId();
88119
const messageText = "Light Push works!";
89120

90121
log("Send message via lightpush");

packages/tests/tests/relay.node.spec.ts

+63-1
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ import {
77
waitForRemotePeer,
88
} from "@waku/core";
99
import { createRelayNode } from "@waku/create";
10-
import type { RelayNode } from "@waku/interfaces";
10+
import { RelayNode, SendError } from "@waku/interfaces";
1111
import { Protocols } from "@waku/interfaces";
1212
import {
1313
createDecoder as createEciesDecoder,
@@ -27,6 +27,7 @@ import debug from "debug";
2727
import {
2828
base64ToUtf8,
2929
delay,
30+
generateRandomUint8Array,
3031
makeLogFileName,
3132
MessageRpcResponse,
3233
NOISE_KEY_1,
@@ -336,6 +337,67 @@ describe("Waku Relay [node only]", () => {
336337
expect(bytesToUtf8(waku2ReceivedMsg.payload!)).to.eq(messageText);
337338
expect(waku2ReceivedMsg.pubSubTopic).to.eq(pubSubTopic);
338339
});
340+
341+
it("Publishes <= 1 MB and rejects others", async function () {
342+
this.timeout(10000);
343+
const MB = 1024 ** 2;
344+
345+
const pubSubTopic = "/some/pubsub/topic";
346+
347+
// 1 and 2 uses a custom pubsub
348+
[waku1, waku2] = await Promise.all([
349+
createRelayNode({
350+
pubSubTopic: pubSubTopic,
351+
staticNoiseKey: NOISE_KEY_1,
352+
}).then((waku) => waku.start().then(() => waku)),
353+
createRelayNode({
354+
pubSubTopic: pubSubTopic,
355+
staticNoiseKey: NOISE_KEY_2,
356+
libp2p: { addresses: { listen: ["/ip4/0.0.0.0/tcp/0/ws"] } },
357+
}).then((waku) => waku.start().then(() => waku)),
358+
]);
359+
360+
await waku1.libp2p.peerStore.addressBook.set(
361+
waku2.libp2p.peerId,
362+
waku2.libp2p.getMultiaddrs()
363+
);
364+
await Promise.all([waku1.dial(waku2.libp2p.peerId)]);
365+
366+
await Promise.all([
367+
waitForRemotePeer(waku1, [Protocols.Relay]),
368+
waitForRemotePeer(waku2, [Protocols.Relay]),
369+
]);
370+
371+
const waku2ReceivedMsgPromise: Promise<DecodedMessage> = new Promise(
372+
(resolve) => {
373+
waku2.relay.subscribe([TestDecoder], () =>
374+
resolve({
375+
payload: new Uint8Array([]),
376+
} as DecodedMessage)
377+
);
378+
}
379+
);
380+
381+
let sendResult = await waku1.relay.send(TestEncoder, {
382+
payload: generateRandomUint8Array(1 * MB),
383+
});
384+
expect(sendResult.recipients.length).to.eq(1);
385+
386+
sendResult = await waku1.relay.send(TestEncoder, {
387+
payload: generateRandomUint8Array(1 * MB + 65536),
388+
});
389+
expect(sendResult.recipients.length).to.eq(0);
390+
expect(sendResult.error).to.eq(SendError.SIZE_TOO_BIG);
391+
392+
sendResult = await waku1.relay.send(TestEncoder, {
393+
payload: generateRandomUint8Array(2 * MB),
394+
});
395+
expect(sendResult.recipients.length).to.eq(0);
396+
expect(sendResult.error).to.eq(SendError.SIZE_TOO_BIG);
397+
398+
const waku2ReceivedMsg = await waku2ReceivedMsgPromise;
399+
expect(waku2ReceivedMsg?.payload?.length).to.eq(0);
400+
});
339401
});
340402

341403
describe("Interop: nwaku", function () {

packages/utils/src/common/index.ts

+1
Original file line numberDiff line numberDiff line change
@@ -2,3 +2,4 @@ export * from "./is_defined.js";
22
export * from "./random_subset.js";
33
export * from "./group_by.js";
44
export * from "./to_async_iterator.js";
5+
export * from "./is_size_valid.js";
+10
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
const MB = 1024 ** 2;
2+
const SIZE_CAP = 1; // 1 MB
3+
4+
export const isSizeValid = (payload: Uint8Array): boolean => {
5+
if (payload.length / MB > SIZE_CAP) {
6+
return false;
7+
}
8+
9+
return true;
10+
};

0 commit comments

Comments
 (0)