Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat!: support for cursors on store API #1024

Merged
merged 15 commits into from
Nov 17, 2022
Merged
12 changes: 12 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion packages/core/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,6 @@
"node": ">=16"
},
"dependencies": {
"@waku/byte-utils": "*",
"@chainsafe/libp2p-gossipsub": "^4.1.1",
"@libp2p/interface-connection": "^3.0.3",
"@libp2p/interface-peer-discovery": "^1.0.0",
Expand All @@ -97,8 +96,10 @@
"@libp2p/interfaces": "^3.0.2",
"@libp2p/peer-id": "^1.1.10",
"@multiformats/multiaddr": "^11.0.6",
"@waku/byte-utils": "*",
"@waku/interfaces": "*",
"debug": "^4.3.4",
"fast-sha256": "^1.3.0",
"it-all": "^1.0.6",
"it-length-prefixed": "^8.0.2",
"it-pipe": "^2.0.4",
Expand Down
7 changes: 6 additions & 1 deletion packages/core/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,9 @@ export * as waku_relay from "./lib/waku_relay";
export { WakuRelay } from "./lib/waku_relay";

export * as waku_store from "./lib/waku_store";
export { PageDirection, WakuStore, StoreCodec } from "./lib/waku_store";
export {
PageDirection,
WakuStore,
StoreCodec,
createCursor,
} from "./lib/waku_store";
30 changes: 27 additions & 3 deletions packages/core/src/lib/waku_store/index.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import type { Connection } from "@libp2p/interface-connection";
import type { PeerId } from "@libp2p/interface-peer-id";
import { Peer } from "@libp2p/interface-peer-store";
import { utf8ToBytes } from "@waku/byte-utils";
import { DecodedMessage, Decoder } from "@waku/interfaces";
import debug from "debug";
import sha256 from "fast-sha256";
import all from "it-all";
import * as lp from "it-length-prefixed";
import { pipe } from "it-pipe";
Expand Down Expand Up @@ -75,6 +77,10 @@ export interface QueryOptions {
* Retrieve messages with a timestamp within the provided values.
*/
timeFilter?: TimeFilter;
/**
* Cursor as an index to start a query from.
*/
cursor?: proto.Index;
}

/**
Expand Down Expand Up @@ -251,7 +257,8 @@ export class WakuStore {
connection,
protocol,
queryOpts,
decodersAsMap
decodersAsMap,
options?.cursor
)) {
yield messages;
}
Expand All @@ -270,7 +277,8 @@ async function* paginate<T extends DecodedMessage>(
connection: Connection,
protocol: string,
queryOpts: Params,
decoders: Map<string, Decoder<T>>
decoders: Map<string, Decoder<T>>,
cursor?: proto.Index
): AsyncGenerator<Promise<T | undefined>[]> {
if (
queryOpts.contentTopics.toString() !==
Expand All @@ -281,7 +289,6 @@ async function* paginate<T extends DecodedMessage>(
);
}

let cursor = undefined;
while (true) {
queryOpts = Object.assign(queryOpts, { cursor });

Expand Down Expand Up @@ -370,3 +377,20 @@ async function* paginate<T extends DecodedMessage>(
export function isDefined<T>(msg: T | undefined): msg is T {
return !!msg;
}

export async function createCursor(
message: string,
messageTimestamp: bigint,
contentTopic: string,
pubsubTopic: string = DefaultPubSubTopic
): Promise<proto.Index> {
const contentTopicBytes = utf8ToBytes(contentTopic);
const messageBytes = utf8ToBytes(message);
const digest = sha256(Buffer.concat([contentTopicBytes, messageBytes]));

return {
digest,
pubsubTopic,
senderTime: messageTimestamp,
};
}
10 changes: 10 additions & 0 deletions packages/interfaces/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,12 @@ export interface PointToPointProtocol {
libp2p: Libp2p;
peers: () => Promise<Peer[]>;
}
export interface Index {
digest?: Uint8Array;
receivedTime?: bigint;
senderTime?: bigint;
pubsubTopic?: string;
}

export type ProtocolOptions = {
pubSubTopic?: string;
Expand Down Expand Up @@ -73,6 +79,10 @@ export type StoreQueryOptions = {
* Retrieve messages with a timestamp within the provided values.
*/
timeFilter?: TimeFilter;
/**
* Cursor as an index to start a query from.
*/
cursor?: Index;
} & ProtocolOptions;

export interface Store extends PointToPointProtocol {
Expand Down
2 changes: 2 additions & 0 deletions packages/message-encryption/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ export class MessageV1 extends MessageV0 implements DecodedMessage {
}
}

export { sha256 } from "./crypto";

export class AsymEncoder implements Encoder {
constructor(
public contentTopic: string,
Expand Down
Loading