From 319ba7795d4521a248537f2ac7ce29b73ba5bdf1 Mon Sep 17 00:00:00 2001 From: Julien Vincent Date: Fri, 17 Dec 2021 12:02:01 +0100 Subject: [PATCH 1/4] feature: expose heartbeat function in eachMessage handler --- src/consumer/runner.js | 9 ++++++++- types/index.d.ts | 8 ++++++-- 2 files changed, 14 insertions(+), 3 deletions(-) diff --git a/src/consumer/runner.js b/src/consumer/runner.js index c57d8709f..112984128 100644 --- a/src/consumer/runner.js +++ b/src/consumer/runner.js @@ -148,7 +148,14 @@ module.exports = class Runner extends EventEmitter { } try { - await this.eachMessage({ topic, partition, message }) + await this.eachMessage({ + topic, + partition, + message, + heartbeat: async () => { + await this.consumerGroup.heartbeat({ interval: this.heartbeatInterval }) + }, + }) } catch (e) { if (!isKafkaJSError(e)) { this.logger.error(`Error when calling eachMessage`, { diff --git a/types/index.d.ts b/types/index.d.ts index 92d45d9d3..0218ec122 100644 --- a/types/index.d.ts +++ b/types/index.d.ts @@ -858,6 +858,7 @@ export interface EachMessagePayload { topic: string partition: number message: KafkaMessage + heartbeat(): Promise } export interface EachBatchPayload { @@ -882,14 +883,17 @@ export type ConsumerEachMessagePayload = EachMessagePayload */ export type ConsumerEachBatchPayload = EachBatchPayload +export type EachBatchHandler = (payload: EachBatchPayload) => Promise +export type EachMessageHandler = (payload: EachMessagePayload) => Promise + export type ConsumerRunConfig = { autoCommit?: boolean autoCommitInterval?: number | null autoCommitThreshold?: number | null eachBatchAutoResolve?: boolean partitionsConsumedConcurrently?: number - eachBatch?: (payload: EachBatchPayload) => Promise - eachMessage?: (payload: EachMessagePayload) => Promise + eachBatch?: EachBatchHandler + eachMessage?: EachMessageHandler } export type ConsumerSubscribeTopic = { topic: string | RegExp; fromBeginning?: boolean } From 75b247084ff7d10773139c9a8556411603946bdd Mon Sep 17 00:00:00 2001 From: Julien Vincent Date: Fri, 17 Dec 2021 14:30:54 +0100 Subject: [PATCH 2/4] fix: got existing consumer tests to pass --- .../__tests__/consumeMessages.spec.js | 224 ++++++++++-------- src/consumer/__tests__/errorRecovery.spec.js | 4 +- src/consumer/__tests__/pause.spec.js | 68 +++--- src/consumer/__tests__/seek.spec.js | 28 +-- 4 files changed, 174 insertions(+), 150 deletions(-) diff --git a/src/consumer/__tests__/consumeMessages.spec.js b/src/consumer/__tests__/consumeMessages.spec.js index f72e13fbd..942d6918d 100644 --- a/src/consumer/__tests__/consumeMessages.spec.js +++ b/src/consumer/__tests__/consumeMessages.spec.js @@ -79,25 +79,29 @@ describe('Consumer', () => { expect(cluster.refreshMetadataIfNecessary).toHaveBeenCalled() - expect(messagesConsumed[0]).toEqual({ - topic: topicName, - partition: 0, - message: expect.objectContaining({ - key: Buffer.from(messages[0].key), - value: Buffer.from(messages[0].value), - offset: '0', - }), - }) + expect(messagesConsumed[0]).toEqual( + expect.objectContaining({ + topic: topicName, + partition: 0, + message: expect.objectContaining({ + key: Buffer.from(messages[0].key), + value: Buffer.from(messages[0].value), + offset: '0', + }), + }) + ) - expect(messagesConsumed[messagesConsumed.length - 1]).toEqual({ - topic: topicName, - partition: 0, - message: expect.objectContaining({ - key: Buffer.from(messages[messages.length - 1].key), - value: Buffer.from(messages[messages.length - 1].value), - offset: '99', - }), - }) + expect(messagesConsumed[messagesConsumed.length - 1]).toEqual( + expect.objectContaining({ + topic: topicName, + partition: 0, + message: expect.objectContaining({ + key: Buffer.from(messages[messages.length - 1].key), + value: Buffer.from(messages[messages.length - 1].value), + offset: '99', + }), + }) + ) // check if all offsets are present expect(messagesConsumed.map(m => m.message.offset)).toEqual(messages.map((_, i) => `${i}`)) @@ -227,7 +231,7 @@ describe('Consumer', () => { }) await expect(waitForMessages(messagesConsumed, { number: 2 })).resolves.toEqual([ - { + expect.objectContaining({ topic: topicName, partition: 0, message: expect.objectContaining({ @@ -235,8 +239,8 @@ describe('Consumer', () => { value: Buffer.from(message1.value), offset: '0', }), - }, - { + }), + expect.objectContaining({ topic: topicName, partition: 0, message: expect.objectContaining({ @@ -244,7 +248,7 @@ describe('Consumer', () => { value: Buffer.from(message2.value), offset: '1', }), - }, + }), ]) }) @@ -335,25 +339,29 @@ describe('Consumer', () => { expect(cluster.refreshMetadataIfNecessary).toHaveBeenCalled() - expect(messagesConsumed[0]).toEqual({ - topic: topicName, - partition: 0, - message: expect.objectContaining({ - key: Buffer.from(messages[0].key), - value: Buffer.from(messages[0].value), - offset: '0', - }), - }) + expect(messagesConsumed[0]).toEqual( + expect.objectContaining({ + topic: topicName, + partition: 0, + message: expect.objectContaining({ + key: Buffer.from(messages[0].key), + value: Buffer.from(messages[0].value), + offset: '0', + }), + }) + ) - expect(messagesConsumed[messagesConsumed.length - 1]).toEqual({ - topic: topicName, - partition: 0, - message: expect.objectContaining({ - key: Buffer.from(messages[messages.length - 1].key), - value: Buffer.from(messages[messages.length - 1].value), - offset: '99', - }), - }) + expect(messagesConsumed[messagesConsumed.length - 1]).toEqual( + expect.objectContaining({ + topic: topicName, + partition: 0, + message: expect.objectContaining({ + key: Buffer.from(messages[messages.length - 1].key), + value: Buffer.from(messages[messages.length - 1].value), + offset: '99', + }), + }) + ) // check if all offsets are present expect(messagesConsumed.map(m => m.message.offset)).toEqual(messages.map((_, i) => `${i}`)) @@ -418,71 +426,79 @@ describe('Consumer', () => { const messagesFromTopic1 = messagesConsumed.filter(m => m.topic === topicName) const messagesFromTopic2 = messagesConsumed.filter(m => m.topic === topicName2) - expect(messagesFromTopic1[0]).toEqual({ - topic: topicName, - partition: 0, - message: expect.objectContaining({ - key: Buffer.from(messages1[0].key), - value: Buffer.from(messages1[0].value), - headers: { - 'header-keyA': Buffer.from(messages1[0].headers['header-keyA']), - 'header-keyB': Buffer.from(messages1[0].headers['header-keyB']), - 'header-keyC': Buffer.from(messages1[0].headers['header-keyC']), - }, - magicByte: 2, - offset: '0', - }), - }) + expect(messagesFromTopic1[0]).toEqual( + expect.objectContaining({ + topic: topicName, + partition: 0, + message: expect.objectContaining({ + key: Buffer.from(messages1[0].key), + value: Buffer.from(messages1[0].value), + headers: { + 'header-keyA': Buffer.from(messages1[0].headers['header-keyA']), + 'header-keyB': Buffer.from(messages1[0].headers['header-keyB']), + 'header-keyC': Buffer.from(messages1[0].headers['header-keyC']), + }, + magicByte: 2, + offset: '0', + }), + }) + ) const lastMessage1 = messages1[messages1.length - 1] - expect(messagesFromTopic1[messagesFromTopic1.length - 1]).toEqual({ - topic: topicName, - partition: 0, - message: expect.objectContaining({ - key: Buffer.from(lastMessage1.key), - value: Buffer.from(lastMessage1.value), - headers: { - 'header-keyA': Buffer.from(lastMessage1.headers['header-keyA']), - 'header-keyB': Buffer.from(lastMessage1.headers['header-keyB']), - 'header-keyC': Buffer.from(lastMessage1.headers['header-keyC']), - }, - magicByte: 2, - offset: '102', - }), - }) + expect(messagesFromTopic1[messagesFromTopic1.length - 1]).toEqual( + expect.objectContaining({ + topic: topicName, + partition: 0, + message: expect.objectContaining({ + key: Buffer.from(lastMessage1.key), + value: Buffer.from(lastMessage1.value), + headers: { + 'header-keyA': Buffer.from(lastMessage1.headers['header-keyA']), + 'header-keyB': Buffer.from(lastMessage1.headers['header-keyB']), + 'header-keyC': Buffer.from(lastMessage1.headers['header-keyC']), + }, + magicByte: 2, + offset: '102', + }), + }) + ) - expect(messagesFromTopic2[0]).toEqual({ - topic: topicName2, - partition: 0, - message: expect.objectContaining({ - key: Buffer.from(messages2[0].key), - value: Buffer.from(messages2[0].value), - headers: { - 'header-keyA': Buffer.from(messages2[0].headers['header-keyA']), - 'header-keyB': Buffer.from(messages2[0].headers['header-keyB']), - 'header-keyC': Buffer.from(messages2[0].headers['header-keyC']), - }, - magicByte: 2, - offset: '0', - }), - }) + expect(messagesFromTopic2[0]).toEqual( + expect.objectContaining({ + topic: topicName2, + partition: 0, + message: expect.objectContaining({ + key: Buffer.from(messages2[0].key), + value: Buffer.from(messages2[0].value), + headers: { + 'header-keyA': Buffer.from(messages2[0].headers['header-keyA']), + 'header-keyB': Buffer.from(messages2[0].headers['header-keyB']), + 'header-keyC': Buffer.from(messages2[0].headers['header-keyC']), + }, + magicByte: 2, + offset: '0', + }), + }) + ) const lastMessage2 = messages2[messages2.length - 1] - expect(messagesFromTopic2[messagesFromTopic2.length - 1]).toEqual({ - topic: topicName2, - partition: 0, - message: expect.objectContaining({ - key: Buffer.from(lastMessage2.key), - value: Buffer.from(lastMessage2.value), - headers: { - 'header-keyA': Buffer.from(lastMessage2.headers['header-keyA']), - 'header-keyB': Buffer.from(lastMessage2.headers['header-keyB']), - 'header-keyC': Buffer.from(lastMessage2.headers['header-keyC']), - }, - magicByte: 2, - offset: '102', - }), - }) + expect(messagesFromTopic2[messagesFromTopic2.length - 1]).toEqual( + expect.objectContaining({ + topic: topicName2, + partition: 0, + message: expect.objectContaining({ + key: Buffer.from(lastMessage2.key), + value: Buffer.from(lastMessage2.value), + headers: { + 'header-keyA': Buffer.from(lastMessage2.headers['header-keyA']), + 'header-keyB': Buffer.from(lastMessage2.headers['header-keyB']), + 'header-keyC': Buffer.from(lastMessage2.headers['header-keyC']), + }, + magicByte: 2, + offset: '102', + }), + }) + ) // check if all offsets are present expect(messagesFromTopic1.map(m => m.message.offset)).toEqual(messages1.map((_, i) => `${i}`)) @@ -533,7 +549,7 @@ describe('Consumer', () => { }) await expect(waitForMessages(messagesConsumed, { number: 2 })).resolves.toEqual([ - { + expect.objectContaining({ topic: topicName, partition: 0, message: expect.objectContaining({ @@ -545,8 +561,8 @@ describe('Consumer', () => { magicByte: 2, offset: '0', }), - }, - { + }), + expect.objectContaining({ topic: topicName, partition: 0, message: expect.objectContaining({ @@ -558,7 +574,7 @@ describe('Consumer', () => { magicByte: 2, offset: '1', }), - }, + }), ]) }) diff --git a/src/consumer/__tests__/errorRecovery.spec.js b/src/consumer/__tests__/errorRecovery.spec.js index 52a58f01c..ba3db717a 100644 --- a/src/consumer/__tests__/errorRecovery.spec.js +++ b/src/consumer/__tests__/errorRecovery.spec.js @@ -102,7 +102,7 @@ describe('Consumer', () => { await waitForConsumerToJoinGroup(consumer) await expect(waitForMessages(messagesConsumed)).resolves.toEqual([ - { + expect.objectContaining({ topic: topicName, partition: 0, message: expect.objectContaining({ @@ -110,7 +110,7 @@ describe('Consumer', () => { value: Buffer.from(message1.value), offset: '0', }), - }, + }), ]) }) diff --git a/src/consumer/__tests__/pause.spec.js b/src/consumer/__tests__/pause.spec.js index 94d235869..f2b114f70 100644 --- a/src/consumer/__tests__/pause.spec.js +++ b/src/consumer/__tests__/pause.spec.js @@ -95,27 +95,27 @@ describe('Consumer', () => { const consumedMessages = await waitForMessages(messagesConsumed, { number: 3 }) expect(consumedMessages.filter(({ topic }) => topic === pausedTopic)).toEqual([ - { + expect.objectContaining({ topic: pausedTopic, partition: expect.any(Number), message: expect.objectContaining({ offset: '0' }), - }, + }), ]) const byPartition = (a, b) => a.partition - b.partition expect( consumedMessages.filter(({ topic }) => topic === activeTopic).sort(byPartition) ).toEqual([ - { + expect.objectContaining({ topic: activeTopic, partition: 0, message: expect.objectContaining({ offset: '0' }), - }, - { + }), + expect.objectContaining({ topic: activeTopic, partition: 1, message: expect.objectContaining({ offset: '0' }), - }, + }), ]) expect(consumer.paused()).toEqual([ @@ -165,19 +165,23 @@ describe('Consumer', () => { }) expect(consumedMessages.filter(({ partition }) => partition === pausedPartition)).toEqual( - messages.map((message, i) => ({ - topic, - partition: pausedPartition, - message: expect.objectContaining({ offset: `${i}` }), - })) + messages.map((message, i) => + expect.objectContaining({ + topic, + partition: pausedPartition, + message: expect.objectContaining({ offset: `${i}` }), + }) + ) ) expect(consumedMessages.filter(({ partition }) => partition !== pausedPartition)).toEqual( - messages.concat(messages).map((message, i) => ({ - topic, - partition: activePartition, - message: expect.objectContaining({ offset: `${i}` }), - })) + messages.concat(messages).map((message, i) => + expect.objectContaining({ + topic, + partition: activePartition, + message: expect.objectContaining({ offset: `${i}` }), + }) + ) ) expect(consumer.paused()).toEqual([ @@ -266,16 +270,16 @@ describe('Consumer', () => { consumer.resume([{ topic: pausedTopic }]) await expect(waitForMessages(messagesConsumed, { number: 2 })).resolves.toEqual([ - { + expect.objectContaining({ topic: activeTopic, partition: 0, message: expect.objectContaining({ offset: '0' }), - }, - { + }), + expect.objectContaining({ topic: pausedTopic, partition: 0, message: expect.objectContaining({ offset: '0' }), - }, + }), ]) expect(consumer.paused()).toEqual([]) @@ -325,19 +329,23 @@ describe('Consumer', () => { }) expect(consumedMessages.filter(({ partition }) => partition === pausedPartition)).toEqual( - messages.concat(messages).map((message, i) => ({ - topic, - partition: pausedPartition, - message: expect.objectContaining({ offset: `${i}` }), - })) + messages.concat(messages).map((message, i) => + expect.objectContaining({ + topic, + partition: pausedPartition, + message: expect.objectContaining({ offset: `${i}` }), + }) + ) ) expect(consumedMessages.filter(({ partition }) => partition !== pausedPartition)).toEqual( - messages.concat(messages).map((message, i) => ({ - topic, - partition: activePartition, - message: expect.objectContaining({ offset: `${i}` }), - })) + messages.concat(messages).map((message, i) => + expect.objectContaining({ + topic, + partition: activePartition, + message: expect.objectContaining({ offset: `${i}` }), + }) + ) ) expect(consumer.paused()).toEqual([]) diff --git a/src/consumer/__tests__/seek.spec.js b/src/consumer/__tests__/seek.spec.js index 25c619f9d..5fb067e86 100644 --- a/src/consumer/__tests__/seek.spec.js +++ b/src/consumer/__tests__/seek.spec.js @@ -98,16 +98,16 @@ describe('Consumer', () => { await waitForConsumerToJoinGroup(consumer) await expect(waitForMessages(messagesConsumed, { number: 2 })).resolves.toEqual([ - { + expect.objectContaining({ topic: topicName, partition: 0, message: expect.objectContaining({ offset: '1' }), - }, - { + }), + expect.objectContaining({ topic: topicName, partition: 0, message: expect.objectContaining({ offset: '2' }), - }, + }), ]) }) @@ -133,11 +133,11 @@ describe('Consumer', () => { await waitForConsumerToJoinGroup(consumer) await expect(waitForMessages(messagesConsumed, { number: 1 })).resolves.toEqual([ - { + expect.objectContaining({ topic: topicName, partition: 0, message: expect.objectContaining({ offset: '2' }), - }, + }), ]) }) @@ -157,11 +157,11 @@ describe('Consumer', () => { await waitForConsumerToJoinGroup(consumer) await expect(waitForMessages(messagesConsumed, { number: 1 })).resolves.toEqual([ - { + expect.objectContaining({ topic: topicName, partition: 0, message: expect.objectContaining({ offset: '0' }), - }, + }), ]) }) @@ -195,11 +195,11 @@ describe('Consumer', () => { await waitForConsumerToJoinGroup(consumer) await expect(waitForMessages(messagesConsumed, { number: 1 })).resolves.toEqual([ - { + expect.objectContaining({ topic: topicName, partition: 0, message: expect.objectContaining({ offset: '2' }), - }, + }), ]) await expect(admin.fetchOffsets({ groupId, topic: topicName })).resolves.toEqual([ @@ -213,16 +213,16 @@ describe('Consumer', () => { consumer.seek({ topic: topicName, partition: 0, offset: 1 }) await expect(waitForMessages(messagesConsumed, { number: 2 })).resolves.toEqual([ - { + expect.objectContaining({ topic: topicName, partition: 0, message: expect.objectContaining({ offset: '1' }), - }, - { + }), + expect.objectContaining({ topic: topicName, partition: 0, message: expect.objectContaining({ offset: '2' }), - }, + }), ]) }) }) From ed77244850b36082d8b79518afa807a7fb7c66f8 Mon Sep 17 00:00:00 2001 From: Julien Vincent Date: Fri, 17 Dec 2021 14:57:11 +0100 Subject: [PATCH 3/4] chore: test heartbeat exposed in eachMessage handler --- .../__tests__/consumeMessages.spec.js | 48 +++++++++++++++++++ 1 file changed, 48 insertions(+) diff --git a/src/consumer/__tests__/consumeMessages.spec.js b/src/consumer/__tests__/consumeMessages.spec.js index 942d6918d..ceb26cf71 100644 --- a/src/consumer/__tests__/consumeMessages.spec.js +++ b/src/consumer/__tests__/consumeMessages.spec.js @@ -209,6 +209,54 @@ describe('Consumer', () => { } }) + it('heartbeats are exposed in the eachMessage handler', async () => { + consumer = createConsumer({ + cluster, + groupId, + heartbeatInterval: 50, + logger: newLogger(), + }) + + topicName = `test-topic-${secureRandom()}` + await createTopic({ + topic: topicName, + partitions: 1, + }) + + await consumer.connect() + await producer.connect() + await consumer.subscribe({ topic: topicName, fromBeginning: true }) + + const messagesConsumed = [] + + let heartbeats = 0 + consumer.on(consumer.events.HEARTBEAT, () => { + heartbeats++ + }) + + consumer.run({ + eachMessage: async payload => { + await new Promise(resolve => { + setTimeout(resolve, 100) + }) + + await payload.heartbeat() + messagesConsumed.push(payload.message) + + await new Promise(resolve => { + setTimeout(resolve, 100) + }) + }, + }) + + await waitForConsumerToJoinGroup(consumer) + + await producer.send({ acks: 1, topic: topicName, messages: [{ key: 'value', value: 'value' }] }) + await waitForMessages(messagesConsumed, { number: 1 }) + + expect(heartbeats).toBe(1) + }) + it('consume GZIP messages', async () => { await consumer.connect() await producer.connect() From a4840e9aca520ce5c16db2ceaa85d843696c02ac Mon Sep 17 00:00:00 2001 From: Julien Vincent Date: Fri, 17 Dec 2021 15:14:26 +0100 Subject: [PATCH 4/4] chore: document usage of heartbeat from the eachMessage handler --- docs/Consuming.md | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/docs/Consuming.md b/docs/Consuming.md index 0a1a13292..d13b84a93 100644 --- a/docs/Consuming.md +++ b/docs/Consuming.md @@ -43,7 +43,7 @@ The `eachMessage` handler provides a convenient and easy to use API, feeding you ```javascript await consumer.run({ - eachMessage: async ({ topic, partition, message }) => { + eachMessage: async ({ topic, partition, message, heartbeat }) => { console.log({ key: message.key.toString(), value: message.value.toString(), @@ -53,6 +53,8 @@ await consumer.run({ }) ``` +Be aware that the `eachMessage` handler should not block for longer than the configured [session timeout](#options) or else the consumer will be removed from the group. If your workload involves very slow processing times for individual messages then you should either increase the session timeout or make periodic use of the `heartbeat` function exposed in the handler payload. + ## eachBatch Some use cases require dealing with batches directly. This handler will feed your function batches and provide some utility functions to give your code more flexibility: `resolveOffset`, `heartbeat`, `commitOffsetsIfNecessary`, `uncommittedOffsets`, `isRunning`, and `isStale`. All resolved offsets will be automatically committed after the function is executed.