Skip to content

Commit 0f8d4b2

Browse files
author
Aitor Magán García
committed
Merge pull request #210 from telefonicaid/retryBucketsLUA
Retry buckets using LUA
2 parents 1f3f080 + 2402a7f commit 0f8d4b2

File tree

3 files changed

+61
-88
lines changed

3 files changed

+61
-88
lines changed

lib/configBase.js

+9-9
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,15 @@ exports.queue = {redisHost:'localhost', redisPort:6379};
7171
*/
7272
exports.expireTime = 60 * 60;
7373

74+
/*
75+
Retry buckets
76+
When a petition fails it will be inserted on a bucket (every time on one different: from the first one to the last one)
77+
Petitions inserted on a bucket will be retried sometime in the future.
78+
This property defines when petitions should be retried.
79+
The number of elements in this array will establish the number of buckets
80+
*/
81+
exports.retryBuckets = [1, 10, 60];
82+
7483

7584
/*
7685
HIGH AVAILABILITY CONFIGURATION
@@ -106,15 +115,6 @@ exports.longFailingTimeout = 10000;
106115
*/
107116
exports.minQuorum = 2;
108117

109-
/*
110-
Retry buckets
111-
When a petition fails it will be inserted on a bucket (every time on one different: from the first one to the last one)
112-
Petitions inserted on a bucket will be retried sometime in the future.
113-
This property defines when petitions should be retried.
114-
The number of elements in this array will establish the number of buckets
115-
*/
116-
exports.retryBuckets = [1, 10, 60];
117-
118118

119119
/* LISTENER AND CONSUMER CONFIGURATION
120120
======================================

lib/listener.js

+1
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ function assignRequest(request, data, callback) {
7373
function processTask(err, routeObj) {
7474
if (!err) {
7575

76+
routeObj.task.service = routeObj.service;
7677
store.put(routeObj.service, routeObj.task, function onWrittenReq(err) {
7778
var st;
7879
if (err) {

lib/retryBuckets.js

+51-79
Original file line numberDiff line numberDiff line change
@@ -28,17 +28,32 @@ logger.prefix = path.basename(module.filename, '.js');
2828

2929
var RETRY_BUCKETS = config.retryBuckets;
3030
var BUCKET_PREFIX = 'bucket';
31-
var FIRST_CHARCODE = "A".charCodeAt(0); //Needed to auto-create bucket names (bucketA, bucketB,...)
31+
var FIRST_CHARCODE = 'A'.charCodeAt(0); //Needed to auto-create bucket names (bucketA, bucketB,...)
3232

3333
var stopped;
3434

35-
var lrangeAndDelete = "\
35+
var getTasksAndRepsuh = "\
3636
local bucketName = KEYS[1]\n\
37+
local serviceQueue = KEYS[2]\n\
3738
local tasks = redis.call('lrange', bucketName, 0, -1)\n\
38-
redis.call('del', bucketName)\n\
39+
local size = table.getn(tasks)\n\
40+
if size > 0 then\n\
41+
redis.call('lpush', serviceQueue, unpack(tasks))\n\
42+
redis.call('del', bucketName)\n\
43+
end\n\
3944
return tasks\n\
4045
";
4146

47+
/**
48+
* Get bucket name
49+
* @param index Position on buckets array
50+
* @param service Service name
51+
* @returns {string} The name of the bucket
52+
*/
53+
function getBucketName(index, service) {
54+
return BUCKET_PREFIX + String.fromCharCode(FIRST_CHARCODE + index) + ':' + service;
55+
}
56+
4257
/**
4358
*
4459
* @param task The task to be pushed on the bucket
@@ -62,7 +77,7 @@ exports.insertOnBucket = function (task, error, maxRetry, callback) {
6277

6378
} else {
6479

65-
var bucketName = BUCKET_PREFIX + String.fromCharCode(FIRST_CHARCODE + retried);
80+
var bucketName = getBucketName(retried, task.service);
6681
var bucketTime = RETRY_BUCKETS[retried];
6782
var db = dbCluster.getDb(bucketName);
6883

@@ -98,105 +113,62 @@ exports.insertOnBucket = function (task, error, maxRetry, callback) {
98113

99114
exports.initBucketTimers = function() {
100115

101-
function repushBucketTasks(bucketName, cb) {
116+
function repushBucketTasks(bucketName, serviceQueue, cb) {
102117

103118
var db = dbCluster.getDb(bucketName);
104119

105120
//Get all tasks from the bucket
106-
db.eval(lrangeAndDelete, 1, bucketName, function (errEval, data) {
107-
121+
db.eval(getTasksAndRepsuh, 2, bucketName, serviceQueue, function (errEval, data) {
108122
if (!errEval && data.length > 0) {
109-
110-
//Insert on the queue again (using the standard method: as listener)
111-
async.map(data,
112-
113-
//Iterate function
114-
function(task, callback) {
115-
router.route(JSON.parse(task), callback);
116-
},
117-
118-
//Function to be called when all items of the array has been processed
119-
function (err, results) {
120-
121-
if (err) {
122-
123-
logger.error('Error routing tasks to be respushed', { op: 'REPUSH BUCKET TASKS', error: err });
124-
cb();
125-
126-
} else {
127-
128-
//Get tasks for each service
129-
var putArguments = {};
130-
for (var i = 0; i < results.length; i++) {
131-
var service = results[i].service;
132-
var task = results[i].task;
133-
if (!putArguments[service]) {
134-
putArguments[service] = [];
135-
}
136-
putArguments[service].push(task);
137-
}
138-
139-
//For each service, repush all the tasks at the same time
140-
//In order to improve efficiency, only up to 100 tasks will be repushed at the same time
141-
var putFunctions = [];
142-
for (service in putArguments) {
143-
if (putArguments.hasOwnProperty(service)) {
144-
var concurrentPushes = MG.CONCURRENT_PUSHES;
145-
for (i=0; i < putArguments[service].length; i += concurrentPushes){
146-
putFunctions.push(store.put.bind(this, service,
147-
putArguments[service].slice(i, i + concurrentPushes)));
148-
}
149-
}
150-
}
151-
152-
async.series(putFunctions, cb);
153-
}
154-
}
155-
);
156-
123+
logger.info('Tasks repushed', { op: 'REPUSH BUCKET TASKS', tasks: data });
124+
cb();
157125
} else if (errEval) {
158-
159126
logger.error('Error getting bucket elements', { op: 'RESPUSH BUCKET TASKS', error: errEval });
160127
cb();
161-
162128
} else if (data.length === 0) {
163-
164129
logger.debug('Empty bucket', { op: 'REPUSH BUCKET TASKS'});
165130
cb();
166-
167131
}
168132
});
169133
}
170134

135+
//Stop condition
171136
stopped = false;
172137

173-
RETRY_BUCKETS.forEach(function(bucket, index) {
138+
var services = router.getQueues();
174139

175-
var bucketName = BUCKET_PREFIX + String.fromCharCode(FIRST_CHARCODE + index);
176-
var intervalTime = bucket * 2 * 1000;
140+
for (var service in services) {
141+
if (services.hasOwnProperty(service)) {
177142

178-
async.whilst(
143+
RETRY_BUCKETS.forEach(function(bucketTimeout, index) {
179144

180-
//Stop condition
181-
function() {
182-
return stopped === false;
183-
},
145+
var serviceName = services[service];
146+
var bucketName = getBucketName(index, serviceName);
147+
var intervalTime = bucketTimeout * 2 * 1000;
184148

185-
//Function to be executed forever
186-
function (callback) {
187-
repushBucketTasks(bucketName, function() {
188-
setTimeout(callback, intervalTime);
189-
})
190-
},
149+
async.whilst(
191150

192-
//Function to be called on error
193-
function(err) {
194-
logger.error('Error processing respush', { op: 'BUCKET TIMER', error: err });
195-
}
196-
);
151+
//Stop condition
152+
function() {
153+
return stopped === false;
154+
},
197155

198-
});
156+
//Function to be executed forever
157+
function (callback) {
158+
repushBucketTasks(bucketName, serviceName, function() {
159+
setTimeout(callback, intervalTime);
160+
})
161+
},
199162

163+
//Function to be called on error
164+
function(err) {
165+
logger.error('Error processing respush', { op: 'BUCKET TIMER', error: err });
166+
}
167+
);
168+
169+
});
170+
}
171+
}
200172
};
201173

202174
exports.stopTimers = function() {

0 commit comments

Comments
 (0)