Skip to content

Commit

Permalink
Let consumer crash when no brokers are available
Browse files Browse the repository at this point in the history
The fetch manager rebalancing mechanism caused an infinite loop when
there were no brokers available, causing the consumer to never become
aware of any connection issues.

Fixes #1384
  • Loading branch information
Nevon committed Jun 27, 2022
1 parent a733eaa commit a4b190a
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 1 deletion.
2 changes: 1 addition & 1 deletion src/consumer/fetchManager.js
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ const createFetchManager = ({
const current = getNodeIds()
const hasChanged =
nodeIds.length !== current.length || nodeIds.some(nodeId => !current.includes(nodeId))
if (hasChanged) {
if (hasChanged && current.length !== 0) {
throw new KafkaJSFetcherRebalanceError()
}
}
Expand Down
22 changes: 22 additions & 0 deletions src/consumer/fetchManager.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ const createFetchManager = require('./fetchManager')
const Batch = require('./batch')
const { newLogger } = require('testHelpers')
const waitFor = require('../utils/waitFor')
const { KafkaJSNonRetriableError } = require('../errors')

describe('FetchManager', () => {
let fetchManager, fetch, handler, getNodeIds, concurrency, batchSize
Expand Down Expand Up @@ -72,4 +73,25 @@ describe('FetchManager', () => {
fetchers = fetchManager.getFetchers()
expect(fetchers).toHaveLength(3)
})

describe('when all brokers have become unavailable', () => {
it('should not rebalance and let the error bubble up', async () => {
const fetchMock = jest.fn().mockImplementation(async nodeId => {
if (!getNodeIds().includes(nodeId)) {
throw new KafkaJSNonRetriableError('Node not found')
}

return fetch(nodeId)
})
getNodeIds.mockImplementation(() => seq(1))

fetchManager = createTestFetchManager({ concurrency: 1, fetch: fetchMock })
const fetchManagerPromise = fetchManager.start()

expect(fetchManager.getFetchers()).toHaveLength(1)

getNodeIds.mockImplementation(() => seq(0))
await expect(fetchManagerPromise).rejects.toThrow('Node not found')
})
})
})

0 comments on commit a4b190a

Please sign in to comment.