Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat!: support for cursors on store API #1024

Merged
merged 15 commits into from
Nov 17, 2022
Merged
18 changes: 18 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion packages/core/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,6 @@
"node": ">=16"
},
"dependencies": {
"@waku/byte-utils": "*",
"@chainsafe/libp2p-gossipsub": "^4.1.1",
"@libp2p/interface-connection": "^3.0.3",
"@libp2p/interface-peer-discovery": "^1.0.0",
Expand All @@ -97,6 +96,8 @@
"@libp2p/interfaces": "^3.0.2",
"@libp2p/peer-id": "^1.1.10",
"@multiformats/multiaddr": "^11.0.6",
"@noble/hashes": "^1.1.3",
"@waku/byte-utils": "*",
"@waku/interfaces": "*",
"debug": "^4.3.4",
"it-all": "^1.0.6",
Expand Down
7 changes: 6 additions & 1 deletion packages/core/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,9 @@ export * as waku_relay from "./lib/waku_relay";
export { WakuRelay } from "./lib/waku_relay";

export * as waku_store from "./lib/waku_store";
export { PageDirection, WakuStore, StoreCodec } from "./lib/waku_store";
export {
PageDirection,
WakuStore,
StoreCodec,
createCursor,
} from "./lib/waku_store";
30 changes: 27 additions & 3 deletions packages/core/src/lib/waku_store/index.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import type { Connection } from "@libp2p/interface-connection";
import type { PeerId } from "@libp2p/interface-peer-id";
import { Peer } from "@libp2p/interface-peer-store";
import { sha256 } from "@noble/hashes/sha256";
import { concat, utf8ToBytes } from "@waku/byte-utils";
import { DecodedMessage, Decoder } from "@waku/interfaces";
import debug from "debug";
import all from "it-all";
Expand Down Expand Up @@ -75,6 +77,10 @@ export interface QueryOptions {
* Retrieve messages with a timestamp within the provided values.
*/
timeFilter?: TimeFilter;
/**
* Cursor as an index to start a query from.
*/
cursor?: proto.Index;
}

/**
Expand Down Expand Up @@ -251,7 +257,8 @@ export class WakuStore {
connection,
protocol,
queryOpts,
decodersAsMap
decodersAsMap,
options?.cursor
)) {
yield messages;
}
Expand All @@ -270,7 +277,8 @@ async function* paginate<T extends DecodedMessage>(
connection: Connection,
protocol: string,
queryOpts: Params,
decoders: Map<string, Decoder<T>>
decoders: Map<string, Decoder<T>>,
cursor?: proto.Index
): AsyncGenerator<Promise<T | undefined>[]> {
if (
queryOpts.contentTopics.toString() !==
Expand All @@ -281,7 +289,6 @@ async function* paginate<T extends DecodedMessage>(
);
}

let cursor = undefined;
while (true) {
queryOpts = Object.assign(queryOpts, { cursor });

Expand Down Expand Up @@ -370,3 +377,20 @@ async function* paginate<T extends DecodedMessage>(
export function isDefined<T>(msg: T | undefined): msg is T {
return !!msg;
}

export async function createCursor(
message: string,
messageTimestamp: bigint,
contentTopic: string,
pubsubTopic: string = DefaultPubSubTopic
): Promise<proto.Index> {
const contentTopicBytes = utf8ToBytes(contentTopic);
const messageBytes = utf8ToBytes(message);
const digest = sha256(concat([contentTopicBytes, messageBytes]));

return {
digest,
pubsubTopic,
senderTime: messageTimestamp,
};
}
10 changes: 10 additions & 0 deletions packages/interfaces/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,12 @@ export interface PointToPointProtocol {
libp2p: Libp2p;
peers: () => Promise<Peer[]>;
}
export interface Index {
digest?: Uint8Array;
receivedTime?: bigint;
senderTime?: bigint;
pubsubTopic?: string;
}

export type ProtocolOptions = {
pubSubTopic?: string;
Expand Down Expand Up @@ -73,6 +79,10 @@ export type StoreQueryOptions = {
* Retrieve messages with a timestamp within the provided values.
*/
timeFilter?: TimeFilter;
/**
* Cursor as an index to start a query from.
*/
cursor?: Index;
} & ProtocolOptions;

export interface Store extends PointToPointProtocol {
Expand Down
2 changes: 2 additions & 0 deletions packages/message-encryption/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ export class MessageV1 extends MessageV0 implements DecodedMessage {
}
}

export { sha256 } from "./crypto";

export class AsymEncoder implements Encoder {
constructor(
public contentTopic: string,
Expand Down
63 changes: 62 additions & 1 deletion packages/tests/tests/store.node.spec.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { bytesToUtf8, utf8ToBytes } from "@waku/byte-utils";
import { PageDirection } from "@waku/core";
import { createCursor, PageDirection } from "@waku/core";
import { waitForRemotePeer } from "@waku/core/lib/wait_for_remote_peer";
import { DecoderV0, EncoderV0 } from "@waku/core/lib/waku_message/version_0";
import { createFullNode } from "@waku/create";
Expand Down Expand Up @@ -110,6 +110,67 @@ describe("Waku Store", () => {
expect(messages?.length).eq(0);
});

it("Passing a cursor", async function () {
this.timeout(4_000);
const totalMsgs = 20;

for (let i = 0; i < totalMsgs; i++) {
expect(
await nwaku.sendMessage(
Nwaku.toMessageRpcQuery({
payload: utf8ToBytes(`Message ${i}`),
contentTopic: TestContentTopic,
})
)
).to.be.true;
}

waku = await createFullNode({
staticNoiseKey: NOISE_KEY_1,
});
await waku.start();
await waku.dial(await nwaku.getMultiaddrWithId());
await waitForRemotePeer(waku, [Protocols.Store]);

const query = waku.store.queryGenerator([TestDecoder]);

// messages in reversed order (first message at last index)
const messages: Message[] = [];
for await (const page of query) {
for await (const msg of page.reverse()) {
messages.push(msg as Message);
}
}

// index 2 would mean the third last message sent
const cursorIndex = 2;
const cursorMessage = messages[cursorIndex];

// create cursor to extract messages after the 3rd index
const cursor = await createCursor(
bytesToUtf8(cursorMessage.payload!),
BigInt(cursorMessage.timestamp!.getTime()) * BigInt(1000000),
TestContentTopic
);

const messagesAfterCursor: Message[] = [];
for await (const page of waku.store.queryGenerator([TestDecoder], {
cursor,
})) {
for await (const msg of page.reverse()) {
messagesAfterCursor.push(msg as Message);
}
}

const testMessage = messagesAfterCursor[0];

expect(messages.length).be.eq(totalMsgs);

expect(bytesToUtf8(testMessage.payload!)).to.be.eq(
bytesToUtf8(messages[cursorIndex + 1].payload!)
);
});

it("Callback on promise", async function () {
this.timeout(15_000);

Expand Down