Skip to content

Commit 0d72a12

Browse files
authored
Merge pull request #1242 from qtomlinson/qt/definition_upgrade
Add the ability to queue definition recomputation
2 parents a569bab + fbfd034 commit 0d72a12

16 files changed

+916
-69
lines changed

app.js

+6-1
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,9 @@ function createApp(config) {
5959
const harvestQueue = config.harvest.queue()
6060
initializers.push(async () => harvestQueue.initialize())
6161

62+
const upgradeHandler = config.upgrade.service({ queue: config.upgrade.queue })
63+
initializers.push(async () => upgradeHandler.initialize())
64+
6265
const definitionService = require('./business/definitionService')(
6366
harvestStore,
6467
harvestService,
@@ -67,7 +70,8 @@ function createApp(config) {
6770
curationService,
6871
definitionStore,
6972
searchService,
70-
cachingService
73+
cachingService,
74+
upgradeHandler
7175
)
7276
// Circular dependency. Reach in and set the curationService's definitionService. Sigh.
7377
curationService.definitionService = definitionService
@@ -234,6 +238,7 @@ function createApp(config) {
234238
// kick off the queue processors
235239
require('./providers/curation/process')(curationQueue, curationService, logger)
236240
require('./providers/harvest/process')(harvestQueue, definitionService, logger)
241+
upgradeHandler.setupProcessing(definitionService, logger)
237242

238243
// Signal system is up and ok (no error)
239244
callback()

bin/config.js

+4
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,10 @@ module.exports = {
5757
definition: {
5858
store: loadFactory(config.get('DEFINITION_STORE_PROVIDER') || 'file', 'definition')
5959
},
60+
upgrade: {
61+
queue: loadFactory(config.get('DEFINITION_UPGRADE_QUEUE_PROVIDER') || 'memory', 'upgrade.queue'),
62+
service: loadFactory(config.get('DEFINITION_UPGRADE_PROVIDER') || 'versionCheck', 'upgrade.service')
63+
},
6064
attachment: {
6165
store: loadFactory(config.get('ATTACHMENT_STORE_PROVIDER') || 'file', 'attachment')
6266
},

business/definitionService.js

+21-6
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ const currentSchema = '1.7.0'
3939
const weights = { declared: 30, discovered: 25, consistency: 15, spdx: 15, texts: 15, date: 30, source: 70 }
4040

4141
class DefinitionService {
42-
constructor(harvestStore, harvestService, summary, aggregator, curation, store, search, cache) {
42+
constructor(harvestStore, harvestService, summary, aggregator, curation, store, search, cache, upgradeHandler) {
4343
this.harvestStore = harvestStore
4444
this.harvestService = harvestService
4545
this.summaryService = summary
@@ -48,9 +48,15 @@ class DefinitionService {
4848
this.definitionStore = store
4949
this.search = search
5050
this.cache = cache
51+
this.upgradeHandler = upgradeHandler
52+
if (this.upgradeHandler) this.upgradeHandler.currentSchema = currentSchema
5153
this.logger = logger()
5254
}
5355

56+
get currentSchema() {
57+
return currentSchema
58+
}
59+
5460
/**
5561
* Get the final representation of the specified definition and optionally apply the indicated
5662
* curation.
@@ -68,11 +74,10 @@ class DefinitionService {
6874
return this.compute(coordinates, curation)
6975
}
7076
const existing = await this._cacheExistingAside(coordinates, force)
71-
let result
72-
if (get(existing, '_meta.schemaVersion') === currentSchema) {
77+
let result = await this.upgradeHandler.validate(existing)
78+
if (result) {
7379
// Log line used for /status page insights
7480
this.logger.info('computed definition available', { coordinates: coordinates.toString() })
75-
result = existing
7681
} else result = force ? await this.computeAndStore(coordinates) : await this.computeStoreAndCurate(coordinates)
7782
return this._trimDefinition(this._cast(result), expand)
7883
}
@@ -598,5 +603,15 @@ class DefinitionService {
598603
}
599604
}
600605

601-
module.exports = (harvestStore, harvestService, summary, aggregator, curation, store, search, cache) =>
602-
new DefinitionService(harvestStore, harvestService, summary, aggregator, curation, store, search, cache)
606+
module.exports = (harvestStore, harvestService, summary, aggregator, curation, store, search, cache, versionHandler) =>
607+
new DefinitionService(
608+
harvestStore,
609+
harvestService,
610+
summary,
611+
aggregator,
612+
curation,
613+
store,
614+
search,
615+
cache,
616+
versionHandler
617+
)

providers/index.js

+10
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,16 @@ module.exports = {
2727
auth: {
2828
github: require('../middleware/githubConfig')
2929
},
30+
upgrade: {
31+
queue: {
32+
azure: require('../providers/upgrade/azureQueueConfig'),
33+
memory: require('../providers/upgrade/memoryQueueConfig')
34+
},
35+
service: {
36+
versionCheck: require('../providers/upgrade/defVersionCheck').factory,
37+
upgradeQueue: require('../providers/upgrade/defUpgradeQueueConfig')
38+
}
39+
},
3040
curation: {
3141
queue: {
3242
azure: require('../providers/curation/azureQueueConfig'),

providers/queueing/memoryQueue.js

+17-3
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ class MemoryQueue {
99
this.logger = logger()
1010
this.data = []
1111
this.messageId = 0
12+
this.decoder = options.decoder
1213
}
1314

1415
async initialize() {}
@@ -33,14 +34,19 @@ class MemoryQueue {
3334
const message = this.data[0]
3435
if (!message) return null
3536
this.data[0].dequeueCount++
36-
if (message.dequeueCount <= 5) return Promise.resolve({ original: message, data: JSON.parse(message.messageText) })
37+
if (message.dequeueCount <= 5) return Promise.resolve({ original: message, data: this._parseData(message) })
3738
await this.delete({ original: message })
3839
return this.dequeue()
3940
}
4041

42+
_parseData({ messageText }) {
43+
return JSON.parse(this.decoder(messageText))
44+
}
45+
4146
/** Similar to dequeue() but returns an array instead. See AzureStorageQueue.dequeueMultiple() */
4247
async dequeueMultiple() {
43-
return [await this.dequeue()]
48+
const message = await this.dequeue()
49+
return message ? [message] : []
4450
}
4551

4652
/**
@@ -58,4 +64,12 @@ class MemoryQueue {
5864
}
5965
}
6066

61-
module.exports = () => new MemoryQueue()
67+
const factory = (opts = {}) => {
68+
const defaultOpts = {
69+
decoder: text => text
70+
}
71+
const mergedOpts = { ...defaultOpts, ...opts }
72+
return new MemoryQueue(mergedOpts)
73+
}
74+
75+
module.exports = factory

providers/upgrade/azureQueueConfig.js

+22
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
// (c) Copyright 2024, SAP SE and ClearlyDefined contributors. Licensed under the MIT license.
2+
// SPDX-License-Identifier: MIT
3+
4+
const config = require('painless-config')
5+
const AzureStorageQueue = require('../queueing/azureStorageQueue')
6+
7+
const defaultOptions = {
8+
connectionString:
9+
config.get('DEFINITION_UPGRADE_QUEUE_CONNECTION_STRING') || config.get('HARVEST_AZBLOB_CONNECTION_STRING'),
10+
queueName: config.get('DEFINITION_UPGRADE_QUEUE_NAME') || 'definitions-upgrade',
11+
dequeueOptions: {
12+
numOfMessages: 32,
13+
visibilityTimeout: 10 * 60 // 10 min. The default value is 30 seconds.
14+
}
15+
}
16+
17+
function azure(options) {
18+
const realOptions = options || defaultOptions
19+
return new AzureStorageQueue(realOptions)
20+
}
21+
22+
module.exports = azure

providers/upgrade/defUpgradeQueue.js

+50
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
// (c) Copyright 2024, SAP SE and ClearlyDefined contributors. Licensed under the MIT license.
2+
// SPDX-License-Identifier: MIT
3+
4+
const { DefinitionVersionChecker } = require('./defVersionCheck')
5+
const { setup } = require('./process')
6+
7+
class DefinitionQueueUpgrader extends DefinitionVersionChecker {
8+
async validate(definition) {
9+
if (!definition) return
10+
const result = await super.validate(definition)
11+
if (result) return result
12+
13+
await this._queueUpgrade(definition)
14+
return definition
15+
}
16+
17+
async _queueUpgrade(definition) {
18+
if (!this._upgrade) throw new Error('Upgrade queue is not set')
19+
try {
20+
const message = this._constructMessage(definition)
21+
await this._upgrade.queue(message)
22+
this.logger.debug('Queued for definition upgrade ', {
23+
coordinates: DefinitionVersionChecker.getCoordinates(definition)
24+
})
25+
} catch (error) {
26+
//continue if queuing fails and requeue at the next request.
27+
this.logger.error(`Error queuing for definition upgrade ${error.message}`, {
28+
error,
29+
coordinates: DefinitionVersionChecker.getCoordinates(definition)
30+
})
31+
}
32+
}
33+
34+
_constructMessage(definition) {
35+
const { coordinates, _meta } = definition
36+
const content = { coordinates, _meta }
37+
return Buffer.from(JSON.stringify(content)).toString('base64')
38+
}
39+
40+
async initialize() {
41+
this._upgrade = this.options.queue()
42+
return this._upgrade.initialize()
43+
}
44+
45+
setupProcessing(definitionService, logger, once) {
46+
return setup(this._upgrade, definitionService, logger, once)
47+
}
48+
}
49+
50+
module.exports = DefinitionQueueUpgrader
+12
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
// (c) Copyright 2024, SAP SE and ClearlyDefined contributors. Licensed under the MIT license.
2+
// SPDX-License-Identifier: MIT
3+
4+
const DefinitionQueueUpgrader = require('./defUpgradeQueue')
5+
const memory = require('../queueing/memoryQueue')
6+
7+
function serviceFactory(options = {}) {
8+
const mergedOptions = { queue: memory, ...options }
9+
return new DefinitionQueueUpgrader(mergedOptions)
10+
}
11+
12+
module.exports = serviceFactory

providers/upgrade/defVersionCheck.js

+47
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
// (c) Copyright 2024, SAP SE and ClearlyDefined contributors. Licensed under the MIT license.
2+
// SPDX-License-Identifier: MIT
3+
4+
const logger = require('../logging/logger')
5+
const { gte } = require('semver')
6+
const { get } = require('lodash')
7+
const EntityCoordinates = require('../../lib/entityCoordinates')
8+
9+
class DefinitionVersionChecker {
10+
constructor(options = {}) {
11+
this.options = options
12+
this.logger = this.options.logger || logger()
13+
}
14+
15+
set currentSchema(schemaVersion) {
16+
this._currentSchema = schemaVersion
17+
}
18+
19+
get currentSchema() {
20+
return this._currentSchema
21+
}
22+
23+
async validate(definition) {
24+
if (!this._currentSchema) throw new Error('Current schema version is not set')
25+
const defSchemaVersion = get(definition, '_meta.schemaVersion')
26+
this.logger.debug(`Definition version: %s, Current schema version: %s `, defSchemaVersion, this._currentSchema, {
27+
coordinates: DefinitionVersionChecker.getCoordinates(definition)
28+
})
29+
if (defSchemaVersion && gte(defSchemaVersion, this._currentSchema)) return definition
30+
}
31+
32+
async initialize() {
33+
//do nothing for initialization
34+
}
35+
36+
setupProcessing() {
37+
//do nothing for set up processing
38+
}
39+
40+
static getCoordinates(definition) {
41+
return definition?.coordinates && EntityCoordinates.fromObject(definition.coordinates).toString()
42+
}
43+
}
44+
45+
const factory = options => new DefinitionVersionChecker(options)
46+
47+
module.exports = { DefinitionVersionChecker, factory }
+14
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
// (c) Copyright 2024, SAP SE and ClearlyDefined contributors. Licensed under the MIT license.
2+
// SPDX-License-Identifier: MIT
3+
4+
const MemoryQueue = require('../queueing/memoryQueue')
5+
6+
const encodedMessageQueueFactory = opts => {
7+
const defaultOpts = {
8+
decoder: text => Buffer.from(text, 'base64').toString('utf8')
9+
}
10+
const mergedOpts = { ...defaultOpts, ...opts }
11+
return MemoryQueue(mergedOpts)
12+
}
13+
14+
module.exports = encodedMessageQueueFactory

providers/upgrade/process.js

+87
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
// (c) Copyright 2024, SAP SE and ClearlyDefined contributors. Licensed under the MIT license.
2+
// SPDX-License-Identifier: MIT
3+
4+
const { get } = require('lodash')
5+
const EntityCoordinates = require('../../lib/entityCoordinates')
6+
const { factory } = require('./defVersionCheck')
7+
const Cache = require('../caching/memory')
8+
9+
class QueueHandler {
10+
constructor(queue, logger, messageHandler = { processMessage: async () => {} }) {
11+
this._queue = queue
12+
this.logger = logger
13+
this._messageHandler = messageHandler
14+
}
15+
16+
async work(once) {
17+
let isQueueEmpty = true
18+
try {
19+
const messages = await this._queue.dequeueMultiple()
20+
if (messages && messages.length > 0) isQueueEmpty = false
21+
const results = await Promise.allSettled(
22+
messages.map(async message => {
23+
await this._messageHandler.processMessage(message)
24+
await this._queue.delete(message)
25+
})
26+
)
27+
results.filter(result => result.status === 'rejected').forEach(result => this.logger.error(result.reason))
28+
} catch (error) {
29+
this.logger.error(error)
30+
} finally {
31+
if (!once) setTimeout(this.work.bind(this), isQueueEmpty ? 10000 : 0)
32+
}
33+
}
34+
}
35+
36+
class DefinitionUpgrader {
37+
static defaultTtlSeconds = 60 * 5 /* 5 mins */
38+
static delayInMSeconds = 500
39+
40+
constructor(
41+
definitionService,
42+
logger,
43+
defVersionChecker,
44+
cache = Cache({ defaultTtlSeconds: DefinitionUpgrader.defaultTtlSeconds })
45+
) {
46+
this.logger = logger
47+
this._definitionService = definitionService
48+
this._defVersionChecker = defVersionChecker
49+
this._defVersionChecker.currentSchema = definitionService.currentSchema
50+
this._upgradeLock = cache
51+
}
52+
53+
async processMessage(message) {
54+
let coordinates = get(message, 'data.coordinates')
55+
if (!coordinates) return
56+
coordinates = EntityCoordinates.fromObject(coordinates)
57+
58+
while (this._upgradeLock.get(coordinates.toString())) {
59+
await new Promise(resolve => setTimeout(resolve, DefinitionUpgrader.delayInMSeconds))
60+
}
61+
try {
62+
this._upgradeLock.set(coordinates.toString(), true)
63+
await this._upgradeIfNecessary(coordinates)
64+
} finally {
65+
this._upgradeLock.delete(coordinates.toString())
66+
}
67+
}
68+
69+
async _upgradeIfNecessary(coordinates) {
70+
const existing = await this._definitionService.getStored(coordinates)
71+
let result = await this._defVersionChecker.validate(existing)
72+
if (!result) {
73+
await this._definitionService.computeStoreAndCurate(coordinates)
74+
this.logger.info(`Handled definition update for ${coordinates.toString()}`)
75+
} else {
76+
this.logger.debug(`Skipped definition update for ${coordinates.toString()}`)
77+
}
78+
}
79+
}
80+
81+
function setup(_queue, _definitionService, _logger, once = false, _defVersionChecker = factory({ logger: _logger })) {
82+
const defUpgrader = new DefinitionUpgrader(_definitionService, _logger, _defVersionChecker)
83+
const queueHandler = new QueueHandler(_queue, _logger, defUpgrader)
84+
return queueHandler.work(once)
85+
}
86+
87+
module.exports = { DefinitionUpgrader, QueueHandler, setup }

0 commit comments

Comments
 (0)