Skip to content

Commit 96938b4

Browse files
committed
Avoid recomputing definition when the coordinates are queued multiple times within one batch
1 parent b31e6e1 commit 96938b4

File tree

2 files changed

+39
-3
lines changed

2 files changed

+39
-3
lines changed

providers/upgrade/process.js

+24-2
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
const { get } = require('lodash')
55
const EntityCoordinates = require('../../lib/entityCoordinates')
66
const { factory } = require('./defVersionCheck')
7+
const Cache = require('../caching/memory')
78

89
class QueueHandler {
910
constructor(queue, logger, messageHandler = { processMessage: async () => {} }) {
@@ -33,18 +34,39 @@ class QueueHandler {
3334
}
3435

3536
class DefinitionUpgrader {
36-
constructor(definitionService, logger, defVersionChecker) {
37+
static defaultTtlSeconds = 60 * 5 /* 5 mins */
38+
static delayInMSeconds = 500
39+
40+
constructor(
41+
definitionService,
42+
logger,
43+
defVersionChecker,
44+
cache = Cache({ defaultTtlSeconds: DefinitionUpgrader.defaultTtlSeconds })
45+
) {
3746
this.logger = logger
3847
this._definitionService = definitionService
3948
this._defVersionChecker = defVersionChecker
4049
this._defVersionChecker.currentSchema = definitionService.currentSchema
50+
this._upgradeLock = cache
4151
}
4252

4353
async processMessage(message) {
4454
let coordinates = get(message, 'data.coordinates')
4555
if (!coordinates) return
46-
4756
coordinates = EntityCoordinates.fromObject(coordinates)
57+
58+
while (this._upgradeLock.get(coordinates.toString())) {
59+
await new Promise(resolve => setTimeout(resolve, DefinitionUpgrader.delayInMSeconds))
60+
}
61+
try {
62+
this._upgradeLock.set(coordinates.toString(), true)
63+
await this._upgradeIfNecessary(coordinates)
64+
} finally {
65+
this._upgradeLock.delete(coordinates.toString())
66+
}
67+
}
68+
69+
async _upgradeIfNecessary(coordinates) {
4870
const existing = await this._definitionService.getStored(coordinates)
4971
let result = await this._defVersionChecker.validate(existing)
5072
if (!result) {

test/business/definitionServiceTest.js

+15-1
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ const DefinitionQueueUpgrader = require('../../providers/upgrade/defUpgradeQueue
1919
const memoryQueue = require('../../providers/upgrade/memoryQueueConfig')
2020
const { DefinitionVersionChecker } = require('../../providers/upgrade/defVersionCheck')
2121
const util = require('util')
22-
const { fail } = require('assert')
2322

2423
describe('Definition Service', () => {
2524
it('invalidates single coordinate', async () => {
@@ -443,6 +442,21 @@ describe('Integration test', () => {
443442
expect(store.store.calledOnce).to.be.true
444443
expect(queue.data.length).to.eq(0)
445444
})
445+
446+
it('computes once when the same coordinates is queued twice within one dequeue batch ', async () => {
447+
const { service, store } = setupServiceForUpgrade(staleDef, upgradeHandler)
448+
await service.get(coordinates)
449+
await service.get(coordinates)
450+
queue.dequeueMultiple = sinon.stub().callsFake(async () => {
451+
const message1 = await queue.dequeue()
452+
const message2 = await queue.dequeue()
453+
return Promise.resolve([message1, message2])
454+
})
455+
await upgradeHandler.setupProcessing(service, logger, true)
456+
const newResult = await service.get(coordinates)
457+
expect(newResult._meta.schemaVersion).to.eq('1.7.0')
458+
expect(store.store.calledOnce).to.be.true
459+
})
446460
})
447461
})
448462
})

0 commit comments

Comments
 (0)