|
4 | 4 | const { get } = require('lodash')
|
5 | 5 | const EntityCoordinates = require('../../lib/entityCoordinates')
|
6 | 6 | const { factory } = require('./defVersionCheck')
|
| 7 | +const Cache = require('../caching/memory') |
7 | 8 |
|
8 | 9 | class QueueHandler {
|
9 | 10 | constructor(queue, logger, messageHandler = { processMessage: async () => {} }) {
|
@@ -33,18 +34,39 @@ class QueueHandler {
|
33 | 34 | }
|
34 | 35 |
|
35 | 36 | class DefinitionUpgrader {
|
36 |
| - constructor(definitionService, logger, defVersionChecker) { |
| 37 | + static defaultTtlSeconds = 60 * 5 /* 5 mins */ |
| 38 | + static delayInMSeconds = 500 |
| 39 | + |
| 40 | + constructor( |
| 41 | + definitionService, |
| 42 | + logger, |
| 43 | + defVersionChecker, |
| 44 | + cache = Cache({ defaultTtlSeconds: DefinitionUpgrader.defaultTtlSeconds }) |
| 45 | + ) { |
37 | 46 | this.logger = logger
|
38 | 47 | this._definitionService = definitionService
|
39 | 48 | this._defVersionChecker = defVersionChecker
|
40 | 49 | this._defVersionChecker.currentSchema = definitionService.currentSchema
|
| 50 | + this._upgradeLock = cache |
41 | 51 | }
|
42 | 52 |
|
43 | 53 | async processMessage(message) {
|
44 | 54 | let coordinates = get(message, 'data.coordinates')
|
45 | 55 | if (!coordinates) return
|
46 |
| - |
47 | 56 | coordinates = EntityCoordinates.fromObject(coordinates)
|
| 57 | + |
| 58 | + while (this._upgradeLock.get(coordinates.toString())) { |
| 59 | + await new Promise(resolve => setTimeout(resolve, DefinitionUpgrader.delayInMSeconds)) |
| 60 | + } |
| 61 | + try { |
| 62 | + this._upgradeLock.set(coordinates.toString(), true) |
| 63 | + await this._upgradeIfNecessary(coordinates) |
| 64 | + } finally { |
| 65 | + this._upgradeLock.delete(coordinates.toString()) |
| 66 | + } |
| 67 | + } |
| 68 | + |
| 69 | + async _upgradeIfNecessary(coordinates) { |
48 | 70 | const existing = await this._definitionService.getStored(coordinates)
|
49 | 71 | let result = await this._defVersionChecker.validate(existing)
|
50 | 72 | if (!result) {
|
|
0 commit comments