Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat!: support for cursors on store API #1024

Merged
merged 15 commits into from
Nov 17, 2022
Merged
18 changes: 18 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions packages/core/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@
"@libp2p/interfaces": "^3.0.4",
"@libp2p/peer-id": "^1.1.10",
"@multiformats/multiaddr": "^11.0.6",
"@noble/hashes": "^1.1.3",
"@waku/byte-utils": "*",
"@waku/interfaces": "*",
"debug": "^4.3.4",
Expand Down
7 changes: 6 additions & 1 deletion packages/core/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,9 @@ export * as waku_relay from "./lib/waku_relay";
export { wakuRelay } from "./lib/waku_relay";

export * as waku_store from "./lib/waku_store";
export { PageDirection, wakuStore, StoreCodec } from "./lib/waku_store";
export {
PageDirection,
wakuStore,
StoreCodec,
createCursor,
} from "./lib/waku_store";
42 changes: 38 additions & 4 deletions packages/core/src/lib/waku_store/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ import type { Connection } from "@libp2p/interface-connection";
import type { ConnectionManager } from "@libp2p/interface-connection-manager";
import type { PeerId } from "@libp2p/interface-peer-id";
import type { Peer, PeerStore } from "@libp2p/interface-peer-store";
import { DecodedMessage, Decoder, Store } from "@waku/interfaces";
import { sha256 } from "@noble/hashes/sha256";
import { concat, utf8ToBytes } from "@waku/byte-utils";
import { DecodedMessage, Decoder, Index, Store } from "@waku/interfaces";
import debug from "debug";
import all from "it-all";
import * as lp from "it-length-prefixed";
Expand Down Expand Up @@ -80,6 +82,10 @@ export interface QueryOptions {
* Retrieve messages with a timestamp within the provided values.
*/
timeFilter?: TimeFilter;
/**
* Cursor as an index to start a query from.
*/
cursor?: Index;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

non-blocking: did you consider not allowing startTime and endTime if cursor set? Could be worth ensuring with a type.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good idea 👍 @danisharora099

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We mean accepting either the timeFilter or the cursor arg here?

cc @felicio

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We mean accepting either the timeFilter or the cursor arg here?

Yeah, but I saw the @LNSD's comment about it being allowed since then, which made me reconsider.

@LNSD thanks for the great summary at waku-org/nwaku#1400 (comment). Please,

  1. What is meant by "If the cursor timestamps are not included in the time range"? Which cursor timestamps? Is it only senderTime for clients?
  2. If so, shouldn't the senderTime, digest and pubsubTopic be actually required, and not optional, fields for clients?
  3. And in that case, would there really be use for queries combining "time range + cursor"? Or is it then rather "end time + cursor"?
  4. Would/should these combined queries be covered by the planned prepared statements and bindings too?

}

/**
Expand Down Expand Up @@ -258,7 +264,8 @@ class WakuStore implements Store {
connection,
protocol,
queryOpts,
decodersAsMap
decodersAsMap,
options?.cursor
)) {
yield messages;
}
Expand All @@ -281,7 +288,8 @@ async function* paginate<T extends DecodedMessage>(
connection: Connection,
protocol: string,
queryOpts: Params,
decoders: Map<string, Decoder<T>>
decoders: Map<string, Decoder<T>>,
cursor?: Index
): AsyncGenerator<Promise<T | undefined>[]> {
if (
queryOpts.contentTopics.toString() !==
Expand All @@ -292,7 +300,6 @@ async function* paginate<T extends DecodedMessage>(
);
}

let cursor = undefined;
while (true) {
queryOpts = Object.assign(queryOpts, { cursor });

Expand Down Expand Up @@ -382,6 +389,33 @@ export function isDefined<T>(msg: T | undefined): msg is T {
return !!msg;
}

export async function createCursor(
message: DecodedMessage,
pubsubTopic: string = DefaultPubSubTopic
): Promise<Index> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

non-blocking: Did you find the answer to why the cursor was implemented as an object and not a string/hash?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It does contain a digest. At the end of the day, it matches the protobuf interface. I think the inner work of the Index type should be a black box for the API consumer:

  1. Call createCursor and get something
  2. Pass something to the store query

If you have an idea how to improve the API, please do.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had discussed this with @jm-clius earlier and there is indeed an ideal way possible to have the entire arg as a digest of all the 4 fields.
This is simply a design that was implemented earlier and could be changed in the future.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cc @LNSD

Copy link

@LNSD LNSD Nov 25, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI: I plan to simplify the current protocol as part of the nwaku v0.14.0 release. And streamline the cursor handling is one of the reasons behind this protocol overhaul.

if (
!message ||
!message.timestamp ||
!message.payload ||
!message.contentTopic
) {
throw new Error("Message is missing required fields");
}

const contentTopicBytes = utf8ToBytes(message.contentTopic);

const digest = sha256(concat([contentTopicBytes, message.payload]));

const messageTime = BigInt(message.timestamp.getTime()) * BigInt(1000000);

return {
digest,
pubsubTopic,
senderTime: messageTime,
receivedTime: messageTime,
};
}

export function wakuStore(
init: Partial<CreateOptions> = {}
): (components: StoreComponents) => Store {
Expand Down
10 changes: 10 additions & 0 deletions packages/interfaces/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,12 @@ export interface PointToPointProtocol {
peerStore: PeerStore;
peers: () => Promise<Peer[]>;
}
export interface Index {
digest?: Uint8Array;
receivedTime?: bigint;
senderTime?: bigint;
pubsubTopic?: string;
}

export type ProtocolOptions = {
pubSubTopic?: string;
Expand Down Expand Up @@ -74,6 +80,10 @@ export type StoreQueryOptions = {
* Retrieve messages with a timestamp within the provided values.
*/
timeFilter?: TimeFilter;
/**
* Cursor as an index to start a query from.
*/
cursor?: Index;
} & ProtocolOptions;

export interface Store extends PointToPointProtocol {
Expand Down
58 changes: 57 additions & 1 deletion packages/tests/tests/store.node.spec.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { bytesToUtf8, utf8ToBytes } from "@waku/byte-utils";
import { PageDirection } from "@waku/core";
import { createCursor, PageDirection } from "@waku/core";
import { waitForRemotePeer } from "@waku/core/lib/wait_for_remote_peer";
import { DecoderV0, EncoderV0 } from "@waku/core/lib/waku_message/version_0";
import { createFullNode } from "@waku/create";
Expand Down Expand Up @@ -111,6 +111,62 @@ describe("Waku Store", () => {
expect(messages?.length).eq(0);
});

it("Passing a cursor", async function () {
this.timeout(4_000);
const totalMsgs = 20;

for (let i = 0; i < totalMsgs; i++) {
expect(
await nwaku.sendMessage(
Nwaku.toMessageRpcQuery({
payload: utf8ToBytes(`Message ${i}`),
contentTopic: TestContentTopic,
})
)
).to.be.true;
}

waku = await createFullNode({
staticNoiseKey: NOISE_KEY_1,
});
await waku.start();
await waku.dial(await nwaku.getMultiaddrWithId());
await waitForRemotePeer(waku, [Protocols.Store]);

const query = waku.store.queryGenerator([TestDecoder]);

// messages in reversed order (first message at last index)
const messages: DecodedMessage[] = [];
for await (const page of query) {
for await (const msg of page.reverse()) {
messages.push(msg as DecodedMessage);
}
}

// index 2 would mean the third last message sent
const cursorIndex = 2;

// create cursor to extract messages after the 3rd index
const cursor = await createCursor(messages[cursorIndex]);

const messagesAfterCursor: DecodedMessage[] = [];
for await (const page of waku.store.queryGenerator([TestDecoder], {
cursor,
})) {
for await (const msg of page.reverse()) {
messagesAfterCursor.push(msg as DecodedMessage);
}
}

const testMessage = messagesAfterCursor[0];

expect(messages.length).be.eq(totalMsgs);

expect(bytesToUtf8(testMessage.payload!)).to.be.eq(
bytesToUtf8(messages[cursorIndex + 1].payload!)
);
});

it("Callback on promise", async function () {
this.timeout(15_000);

Expand Down