From 4c52a0525f7ea6c3834b2fd7fbd478e95e4e38dd Mon Sep 17 00:00:00 2001 From: balag3 Date: Sat, 1 Jul 2023 21:51:20 +0200 Subject: [PATCH 1/3] Refactors message decoding to abort as soon as a suitable decoder found. #1369 --- packages/core/src/lib/filter/v1/index.ts | 37 +++++++++++------------ packages/core/src/lib/filter/v2/index.ts | 38 +++++++++++------------- 2 files changed, 34 insertions(+), 41 deletions(-) diff --git a/packages/core/src/lib/filter/v1/index.ts b/packages/core/src/lib/filter/v1/index.ts index 6afaebe4f2..ac0ef03670 100644 --- a/packages/core/src/lib/filter/v1/index.ts +++ b/packages/core/src/lib/filter/v1/index.ts @@ -195,26 +195,23 @@ class Filter extends BaseProtocol implements IFilter { log("Message has no content topic, skipping"); return; } - - let didDecodeMsg = false; - // We don't want to wait for decoding failure, just attempt to decode - // all messages and do the call back on the one that works - // noinspection ES6MissingAwait - decoders.forEach(async (dec: IDecoder) => { - if (didDecodeMsg) return; - const decoded = await dec.fromProtoObj( - pubSubTopic, - toProtoMessage(protoMessage) - ); - if (!decoded) { - log("Not able to decode message"); - return; - } - // This is just to prevent more decoding attempt - // TODO: Could be better if we were to abort promises - didDecodeMsg = Boolean(decoded); - await callback(decoded); - }); + Promise.any( + decoders.map(async (dec) => { + dec + .fromProtoObj(pubSubTopic, toProtoMessage(protoMessage)) + .then((decoded) => + decoded + ? Promise.resolve(decoded) + : Promise.reject(new Error("Decoding failed")) + ); + }) + ) + .then(async (decodedMessage) => { + await callback(decodedMessage); + }) + .catch((e) => { + log("Error decoding message", e); + }); } } diff --git a/packages/core/src/lib/filter/v2/index.ts b/packages/core/src/lib/filter/v2/index.ts index 7fa1862255..35e17d62a6 100644 --- a/packages/core/src/lib/filter/v2/index.ts +++ b/packages/core/src/lib/filter/v2/index.ts @@ -389,25 +389,21 @@ async function pushMessage( log("Message has no content topic, skipping"); return; } - - let didDecodeMsg = false; - // We don't want to wait for decoding failure, just attempt to decode - // all messages and do the call back on the one that works - // noinspection ES6MissingAwait - decoders.forEach(async (dec: IDecoder) => { - if (didDecodeMsg) return; - const decoded = await dec.fromProtoObj( - pubSubTopic, - message as IProtoMessage - ); - // const decoded = await dec.fromProtoObj(pubSubTopic, message); - if (!decoded) { - log("Not able to decode message"); - return; - } - // This is just to prevent more decoding attempt - // TODO: Could be better if we were to abort promises - didDecodeMsg = Boolean(decoded); - await callback(decoded); - }); + Promise.any( + decoders.map(async (dec) => { + dec + .fromProtoObj(pubSubTopic, message as IProtoMessage) + .then((decoded) => + decoded + ? Promise.resolve(decoded) + : Promise.reject(new Error("Decoding failed")) + ); + }) + ) + .then(async (decodedMessage) => { + await callback(decodedMessage); + }) + .catch((e) => { + log("Error decoding message", e); + }); } From 79eaf3ea9a932afa012c7faedb38843d4346e4f4 Mon Sep 17 00:00:00 2001 From: danisharora099 Date: Fri, 28 Jul 2023 01:02:44 +0530 Subject: [PATCH 2/3] fix: return from the function --- packages/core/src/lib/filter/index.ts | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/packages/core/src/lib/filter/index.ts b/packages/core/src/lib/filter/index.ts index 293f99ab56..d1ae8f4f8c 100644 --- a/packages/core/src/lib/filter/index.ts +++ b/packages/core/src/lib/filter/index.ts @@ -385,7 +385,7 @@ async function pushMessage( } try { const decodedMessage = await Promise.any( - decoders.map(async (dec) => { + decoders.map((dec) => dec .fromProtoObj(pubSubTopic, message as IProtoMessage) .then((decoded) => @@ -393,8 +393,7 @@ async function pushMessage( ? Promise.resolve(decoded) : Promise.reject(new Error("Decoding failed")) ) - .catch((e) => log("Decoding failed", e)); - }) + ) ); await callback(decodedMessage); } catch (e) { From 05f1731e6c530852b04d5dfa0763b2414258e5a6 Mon Sep 17 00:00:00 2001 From: danisharora099 Date: Fri, 28 Jul 2023 01:11:10 +0530 Subject: [PATCH 3/3] improve readability --- packages/core/src/lib/filter/index.ts | 18 ++++++++---------- 1 file changed, 8 insertions(+), 10 deletions(-) diff --git a/packages/core/src/lib/filter/index.ts b/packages/core/src/lib/filter/index.ts index d1ae8f4f8c..02f44952fe 100644 --- a/packages/core/src/lib/filter/index.ts +++ b/packages/core/src/lib/filter/index.ts @@ -383,18 +383,16 @@ async function pushMessage( log("Message has no content topic, skipping"); return; } + try { - const decodedMessage = await Promise.any( - decoders.map((dec) => - dec - .fromProtoObj(pubSubTopic, message as IProtoMessage) - .then((decoded) => - decoded - ? Promise.resolve(decoded) - : Promise.reject(new Error("Decoding failed")) - ) - ) + const decodePromises = decoders.map((dec) => + dec + .fromProtoObj(pubSubTopic, message as IProtoMessage) + .then((decoded) => decoded || Promise.reject("Decoding failed")) ); + + const decodedMessage = await Promise.any(decodePromises); + await callback(decodedMessage); } catch (e) { log("Error decoding message", e);