Skip to content

Commit 2389977

Browse files
chore!: refactor store protocol for readability (#1456)
* refactor store protocol for readability * update interface * fix: test * rm: comments * Update packages/core/src/lib/store/index.ts Co-authored-by: fryorcraken <110212804+fryorcraken@users.noreply.github.com> * fix commit --------- Co-authored-by: fryorcraken <110212804+fryorcraken@users.noreply.github.com>
1 parent 45baa76 commit 2389977

File tree

3 files changed

+66
-45
lines changed

3 files changed

+66
-45
lines changed

packages/core/src/lib/store/index.ts

+57-36
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,54 @@ class Store extends BaseProtocol implements IStore {
8686
this.options = options ?? {};
8787
}
8888

89+
/**
90+
* Processes messages based on the provided callback and options.
91+
* @private
92+
*/
93+
private async processMessages<T extends IDecodedMessage>(
94+
messages: Promise<T | undefined>[],
95+
callback: (message: T) => Promise<void | boolean> | boolean | void,
96+
options?: QueryOptions
97+
): Promise<boolean> {
98+
let abort = false;
99+
const messagesOrUndef: Array<T | undefined> = await Promise.all(messages);
100+
let processedMessages: Array<T> = messagesOrUndef.filter(isDefined);
101+
102+
if (this.shouldReverseOrder(options)) {
103+
processedMessages = processedMessages.reverse();
104+
}
105+
106+
await Promise.all(
107+
processedMessages.map(async (msg) => {
108+
if (msg && !abort) {
109+
abort = Boolean(await callback(msg));
110+
}
111+
})
112+
);
113+
114+
return abort;
115+
}
116+
117+
/**
118+
* Determines whether to reverse the order of messages based on the provided options.
119+
*
120+
* Messages in pages are ordered from oldest (first) to most recent (last).
121+
* https://github.com/vacp2p/rfc/issues/533
122+
*
123+
* @private
124+
*/
125+
private shouldReverseOrder(options?: QueryOptions): boolean {
126+
return (
127+
typeof options?.pageDirection === "undefined" ||
128+
options?.pageDirection === PageDirection.BACKWARD
129+
);
130+
}
131+
132+
/**
133+
* @deprecated Use `queryWithOrderedCallback` instead
134+
**/
135+
queryOrderedCallback = this.queryWithOrderedCallback;
136+
89137
/**
90138
* Do a query to a Waku Store to retrieve historical/missed messages.
91139
*
@@ -103,42 +151,20 @@ class Store extends BaseProtocol implements IStore {
103151
* or if an error is encountered when processing the reply,
104152
* or if two decoders with the same content topic are passed.
105153
*/
106-
async queryOrderedCallback<T extends IDecodedMessage>(
154+
async queryWithOrderedCallback<T extends IDecodedMessage>(
107155
decoders: IDecoder<T>[],
108156
callback: (message: T) => Promise<void | boolean> | boolean | void,
109157
options?: QueryOptions
110158
): Promise<void> {
111-
let abort = false;
112159
for await (const promises of this.queryGenerator(decoders, options)) {
113-
if (abort) break;
114-
const messagesOrUndef: Array<T | undefined> = await Promise.all(promises);
115-
116-
let messages: Array<T> = messagesOrUndef.filter(isDefined);
117-
118-
// Messages in pages are ordered from oldest (first) to most recent (last).
119-
// https://github.com/vacp2p/rfc/issues/533
120-
if (
121-
typeof options?.pageDirection === "undefined" ||
122-
options?.pageDirection === PageDirection.BACKWARD
123-
) {
124-
messages = messages.reverse();
125-
}
126-
127-
await Promise.all(
128-
messages.map(async (msg) => {
129-
if (msg && !abort) {
130-
abort = Boolean(await callback(msg));
131-
}
132-
})
133-
);
160+
if (await this.processMessages(promises, callback, options)) break;
134161
}
135162
}
136163

137164
/**
138165
* Do a query to a Waku Store to retrieve historical/missed messages.
139-
*
140166
* The callback function takes a `Promise<WakuMessage>` in input,
141-
* useful if messages needs to be decrypted and performance matters.
167+
* useful if messages need to be decrypted and performance matters.
142168
*
143169
* The order of the messages passed to the callback is as follows:
144170
* - within a page, messages are expected to be ordered from oldest to most recent
@@ -152,25 +178,23 @@ class Store extends BaseProtocol implements IStore {
152178
* or if an error is encountered when processing the reply,
153179
* or if two decoders with the same content topic are passed.
154180
*/
155-
async queryCallbackOnPromise<T extends IDecodedMessage>(
181+
async queryWithPromiseCallback<T extends IDecodedMessage>(
156182
decoders: IDecoder<T>[],
157183
callback: (
158184
message: Promise<T | undefined>
159185
) => Promise<void | boolean> | boolean | void,
160186
options?: QueryOptions
161187
): Promise<void> {
162188
let abort = false;
163-
let promises: Promise<void>[] = [];
164189
for await (const page of this.queryGenerator(decoders, options)) {
165-
const _promises = page.map(async (msg) => {
166-
if (!abort) {
167-
abort = Boolean(await callback(msg));
168-
}
190+
const _promises = page.map(async (msgPromise) => {
191+
if (abort) return;
192+
abort = Boolean(await callback(msgPromise));
169193
});
170194

171-
promises = promises.concat(_promises);
195+
await Promise.all(_promises);
196+
if (abort) break;
172197
}
173-
await Promise.all(promises);
174198
}
175199

176200
/**
@@ -183,9 +207,6 @@ class Store extends BaseProtocol implements IStore {
183207
* as follows:
184208
* - within a page, messages SHOULD be ordered from oldest to most recent
185209
* - pages direction depends on { @link QueryOptions.pageDirection }
186-
*
187-
* However, there is no way to guarantee the behavior of the remote node.
188-
*
189210
* @throws If not able to reach a Waku Store peer to query,
190211
* or if an error is encountered when processing the reply,
191212
* or if two decoders with the same content topic are passed.

packages/interfaces/src/store.ts

+2-2
Original file line numberDiff line numberDiff line change
@@ -46,12 +46,12 @@ export type StoreQueryOptions = {
4646
} & ProtocolOptions;
4747

4848
export interface IStore extends IBaseProtocol {
49-
queryOrderedCallback: <T extends IDecodedMessage>(
49+
queryWithOrderedCallback: <T extends IDecodedMessage>(
5050
decoders: IDecoder<T>[],
5151
callback: (message: T) => Promise<void | boolean> | boolean | void,
5252
options?: StoreQueryOptions
5353
) => Promise<void>;
54-
queryCallbackOnPromise: <T extends IDecodedMessage>(
54+
queryWithPromiseCallback: <T extends IDecodedMessage>(
5555
decoders: IDecoder<T>[],
5656
callback: (
5757
message: Promise<T | undefined>

packages/tests/tests/store.node.spec.ts

+7-7
Original file line numberDiff line numberDiff line change
@@ -204,7 +204,7 @@ describe("Waku Store", () => {
204204
await waitForRemotePeer(waku, [Protocols.Store]);
205205

206206
const messages: IMessage[] = [];
207-
await waku.store.queryCallbackOnPromise(
207+
await waku.store.queryWithPromiseCallback(
208208
[TestDecoder],
209209
async (msgPromise) => {
210210
const msg = await msgPromise;
@@ -246,7 +246,7 @@ describe("Waku Store", () => {
246246

247247
const desiredMsgs = 14;
248248
const messages: IMessage[] = [];
249-
await waku.store.queryCallbackOnPromise(
249+
await waku.store.queryWithPromiseCallback(
250250
[TestDecoder],
251251
async (msgPromise) => {
252252
const msg = await msgPromise;
@@ -285,7 +285,7 @@ describe("Waku Store", () => {
285285
await waitForRemotePeer(waku, [Protocols.Store]);
286286

287287
const messages: IMessage[] = [];
288-
await waku.store.queryOrderedCallback(
288+
await waku.store.queryWithOrderedCallback(
289289
[TestDecoder],
290290
async (msg) => {
291291
messages.push(msg);
@@ -324,7 +324,7 @@ describe("Waku Store", () => {
324324
await waitForRemotePeer(waku, [Protocols.Store]);
325325

326326
let messages: IMessage[] = [];
327-
await waku.store.queryOrderedCallback(
327+
await waku.store.queryWithOrderedCallback(
328328
[TestDecoder],
329329
async (msg) => {
330330
messages.push(msg);
@@ -491,7 +491,7 @@ describe("Waku Store", () => {
491491
const nwakuPeerId = await nwaku.getPeerId();
492492

493493
const firstMessages: IMessage[] = [];
494-
await waku.store.queryOrderedCallback(
494+
await waku.store.queryWithOrderedCallback(
495495
[TestDecoder],
496496
(msg) => {
497497
if (msg) {
@@ -505,7 +505,7 @@ describe("Waku Store", () => {
505505
);
506506

507507
const bothMessages: IMessage[] = [];
508-
await waku.store.queryOrderedCallback(
508+
await waku.store.queryWithOrderedCallback(
509509
[TestDecoder],
510510
async (msg) => {
511511
bothMessages.push(msg);
@@ -552,7 +552,7 @@ describe("Waku Store", () => {
552552

553553
const desiredMsgs = 14;
554554
const messages: IMessage[] = [];
555-
await waku.store.queryOrderedCallback(
555+
await waku.store.queryWithOrderedCallback(
556556
[TestDecoder],
557557
async (msg) => {
558558
messages.push(msg);

0 commit comments

Comments
 (0)