From 4da84ec6417298068ac9e14af7369a00dafb156f Mon Sep 17 00:00:00 2001
From: danisharora099 <danisharora099@gmail.com>
Date: Tue, 30 Jul 2024 15:06:52 +0530
Subject: [PATCH 1/7] chore: rename IReceiver subscribe

---
 packages/interfaces/src/receiver.ts  |  2 +-
 packages/relay/src/index.ts          |  2 +-
 packages/sdk/src/protocols/filter.ts | 33 ++++++++++++++--------------
 3 files changed, 18 insertions(+), 19 deletions(-)

diff --git a/packages/interfaces/src/receiver.ts b/packages/interfaces/src/receiver.ts
index f329460b4e..476780b3e6 100644
--- a/packages/interfaces/src/receiver.ts
+++ b/packages/interfaces/src/receiver.ts
@@ -13,7 +13,7 @@ export interface IReceiver {
   toSubscriptionIterator: <T extends IDecodedMessage>(
     decoders: IDecoder<T> | IDecoder<T>[]
   ) => Promise<IAsyncIterator<T>>;
-  subscribe: <T extends IDecodedMessage>(
+  subscribeWithUnsubscribe: <T extends IDecodedMessage>(
     decoders: IDecoder<T> | IDecoder<T>[],
     callback: Callback<T>
   ) => Unsubscribe | Promise<Unsubscribe>;
diff --git a/packages/relay/src/index.ts b/packages/relay/src/index.ts
index c8c1f39238..d5b0e25300 100644
--- a/packages/relay/src/index.ts
+++ b/packages/relay/src/index.ts
@@ -148,7 +148,7 @@ class Relay implements IRelay {
     };
   }
 
-  public subscribe<T extends IDecodedMessage>(
+  public subscribeWithUnsubscribe<T extends IDecodedMessage>(
     decoders: IDecoder<T> | IDecoder<T>[],
     callback: Callback<T>
   ): () => void {
diff --git a/packages/sdk/src/protocols/filter.ts b/packages/sdk/src/protocols/filter.ts
index c40661a728..9a5c472d28 100644
--- a/packages/sdk/src/protocols/filter.ts
+++ b/packages/sdk/src/protocols/filter.ts
@@ -448,21 +448,6 @@ class FilterSDK extends BaseProtocolSDK implements IFilterSDK {
     this.activeSubscriptions = new Map();
   }
 
-  //TODO: move to SubscriptionManager
-  private getActiveSubscription(
-    pubsubTopic: PubsubTopic
-  ): SubscriptionManager | undefined {
-    return this.activeSubscriptions.get(pubsubTopic);
-  }
-
-  private setActiveSubscription(
-    pubsubTopic: PubsubTopic,
-    subscription: SubscriptionManager
-  ): SubscriptionManager {
-    this.activeSubscriptions.set(pubsubTopic, subscription);
-    return subscription;
-  }
-
   /**
    * Creates a new subscription to the given pubsub topic.
    * The subscription is made to multiple peers for decentralization.
@@ -516,7 +501,6 @@ class FilterSDK extends BaseProtocolSDK implements IFilterSDK {
     };
   }
 
-  //TODO: remove this dependency on IReceiver
   /**
    * This method is used to satisfy the `IReceiver` interface.
    *
@@ -532,7 +516,7 @@ class FilterSDK extends BaseProtocolSDK implements IFilterSDK {
    * This method should not be used directly.
    * Instead, use `createSubscription` to create a new subscription.
    */
-  public async subscribe<T extends IDecodedMessage>(
+  public async subscribeWithUnsubscribe<T extends IDecodedMessage>(
     decoders: IDecoder<T> | IDecoder<T>[],
     callback: Callback<T>,
     options: SubscribeOptions = DEFAULT_SUBSCRIBE_OPTIONS
@@ -578,6 +562,21 @@ class FilterSDK extends BaseProtocolSDK implements IFilterSDK {
     return toAsyncIterator(this, decoders);
   }
 
+  //TODO: move to SubscriptionManager
+  private getActiveSubscription(
+    pubsubTopic: PubsubTopic
+  ): SubscriptionManager | undefined {
+    return this.activeSubscriptions.get(pubsubTopic);
+  }
+
+  private setActiveSubscription(
+    pubsubTopic: PubsubTopic,
+    subscription: SubscriptionManager
+  ): SubscriptionManager {
+    this.activeSubscriptions.set(pubsubTopic, subscription);
+    return subscription;
+  }
+
   private getUniquePubsubTopics<T extends IDecodedMessage>(
     decoders: IDecoder<T> | IDecoder<T>[]
   ): string[] {

From eacfcae7d953dbddc418c3e642b0900d5fc4305e Mon Sep 17 00:00:00 2001
From: danisharora099 <danisharora099@gmail.com>
Date: Tue, 30 Jul 2024 15:44:47 +0530
Subject: [PATCH 2/7] feat!: new `subscribe() API that only takes in decoders
 and callback

---
 packages/interfaces/src/filter.ts    | 19 +++---
 packages/interfaces/src/protocols.ts |  5 ++
 packages/sdk/src/protocols/filter.ts | 92 ++++++++++++++++++++++++++++
 3 files changed, 109 insertions(+), 7 deletions(-)

diff --git a/packages/interfaces/src/filter.ts b/packages/interfaces/src/filter.ts
index 2b770b30c1..691470b1ea 100644
--- a/packages/interfaces/src/filter.ts
+++ b/packages/interfaces/src/filter.ts
@@ -1,15 +1,14 @@
 import type { PeerId } from "@libp2p/interface";
 
 import type { IDecodedMessage, IDecoder } from "./message.js";
-import type { ContentTopic, PubsubTopic, ThisOrThat } from "./misc.js";
+import type { ContentTopic, ThisOrThat } from "./misc.js";
 import type {
   Callback,
   IBaseProtocolCore,
   IBaseProtocolSDK,
   ProtocolError,
   ProtocolUseOptions,
-  SDKProtocolResult,
-  ShardingParams
+  SDKProtocolResult
 } from "./protocols.js";
 import type { IReceiver } from "./receiver.js";
 
@@ -37,12 +36,18 @@ export interface ISubscriptionSDK {
 
 export type IFilterSDK = IReceiver &
   IBaseProtocolSDK & { protocol: IBaseProtocolCore } & {
-    createSubscription(
-      pubsubTopicShardInfo?: ShardingParams | PubsubTopic,
-      options?: ProtocolUseOptions
-    ): Promise<CreateSubscriptionResult>;
+    subscribe<T extends IDecodedMessage>(
+      decoders: IDecoder<T> | IDecoder<T>[],
+      callback: Callback<T>,
+      protocolUseOptions?: ProtocolUseOptions,
+      subscribeOptions?: SubscribeOptions
+    ): Promise<SubscribeResult>;
   };
 
+export type SubscribeResult = CreateSubscriptionResult & {
+  results?: SDKProtocolResult;
+};
+
 export type CreateSubscriptionResult = ThisOrThat<
   "subscription",
   ISubscriptionSDK,
diff --git a/packages/interfaces/src/protocols.ts b/packages/interfaces/src/protocols.ts
index a9bdeda2e1..6d0a051513 100644
--- a/packages/interfaces/src/protocols.ts
+++ b/packages/interfaces/src/protocols.ts
@@ -173,6 +173,11 @@ export enum ProtocolError {
    * Ensure that the pubsub topic used for decoder creation is the same as the one used for protocol.
    */
   TOPIC_DECODER_MISMATCH = "Topic decoder mismatch",
+  /**
+   * The topics passed in the decoders do not match each other, or don't exist at all.
+   * Ensure that all the pubsub topics used in the decoders are valid and match each other.
+   */
+  INVALID_DECODER_TOPICS = "Invalid decoder topics",
   /**
    * Failure to find a peer with suitable protocols. This may due to a connection issue.
    * Mitigation can be: retrying after a given time period, display connectivity issue
diff --git a/packages/sdk/src/protocols/filter.ts b/packages/sdk/src/protocols/filter.ts
index 9a5c472d28..d575c3446b 100644
--- a/packages/sdk/src/protocols/filter.ts
+++ b/packages/sdk/src/protocols/filter.ts
@@ -21,6 +21,7 @@ import {
   type SDKProtocolResult,
   type ShardingParams,
   type SubscribeOptions,
+  SubscribeResult,
   type Unsubscribe
 } from "@waku/interfaces";
 import { messageHashStr } from "@waku/message-hash";
@@ -448,6 +449,97 @@ class FilterSDK extends BaseProtocolSDK implements IFilterSDK {
     this.activeSubscriptions = new Map();
   }
 
+  /**
+   * Opens a subscription with the Filter protocol using the provided decoders and callback.
+   * This method combines the functionality of creating a subscription and subscribing to it.
+   *
+   * @param {IDecoder<T> | IDecoder<T>[]} decoders - A single decoder or an array of decoders to use for decoding messages.
+   * @param {Callback<T>} callback - The callback function to be invoked with decoded messages.
+   * @param {ProtocolUseOptions} [protocolUseOptions] - Optional settings for using the protocol.
+   * @param {SubscribeOptions} [subscribeOptions=DEFAULT_SUBSCRIBE_OPTIONS] - Options for the subscription.
+   *
+   * @returns {Promise<SubscribeResult>} A promise that resolves to an object containing:
+   *   - subscription: The created subscription object if successful, or null if failed.
+   *   - error: A ProtocolError if the subscription creation failed, or null if successful.
+   *   - results: An object containing arrays of failures and successes from the subscription process.
+   *     Only present if the subscription was created successfully.
+   *
+   * @throws {Error} If there's an unexpected error during the subscription process.
+   *
+   * @remarks
+   * This method attempts to create a subscription using the pubsub topic derived from the provided decoders,
+   * then tries to subscribe using the created subscription. The return value should be interpreted as follows:
+   * - If `subscription` is null and `error` is non-null, a critical error occurred and the subscription failed completely.
+   * - If `subscription` is non-null and `error` is null, the subscription was created successfully.
+   *   In this case, check the `results` field for detailed information about successes and failures during the subscription process.
+   * - Even if the subscription was created successfully, there might be some failures in the results.
+   *
+   * @example
+   * ```typescript
+   * const {subscription, error, results} = await waku.filter.subscribe(decoders, callback);
+   * if (!subscription || error) {
+   *   console.error("Failed to create subscription:", error);
+   * }
+   * console.log("Subscription created successfully");
+   * if (results.failures.length > 0) {
+   *   console.warn("Some errors occurred during subscription:", results.failures);
+   * }
+   * console.log("Successful subscriptions:", results.successes);
+   *
+   * ```
+   */
+  public async subscribe<T extends IDecodedMessage>(
+    decoders: IDecoder<T> | IDecoder<T>[],
+    callback: Callback<T>,
+    protocolUseOptions?: ProtocolUseOptions,
+    subscribeOptions: SubscribeOptions = DEFAULT_SUBSCRIBE_OPTIONS
+  ): Promise<SubscribeResult> {
+    const uniquePubsubTopics = this.getUniquePubsubTopics(decoders);
+
+    //TODO: check error codes
+    if (uniquePubsubTopics.length === 0) {
+      return {
+        subscription: null,
+        error: ProtocolError.INVALID_DECODER_TOPICS
+      };
+    }
+
+    if (uniquePubsubTopics.length > 1) {
+      return {
+        subscription: null,
+        error: ProtocolError.INVALID_DECODER_TOPICS
+      };
+    }
+
+    const pubsubTopic = uniquePubsubTopics[0];
+
+    const { subscription, error } = await this.createSubscription(
+      pubsubTopic,
+      protocolUseOptions
+    );
+
+    if (error) {
+      return {
+        subscription: null,
+        error: error
+      };
+    }
+
+    const { failures, successes } = await subscription.subscribe(
+      decoders,
+      callback,
+      subscribeOptions
+    );
+    return {
+      subscription,
+      error: null,
+      results: {
+        failures: failures,
+        successes: successes
+      }
+    };
+  }
+
   /**
    * Creates a new subscription to the given pubsub topic.
    * The subscription is made to multiple peers for decentralization.

From 6b749c91dfdade107f331bf1116bd5a2ca8427d3 Mon Sep 17 00:00:00 2001
From: danisharora099 <danisharora099@gmail.com>
Date: Tue, 30 Jul 2024 15:51:20 +0530
Subject: [PATCH 3/7] chore: `to_async_iterator` uses new function name

---
 packages/utils/src/common/to_async_iterator.ts | 9 ++++++---
 1 file changed, 6 insertions(+), 3 deletions(-)

diff --git a/packages/utils/src/common/to_async_iterator.ts b/packages/utils/src/common/to_async_iterator.ts
index 489ef234ef..a0226369a8 100644
--- a/packages/utils/src/common/to_async_iterator.ts
+++ b/packages/utils/src/common/to_async_iterator.ts
@@ -37,9 +37,12 @@ export async function toAsyncIterator<T extends IDecodedMessage>(
   const messages: T[] = [];
 
   let unsubscribe: undefined | Unsubscribe;
-  unsubscribe = await receiver.subscribe(decoder, (message: T) => {
-    messages.push(message);
-  });
+  unsubscribe = await receiver.subscribeWithUnsubscribe(
+    decoder,
+    (message: T) => {
+      messages.push(message);
+    }
+  );
 
   const isWithTimeout = Number.isInteger(iteratorOptions?.timeoutMs);
   const timeoutMs = iteratorOptions?.timeoutMs ?? 0;

From c4aa41124ccc84c33f3cd369d082c85a5fc7fc8b Mon Sep 17 00:00:00 2001
From: danisharora099 <danisharora099@gmail.com>
Date: Tue, 30 Jul 2024 15:51:38 +0530
Subject: [PATCH 4/7] chore: make `createSubscription` private, and shorten
 error handling

---
 packages/sdk/src/protocols/filter.ts | 12 ++----------
 1 file changed, 2 insertions(+), 10 deletions(-)

diff --git a/packages/sdk/src/protocols/filter.ts b/packages/sdk/src/protocols/filter.ts
index d575c3446b..63fb93163f 100644
--- a/packages/sdk/src/protocols/filter.ts
+++ b/packages/sdk/src/protocols/filter.ts
@@ -496,15 +496,7 @@ class FilterSDK extends BaseProtocolSDK implements IFilterSDK {
   ): Promise<SubscribeResult> {
     const uniquePubsubTopics = this.getUniquePubsubTopics(decoders);
 
-    //TODO: check error codes
-    if (uniquePubsubTopics.length === 0) {
-      return {
-        subscription: null,
-        error: ProtocolError.INVALID_DECODER_TOPICS
-      };
-    }
-
-    if (uniquePubsubTopics.length > 1) {
+    if (uniquePubsubTopics.length !== 1) {
       return {
         subscription: null,
         error: ProtocolError.INVALID_DECODER_TOPICS
@@ -546,7 +538,7 @@ class FilterSDK extends BaseProtocolSDK implements IFilterSDK {
    * @param pubsubTopicShardInfo The pubsub topic to subscribe to.
    * @returns The subscription object.
    */
-  public async createSubscription(
+  private async createSubscription(
     pubsubTopicShardInfo: ShardingParams | PubsubTopic,
     options?: ProtocolUseOptions
   ): Promise<CreateSubscriptionResult> {

From b904bed6f714dec9db31fa243b9dc442e727f8fa Mon Sep 17 00:00:00 2001
From: danisharora099 <danisharora099@gmail.com>
Date: Tue, 30 Jul 2024 16:09:00 +0530
Subject: [PATCH 5/7] chore: update subscribe return type

---
 packages/interfaces/src/filter.ts    | 14 ++++++++++++--
 packages/sdk/src/protocols/filter.ts |  6 ++++--
 2 files changed, 16 insertions(+), 4 deletions(-)

diff --git a/packages/interfaces/src/filter.ts b/packages/interfaces/src/filter.ts
index 691470b1ea..67946a02a9 100644
--- a/packages/interfaces/src/filter.ts
+++ b/packages/interfaces/src/filter.ts
@@ -44,8 +44,18 @@ export type IFilterSDK = IReceiver &
     ): Promise<SubscribeResult>;
   };
 
-export type SubscribeResult = CreateSubscriptionResult & {
-  results?: SDKProtocolResult;
+export type SubscribeResult = SubscriptionSuccess | SubscriptionError;
+
+type SubscriptionSuccess = {
+  subscription: ISubscriptionSDK;
+  error: null;
+  results: SDKProtocolResult;
+};
+
+type SubscriptionError = {
+  subscription: null;
+  error: ProtocolError;
+  results: null;
 };
 
 export type CreateSubscriptionResult = ThisOrThat<
diff --git a/packages/sdk/src/protocols/filter.ts b/packages/sdk/src/protocols/filter.ts
index 63fb93163f..36a3d6f820 100644
--- a/packages/sdk/src/protocols/filter.ts
+++ b/packages/sdk/src/protocols/filter.ts
@@ -499,7 +499,8 @@ class FilterSDK extends BaseProtocolSDK implements IFilterSDK {
     if (uniquePubsubTopics.length !== 1) {
       return {
         subscription: null,
-        error: ProtocolError.INVALID_DECODER_TOPICS
+        error: ProtocolError.INVALID_DECODER_TOPICS,
+        results: null
       };
     }
 
@@ -513,7 +514,8 @@ class FilterSDK extends BaseProtocolSDK implements IFilterSDK {
     if (error) {
       return {
         subscription: null,
-        error: error
+        error: error,
+        results: null
       };
     }
 

From 2597ba3c8fc0be10d0b3629df8a61973d3495f4e Mon Sep 17 00:00:00 2001
From: danisharora099 <danisharora099@gmail.com>
Date: Tue, 30 Jul 2024 16:38:05 +0530
Subject: [PATCH 6/7] tests: use new API

---
 packages/interfaces/src/receiver.ts           |  10 +-
 packages/relay/src/index.ts                   |   2 +
 packages/tests/tests/ephemeral.node.spec.ts   |  15 +--
 .../tests/filter/peer_management.spec.ts      | 101 ++++++++++------
 packages/tests/tests/filter/ping.node.spec.ts |  36 ++++--
 packages/tests/tests/filter/push.node.spec.ts |  33 ++----
 .../single_node/multiple_pubsub.node.spec.ts  | 109 +++---------------
 .../filter/single_node/ping.node.spec.ts      |  40 +++++--
 .../filter/single_node/push.node.spec.ts      |  36 +++---
 .../filter/single_node/subscribe.node.spec.ts |  91 +++++++++------
 .../single_node/unsubscribe.node.spec.ts      |  54 +++++++--
 .../tests/tests/filter/subscribe.node.spec.ts |  53 ++++-----
 .../tests/filter/unsubscribe.node.spec.ts     |  46 +++++---
 .../tests/tests/health-manager/node.spec.ts   |   9 +-
 .../tests/health-manager/protocols.spec.ts    |   6 +-
 packages/tests/tests/relay/index.node.spec.ts |   6 +-
 .../tests/tests/relay/interop.node.spec.ts    |   7 +-
 .../tests/relay/multiple_pubsub.node.spec.ts  |  84 ++++++++++----
 .../tests/tests/relay/publish.node.spec.ts    |   5 +-
 .../tests/tests/relay/subscribe.node.spec.ts  |  55 +++++++--
 packages/tests/tests/waku.node.spec.ts        |   2 +-
 21 files changed, 448 insertions(+), 352 deletions(-)

diff --git a/packages/interfaces/src/receiver.ts b/packages/interfaces/src/receiver.ts
index 476780b3e6..071baed171 100644
--- a/packages/interfaces/src/receiver.ts
+++ b/packages/interfaces/src/receiver.ts
@@ -13,8 +13,10 @@ export interface IReceiver {
   toSubscriptionIterator: <T extends IDecodedMessage>(
     decoders: IDecoder<T> | IDecoder<T>[]
   ) => Promise<IAsyncIterator<T>>;
-  subscribeWithUnsubscribe: <T extends IDecodedMessage>(
-    decoders: IDecoder<T> | IDecoder<T>[],
-    callback: Callback<T>
-  ) => Unsubscribe | Promise<Unsubscribe>;
+  subscribeWithUnsubscribe: SubscribeWithUnsubscribe;
 }
+
+type SubscribeWithUnsubscribe = <T extends IDecodedMessage>(
+  decoders: IDecoder<T> | IDecoder<T>[],
+  callback: Callback<T>
+) => Unsubscribe | Promise<Unsubscribe>;
diff --git a/packages/relay/src/index.ts b/packages/relay/src/index.ts
index d5b0e25300..6724ae344c 100644
--- a/packages/relay/src/index.ts
+++ b/packages/relay/src/index.ts
@@ -171,6 +171,8 @@ class Relay implements IRelay {
     };
   }
 
+  public subscribe = this.subscribeWithUnsubscribe;
+
   private removeObservers<T extends IDecodedMessage>(
     observers: Array<[PubsubTopic, Observer<T>]>
   ): void {
diff --git a/packages/tests/tests/ephemeral.node.spec.ts b/packages/tests/tests/ephemeral.node.spec.ts
index c08a4a49a1..2c11e19c5c 100644
--- a/packages/tests/tests/ephemeral.node.spec.ts
+++ b/packages/tests/tests/ephemeral.node.spec.ts
@@ -4,7 +4,7 @@ import {
   DecodedMessage,
   waitForRemotePeer
 } from "@waku/core";
-import { ISubscriptionSDK, Protocols } from "@waku/interfaces";
+import { Protocols } from "@waku/interfaces";
 import type { LightNode } from "@waku/interfaces";
 import {
   generatePrivateKey,
@@ -83,8 +83,6 @@ describe("Waku Message Ephemeral field", function () {
   let waku: LightNode;
   let nwaku: ServiceNode;
 
-  let subscription: ISubscriptionSDK;
-
   afterEachCustom(this, async () => {
     await tearDownNodes(nwaku, waku);
   });
@@ -122,11 +120,6 @@ describe("Waku Message Ephemeral field", function () {
       Protocols.LightPush,
       Protocols.Store
     ]);
-
-    const { error, subscription: _subscription } =
-      await waku.filter.createSubscription(TestEncoder.pubsubTopic);
-    if (error) throw error;
-    subscription = _subscription;
   });
 
   it("Ephemeral messages are not stored", async function () {
@@ -218,7 +211,7 @@ describe("Waku Message Ephemeral field", function () {
     const callback = (msg: DecodedMessage): void => {
       messages.push(msg);
     };
-    await subscription.subscribe([TestDecoder], callback);
+    await waku.filter.subscribe([TestDecoder], callback);
 
     await delay(200);
     const normalTxt = "Normal message";
@@ -265,7 +258,7 @@ describe("Waku Message Ephemeral field", function () {
     const callback = (msg: DecodedMessage): void => {
       messages.push(msg);
     };
-    await subscription.subscribe([decoder], callback);
+    await waku.filter.subscribe([decoder], callback);
 
     await delay(200);
     const normalTxt = "Normal message";
@@ -316,7 +309,7 @@ describe("Waku Message Ephemeral field", function () {
     const callback = (msg: DecodedMessage): void => {
       messages.push(msg);
     };
-    await subscription.subscribe([decoder], callback);
+    await waku.filter.subscribe([decoder], callback);
 
     await delay(200);
     const normalTxt = "Normal message";
diff --git a/packages/tests/tests/filter/peer_management.spec.ts b/packages/tests/tests/filter/peer_management.spec.ts
index f78d83f69e..5b3639b820 100644
--- a/packages/tests/tests/filter/peer_management.spec.ts
+++ b/packages/tests/tests/filter/peer_management.spec.ts
@@ -28,7 +28,6 @@ describe("Waku Filter: Peer Management: E2E", function () {
   this.timeout(15000);
   let waku: LightNode;
   let serviceNodes: ServiceNodesFleet;
-  let subscription: ISubscriptionSDK;
 
   const contentTopic = "/test";
 
@@ -47,13 +46,6 @@ describe("Waku Filter: Peer Management: E2E", function () {
       undefined,
       5
     );
-    const { error, subscription: sub } = await waku.filter.createSubscription(
-      DefaultTestPubsubTopic
-    );
-    if (!sub || error) {
-      throw new Error("Could not create subscription");
-    }
-    subscription = sub;
   });
 
   afterEachCustom(this, async () => {
@@ -62,12 +54,15 @@ describe("Waku Filter: Peer Management: E2E", function () {
 
   it("Number of peers are maintained correctly", async function () {
     const messages: DecodedMessage[] = [];
-    const { failures, successes } = await subscription.subscribe(
-      [decoder],
-      (msg) => {
-        messages.push(msg);
-      }
-    );
+    const { error, results } = await waku.filter.subscribe([decoder], (msg) => {
+      messages.push(msg);
+    });
+
+    if (error) {
+      throw error;
+    }
+
+    const { successes, failures } = results;
 
     await waku.lightPush.send(encoder, {
       payload: utf8ToBytes("Hello_World")
@@ -82,20 +77,41 @@ describe("Waku Filter: Peer Management: E2E", function () {
   });
 
   it("Ping succeeds for all connected peers", async function () {
-    await subscription.subscribe([decoder], () => {});
+    const { error, subscription } = await waku.filter.subscribe(
+      [decoder],
+      () => {}
+    );
+    if (error) {
+      throw error;
+    }
     const pingResult = await subscription.ping();
     expect(pingResult.successes.length).to.equal(waku.filter.numPeersToUse);
     expect(pingResult.failures.length).to.equal(0);
   });
 
   it("Ping fails for unsubscribed peers", async function () {
+    const { error, subscription } = await waku.filter.subscribe(
+      [decoder],
+      () => {}
+    );
+    if (error) {
+      throw error;
+    }
     const pingResult = await subscription.ping();
     expect(pingResult.successes.length).to.equal(0);
     expect(pingResult.failures.length).to.be.greaterThan(0);
   });
 
   it("Keep-alive pings maintain the connection", async function () {
-    await subscription.subscribe([decoder], () => {}, { keepAlive: 100 });
+    const { error, subscription } = await waku.filter.subscribe(
+      [decoder],
+      () => {},
+      undefined,
+      { keepAlive: 100 }
+    );
+    if (error) {
+      throw error;
+    }
 
     await delay(1000);
 
@@ -106,9 +122,17 @@ describe("Waku Filter: Peer Management: E2E", function () {
 
   it("Renews peer on consistent ping failures", async function () {
     const maxPingFailures = 3;
-    await subscription.subscribe([decoder], () => {}, {
-      pingsBeforePeerRenewed: maxPingFailures
-    });
+    const { error, subscription } = await waku.filter.subscribe(
+      [decoder],
+      () => {},
+      undefined,
+      {
+        pingsBeforePeerRenewed: maxPingFailures
+      }
+    );
+    if (error) {
+      throw error;
+    }
 
     const disconnectedNodePeerId = waku.filter.connectedPeers[0].id;
     await waku.connectionManager.dropConnection(disconnectedNodePeerId);
@@ -135,9 +159,17 @@ describe("Waku Filter: Peer Management: E2E", function () {
 
   it("Tracks peer failures correctly", async function () {
     const maxPingFailures = 3;
-    await subscription.subscribe([decoder], () => {}, {
-      pingsBeforePeerRenewed: maxPingFailures
-    });
+    const { error, subscription } = await waku.filter.subscribe(
+      [decoder],
+      () => {},
+      undefined,
+      {
+        pingsBeforePeerRenewed: maxPingFailures
+      }
+    );
+    if (error) {
+      throw error;
+    }
 
     const targetPeer = waku.filter.connectedPeers[0];
     await waku.connectionManager.dropConnection(targetPeer.id);
@@ -163,8 +195,14 @@ describe("Waku Filter: Peer Management: E2E", function () {
   });
 
   it("Maintains correct number of peers after multiple subscribe/unsubscribe cycles", async function () {
+    let subscription: ISubscriptionSDK;
     for (let i = 0; i < 3; i++) {
-      await subscription.subscribe([decoder], () => {});
+      const { error, subscription: _subscription } =
+        await waku.filter.subscribe([decoder], () => {});
+      if (error) {
+        throw error;
+      }
+      subscription = _subscription;
       let pingResult = await subscription.ping();
       expect(pingResult.successes.length).to.equal(waku.filter.numPeersToUse);
 
@@ -173,8 +211,7 @@ describe("Waku Filter: Peer Management: E2E", function () {
       expect(pingResult.failures.length).to.be.greaterThan(0);
     }
 
-    await subscription.subscribe([decoder], () => {});
-    const finalPingResult = await subscription.ping();
+    const finalPingResult = await subscription!.ping();
     expect(finalPingResult.successes.length).to.equal(
       waku.filter.numPeersToUse
     );
@@ -200,17 +237,15 @@ describe("Waku Filter: Peer Management: E2E", function () {
     ).toString();
     await waku.dial(await nodeWithoutDiscovery.getMultiaddrWithId());
 
-    const { error, subscription: sub } = await waku.filter.createSubscription(
-      DefaultTestPubsubTopic
-    );
-    if (!sub || error) {
-      throw new Error("Could not create subscription");
-    }
-
     const messages: DecodedMessage[] = [];
-    const { successes } = await sub.subscribe([decoder], (msg) => {
+    const { error, results } = await waku.filter.subscribe([decoder], (msg) => {
       messages.push(msg);
     });
+    if (error) {
+      throw error;
+    }
+
+    const { successes } = results;
 
     expect(successes.length).to.be.greaterThan(0);
     expect(successes.length).to.be.equal(waku.filter.numPeersToUse);
diff --git a/packages/tests/tests/filter/ping.node.spec.ts b/packages/tests/tests/filter/ping.node.spec.ts
index c6e3e11a2b..e661e05f33 100644
--- a/packages/tests/tests/filter/ping.node.spec.ts
+++ b/packages/tests/tests/filter/ping.node.spec.ts
@@ -24,14 +24,9 @@ const runTests = (strictCheckNodes: boolean): void => {
     this.timeout(10000);
     let waku: LightNode;
     let serviceNodes: ServiceNodesFleet;
-    let subscription: ISubscriptionSDK;
 
     beforeEachCustom(this, async () => {
       [serviceNodes, waku] = await runMultipleNodes(this.ctx, TestShardInfo);
-      const { error, subscription: _subscription } =
-        await waku.filter.createSubscription(TestShardInfo);
-      if (error) throw error;
-      subscription = _subscription;
     });
 
     afterEachCustom(this, async () => {
@@ -39,10 +34,13 @@ const runTests = (strictCheckNodes: boolean): void => {
     });
 
     it("Ping on subscribed peer", async function () {
-      await subscription.subscribe(
+      const { error, subscription } = await waku.filter.subscribe(
         [TestDecoder],
         serviceNodes.messageCollector.callback
       );
+      if (error) {
+        throw error;
+      }
       await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M1") });
       expect(await serviceNodes.messageCollector.waitForMessages(1)).to.eq(
         true
@@ -60,14 +58,24 @@ const runTests = (strictCheckNodes: boolean): void => {
     });
 
     it("Ping on peer without subscriptions", async function () {
+      const { subscription, error } = await waku.filter.subscribe(
+        [TestDecoder],
+        serviceNodes.messageCollector.callback
+      );
+      if (error) {
+        throw error;
+      }
       await validatePingError(subscription);
     });
 
     it("Ping on unsubscribed peer", async function () {
-      await subscription.subscribe(
+      const { error, subscription } = await waku.filter.subscribe(
         [TestDecoder],
         serviceNodes.messageCollector.callback
       );
+      if (error) {
+        throw error;
+      }
       await subscription.ping();
       await subscription.unsubscribe([TestContentTopic]);
 
@@ -76,11 +84,17 @@ const runTests = (strictCheckNodes: boolean): void => {
     });
 
     it("Reopen subscription with peer with lost subscription", async function () {
+      let subscription: ISubscriptionSDK;
       const openSubscription = async (): Promise<void> => {
-        await subscription.subscribe(
-          [TestDecoder],
-          serviceNodes.messageCollector.callback
-        );
+        const { error, subscription: _subscription } =
+          await waku.filter.subscribe(
+            [TestDecoder],
+            serviceNodes.messageCollector.callback
+          );
+        if (error) {
+          throw error;
+        }
+        subscription = _subscription;
       };
 
       const unsubscribe = async (): Promise<void> => {
diff --git a/packages/tests/tests/filter/push.node.spec.ts b/packages/tests/tests/filter/push.node.spec.ts
index 0b41585ff9..5d69c73948 100644
--- a/packages/tests/tests/filter/push.node.spec.ts
+++ b/packages/tests/tests/filter/push.node.spec.ts
@@ -1,5 +1,5 @@
 import { waitForRemotePeer } from "@waku/core";
-import { ISubscriptionSDK, LightNode, Protocols } from "@waku/interfaces";
+import { LightNode, Protocols } from "@waku/interfaces";
 import { utf8ToBytes } from "@waku/sdk";
 import { expect } from "chai";
 
@@ -29,15 +29,9 @@ const runTests = (strictCheckNodes: boolean): void => {
     this.timeout(10000);
     let waku: LightNode;
     let serviceNodes: ServiceNodesFleet;
-    let subscription: ISubscriptionSDK;
 
     beforeEachCustom(this, async () => {
       [serviceNodes, waku] = await runMultipleNodes(this.ctx, TestShardInfo);
-
-      const { error, subscription: _subscription } =
-        await waku.filter.createSubscription(TestShardInfo);
-      if (error) throw error;
-      subscription = _subscription;
     });
 
     afterEachCustom(this, async () => {
@@ -46,7 +40,7 @@ const runTests = (strictCheckNodes: boolean): void => {
 
     TEST_STRING.forEach((testItem) => {
       it(`Check received message containing ${testItem.description}`, async function () {
-        await subscription.subscribe(
+        await waku.filter.subscribe(
           [TestDecoder],
           serviceNodes.messageCollector.callback
         );
@@ -67,7 +61,7 @@ const runTests = (strictCheckNodes: boolean): void => {
 
     TEST_TIMESTAMPS.forEach((testItem) => {
       it(`Check received message with timestamp: ${testItem} `, async function () {
-        await subscription.subscribe(
+        await waku.filter.subscribe(
           [TestDecoder],
           serviceNodes.messageCollector.callback
         );
@@ -106,7 +100,7 @@ const runTests = (strictCheckNodes: boolean): void => {
     });
 
     it("Check message with invalid timestamp is not received", async function () {
-      await subscription.subscribe(
+      await waku.filter.subscribe(
         [TestDecoder],
         serviceNodes.messageCollector.callback
       );
@@ -128,7 +122,7 @@ const runTests = (strictCheckNodes: boolean): void => {
     });
 
     it("Check message on other pubsub topic is not received", async function () {
-      await subscription.subscribe(
+      await waku.filter.subscribe(
         [TestDecoder],
         serviceNodes.messageCollector.callback
       );
@@ -151,7 +145,7 @@ const runTests = (strictCheckNodes: boolean): void => {
     });
 
     it("Check message with no content topic is not received", async function () {
-      await subscription.subscribe(
+      await waku.filter.subscribe(
         [TestDecoder],
         serviceNodes.messageCollector.callback
       );
@@ -171,7 +165,7 @@ const runTests = (strictCheckNodes: boolean): void => {
     });
 
     it("Check message with no payload is not received", async function () {
-      await subscription.subscribe(
+      await waku.filter.subscribe(
         [TestDecoder],
         serviceNodes.messageCollector.callback
       );
@@ -199,7 +193,7 @@ const runTests = (strictCheckNodes: boolean): void => {
     });
 
     it("Check message with non string payload is not received", async function () {
-      await subscription.subscribe(
+      await waku.filter.subscribe(
         [TestDecoder],
         serviceNodes.messageCollector.callback
       );
@@ -222,7 +216,7 @@ const runTests = (strictCheckNodes: boolean): void => {
     // Will be skipped until https://github.com/waku-org/js-waku/issues/1464 si done
     it.skip("Check message received after jswaku node is restarted", async function () {
       // Subscribe and send message
-      await subscription.subscribe(
+      await waku.filter.subscribe(
         [TestDecoder],
         serviceNodes.messageCollector.callback
       );
@@ -242,11 +236,8 @@ const runTests = (strictCheckNodes: boolean): void => {
         await waku.dial(await node.getMultiaddrWithId());
         await waitForRemotePeer(waku, [Protocols.Filter, Protocols.LightPush]);
       }
-      const { error, subscription: _subscription } =
-        await waku.filter.createSubscription(TestShardInfo);
-      if (error) throw error;
-      subscription = _subscription;
-      await subscription.subscribe(
+
+      await waku.filter.subscribe(
         [TestDecoder],
         serviceNodes.messageCollector.callback
       );
@@ -271,7 +262,7 @@ const runTests = (strictCheckNodes: boolean): void => {
 
     // Will be skipped until https://github.com/waku-org/js-waku/issues/1464 si done
     it.skip("Check message received after nwaku node is restarted", async function () {
-      await subscription.subscribe(
+      await waku.filter.subscribe(
         [TestDecoder],
         serviceNodes.messageCollector.callback
       );
diff --git a/packages/tests/tests/filter/single_node/multiple_pubsub.node.spec.ts b/packages/tests/tests/filter/single_node/multiple_pubsub.node.spec.ts
index ec8d9515dc..75e12b1919 100644
--- a/packages/tests/tests/filter/single_node/multiple_pubsub.node.spec.ts
+++ b/packages/tests/tests/filter/single_node/multiple_pubsub.node.spec.ts
@@ -1,7 +1,6 @@
 import { createDecoder, createEncoder, waitForRemotePeer } from "@waku/core";
 import type {
   ContentTopicInfo,
-  ISubscriptionSDK,
   LightNode,
   ShardInfo,
   SingleShardInfo
@@ -32,7 +31,6 @@ describe("Waku Filter V2: Multiple PubsubTopics", function () {
   let waku: LightNode;
   let nwaku: ServiceNode;
   let nwaku2: ServiceNode;
-  let subscription: ISubscriptionSDK;
   let messageCollector: MessageCollector;
 
   const customPubsubTopic1 = singleShardInfoToPubsubTopic({
@@ -61,12 +59,6 @@ describe("Waku Filter V2: Multiple PubsubTopics", function () {
 
   beforeEachCustom(this, async () => {
     [nwaku, waku] = await runNodes(this.ctx, shardInfo);
-
-    const { error, subscription: _subscription } =
-      await waku.filter.createSubscription(shardInfo);
-    if (error) throw error;
-    subscription = _subscription;
-
     messageCollector = new MessageCollector();
   });
 
@@ -75,7 +67,7 @@ describe("Waku Filter V2: Multiple PubsubTopics", function () {
   });
 
   it("Subscribe and receive messages on custom pubsubtopic", async function () {
-    await subscription.subscribe([customDecoder1], messageCollector.callback);
+    await waku.filter.subscribe([customDecoder1], messageCollector.callback);
     await waku.lightPush.send(customEncoder1, { payload: utf8ToBytes("M1") });
     expect(await messageCollector.waitForMessages(1)).to.eq(true);
     messageCollector.verifyReceivedMessage(0, {
@@ -86,18 +78,11 @@ describe("Waku Filter V2: Multiple PubsubTopics", function () {
   });
 
   it("Subscribe and receive messages on 2 different pubsubtopics", async function () {
-    await subscription.subscribe([customDecoder1], messageCollector.callback);
-
-    // Subscribe from the same lightnode to the 2nd pubsubtopic
-    const { error, subscription: subscription2 } =
-      await waku.filter.createSubscription(customPubsubTopic2);
-    if (error) {
-      throw error;
-    }
+    await waku.filter.subscribe([customDecoder1], messageCollector.callback);
 
     const messageCollector2 = new MessageCollector();
 
-    await subscription2.subscribe([customDecoder2], messageCollector2.callback);
+    await waku.filter.subscribe([customDecoder2], messageCollector2.callback);
 
     await waku.lightPush.send(customEncoder1, { payload: utf8ToBytes("M1") });
     await waku.lightPush.send(customEncoder2, { payload: utf8ToBytes("M2") });
@@ -119,7 +104,7 @@ describe("Waku Filter V2: Multiple PubsubTopics", function () {
   });
 
   it("Subscribe and receive messages from 2 nwaku nodes each with different pubsubtopics", async function () {
-    await subscription.subscribe([customDecoder1], messageCollector.callback);
+    await waku.filter.subscribe([customDecoder1], messageCollector.callback);
 
     // Set up and start a new nwaku node with customPubsubTopic1
     nwaku2 = new ServiceNode(makeLogFileName(this) + "2");
@@ -133,19 +118,11 @@ describe("Waku Filter V2: Multiple PubsubTopics", function () {
     await waku.dial(await nwaku2.getMultiaddrWithId());
     await waitForRemotePeer(waku, [Protocols.Filter, Protocols.LightPush]);
 
-    // Subscribe from the same lightnode to the new nwaku on the new pubsubtopic
-    const { error, subscription: subscription2 } =
-      await waku.filter.createSubscription(customPubsubTopic2);
-
-    if (error) {
-      throw error;
-    }
-
     await nwaku2.ensureSubscriptions([customPubsubTopic2]);
 
     const messageCollector2 = new MessageCollector();
 
-    await subscription2.subscribe([customDecoder2], messageCollector2.callback);
+    await waku.filter.subscribe([customDecoder2], messageCollector2.callback);
 
     // Making sure that messages are send and reveiced for both subscriptions
     // While loop is done because of https://github.com/waku-org/js-waku/issues/1606
@@ -173,17 +150,6 @@ describe("Waku Filter V2: Multiple PubsubTopics", function () {
       expectedMessageText: "M2"
     });
   });
-
-  it("Should fail to subscribe with decoder with wrong pubsubTopic", async function () {
-    // this subscription object is set up with the `customPubsubTopic1` but we're passing it a Decoder with the `customPubsubTopic2`
-    try {
-      await subscription.subscribe([customDecoder2], messageCollector.callback);
-    } catch (error) {
-      expect((error as Error).message).to.include(
-        "Pubsub topic not configured"
-      );
-    }
-  });
 });
 
 describe("Waku Filter V2 (Autosharding): Multiple PubsubTopics", function () {
@@ -193,7 +159,6 @@ describe("Waku Filter V2 (Autosharding): Multiple PubsubTopics", function () {
   let waku: LightNode;
   let nwaku: ServiceNode;
   let nwaku2: ServiceNode;
-  let subscription: ISubscriptionSDK;
   let messageCollector: MessageCollector;
 
   const customContentTopic1 = "/waku/2/content/utf8";
@@ -235,10 +200,6 @@ describe("Waku Filter V2 (Autosharding): Multiple PubsubTopics", function () {
 
   beforeEachCustom(this, async () => {
     [nwaku, waku] = await runNodes(this.ctx, contentTopicInfo);
-    const { error, subscription: _subscription } =
-      await waku.filter.createSubscription(autoshardingPubsubTopic1);
-    if (error) throw error;
-    subscription = _subscription;
     messageCollector = new MessageCollector();
   });
 
@@ -247,7 +208,7 @@ describe("Waku Filter V2 (Autosharding): Multiple PubsubTopics", function () {
   });
 
   it("Subscribe and receive messages on autosharded pubsubtopic", async function () {
-    await subscription.subscribe([customDecoder1], messageCollector.callback);
+    await waku.filter.subscribe([customDecoder1], messageCollector.callback);
     await waku.lightPush.send(customEncoder1, { payload: utf8ToBytes("M1") });
     expect(
       await messageCollector.waitForMessagesAutosharding(1, {
@@ -262,19 +223,10 @@ describe("Waku Filter V2 (Autosharding): Multiple PubsubTopics", function () {
   });
 
   it("Subscribe and receive messages on 2 different pubsubtopics", async function () {
-    await subscription.subscribe([customDecoder1], messageCollector.callback);
-
-    // Subscribe from the same lightnode to the 2nd pubsubtopic
-    const { error, subscription: subscription2 } =
-      await waku.filter.createSubscription(autoshardingPubsubTopic2);
-
-    if (error) {
-      throw error;
-    }
+    await waku.filter.subscribe([customDecoder1], messageCollector.callback);
 
     const messageCollector2 = new MessageCollector();
-
-    await subscription2.subscribe([customDecoder2], messageCollector2.callback);
+    await waku.filter.subscribe([customDecoder2], messageCollector2.callback);
 
     await waku.lightPush.send(customEncoder1, { payload: utf8ToBytes("M1") });
     await waku.lightPush.send(customEncoder2, { payload: utf8ToBytes("M2") });
@@ -304,7 +256,7 @@ describe("Waku Filter V2 (Autosharding): Multiple PubsubTopics", function () {
   });
 
   it("Subscribe and receive messages from 2 nwaku nodes each with different pubsubtopics", async function () {
-    await subscription.subscribe([customDecoder1], messageCollector.callback);
+    await waku.filter.subscribe([customDecoder1], messageCollector.callback);
 
     // Set up and start a new nwaku node with customPubsubTopic1
     nwaku2 = new ServiceNode(makeLogFileName(this) + "2");
@@ -319,19 +271,11 @@ describe("Waku Filter V2 (Autosharding): Multiple PubsubTopics", function () {
     await waku.dial(await nwaku2.getMultiaddrWithId());
     await waitForRemotePeer(waku, [Protocols.Filter, Protocols.LightPush]);
 
-    // Subscribe from the same lightnode to the new nwaku on the new pubsubtopic
-    const { error, subscription: subscription2 } =
-      await waku.filter.createSubscription(autoshardingPubsubTopic2);
-
-    if (error) {
-      throw error;
-    }
-
     await nwaku2.ensureSubscriptionsAutosharding([customContentTopic2]);
 
     const messageCollector2 = new MessageCollector();
 
-    await subscription2.subscribe([customDecoder2], messageCollector2.callback);
+    await waku.filter.subscribe([customDecoder2], messageCollector2.callback);
 
     // Making sure that messages are send and reveiced for both subscriptions
     // While loop is done because of https://github.com/waku-org/js-waku/issues/1606
@@ -363,7 +307,7 @@ describe("Waku Filter V2 (Autosharding): Multiple PubsubTopics", function () {
   it("Should fail to subscribe with decoder with wrong pubsubTopic", async function () {
     // this subscription object is set up with the `customPubsubTopic1` but we're passing it a Decoder with the `customPubsubTopic2`
     try {
-      await subscription.subscribe([customDecoder2], messageCollector.callback);
+      await waku.filter.subscribe([customDecoder2], messageCollector.callback);
     } catch (error) {
       expect((error as Error).message).to.include(
         "Pubsub topic not configured"
@@ -378,7 +322,6 @@ describe("Waku Filter V2 (Named sharding): Multiple PubsubTopics", function () {
   let waku: LightNode;
   let nwaku: ServiceNode;
   let nwaku2: ServiceNode;
-  let subscription: ISubscriptionSDK;
   let messageCollector: MessageCollector;
 
   const customPubsubTopic1 = singleShardInfoToPubsubTopic({
@@ -408,11 +351,6 @@ describe("Waku Filter V2 (Named sharding): Multiple PubsubTopics", function () {
 
   beforeEachCustom(this, async () => {
     [nwaku, waku] = await runNodes(this.ctx, shardInfo);
-    const { error, subscription: _subscription } =
-      await waku.filter.createSubscription(customPubsubTopic1);
-    if (error) throw error;
-    subscription = _subscription;
-
     messageCollector = new MessageCollector();
   });
 
@@ -421,7 +359,7 @@ describe("Waku Filter V2 (Named sharding): Multiple PubsubTopics", function () {
   });
 
   it("Subscribe and receive messages on custom pubsubtopic", async function () {
-    await subscription.subscribe([customDecoder1], messageCollector.callback);
+    await waku.filter.subscribe([customDecoder1], messageCollector.callback);
     await waku.lightPush.send(customEncoder1, { payload: utf8ToBytes("M1") });
     expect(await messageCollector.waitForMessages(1)).to.eq(true);
     messageCollector.verifyReceivedMessage(0, {
@@ -432,18 +370,11 @@ describe("Waku Filter V2 (Named sharding): Multiple PubsubTopics", function () {
   });
 
   it("Subscribe and receive messages on 2 different pubsubtopics", async function () {
-    await subscription.subscribe([customDecoder1], messageCollector.callback);
-
-    // Subscribe from the same lightnode to the 2nd pubsubtopic
-    const { error, subscription: subscription2 } =
-      await waku.filter.createSubscription(customPubsubTopic2);
-    if (error) {
-      throw error;
-    }
+    await waku.filter.subscribe([customDecoder1], messageCollector.callback);
 
     const messageCollector2 = new MessageCollector();
 
-    await subscription2.subscribe([customDecoder2], messageCollector2.callback);
+    await waku.filter.subscribe([customDecoder2], messageCollector2.callback);
 
     await waku.lightPush.send(customEncoder1, { payload: utf8ToBytes("M1") });
     await waku.lightPush.send(customEncoder2, { payload: utf8ToBytes("M2") });
@@ -465,7 +396,7 @@ describe("Waku Filter V2 (Named sharding): Multiple PubsubTopics", function () {
   });
 
   it("Subscribe and receive messages from 2 nwaku nodes each with different pubsubtopics", async function () {
-    await subscription.subscribe([customDecoder1], messageCollector.callback);
+    await waku.filter.subscribe([customDecoder1], messageCollector.callback);
 
     // Set up and start a new nwaku node with customPubsubTopic1
     nwaku2 = new ServiceNode(makeLogFileName(this) + "2");
@@ -479,17 +410,11 @@ describe("Waku Filter V2 (Named sharding): Multiple PubsubTopics", function () {
     await waku.dial(await nwaku2.getMultiaddrWithId());
     await waitForRemotePeer(waku, [Protocols.Filter, Protocols.LightPush]);
 
-    // Subscribe from the same lightnode to the new nwaku on the new pubsubtopic
-    const { error, subscription: subscription2 } =
-      await waku.filter.createSubscription(customPubsubTopic2);
-    if (error) {
-      throw error;
-    }
     await nwaku2.ensureSubscriptions([customPubsubTopic2]);
 
     const messageCollector2 = new MessageCollector();
 
-    await subscription2.subscribe([customDecoder2], messageCollector2.callback);
+    await waku.filter.subscribe([customDecoder2], messageCollector2.callback);
 
     // Making sure that messages are send and reveiced for both subscriptions
     // While loop is done because of https://github.com/waku-org/js-waku/issues/1606
@@ -521,7 +446,7 @@ describe("Waku Filter V2 (Named sharding): Multiple PubsubTopics", function () {
   it("Should fail to subscribe with decoder with wrong pubsubTopic", async function () {
     // this subscription object is set up with the `customPubsubTopic1` but we're passing it a Decoder with the `customPubsubTopic2`
     try {
-      await subscription.subscribe([customDecoder2], messageCollector.callback);
+      await waku.filter.subscribe([customDecoder2], messageCollector.callback);
     } catch (error) {
       expect((error as Error).message).to.include(
         "Pubsub topic not configured"
diff --git a/packages/tests/tests/filter/single_node/ping.node.spec.ts b/packages/tests/tests/filter/single_node/ping.node.spec.ts
index 71040dfaee..914cce8150 100644
--- a/packages/tests/tests/filter/single_node/ping.node.spec.ts
+++ b/packages/tests/tests/filter/single_node/ping.node.spec.ts
@@ -24,16 +24,10 @@ describe("Waku Filter V2: Ping", function () {
   this.timeout(10000);
   let waku: LightNode;
   let nwaku: ServiceNode;
-  let subscription: ISubscriptionSDK;
   let messageCollector: MessageCollector;
 
   beforeEachCustom(this, async () => {
     [nwaku, waku] = await runNodes(this.ctx, TestShardInfo);
-
-    const { error, subscription: _subscription } =
-      await waku.filter.createSubscription(TestShardInfo);
-    if (error) throw error;
-    subscription = _subscription;
     messageCollector = new MessageCollector();
   });
 
@@ -42,7 +36,13 @@ describe("Waku Filter V2: Ping", function () {
   });
 
   it("Ping on subscribed peer", async function () {
-    await subscription.subscribe([TestDecoder], messageCollector.callback);
+    const { subscription, error } = await waku.filter.subscribe(
+      [TestDecoder],
+      messageCollector.callback
+    );
+    if (error) {
+      throw error;
+    }
     await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M1") });
     expect(await messageCollector.waitForMessages(1)).to.eq(true);
 
@@ -56,11 +56,25 @@ describe("Waku Filter V2: Ping", function () {
   });
 
   it("Ping on peer without subscriptions", async function () {
+    const { subscription, error } = await waku.filter.subscribe(
+      [TestDecoder],
+      messageCollector.callback
+    );
+    if (error) {
+      throw error;
+    }
     await validatePingError(subscription);
   });
 
   it("Ping on unsubscribed peer", async function () {
-    await subscription.subscribe([TestDecoder], messageCollector.callback);
+    const { error, subscription } = await waku.filter.subscribe(
+      [TestDecoder],
+      messageCollector.callback
+    );
+    if (error) {
+      throw error;
+    }
+
     await subscription.ping();
     await subscription.unsubscribe([TestContentTopic]);
 
@@ -69,8 +83,16 @@ describe("Waku Filter V2: Ping", function () {
   });
 
   it("Reopen subscription with peer with lost subscription", async function () {
+    let subscription: ISubscriptionSDK;
     const openSubscription = async (): Promise<void> => {
-      await subscription.subscribe([TestDecoder], messageCollector.callback);
+      const result = await waku.filter.subscribe(
+        [TestDecoder],
+        messageCollector.callback
+      );
+      if (result.error) {
+        throw result.error;
+      }
+      subscription = result.subscription;
     };
 
     const unsubscribe = async (): Promise<void> => {
diff --git a/packages/tests/tests/filter/single_node/push.node.spec.ts b/packages/tests/tests/filter/single_node/push.node.spec.ts
index fdb6a75760..6e47e586ae 100644
--- a/packages/tests/tests/filter/single_node/push.node.spec.ts
+++ b/packages/tests/tests/filter/single_node/push.node.spec.ts
@@ -1,5 +1,5 @@
 import { waitForRemotePeer } from "@waku/core";
-import { ISubscriptionSDK, LightNode, Protocols } from "@waku/interfaces";
+import { LightNode, Protocols } from "@waku/interfaces";
 import { utf8ToBytes } from "@waku/sdk";
 import { expect } from "chai";
 
@@ -28,17 +28,10 @@ describe("Waku Filter V2: FilterPush", function () {
   this.timeout(10000);
   let waku: LightNode;
   let nwaku: ServiceNode;
-  let subscription: ISubscriptionSDK;
   let messageCollector: MessageCollector;
 
   beforeEachCustom(this, async () => {
     [nwaku, waku] = await runNodes(this.ctx, TestShardInfo);
-
-    const { error, subscription: _subscription } =
-      await waku.filter.createSubscription(TestShardInfo);
-    if (error) throw error;
-    subscription = _subscription;
-
     messageCollector = new MessageCollector(nwaku);
   });
 
@@ -48,7 +41,7 @@ describe("Waku Filter V2: FilterPush", function () {
 
   TEST_STRING.forEach((testItem) => {
     it(`Check received message containing ${testItem.description}`, async function () {
-      await subscription.subscribe([TestDecoder], messageCollector.callback);
+      await waku.filter.subscribe([TestDecoder], messageCollector.callback);
       await waku.lightPush.send(TestEncoder, {
         payload: utf8ToBytes(testItem.value)
       });
@@ -63,7 +56,7 @@ describe("Waku Filter V2: FilterPush", function () {
 
   TEST_TIMESTAMPS.forEach((testItem) => {
     it(`Check received message with timestamp: ${testItem} `, async function () {
-      await subscription.subscribe([TestDecoder], messageCollector.callback);
+      await waku.filter.subscribe([TestDecoder], messageCollector.callback);
       await delay(400);
 
       await nwaku.restCall<boolean>(
@@ -97,7 +90,7 @@ describe("Waku Filter V2: FilterPush", function () {
   });
 
   it("Check message with invalid timestamp is not received", async function () {
-    await subscription.subscribe([TestDecoder], messageCollector.callback);
+    await waku.filter.subscribe([TestDecoder], messageCollector.callback);
     await delay(400);
 
     await nwaku.restCall<boolean>(
@@ -116,7 +109,7 @@ describe("Waku Filter V2: FilterPush", function () {
   });
 
   it("Check message on other pubsub topic is not received", async function () {
-    await subscription.subscribe([TestDecoder], messageCollector.callback);
+    await waku.filter.subscribe([TestDecoder], messageCollector.callback);
     await delay(400);
 
     await nwaku.restCall<boolean>(
@@ -134,7 +127,7 @@ describe("Waku Filter V2: FilterPush", function () {
   });
 
   it("Check message with no pubsub topic is not received", async function () {
-    await subscription.subscribe([TestDecoder], messageCollector.callback);
+    await waku.filter.subscribe([TestDecoder], messageCollector.callback);
     await delay(400);
 
     await nwaku.restCall<boolean>(
@@ -152,7 +145,7 @@ describe("Waku Filter V2: FilterPush", function () {
   });
 
   it("Check message with no content topic is not received", async function () {
-    await subscription.subscribe([TestDecoder], messageCollector.callback);
+    await waku.filter.subscribe([TestDecoder], messageCollector.callback);
     await delay(400);
 
     await nwaku.restCall<boolean>(
@@ -169,7 +162,7 @@ describe("Waku Filter V2: FilterPush", function () {
   });
 
   it("Check message with no payload is not received", async function () {
-    await subscription.subscribe([TestDecoder], messageCollector.callback);
+    await waku.filter.subscribe([TestDecoder], messageCollector.callback);
     await delay(400);
 
     await nwaku.restCall<boolean>(
@@ -191,7 +184,7 @@ describe("Waku Filter V2: FilterPush", function () {
   });
 
   it("Check message with non string payload is not received", async function () {
-    await subscription.subscribe([TestDecoder], messageCollector.callback);
+    await waku.filter.subscribe([TestDecoder], messageCollector.callback);
     await delay(400);
 
     await nwaku.restCall<boolean>(
@@ -211,7 +204,7 @@ describe("Waku Filter V2: FilterPush", function () {
   // Will be skipped until https://github.com/waku-org/js-waku/issues/1464 si done
   it.skip("Check message received after jswaku node is restarted", async function () {
     // Subscribe and send message
-    await subscription.subscribe([TestDecoder], messageCollector.callback);
+    await waku.filter.subscribe([TestDecoder], messageCollector.callback);
     await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M1") });
     expect(await messageCollector.waitForMessages(1)).to.eq(true);
 
@@ -224,11 +217,8 @@ describe("Waku Filter V2: FilterPush", function () {
     // Redo the connection and create a new subscription
     await waku.dial(await nwaku.getMultiaddrWithId());
     await waitForRemotePeer(waku, [Protocols.Filter, Protocols.LightPush]);
-    const { error, subscription: _subscription } =
-      await waku.filter.createSubscription();
-    if (error) throw error;
-    subscription = _subscription;
-    await subscription.subscribe([TestDecoder], messageCollector.callback);
+
+    await waku.filter.subscribe([TestDecoder], messageCollector.callback);
 
     await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M2") });
 
@@ -246,7 +236,7 @@ describe("Waku Filter V2: FilterPush", function () {
 
   // Will be skipped until https://github.com/waku-org/js-waku/issues/1464 si done
   it.skip("Check message received after nwaku node is restarted", async function () {
-    await subscription.subscribe([TestDecoder], messageCollector.callback);
+    await waku.filter.subscribe([TestDecoder], messageCollector.callback);
     await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M1") });
     expect(await messageCollector.waitForMessages(1)).to.eq(true);
 
diff --git a/packages/tests/tests/filter/single_node/subscribe.node.spec.ts b/packages/tests/tests/filter/single_node/subscribe.node.spec.ts
index 6ef09e3182..928903fac4 100644
--- a/packages/tests/tests/filter/single_node/subscribe.node.spec.ts
+++ b/packages/tests/tests/filter/single_node/subscribe.node.spec.ts
@@ -1,5 +1,5 @@
 import { createDecoder, createEncoder, waitForRemotePeer } from "@waku/core";
-import { ISubscriptionSDK, LightNode, Protocols } from "@waku/interfaces";
+import { LightNode, Protocols } from "@waku/interfaces";
 import {
   ecies,
   generatePrivateKey,
@@ -40,18 +40,11 @@ describe("Waku Filter V2: Subscribe: Single Service Node", function () {
   let waku2: LightNode;
   let nwaku: ServiceNode;
   let nwaku2: ServiceNode;
-  let subscription: ISubscriptionSDK;
   let messageCollector: MessageCollector;
   let ctx: Context;
 
   beforeEachCustom(this, async () => {
     [nwaku, waku] = await runNodes(this.ctx, TestShardInfo);
-
-    const { error, subscription: _subscription } =
-      await waku.filter.createSubscription(TestShardInfo);
-    if (error) throw error;
-    subscription = _subscription;
-
     messageCollector = new MessageCollector();
     await nwaku.ensureSubscriptions([TestPubsubTopic]);
   });
@@ -61,7 +54,13 @@ describe("Waku Filter V2: Subscribe: Single Service Node", function () {
   });
 
   it("Subscribe and receive messages via lightPush", async function () {
-    await subscription.subscribe([TestDecoder], messageCollector.callback);
+    const { error } = await waku.filter.subscribe(
+      [TestDecoder],
+      messageCollector.callback
+    );
+    if (error) {
+      throw error;
+    }
 
     await waku.lightPush.send(TestEncoder, messagePayload);
 
@@ -88,7 +87,13 @@ describe("Waku Filter V2: Subscribe: Single Service Node", function () {
       TestPubsubTopic
     );
 
-    await subscription.subscribe([decoder], messageCollector.callback);
+    const { error } = await waku.filter.subscribe(
+      [decoder],
+      messageCollector.callback
+    );
+    if (error) {
+      throw error;
+    }
 
     await waku.lightPush.send(encoder, messagePayload);
 
@@ -115,7 +120,13 @@ describe("Waku Filter V2: Subscribe: Single Service Node", function () {
       TestPubsubTopic
     );
 
-    await subscription.subscribe([decoder], messageCollector.callback);
+    const { error } = await waku.filter.subscribe(
+      [decoder],
+      messageCollector.callback
+    );
+    if (error) {
+      throw error;
+    }
 
     await waku.lightPush.send(encoder, messagePayload);
 
@@ -130,7 +141,13 @@ describe("Waku Filter V2: Subscribe: Single Service Node", function () {
   });
 
   it("Subscribe and receive messages via waku relay post", async function () {
-    await subscription.subscribe([TestDecoder], messageCollector.callback);
+    const { error } = await waku.filter.subscribe(
+      [TestDecoder],
+      messageCollector.callback
+    );
+    if (error) {
+      throw error;
+    }
 
     await delay(400);
 
@@ -152,7 +169,7 @@ describe("Waku Filter V2: Subscribe: Single Service Node", function () {
   });
 
   it("Subscribe and receive 2 messages on the same topic", async function () {
-    await subscription.subscribe([TestDecoder], messageCollector.callback);
+    await waku.filter.subscribe([TestDecoder], messageCollector.callback);
 
     await waku.lightPush.send(TestEncoder, messagePayload);
 
@@ -181,7 +198,13 @@ describe("Waku Filter V2: Subscribe: Single Service Node", function () {
 
   it("Subscribe and receive messages on 2 different content topics", async function () {
     // Subscribe to the first content topic and send a message.
-    await subscription.subscribe([TestDecoder], messageCollector.callback);
+    const { error, subscription } = await waku.filter.subscribe(
+      [TestDecoder],
+      messageCollector.callback
+    );
+    if (error) {
+      throw error;
+    }
     await waku.lightPush.send(TestEncoder, messagePayload);
     expect(await messageCollector.waitForMessages(1)).to.eq(true);
     messageCollector.verifyReceivedMessage(0, {
@@ -227,7 +250,7 @@ describe("Waku Filter V2: Subscribe: Single Service Node", function () {
 
     // Subscribe to all 20 topics.
     for (let i = 0; i < topicCount; i++) {
-      await subscription.subscribe([td.decoders[i]], messageCollector.callback);
+      await waku.filter.subscribe([td.decoders[i]], messageCollector.callback);
     }
 
     // Send a unique message on each topic.
@@ -253,7 +276,7 @@ describe("Waku Filter V2: Subscribe: Single Service Node", function () {
     const topicCount = 100;
     const td = generateTestData(topicCount, { pubsubTopic: TestPubsubTopic });
 
-    await subscription.subscribe(td.decoders, messageCollector.callback);
+    await waku.filter.subscribe(td.decoders, messageCollector.callback);
 
     // Send a unique message on each topic.
     for (let i = 0; i < topicCount; i++) {
@@ -278,10 +301,14 @@ describe("Waku Filter V2: Subscribe: Single Service Node", function () {
     const td = generateTestData(topicCount, { pubsubTopic: TestPubsubTopic });
 
     try {
-      const { failures, successes } = await subscription.subscribe(
+      const { error, results } = await waku.filter.subscribe(
         td.decoders,
         messageCollector.callback
       );
+      if (error) {
+        throw error;
+      }
+      const { failures, successes } = results;
       if (failures.length === 0 || successes.length > 0) {
         throw new Error(
           `Subscribe to ${topicCount} topics was successful but was expected to fail with a specific error.`
@@ -309,10 +336,10 @@ describe("Waku Filter V2: Subscribe: Single Service Node", function () {
     const td2 = generateTestData(topicCount2, { pubsubTopic: TestPubsubTopic });
 
     // Subscribe to the first set of topics.
-    await subscription.subscribe(td1.decoders, messageCollector.callback);
+    await waku.filter.subscribe(td1.decoders, messageCollector.callback);
 
     // Subscribe to the second set of topics which has overlapping topics with the first set.
-    await subscription.subscribe(td2.decoders, messageCollector.callback);
+    await waku.filter.subscribe(td2.decoders, messageCollector.callback);
 
     // Send messages to the first set of topics.
     for (let i = 0; i < topicCount1; i++) {
@@ -339,11 +366,11 @@ describe("Waku Filter V2: Subscribe: Single Service Node", function () {
   });
 
   it("Refresh subscription", async function () {
-    await subscription.subscribe([TestDecoder], messageCollector.callback);
+    await waku.filter.subscribe([TestDecoder], messageCollector.callback);
     await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M1") });
 
     // Resubscribe (refresh) to the same topic and send another message.
-    await subscription.subscribe([TestDecoder], messageCollector.callback);
+    await waku.filter.subscribe([TestDecoder], messageCollector.callback);
     await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M2") });
 
     // Confirm both messages were received.
@@ -371,7 +398,7 @@ describe("Waku Filter V2: Subscribe: Single Service Node", function () {
       });
       const newDecoder = createDecoder(newContentTopic, TestPubsubTopic);
 
-      await subscription.subscribe([newDecoder], messageCollector.callback);
+      await waku.filter.subscribe([newDecoder], messageCollector.callback);
       await waku.lightPush.send(newEncoder, messagePayload);
 
       expect(await messageCollector.waitForMessages(1)).to.eq(true);
@@ -384,22 +411,16 @@ describe("Waku Filter V2: Subscribe: Single Service Node", function () {
   });
 
   it("Add multiple subscription objects on single nwaku node", async function () {
-    await subscription.subscribe([TestDecoder], messageCollector.callback);
+    await waku.filter.subscribe([TestDecoder], messageCollector.callback);
     await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M1") });
 
-    // Create a second subscription on a different topic
-    const { error, subscription: subscription2 } =
-      await waku.filter.createSubscription(TestShardInfo);
-    if (error) {
-      throw error;
-    }
     const newContentTopic = "/test/2/waku-filter/default";
     const newEncoder = createEncoder({
       contentTopic: newContentTopic,
       pubsubTopic: TestPubsubTopic
     });
     const newDecoder = createDecoder(newContentTopic, TestPubsubTopic);
-    await subscription2.subscribe([newDecoder], messageCollector.callback);
+    await waku.filter.subscribe([newDecoder], messageCollector.callback);
 
     await waku.lightPush.send(newEncoder, { payload: utf8ToBytes("M2") });
 
@@ -418,17 +439,13 @@ describe("Waku Filter V2: Subscribe: Single Service Node", function () {
   });
 
   it("Subscribe and receive messages from multiple nwaku nodes", async function () {
-    await subscription.subscribe([TestDecoder], messageCollector.callback);
+    await waku.filter.subscribe([TestDecoder], messageCollector.callback);
 
     // Set up and start a new nwaku node
     [nwaku2, waku2] = await runNodes(ctx, TestShardInfo);
     await waku.dial(await nwaku2.getMultiaddrWithId());
     await waitForRemotePeer(waku, [Protocols.Filter, Protocols.LightPush]);
-    const { error, subscription: subscription2 } =
-      await waku.filter.createSubscription(TestShardInfo);
-    if (error) {
-      throw error;
-    }
+
     await nwaku2.ensureSubscriptions([TestPubsubTopic]);
     // Send a message using the new subscription
     const newContentTopic = "/test/2/waku-filter/default";
@@ -437,7 +454,7 @@ describe("Waku Filter V2: Subscribe: Single Service Node", function () {
       pubsubTopic: TestPubsubTopic
     });
     const newDecoder = createDecoder(newContentTopic, TestPubsubTopic);
-    await subscription2.subscribe([newDecoder], messageCollector.callback);
+    await waku.filter.subscribe([newDecoder], messageCollector.callback);
 
     // Making sure that messages are send and reveiced for both subscriptions
     while (!(await messageCollector.waitForMessages(2))) {
diff --git a/packages/tests/tests/filter/single_node/unsubscribe.node.spec.ts b/packages/tests/tests/filter/single_node/unsubscribe.node.spec.ts
index 1f7a6258a4..a2af7c4b4b 100644
--- a/packages/tests/tests/filter/single_node/unsubscribe.node.spec.ts
+++ b/packages/tests/tests/filter/single_node/unsubscribe.node.spec.ts
@@ -1,5 +1,4 @@
 import { createDecoder, createEncoder } from "@waku/core";
-import { ISubscriptionSDK } from "@waku/interfaces";
 import { LightNode } from "@waku/interfaces";
 import { utf8ToBytes } from "@waku/sdk";
 import { expect } from "chai";
@@ -28,16 +27,11 @@ describe("Waku Filter V2: Unsubscribe", function () {
   this.timeout(10000);
   let waku: LightNode;
   let nwaku: ServiceNode;
-  let subscription: ISubscriptionSDK;
   let messageCollector: MessageCollector;
 
   beforeEachCustom(this, async () => {
     [nwaku, waku] = await runNodes(this.ctx, TestShardInfo);
 
-    const { error, subscription: _subscription } =
-      await waku.filter.createSubscription(TestShardInfo);
-    if (error) throw error;
-    subscription = _subscription;
     messageCollector = new MessageCollector();
     await nwaku.ensureSubscriptions([TestPubsubTopic]);
   });
@@ -47,7 +41,13 @@ describe("Waku Filter V2: Unsubscribe", function () {
   });
 
   it("Unsubscribe 1 topic - node subscribed to 1 topic", async function () {
-    await subscription.subscribe([TestDecoder], messageCollector.callback);
+    const { subscription, error } = await waku.filter.subscribe(
+      [TestDecoder],
+      messageCollector.callback
+    );
+    if (error) {
+      throw error;
+    }
     await waku.lightPush.send(TestEncoder, messagePayload);
     expect(await messageCollector.waitForMessages(1)).to.eq(true);
 
@@ -68,7 +68,13 @@ describe("Waku Filter V2: Unsubscribe", function () {
 
   it("Unsubscribe 1 topic - node subscribed to 2 topics", async function () {
     // Subscribe to 2 topics and send messages
-    await subscription.subscribe([TestDecoder], messageCollector.callback);
+    const { error, subscription } = await waku.filter.subscribe(
+      [TestDecoder],
+      messageCollector.callback
+    );
+    if (error) {
+      throw error;
+    }
     const newContentTopic = "/test/2/waku-filter";
     const newEncoder = createEncoder({
       contentTopic: newContentTopic,
@@ -93,7 +99,13 @@ describe("Waku Filter V2: Unsubscribe", function () {
 
   it("Unsubscribe 2 topics - node subscribed to 2 topics", async function () {
     // Subscribe to 2 topics and send messages
-    await subscription.subscribe([TestDecoder], messageCollector.callback);
+    const { error, subscription } = await waku.filter.subscribe(
+      [TestDecoder],
+      messageCollector.callback
+    );
+    if (error) {
+      throw error;
+    }
     const newContentTopic = "/test/2/waku-filter/default";
     const newEncoder = createEncoder({
       contentTopic: newContentTopic,
@@ -118,7 +130,13 @@ describe("Waku Filter V2: Unsubscribe", function () {
 
   it("Unsubscribe topics the node is not subscribed to", async function () {
     // Subscribe to 1 topic and send message
-    await subscription.subscribe([TestDecoder], messageCollector.callback);
+    const { error, subscription } = await waku.filter.subscribe(
+      [TestDecoder],
+      messageCollector.callback
+    );
+    if (error) {
+      throw error;
+    }
     await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M1") });
     expect(await messageCollector.waitForMessages(1)).to.eq(true);
 
@@ -136,7 +154,13 @@ describe("Waku Filter V2: Unsubscribe", function () {
   });
 
   it("Unsubscribes all - node subscribed to 1 topic", async function () {
-    await subscription.subscribe([TestDecoder], messageCollector.callback);
+    const { error, subscription } = await waku.filter.subscribe(
+      [TestDecoder],
+      messageCollector.callback
+    );
+    if (error) {
+      throw error;
+    }
     await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M1") });
     expect(await messageCollector.waitForMessages(1)).to.eq(true);
     expect(messageCollector.count).to.eq(1);
@@ -155,7 +179,13 @@ describe("Waku Filter V2: Unsubscribe", function () {
     // Subscribe to 10 topics and send message
     const topicCount = 10;
     const td = generateTestData(topicCount, { pubsubTopic: TestPubsubTopic });
-    await subscription.subscribe(td.decoders, messageCollector.callback);
+    const { error, subscription } = await waku.filter.subscribe(
+      td.decoders,
+      messageCollector.callback
+    );
+    if (error) {
+      throw error;
+    }
     for (let i = 0; i < topicCount; i++) {
       await waku.lightPush.send(td.encoders[i], {
         payload: utf8ToBytes(`M${i + 1}`)
diff --git a/packages/tests/tests/filter/subscribe.node.spec.ts b/packages/tests/tests/filter/subscribe.node.spec.ts
index b350643670..46a0cf9989 100644
--- a/packages/tests/tests/filter/subscribe.node.spec.ts
+++ b/packages/tests/tests/filter/subscribe.node.spec.ts
@@ -1,5 +1,5 @@
 import { createDecoder, createEncoder } from "@waku/core";
-import { ISubscriptionSDK, LightNode } from "@waku/interfaces";
+import { LightNode } from "@waku/interfaces";
 import {
   ecies,
   generatePrivateKey,
@@ -36,7 +36,6 @@ const runTests = (strictCheckNodes: boolean): void => {
     this.timeout(100000);
     let waku: LightNode;
     let serviceNodes: ServiceNodesFleet;
-    let subscription: ISubscriptionSDK;
 
     beforeEachCustom(this, async () => {
       [serviceNodes, waku] = await runMultipleNodes(
@@ -45,12 +44,6 @@ const runTests = (strictCheckNodes: boolean): void => {
         undefined,
         strictCheckNodes
       );
-      const { error, subscription: _subscription } =
-        await waku.filter.createSubscription(TestShardInfo);
-
-      if (!error) {
-        subscription = _subscription;
-      }
     });
 
     afterEachCustom(this, async () => {
@@ -60,7 +53,7 @@ const runTests = (strictCheckNodes: boolean): void => {
     it("Subscribe and receive messages via lightPush", async function () {
       expect(waku.libp2p.getConnections()).has.length(3);
 
-      await subscription.subscribe(
+      await waku.filter.subscribe(
         [TestDecoder],
         serviceNodes.messageCollector.callback
       );
@@ -92,7 +85,7 @@ const runTests = (strictCheckNodes: boolean): void => {
         TestPubsubTopic
       );
 
-      await subscription.subscribe(
+      await waku.filter.subscribe(
         [decoder],
         serviceNodes.messageCollector.callback
       );
@@ -125,7 +118,7 @@ const runTests = (strictCheckNodes: boolean): void => {
         TestPubsubTopic
       );
 
-      await subscription.subscribe(
+      await waku.filter.subscribe(
         [decoder],
         serviceNodes.messageCollector.callback
       );
@@ -146,7 +139,7 @@ const runTests = (strictCheckNodes: boolean): void => {
     });
 
     it("Subscribe and receive messages via waku relay post", async function () {
-      await subscription.subscribe(
+      await waku.filter.subscribe(
         [TestDecoder],
         serviceNodes.messageCollector.callback
       );
@@ -173,7 +166,7 @@ const runTests = (strictCheckNodes: boolean): void => {
     });
 
     it("Subscribe and receive 2 messages on the same topic", async function () {
-      await subscription.subscribe(
+      await waku.filter.subscribe(
         [TestDecoder],
         serviceNodes.messageCollector.callback
       );
@@ -208,7 +201,7 @@ const runTests = (strictCheckNodes: boolean): void => {
 
     it("Subscribe and receive messages on 2 different content topics", async function () {
       // Subscribe to the first content topic and send a message.
-      await subscription.subscribe(
+      await waku.filter.subscribe(
         [TestDecoder],
         serviceNodes.messageCollector.callback
       );
@@ -231,7 +224,7 @@ const runTests = (strictCheckNodes: boolean): void => {
         pubsubTopic: TestPubsubTopic
       });
       const newDecoder = createDecoder(newContentTopic, TestPubsubTopic);
-      await subscription.subscribe(
+      await waku.filter.subscribe(
         [newDecoder],
         serviceNodes.messageCollector.callback
       );
@@ -267,7 +260,7 @@ const runTests = (strictCheckNodes: boolean): void => {
 
       // Subscribe to all 20 topics.
       for (let i = 0; i < topicCount; i++) {
-        await subscription.subscribe(
+        await waku.filter.subscribe(
           [td.decoders[i]],
           serviceNodes.messageCollector.callback
         );
@@ -298,7 +291,7 @@ const runTests = (strictCheckNodes: boolean): void => {
       const topicCount = 100;
       const td = generateTestData(topicCount, { pubsubTopic: TestPubsubTopic });
 
-      await subscription.subscribe(
+      await waku.filter.subscribe(
         td.decoders,
         serviceNodes.messageCollector.callback
       );
@@ -328,10 +321,14 @@ const runTests = (strictCheckNodes: boolean): void => {
       const td = generateTestData(topicCount, { pubsubTopic: TestPubsubTopic });
 
       try {
-        const { failures, successes } = await subscription.subscribe(
+        const { error, results } = await waku.filter.subscribe(
           td.decoders,
           serviceNodes.messageCollector.callback
         );
+        if (error) {
+          throw error;
+        }
+        const { failures, successes } = results;
         if (failures.length === 0 || successes.length > 0) {
           throw new Error(
             `Subscribe to ${topicCount} topics was successful but was expected to fail with a specific error.`
@@ -363,13 +360,13 @@ const runTests = (strictCheckNodes: boolean): void => {
       });
 
       // Subscribe to the first set of topics.
-      await subscription.subscribe(
+      await waku.filter.subscribe(
         td1.decoders,
         serviceNodes.messageCollector.callback
       );
 
       // Subscribe to the second set of topics which has overlapping topics with the first set.
-      await subscription.subscribe(
+      await waku.filter.subscribe(
         td2.decoders,
         serviceNodes.messageCollector.callback
       );
@@ -398,14 +395,14 @@ const runTests = (strictCheckNodes: boolean): void => {
     });
 
     it("Refresh subscription", async function () {
-      await subscription.subscribe(
+      await waku.filter.subscribe(
         [TestDecoder],
         serviceNodes.messageCollector.callback
       );
       await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M1") });
 
       // Resubscribe (refresh) to the same topic and send another message.
-      await subscription.subscribe(
+      await waku.filter.subscribe(
         [TestDecoder],
         serviceNodes.messageCollector.callback
       );
@@ -436,7 +433,7 @@ const runTests = (strictCheckNodes: boolean): void => {
         });
         const newDecoder = createDecoder(newContentTopic, TestPubsubTopic);
 
-        await subscription.subscribe(
+        await waku.filter.subscribe(
           [newDecoder],
           serviceNodes.messageCollector.callback
         );
@@ -454,25 +451,19 @@ const runTests = (strictCheckNodes: boolean): void => {
     });
 
     it("Add multiple subscription objects on single nwaku node", async function () {
-      await subscription.subscribe(
+      await waku.filter.subscribe(
         [TestDecoder],
         serviceNodes.messageCollector.callback
       );
       await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M1") });
 
-      // Create a second subscription on a different topic
-      const { error, subscription: subscription2 } =
-        await waku.filter.createSubscription(TestShardInfo);
-      if (error) {
-        throw error;
-      }
       const newContentTopic = "/test/2/waku-filter/default";
       const newEncoder = createEncoder({
         contentTopic: newContentTopic,
         pubsubTopic: TestPubsubTopic
       });
       const newDecoder = createDecoder(newContentTopic, TestPubsubTopic);
-      await subscription2.subscribe(
+      await waku.filter.subscribe(
         [newDecoder],
         serviceNodes.messageCollector.callback
       );
diff --git a/packages/tests/tests/filter/unsubscribe.node.spec.ts b/packages/tests/tests/filter/unsubscribe.node.spec.ts
index 7f3a74bf11..de644d9cc6 100644
--- a/packages/tests/tests/filter/unsubscribe.node.spec.ts
+++ b/packages/tests/tests/filter/unsubscribe.node.spec.ts
@@ -1,5 +1,5 @@
 import { createDecoder, createEncoder } from "@waku/core";
-import { ISubscriptionSDK, LightNode } from "@waku/interfaces";
+import { type LightNode } from "@waku/interfaces";
 import { utf8ToBytes } from "@waku/sdk";
 import { expect } from "chai";
 
@@ -28,22 +28,12 @@ const runTests = (strictCheckNodes: boolean): void => {
     this.timeout(10000);
     let waku: LightNode;
     let serviceNodes: ServiceNodesFleet;
-    let subscription: ISubscriptionSDK;
 
     beforeEachCustom(this, async () => {
       [serviceNodes, waku] = await runMultipleNodes(this.ctx, {
         contentTopics: [TestContentTopic],
         clusterId: ClusterId
       });
-      const { error, subscription: _subscription } =
-        await waku.filter.createSubscription({
-          contentTopics: [TestContentTopic],
-          clusterId: ClusterId
-        });
-
-      if (!error) {
-        subscription = _subscription;
-      }
     });
 
     afterEachCustom(this, async () => {
@@ -51,10 +41,13 @@ const runTests = (strictCheckNodes: boolean): void => {
     });
 
     it("Unsubscribe 1 topic - node subscribed to 1 topic", async function () {
-      await subscription.subscribe(
+      const { error, subscription } = await waku.filter.subscribe(
         [TestDecoder],
         serviceNodes.messageCollector.callback
       );
+      if (error) {
+        throw error;
+      }
       await waku.lightPush.send(TestEncoder, messagePayload);
       expect(await serviceNodes.messageCollector.waitForMessages(1)).to.eq(
         true
@@ -79,17 +72,20 @@ const runTests = (strictCheckNodes: boolean): void => {
 
     it("Unsubscribe 1 topic - node subscribed to 2 topics", async function () {
       // Subscribe to 2 topics and send messages
-      await subscription.subscribe(
+      const { error, subscription } = await waku.filter.subscribe(
         [TestDecoder],
         serviceNodes.messageCollector.callback
       );
+      if (error) {
+        throw error;
+      }
       const newContentTopic = "/test/2/waku-filter";
       const newEncoder = createEncoder({
         contentTopic: newContentTopic,
         pubsubTopic: TestPubsubTopic
       });
       const newDecoder = createDecoder(newContentTopic, TestPubsubTopic);
-      await subscription.subscribe(
+      await waku.filter.subscribe(
         [newDecoder],
         serviceNodes.messageCollector.callback
       );
@@ -114,7 +110,7 @@ const runTests = (strictCheckNodes: boolean): void => {
 
     it("Unsubscribe 2 topics - node subscribed to 2 topics", async function () {
       // Subscribe to 2 topics and send messages
-      await subscription.subscribe(
+      await waku.filter.subscribe(
         [TestDecoder],
         serviceNodes.messageCollector.callback
       );
@@ -124,10 +120,13 @@ const runTests = (strictCheckNodes: boolean): void => {
         pubsubTopic: TestPubsubTopic
       });
       const newDecoder = createDecoder(newContentTopic, TestPubsubTopic);
-      await subscription.subscribe(
+      const { error, subscription } = await waku.filter.subscribe(
         [newDecoder],
         serviceNodes.messageCollector.callback
       );
+      if (error) {
+        throw error;
+      }
       await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M1") });
       await waku.lightPush.send(newEncoder, { payload: utf8ToBytes("M2") });
       expect(await serviceNodes.messageCollector.waitForMessages(2)).to.eq(
@@ -149,10 +148,13 @@ const runTests = (strictCheckNodes: boolean): void => {
 
     it("Unsubscribe topics the node is not subscribed to", async function () {
       // Subscribe to 1 topic and send message
-      await subscription.subscribe(
+      const { error, subscription } = await waku.filter.subscribe(
         [TestDecoder],
         serviceNodes.messageCollector.callback
       );
+      if (error) {
+        throw error;
+      }
       await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M1") });
       expect(await serviceNodes.messageCollector.waitForMessages(1)).to.eq(
         true
@@ -174,10 +176,13 @@ const runTests = (strictCheckNodes: boolean): void => {
     });
 
     it("Unsubscribes all - node subscribed to 1 topic", async function () {
-      await subscription.subscribe(
+      const { error, subscription } = await waku.filter.subscribe(
         [TestDecoder],
         serviceNodes.messageCollector.callback
       );
+      if (error) {
+        throw error;
+      }
       await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M1") });
       expect(await serviceNodes.messageCollector.waitForMessages(1)).to.eq(
         true
@@ -200,10 +205,13 @@ const runTests = (strictCheckNodes: boolean): void => {
       // Subscribe to 10 topics and send message
       const topicCount = 10;
       const td = generateTestData(topicCount, { pubsubTopic: TestPubsubTopic });
-      await subscription.subscribe(
+      const { error, subscription } = await waku.filter.subscribe(
         td.decoders,
         serviceNodes.messageCollector.callback
       );
+      if (error) {
+        throw error;
+      }
       for (let i = 0; i < topicCount; i++) {
         await waku.lightPush.send(td.encoders[i], {
           payload: utf8ToBytes(`M${i + 1}`)
diff --git a/packages/tests/tests/health-manager/node.spec.ts b/packages/tests/tests/health-manager/node.spec.ts
index 85ef6e3e67..a4356b8fc3 100644
--- a/packages/tests/tests/health-manager/node.spec.ts
+++ b/packages/tests/tests/health-manager/node.spec.ts
@@ -9,7 +9,12 @@ import {
   ServiceNodesFleet
 } from "../../src";
 
-import { messagePayload, TestEncoder, TestShardInfo } from "./utils";
+import {
+  messagePayload,
+  TestDecoder,
+  TestEncoder,
+  TestShardInfo
+} from "./utils";
 
 describe("Node Health Status Matrix Tests", function () {
   let waku: LightNode;
@@ -45,7 +50,7 @@ describe("Node Health Status Matrix Tests", function () {
         }
 
         if (filterPeers > 0) {
-          await waku.filter.createSubscription(TestShardInfo);
+          await waku.filter.subscribe([TestDecoder], () => {});
         }
 
         const lightPushHealth = waku.health.getProtocolStatus(
diff --git a/packages/tests/tests/health-manager/protocols.spec.ts b/packages/tests/tests/health-manager/protocols.spec.ts
index 965753f420..a0db807616 100644
--- a/packages/tests/tests/health-manager/protocols.spec.ts
+++ b/packages/tests/tests/health-manager/protocols.spec.ts
@@ -67,14 +67,12 @@ describe("Health Manager", function () {
           num
         );
 
-        const { error, subscription } =
-          await waku.filter.createSubscription(TestShardInfo);
+        const { error } = await waku.filter.subscribe([TestDecoder], () => {});
+
         if (error) {
           expect(error).to.not.equal(undefined);
         }
 
-        await subscription?.subscribe([TestDecoder], () => {});
-
         const health = waku.health.getProtocolStatus(Protocols.Filter);
         if (!health) {
           expect(health).to.not.equal(undefined);
diff --git a/packages/tests/tests/relay/index.node.spec.ts b/packages/tests/tests/relay/index.node.spec.ts
index f9f4175f21..d9d0860427 100644
--- a/packages/tests/tests/relay/index.node.spec.ts
+++ b/packages/tests/tests/relay/index.node.spec.ts
@@ -67,10 +67,10 @@ describe("Waku Relay", function () {
     const symDecoder = createSymDecoder(symTopic, symKey, TestPubsubTopic);
 
     const msgs: DecodedMessage[] = [];
-    void waku2.relay.subscribe([eciesDecoder], (wakuMsg) => {
+    void waku2.relay.subscribeWithUnsubscribe([eciesDecoder], (wakuMsg) => {
       msgs.push(wakuMsg);
     });
-    void waku2.relay.subscribe([symDecoder], (wakuMsg) => {
+    void waku2.relay.subscribeWithUnsubscribe([symDecoder], (wakuMsg) => {
       msgs.push(wakuMsg);
     });
 
@@ -97,7 +97,7 @@ describe("Waku Relay", function () {
     // The promise **fails** if we receive a message on this observer.
     const receivedMsgPromise: Promise<DecodedMessage> = new Promise(
       (resolve, reject) => {
-        const deleteObserver = waku2.relay.subscribe(
+        const deleteObserver = waku2.relay.subscribeWithUnsubscribe(
           [createDecoder(contentTopic)],
           reject
         ) as () => void;
diff --git a/packages/tests/tests/relay/interop.node.spec.ts b/packages/tests/tests/relay/interop.node.spec.ts
index 09909bb673..ed6128bf52 100644
--- a/packages/tests/tests/relay/interop.node.spec.ts
+++ b/packages/tests/tests/relay/interop.node.spec.ts
@@ -76,8 +76,9 @@ describe("Waku Relay, Interop", function () {
 
     const receivedMsgPromise: Promise<DecodedMessage> = new Promise(
       (resolve) => {
-        void waku.relay.subscribe<DecodedMessage>(TestDecoder, (msg) =>
-          resolve(msg)
+        void waku.relay.subscribeWithUnsubscribe<DecodedMessage>(
+          TestDecoder,
+          (msg) => resolve(msg)
         );
       }
     );
@@ -119,7 +120,7 @@ describe("Waku Relay, Interop", function () {
 
     const waku2ReceivedMsgPromise: Promise<DecodedMessage> = new Promise(
       (resolve) => {
-        void waku2.relay.subscribe(TestDecoder, resolve);
+        void waku2.relay.subscribeWithUnsubscribe(TestDecoder, resolve);
       }
     );
 
diff --git a/packages/tests/tests/relay/multiple_pubsub.node.spec.ts b/packages/tests/tests/relay/multiple_pubsub.node.spec.ts
index b6b4d8a4a3..1ca29a2411 100644
--- a/packages/tests/tests/relay/multiple_pubsub.node.spec.ts
+++ b/packages/tests/tests/relay/multiple_pubsub.node.spec.ts
@@ -124,9 +124,18 @@ describe("Waku Relay, multiple pubsub topics", function () {
         waitForRemotePeer(waku3, [Protocols.Relay])
       ]);
 
-      await waku1.relay.subscribe([testItem.decoder], msgCollector1.callback);
-      await waku2.relay.subscribe([testItem.decoder], msgCollector2.callback);
-      await waku3.relay.subscribe([testItem.decoder], msgCollector3.callback);
+      await waku1.relay.subscribeWithUnsubscribe(
+        [testItem.decoder],
+        msgCollector1.callback
+      );
+      await waku2.relay.subscribeWithUnsubscribe(
+        [testItem.decoder],
+        msgCollector2.callback
+      );
+      await waku3.relay.subscribeWithUnsubscribe(
+        [testItem.decoder],
+        msgCollector3.callback
+      );
 
       // The nodes are setup in such a way that all messages send should be relayed to the other nodes in the network
       const relayResponse1 = await waku1.relay.send(testItem.encoder, {
@@ -222,15 +231,18 @@ describe("Waku Relay, multiple pubsub topics", function () {
       waitForRemotePeer(waku3, [Protocols.Relay])
     ]);
 
-    await waku1.relay.subscribe(
+    await waku1.relay.subscribeWithUnsubscribe(
       [customDecoder1, customDecoder2],
       msgCollector1.callback
     );
-    await waku2.relay.subscribe(
+    await waku2.relay.subscribeWithUnsubscribe(
       [customDecoder1, customDecoder2],
       msgCollector2.callback
     );
-    await waku3.relay.subscribe([customDecoder1], msgCollector3.callback);
+    await waku3.relay.subscribeWithUnsubscribe(
+      [customDecoder1],
+      msgCollector3.callback
+    );
 
     // The nodes are setup in such a way that all messages send should be relayed to the other nodes in the network
     // However onlt waku1 and waku2 are receiving messages on the CustomPubSubTopic
@@ -290,7 +302,7 @@ describe("Waku Relay, multiple pubsub topics", function () {
 
     const waku2ReceivedMsgPromise: Promise<DecodedMessage> = new Promise(
       (resolve) => {
-        void waku2.relay.subscribe([customDecoder1], resolve);
+        void waku2.relay.subscribeWithUnsubscribe([customDecoder1], resolve);
       }
     );
 
@@ -298,7 +310,7 @@ describe("Waku Relay, multiple pubsub topics", function () {
     // pubsub topic.
     const waku3NoMsgPromise: Promise<DecodedMessage> = new Promise(
       (resolve, reject) => {
-        void waku3.relay.subscribe([TestDecoder], reject);
+        void waku3.relay.subscribeWithUnsubscribe([TestDecoder], reject);
         setTimeout(resolve, 1000);
       }
     );
@@ -417,9 +429,18 @@ describe("Waku Relay (Autosharding), multiple pubsub topics", function () {
         waitForRemotePeer(waku3, [Protocols.Relay])
       ]);
 
-      await waku1.relay.subscribe([testItem.decoder], msgCollector1.callback);
-      await waku2.relay.subscribe([testItem.decoder], msgCollector2.callback);
-      await waku3.relay.subscribe([testItem.decoder], msgCollector3.callback);
+      await waku1.relay.subscribeWithUnsubscribe(
+        [testItem.decoder],
+        msgCollector1.callback
+      );
+      await waku2.relay.subscribeWithUnsubscribe(
+        [testItem.decoder],
+        msgCollector2.callback
+      );
+      await waku3.relay.subscribeWithUnsubscribe(
+        [testItem.decoder],
+        msgCollector3.callback
+      );
 
       // The nodes are setup in such a way that all messages send should be relayed to the other nodes in the network
       const relayResponse1 = await waku1.relay.send(testItem.encoder, {
@@ -524,15 +545,18 @@ describe("Waku Relay (Autosharding), multiple pubsub topics", function () {
       waitForRemotePeer(waku3, [Protocols.Relay])
     ]);
 
-    await waku1.relay.subscribe(
+    await waku1.relay.subscribeWithUnsubscribe(
       [customDecoder1, customDecoder2],
       msgCollector1.callback
     );
-    await waku2.relay.subscribe(
+    await waku2.relay.subscribeWithUnsubscribe(
       [customDecoder1, customDecoder2],
       msgCollector2.callback
     );
-    await waku3.relay.subscribe([customDecoder1], msgCollector3.callback);
+    await waku3.relay.subscribeWithUnsubscribe(
+      [customDecoder1],
+      msgCollector3.callback
+    );
 
     // The nodes are setup in such a way that all messages send should be relayed to the other nodes in the network
     // However onlt waku1 and waku2 are receiving messages on the CustomPubSubTopic
@@ -619,7 +643,7 @@ describe("Waku Relay (Autosharding), multiple pubsub topics", function () {
 
     const waku2ReceivedMsgPromise: Promise<DecodedMessage> = new Promise(
       (resolve) => {
-        void waku2.relay.subscribe([customDecoder1], resolve);
+        void waku2.relay.subscribeWithUnsubscribe([customDecoder1], resolve);
       }
     );
 
@@ -627,7 +651,7 @@ describe("Waku Relay (Autosharding), multiple pubsub topics", function () {
     // pubsub topic.
     const waku3NoMsgPromise: Promise<DecodedMessage> = new Promise(
       (resolve, reject) => {
-        void waku3.relay.subscribe([TestDecoder], reject);
+        void waku3.relay.subscribeWithUnsubscribe([TestDecoder], reject);
         setTimeout(resolve, 1000);
       }
     );
@@ -725,9 +749,18 @@ describe("Waku Relay (named sharding), multiple pubsub topics", function () {
         waitForRemotePeer(waku3, [Protocols.Relay])
       ]);
 
-      await waku1.relay.subscribe([testItem.decoder], msgCollector1.callback);
-      await waku2.relay.subscribe([testItem.decoder], msgCollector2.callback);
-      await waku3.relay.subscribe([testItem.decoder], msgCollector3.callback);
+      await waku1.relay.subscribeWithUnsubscribe(
+        [testItem.decoder],
+        msgCollector1.callback
+      );
+      await waku2.relay.subscribeWithUnsubscribe(
+        [testItem.decoder],
+        msgCollector2.callback
+      );
+      await waku3.relay.subscribeWithUnsubscribe(
+        [testItem.decoder],
+        msgCollector3.callback
+      );
 
       // The nodes are setup in such a way that all messages send should be relayed to the other nodes in the network
       const relayResponse1 = await waku1.relay.send(testItem.encoder, {
@@ -823,15 +856,18 @@ describe("Waku Relay (named sharding), multiple pubsub topics", function () {
       waitForRemotePeer(waku3, [Protocols.Relay])
     ]);
 
-    await waku1.relay.subscribe(
+    await waku1.relay.subscribeWithUnsubscribe(
       [customDecoder1, customDecoder2],
       msgCollector1.callback
     );
-    await waku2.relay.subscribe(
+    await waku2.relay.subscribeWithUnsubscribe(
       [customDecoder1, customDecoder2],
       msgCollector2.callback
     );
-    await waku3.relay.subscribe([customDecoder1], msgCollector3.callback);
+    await waku3.relay.subscribeWithUnsubscribe(
+      [customDecoder1],
+      msgCollector3.callback
+    );
 
     // The nodes are setup in such a way that all messages send should be relayed to the other nodes in the network
     // However onlt waku1 and waku2 are receiving messages on the CustomPubSubTopic
@@ -891,7 +927,7 @@ describe("Waku Relay (named sharding), multiple pubsub topics", function () {
 
     const waku2ReceivedMsgPromise: Promise<DecodedMessage> = new Promise(
       (resolve) => {
-        void waku2.relay.subscribe([customDecoder1], resolve);
+        void waku2.relay.subscribeWithUnsubscribe([customDecoder1], resolve);
       }
     );
 
@@ -899,7 +935,7 @@ describe("Waku Relay (named sharding), multiple pubsub topics", function () {
     // pubsub topic.
     const waku3NoMsgPromise: Promise<DecodedMessage> = new Promise(
       (resolve, reject) => {
-        void waku3.relay.subscribe([TestDecoder], reject);
+        void waku3.relay.subscribeWithUnsubscribe([TestDecoder], reject);
         setTimeout(resolve, 1000);
       }
     );
diff --git a/packages/tests/tests/relay/publish.node.spec.ts b/packages/tests/tests/relay/publish.node.spec.ts
index f93c4ea609..f7fc8be951 100644
--- a/packages/tests/tests/relay/publish.node.spec.ts
+++ b/packages/tests/tests/relay/publish.node.spec.ts
@@ -35,7 +35,10 @@ describe("Waku Relay, Publish", function () {
   beforeEachCustom(this, async () => {
     [waku1, waku2] = await runJSNodes();
     messageCollector = new MessageCollector();
-    await waku2.relay.subscribe([TestDecoder], messageCollector.callback);
+    await waku2.relay.subscribeWithUnsubscribe(
+      [TestDecoder],
+      messageCollector.callback
+    );
   });
 
   afterEachCustom(this, async () => {
diff --git a/packages/tests/tests/relay/subscribe.node.spec.ts b/packages/tests/tests/relay/subscribe.node.spec.ts
index b234acd53b..a147c4ad68 100644
--- a/packages/tests/tests/relay/subscribe.node.spec.ts
+++ b/packages/tests/tests/relay/subscribe.node.spec.ts
@@ -85,7 +85,10 @@ describe("Waku Relay, Subscribe", function () {
   });
 
   it("Subscribe and publish message", async function () {
-    await waku2.relay.subscribe([TestDecoder], messageCollector.callback);
+    await waku2.relay.subscribeWithUnsubscribe(
+      [TestDecoder],
+      messageCollector.callback
+    );
     await waku1.relay.send(TestEncoder, { payload: utf8ToBytes(messageText) });
     expect(
       await messageCollector.waitForMessages(1, TestWaitMessageOptions)
@@ -98,7 +101,10 @@ describe("Waku Relay, Subscribe", function () {
 
   it("Subscribe and publish 10000 messages on the same topic", async function () {
     const messageCount = 10000;
-    await waku2.relay.subscribe([TestDecoder], messageCollector.callback);
+    await waku2.relay.subscribeWithUnsubscribe(
+      [TestDecoder],
+      messageCollector.callback
+    );
     // Send a unique message on each topic.
     for (let i = 0; i < messageCount; i++) {
       await waku1.relay.send(TestEncoder, {
@@ -131,8 +137,14 @@ describe("Waku Relay, Subscribe", function () {
     });
     const secondDecoder = createDecoder(secondContentTopic, TestPubsubTopic);
 
-    await waku2.relay.subscribe([TestDecoder], messageCollector.callback);
-    await waku2.relay.subscribe([secondDecoder], messageCollector.callback);
+    await waku2.relay.subscribeWithUnsubscribe(
+      [TestDecoder],
+      messageCollector.callback
+    );
+    await waku2.relay.subscribeWithUnsubscribe(
+      [secondDecoder],
+      messageCollector.callback
+    );
     await waku1.relay.send(TestEncoder, { payload: utf8ToBytes("M1") });
     await waku1.relay.send(secondEncoder, { payload: utf8ToBytes("M2") });
     expect(
@@ -158,7 +170,10 @@ describe("Waku Relay, Subscribe", function () {
 
     // Subscribe to topics one by one
     for (let i = 0; i < topicCount; i++) {
-      await waku2.relay.subscribe([td.decoders[i]], messageCollector.callback);
+      await waku2.relay.subscribeWithUnsubscribe(
+        [td.decoders[i]],
+        messageCollector.callback
+      );
     }
 
     // Send a unique message on each topic.
@@ -189,7 +204,10 @@ describe("Waku Relay, Subscribe", function () {
     const td = generateTestData(topicCount, TestWaitMessageOptions);
 
     // Subscribe to all topics at once
-    await waku2.relay.subscribe(td.decoders, messageCollector.callback);
+    await waku2.relay.subscribeWithUnsubscribe(
+      td.decoders,
+      messageCollector.callback
+    );
 
     // Send a unique message on each topic.
     for (let i = 0; i < topicCount; i++) {
@@ -217,8 +235,14 @@ describe("Waku Relay, Subscribe", function () {
 
   // Will be skipped until https://github.com/waku-org/js-waku/issues/1678 is fixed
   it.skip("Refresh subscription", async function () {
-    await waku2.relay.subscribe([TestDecoder], messageCollector.callback);
-    await waku2.relay.subscribe([TestDecoder], messageCollector.callback);
+    await waku2.relay.subscribeWithUnsubscribe(
+      [TestDecoder],
+      messageCollector.callback
+    );
+    await waku2.relay.subscribeWithUnsubscribe(
+      [TestDecoder],
+      messageCollector.callback
+    );
 
     await waku1.relay.send(TestEncoder, { payload: utf8ToBytes("M1") });
 
@@ -239,9 +263,15 @@ describe("Waku Relay, Subscribe", function () {
     const td2 = generateTestData(topicCount2, TestWaitMessageOptions);
 
     // Subscribe to the first set of topics.
-    await waku2.relay.subscribe(td1.decoders, messageCollector.callback);
+    await waku2.relay.subscribeWithUnsubscribe(
+      td1.decoders,
+      messageCollector.callback
+    );
     // Subscribe to the second set of topics which has overlapping topics with the first set.
-    await waku2.relay.subscribe(td2.decoders, messageCollector.callback);
+    await waku2.relay.subscribeWithUnsubscribe(
+      td2.decoders,
+      messageCollector.callback
+    );
 
     // Send messages to the first set of topics.
     for (let i = 0; i < topicCount1; i++) {
@@ -278,7 +308,10 @@ describe("Waku Relay, Subscribe", function () {
       });
       const newDecoder = createDecoder(newContentTopic, TestPubsubTopic);
 
-      await waku2.relay.subscribe([newDecoder], messageCollector.callback);
+      await waku2.relay.subscribeWithUnsubscribe(
+        [newDecoder],
+        messageCollector.callback
+      );
       await waku1.relay.send(newEncoder, {
         payload: utf8ToBytes(messageText)
       });
diff --git a/packages/tests/tests/waku.node.spec.ts b/packages/tests/tests/waku.node.spec.ts
index 85d56a6258..e2c43c33e9 100644
--- a/packages/tests/tests/waku.node.spec.ts
+++ b/packages/tests/tests/waku.node.spec.ts
@@ -223,7 +223,7 @@ describe("Decryption Keys", function () {
 
     const receivedMsgPromise: Promise<DecodedMessage> = new Promise(
       (resolve) => {
-        void waku2.relay.subscribe([decoder], resolve);
+        void waku2.relay.subscribeWithUnsubscribe([decoder], resolve);
       }
     );
 

From 6e735815368fe631d035a45a8176821a6571db77 Mon Sep 17 00:00:00 2001
From: danisharora099 <danisharora099@gmail.com>
Date: Tue, 30 Jul 2024 17:09:25 +0530
Subject: [PATCH 7/7] fix: tests

---
 packages/tests/tests/filter/peer_management.spec.ts       | 2 ++
 packages/tests/tests/filter/ping.node.spec.ts             | 1 +
 packages/tests/tests/filter/single_node/ping.node.spec.ts | 1 +
 3 files changed, 4 insertions(+)

diff --git a/packages/tests/tests/filter/peer_management.spec.ts b/packages/tests/tests/filter/peer_management.spec.ts
index 5b3639b820..dee9aa33d9 100644
--- a/packages/tests/tests/filter/peer_management.spec.ts
+++ b/packages/tests/tests/filter/peer_management.spec.ts
@@ -97,6 +97,7 @@ describe("Waku Filter: Peer Management: E2E", function () {
     if (error) {
       throw error;
     }
+    await subscription.unsubscribe([contentTopic]);
     const pingResult = await subscription.ping();
     expect(pingResult.successes.length).to.equal(0);
     expect(pingResult.failures.length).to.be.greaterThan(0);
@@ -209,6 +210,7 @@ describe("Waku Filter: Peer Management: E2E", function () {
       await subscription.unsubscribe([contentTopic]);
       pingResult = await subscription.ping();
       expect(pingResult.failures.length).to.be.greaterThan(0);
+      await subscription.subscribe([decoder], () => {});
     }
 
     const finalPingResult = await subscription!.ping();
diff --git a/packages/tests/tests/filter/ping.node.spec.ts b/packages/tests/tests/filter/ping.node.spec.ts
index e661e05f33..dcc4c2740b 100644
--- a/packages/tests/tests/filter/ping.node.spec.ts
+++ b/packages/tests/tests/filter/ping.node.spec.ts
@@ -65,6 +65,7 @@ const runTests = (strictCheckNodes: boolean): void => {
       if (error) {
         throw error;
       }
+      await subscription.unsubscribe([TestContentTopic]);
       await validatePingError(subscription);
     });
 
diff --git a/packages/tests/tests/filter/single_node/ping.node.spec.ts b/packages/tests/tests/filter/single_node/ping.node.spec.ts
index 914cce8150..f26395c221 100644
--- a/packages/tests/tests/filter/single_node/ping.node.spec.ts
+++ b/packages/tests/tests/filter/single_node/ping.node.spec.ts
@@ -63,6 +63,7 @@ describe("Waku Filter V2: Ping", function () {
     if (error) {
       throw error;
     }
+    await subscription.unsubscribe([TestContentTopic]);
     await validatePingError(subscription);
   });