Skip to content

Commit 0e787c2

Browse files
authored
Merge pull request #412 from cloudant/408-split-changes
Split changes spooling
2 parents 8125569 + fe18c76 commit 0e787c2

File tree

3 files changed

+140
-25
lines changed

3 files changed

+140
-25
lines changed

CHANGES.md

+2
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44
- [IMPROVED] Added quiet option to backup and restore to suppress batch messages.
55
- [IMPROVED] Added a preflight check for restore function to make sure that a target database is new and empty.
66
- [IMPROVED] Added handling for errors reading log file.
7+
- [IMPROVED] Split changes spooling to improve reliability on databases with
8+
millions of documents.
79

810
# 2.7.0 (2021-09-14)
911
- [UPGRADED] Cloudant client dependency from `@cloudant/cloudant` to `@ibm-cloud/cloudant`.

includes/spoolchanges.js

+44-25
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,9 @@ module.exports = function(db, log, bufferSize, ee, callback) {
3434
let batch = 0;
3535
let lastSeq = null;
3636
const logStream = fs.createWriteStream(log);
37+
let pending = 0;
38+
// The number of changes to fetch per request
39+
const limit = 100000;
3740

3841
// send documents ids to the queue in batches of bufferSize + the last batch
3942
const processBuffer = function(lastOne) {
@@ -57,36 +60,52 @@ module.exports = function(db, log, bufferSize, ee, callback) {
5760
processBuffer(false);
5861
} else if (c.last_seq) {
5962
lastSeq = c.last_seq;
63+
pending = c.pending;
6064
}
6165
}
6266
};
6367

64-
// stream the changes feed to disk
65-
db.service.postChangesAsStream({ db: db.db, seq_interval: 10000 }).then(response => {
66-
response.result.pipe(liner())
67-
.on('error', function(err) {
68-
callback(err);
68+
function getChanges(since = 0) {
69+
debug('making changes request since ' + since);
70+
return db.service.postChangesAsStream({ db: db.db, since: since, limit: limit, seq_interval: limit })
71+
.then(response => {
72+
response.result.pipe(liner())
73+
.on('error', function(err) {
74+
logStream.end();
75+
callback(err);
76+
})
77+
.pipe(change(onChange))
78+
.on('error', function(err) {
79+
logStream.end();
80+
callback(err);
81+
})
82+
.on('finish', function() {
83+
processBuffer(true);
84+
if (!lastSeq) {
85+
logStream.end();
86+
debug('changes request terminated before last_seq was sent');
87+
callback(new error.BackupError('SpoolChangesError', 'Changes request terminated before last_seq was sent'));
88+
} else {
89+
debug(`changes request completed with last_seq: ${lastSeq} and ${pending} changes pending.`);
90+
if (pending > 0) {
91+
// Return the next promise
92+
return getChanges(lastSeq);
93+
} else {
94+
debug('finished streaming database changes');
95+
logStream.end(':changes_complete ' + lastSeq + '\n', 'utf8', callback);
96+
}
97+
}
98+
});
6999
})
70-
.pipe(change(onChange))
71-
.on('error', function(err) {
72-
callback(err);
73-
})
74-
.on('finish', function() {
75-
processBuffer(true);
76-
if (!lastSeq) {
77-
logStream.end();
78-
debug('changes request terminated before last_seq was sent');
79-
callback(new error.BackupError('SpoolChangesError', 'Changes request terminated before last_seq was sent'));
80-
} else {
81-
debug('finished streaming database changes');
82-
logStream.end(':changes_complete ' + lastSeq + '\n', 'utf8', callback);
100+
.catch(err => {
101+
logStream.end();
102+
if (err.status && err.status >= 400) {
103+
callback(error.convertResponseError(err));
104+
} else if (err.name !== 'SpoolChangesError') {
105+
callback(new error.BackupError('SpoolChangesError', `Failed changes request - ${err.message}`));
83106
}
84107
});
85-
}).catch(err => {
86-
if (err.status && err.status >= 400) {
87-
callback(error.convertResponseError(err));
88-
} else {
89-
callback(new error.BackupError('SpoolChangesError', `Failed changes request - ${err.message}`));
90-
}
91-
});
108+
}
109+
110+
getChanges();
92111
};

test/spoolchanges.js

+94
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,38 @@ const longTestTimeout = 3000;
2626

2727
const db = request.client(`${url}/${dbName}`, { parallelism: 1 });
2828

29+
const seqSuffix = Buffer.alloc(124, 'abc123').toString('base64');
30+
function provideChanges(batchSize, totalChanges, fullResponse = false) {
31+
let pending = totalChanges;
32+
const sparseResultsArray = Array(batchSize).fill({
33+
seq: null,
34+
id: 'doc',
35+
changes: [{ rev: '1-abcdef0123456789abcdef0123456789' }]
36+
});
37+
nock(url)
38+
.post(`/${dbName}/_changes`)
39+
.query(true)
40+
.times(totalChanges / batchSize + (totalChanges % batchSize > 0 ? 1 : 0))
41+
.reply(200, (uri, requestBody) => {
42+
pending -= batchSize;
43+
const lastSeq = (totalChanges - pending);
44+
const seq = lastSeq - batchSize;
45+
return {
46+
results: fullResponse
47+
? Array.from(Array(batchSize), (_, i) => {
48+
return {
49+
seq: `${seq + i}-${seqSuffix}`,
50+
id: `doc${seq + i}`,
51+
changes: [{ rev: '1-abcdef0123456789abcdef0123456789' }]
52+
};
53+
})
54+
: sparseResultsArray,
55+
pending: pending,
56+
last_seq: `${lastSeq}-abc`
57+
};
58+
});
59+
}
60+
2961
describe('#unit Check spool changes', function() {
3062
it('should terminate on request error', function(done) {
3163
nock(url)
@@ -59,4 +91,66 @@ describe('#unit Check spool changes', function() {
5991
done();
6092
});
6193
}).timeout(longTestTimeout);
94+
95+
it('should keep collecting changes', function(done) {
96+
// This test validates that spooling will correctly
97+
// continue across multiple requests
98+
// (4 batches of 100000 to be precise).
99+
// This test might take up to 10 seconds
100+
this.timeout(10 * 1000);
101+
102+
// Use full changes for this test
103+
provideChanges(100000, 400000, true);
104+
changes(db, '/dev/null', 500, null, function(err) {
105+
assert.ok(!err);
106+
assert.ok(nock.isDone());
107+
done();
108+
});
109+
});
110+
111+
it('should keep collecting sparse changes', function(done) {
112+
// This test checks that making thousands of requests doesn't
113+
// make anything bad happen.
114+
// This test might take up to 25 seconds
115+
this.timeout(25 * 1000);
116+
// Use sparse changes for this test and a batch size of 1
117+
provideChanges(1, 2500);
118+
changes(db, '/dev/null', 500, null, function(err) {
119+
assert.ok(!err);
120+
assert.ok(nock.isDone());
121+
done();
122+
});
123+
});
124+
});
125+
126+
describe('Longer spool changes checks', function() {
127+
it('#slow should keep collecting changes (25M)', function(done) {
128+
// This test might take up to 2 minutes
129+
this.timeout(2 * 60 * 1000);
130+
// Note changes spooling uses a constant batch size, we are setting
131+
// a test value here and setting the buffer to match
132+
const batch = 100000;
133+
// Use sparse changes for this test
134+
provideChanges(batch, 25000000);
135+
changes(db, '/dev/null', batch, null, function(err) {
136+
assert.ok(!err);
137+
assert.ok(nock.isDone());
138+
done();
139+
});
140+
});
141+
142+
it('#slower should keep collecting changes (500M)', function(done) {
143+
// This test might take up to 90 minutes
144+
this.timeout(90 * 60 * 1000);
145+
// Note changes spooling uses a constant batch size, we are setting
146+
// a test value here and setting the buffer to match
147+
const batch = 1000000;
148+
// Use full changes for this test to exercise load
149+
provideChanges(batch, 500000000, true);
150+
changes(db, '/dev/null', batch, null, function(err) {
151+
assert.ok(!err);
152+
assert.ok(nock.isDone());
153+
done();
154+
});
155+
});
62156
});

0 commit comments

Comments
 (0)