Skip to content

Commit b10c46b

Browse files
fix(store)!: use pubSubTopic from DecodedMessage for createCursor (#1640)
* fix!(store): Cursor: use pubsubtopic from Message * add control to check cursor topic should match decoder * fix
1 parent 0bc4a96 commit b10c46b

File tree

1 file changed

+12
-5
lines changed

1 file changed

+12
-5
lines changed

packages/core/src/lib/store/index.ts

+12-5
Original file line numberDiff line numberDiff line change
@@ -246,6 +246,16 @@ class Store extends BaseProtocol implements IStore {
246246

247247
ensurePubsubTopicIsConfigured(pubSubTopicForQuery, this.pubSubTopics);
248248

249+
// check that the pubSubTopic from the Cursor and Decoder match
250+
if (
251+
options?.cursor?.pubsubTopic &&
252+
options.cursor.pubsubTopic !== pubSubTopicForQuery
253+
) {
254+
throw new Error(
255+
`Cursor pubsub topic (${options?.cursor?.pubsubTopic}) does not match decoder pubsub topic (${pubSubTopicForQuery})`
256+
);
257+
}
258+
249259
const decodersAsMap = new Map();
250260
decoders.forEach((dec) => {
251261
if (decodersAsMap.has(dec.contentTopic)) {
@@ -397,10 +407,7 @@ async function* paginate<T extends IDecodedMessage>(
397407
}
398408
}
399409

400-
export async function createCursor(
401-
message: IDecodedMessage,
402-
pubsubTopic: string = DefaultPubSubTopic
403-
): Promise<Cursor> {
410+
export async function createCursor(message: IDecodedMessage): Promise<Cursor> {
404411
if (
405412
!message ||
406413
!message.timestamp ||
@@ -418,7 +425,7 @@ export async function createCursor(
418425

419426
return {
420427
digest,
421-
pubsubTopic,
428+
pubsubTopic: message.pubSubTopic,
422429
senderTime: messageTime,
423430
receiverTime: messageTime
424431
};

0 commit comments

Comments
 (0)