Skip to content

Commit 0efd310

Browse files
committed
Address review comments
-add logging to handle queuing failure -remove redundant JSON.stringify when queuing message (there is no impact to message decoding) -move queueHandler and defUpgrader to local scope in setup -minor refactoring to improve code reuse and readability -add more tests
1 parent 96938b4 commit 0efd310

File tree

4 files changed

+127
-68
lines changed

4 files changed

+127
-68
lines changed

providers/upgrade/defUpgradeQueue.js

+13-6
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22
// SPDX-License-Identifier: MIT
33

44
const { DefinitionVersionChecker } = require('./defVersionCheck')
5-
const EntityCoordinates = require('../../lib/entityCoordinates')
65
const { setup } = require('./process')
76

87
class DefinitionQueueUpgrader extends DefinitionVersionChecker {
@@ -17,11 +16,19 @@ class DefinitionQueueUpgrader extends DefinitionVersionChecker {
1716

1817
async _queueUpgrade(definition) {
1918
if (!this._upgrade) throw new Error('Upgrade queue is not set')
20-
const message = this._constructMessage(definition)
21-
await this._upgrade.queue(JSON.stringify(message))
22-
this.logger.debug('Queued for definition upgrade ', {
23-
coordinates: EntityCoordinates.fromObject(definition.coordinates).toString()
24-
})
19+
try {
20+
const message = this._constructMessage(definition)
21+
await this._upgrade.queue(message)
22+
this.logger.debug('Queued for definition upgrade ', {
23+
coordinates: DefinitionVersionChecker.getCoordinates(definition)
24+
})
25+
} catch (error) {
26+
//continue if queuing fails and requeue at the next request.
27+
this.logger.error(`Error queuing for definition upgrade ${error.message}`, {
28+
error,
29+
coordinates: DefinitionVersionChecker.getCoordinates(definition)
30+
})
31+
}
2532
}
2633

2734
_constructMessage(definition) {

providers/upgrade/defVersionCheck.js

+5-1
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ class DefinitionVersionChecker {
2424
if (!this._currentSchema) throw new Error('Current schema version is not set')
2525
const defSchemaVersion = get(definition, '_meta.schemaVersion')
2626
this.logger.debug(`Definition version: %s, Current schema version: %s `, defSchemaVersion, this._currentSchema, {
27-
coordinates: definition?.coordinates && EntityCoordinates.fromObject(definition.coordinates).toString()
27+
coordinates: DefinitionVersionChecker.getCoordinates(definition)
2828
})
2929
if (defSchemaVersion && gte(defSchemaVersion, this._currentSchema)) return definition
3030
}
@@ -36,6 +36,10 @@ class DefinitionVersionChecker {
3636
setupProcessing() {
3737
//do nothing for set up processing
3838
}
39+
40+
static getCoordinates(definition) {
41+
return definition?.coordinates && EntityCoordinates.fromObject(definition.coordinates).toString()
42+
}
3943
}
4044

4145
const factory = options => new DefinitionVersionChecker(options)

providers/upgrade/process.js

+2-5
Original file line numberDiff line numberDiff line change
@@ -78,12 +78,9 @@ class DefinitionUpgrader {
7878
}
7979
}
8080

81-
let queueHandler
82-
let defUpgrader
83-
8481
function setup(_queue, _definitionService, _logger, once = false, _defVersionChecker = factory({ logger: _logger })) {
85-
defUpgrader = new DefinitionUpgrader(_definitionService, _logger, _defVersionChecker)
86-
queueHandler = new QueueHandler(_queue, _logger, defUpgrader)
82+
const defUpgrader = new DefinitionUpgrader(_definitionService, _logger, _defVersionChecker)
83+
const queueHandler = new QueueHandler(_queue, _logger, defUpgrader)
8784
return queueHandler.work(once)
8885
}
8986

test/providers/upgrade/defUpgradeQueue.js

+107-56
Original file line numberDiff line numberDiff line change
@@ -7,80 +7,131 @@ chai.use(chaiAsPromised)
77
const { expect } = require('chai')
88
const sinon = require('sinon')
99
const DefinitionQueueUpgrader = require('../../../providers/upgrade/defUpgradeQueue')
10+
const MemoryQueue = require('../../../providers/upgrade/memoryQueueConfig')
1011

1112
describe('DefinitionQueueUpgrader', () => {
12-
const definition = { coordinates: 'test', _meta: { schemaVersion: '1.0.0' } }
13-
let queue, upgrader
14-
15-
beforeEach(async () => {
16-
const logger = { debug: sinon.stub() }
17-
queue = {
18-
queue: sinon.stub().resolves(),
19-
initialize: sinon.stub().resolves()
20-
}
21-
const queueFactory = sinon.stub().returns(queue)
22-
upgrader = new DefinitionQueueUpgrader({ logger, queue: queueFactory })
23-
})
13+
const logger = { debug: sinon.stub(), error: sinon.stub() }
2414

25-
it('returns an instance of DefinitionQueueUpgrader', () => {
26-
expect(upgrader).to.be.an.instanceOf(DefinitionQueueUpgrader)
27-
})
15+
describe('Unit tests', () => {
16+
const definition = { coordinates: 'test', _meta: { schemaVersion: '1.0.0' } }
17+
let queue, upgrader
2818

29-
it('sets and gets current schema version', () => {
30-
upgrader.currentSchema = '1.0.0'
31-
expect(upgrader.currentSchema).to.equal('1.0.0')
32-
})
19+
beforeEach(async () => {
20+
queue = {
21+
queue: sinon.stub().resolves(),
22+
initialize: sinon.stub().resolves()
23+
}
24+
const queueFactory = sinon.stub().returns(queue)
25+
upgrader = new DefinitionQueueUpgrader({ logger, queue: queueFactory })
26+
})
3327

34-
it('initializes', async () => {
35-
await upgrader.initialize()
36-
expect(queue.initialize.calledOnce).to.be.true
37-
})
28+
it('returns an instance of DefinitionQueueUpgrader', () => {
29+
expect(upgrader).to.be.an.instanceOf(DefinitionQueueUpgrader)
30+
})
3831

39-
it('connects to queue after setupProcessing', async () => {
40-
await upgrader.initialize()
41-
const definitionService = { currentSchema: '1.0.0' }
42-
const logger = { debug: sinon.stub() }
43-
queue.dequeueMultiple = sinon.stub().resolves([])
44-
upgrader.setupProcessing(definitionService, logger, true)
45-
expect(queue.dequeueMultiple.calledOnce).to.be.true
46-
})
32+
it('sets and gets current schema version', () => {
33+
upgrader.currentSchema = '1.0.0'
34+
expect(upgrader.currentSchema).to.equal('1.0.0')
35+
})
4736

48-
context('validate', () => {
49-
it('fails if current schema version is not set', async () => {
50-
await expect(upgrader.validate(definition)).to.be.rejectedWith(Error)
37+
it('initializes', async () => {
38+
await upgrader.initialize()
39+
expect(queue.initialize.calledOnce).to.be.true
5140
})
5241

53-
it('fails if it is not initialized', async () => {
54-
upgrader.currentSchema = '1.0.0'
55-
const stale = { coordinates: 'test', _meta: { schemaVersion: '0.0.1' } }
56-
await expect(upgrader.validate(stale)).to.be.rejectedWith(Error)
42+
it('connects to queue after setupProcessing', async () => {
43+
await upgrader.initialize()
44+
const definitionService = { currentSchema: '1.0.0' }
45+
queue.dequeueMultiple = sinon.stub().resolves([])
46+
upgrader.setupProcessing(definitionService, logger, true)
47+
expect(queue.dequeueMultiple.calledOnce).to.be.true
48+
})
49+
50+
context('validate', () => {
51+
it('fails if current schema version is not set', async () => {
52+
await expect(upgrader.validate(definition)).to.be.rejectedWith(Error)
53+
})
54+
55+
it('fails if it is not initialized', async () => {
56+
upgrader.currentSchema = '1.0.0'
57+
const stale = { coordinates: 'test', _meta: { schemaVersion: '0.0.1' } }
58+
await expect(upgrader.validate(stale)).to.be.rejectedWith(Error)
59+
})
60+
})
61+
62+
context('validate after set up', () => {
63+
beforeEach(async () => {
64+
await upgrader.initialize()
65+
upgrader.currentSchema = '1.0.0'
66+
})
67+
68+
it('does not queue null definition', async () => {
69+
const result = await upgrader.validate(null)
70+
expect(result).to.be.not.ok
71+
expect(queue.queue.called).to.be.false
72+
})
73+
74+
it('does not queue an up-to-date definition', async () => {
75+
const definition = { coordinates: 'test', _meta: { schemaVersion: '1.0.0' } }
76+
const result = await upgrader.validate(definition)
77+
expect(result).to.deep.equal(definition)
78+
expect(queue.queue.called).to.be.false
79+
})
80+
81+
it('queues and returns a stale definition', async () => {
82+
const definition = { coordinates: 'test', _meta: { schemaVersion: '0.0.1' } }
83+
const result = await upgrader.validate(definition)
84+
expect(result).to.deep.equal(definition)
85+
expect(queue.queue.calledOnce).to.be.true
86+
})
87+
88+
it('logs erorr when queueing throws', async () => {
89+
const staleDef = {
90+
coordinates: {
91+
type: 'npm',
92+
provider: 'npmjs',
93+
name: 'lodash',
94+
revision: '4.17.11'
95+
},
96+
_meta: { schemaVersion: '0.0.1' }
97+
}
98+
queue.queue.rejects(new Error('test'))
99+
const result = await upgrader.validate(staleDef)
100+
expect(result).to.deep.equal(staleDef)
101+
expect(logger.error.calledOnce).to.be.true
102+
const { coordinates } = logger.error.args[0][1]
103+
expect(coordinates).to.eq('npm/npmjs/-/lodash/4.17.11')
104+
})
57105
})
58106
})
59107

60-
context('validate after set up', () => {
108+
describe('Integration tests', () => {
109+
let queue, upgrader
110+
61111
beforeEach(async () => {
112+
queue = MemoryQueue()
113+
upgrader = new DefinitionQueueUpgrader({ logger, queue: sinon.stub().returns(queue) })
62114
await upgrader.initialize()
63115
upgrader.currentSchema = '1.0.0'
64116
})
65117

66-
it('does not queue null definition', async () => {
67-
const result = await upgrader.validate(null)
68-
expect(result).to.be.not.ok
69-
expect(queue.queue.called).to.be.false
70-
})
71-
72-
it('does not queue an up-to-date definition', async () => {
73-
const definition = { coordinates: 'test', _meta: { schemaVersion: '1.0.0' } }
74-
const result = await upgrader.validate(definition)
75-
expect(result).to.deep.equal(definition)
76-
expect(queue.queue.called).to.be.false
77-
})
118+
it('queues the correct message that can be decoded correctly', async () => {
119+
const staleDef = {
120+
coordinates: {
121+
type: 'npm',
122+
provider: 'npmjs',
123+
name: 'lodash',
124+
revision: '4.17.11'
125+
},
126+
_meta: { schemaVersion: '0.0.1' }
127+
}
128+
const result = await upgrader.validate(staleDef)
129+
expect(result).to.deep.equal(staleDef)
130+
expect(queue.data.length).to.equal(1)
78131

79-
it('queues and returns a stale definition', async () => {
80-
const definition = { coordinates: 'test', _meta: { schemaVersion: '0.0.1' } }
81-
const result = await upgrader.validate(definition)
82-
expect(result).to.deep.equal(definition)
83-
expect(queue.queue.calledOnce).to.be.true
132+
const message = await queue.dequeue()
133+
const coordinates = message.data.coordinates
134+
expect(coordinates).to.deep.equal(staleDef.coordinates)
84135
})
85136
})
86137
})

0 commit comments

Comments
 (0)