diff --git a/.travis.yml b/.travis.yml index c042821..d72655c 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,6 +1,5 @@ language: node_js node_js: - - '0.10' - - '0.12' - - '4.0' - - '5.0' + - 8 + - 10 + - 12 diff --git a/client.js b/client.js index dd49b33..8db3488 100644 --- a/client.js +++ b/client.js @@ -1,35 +1,32 @@ var levelup = require('levelup') -var duplexify = require('duplexify') +var encode = require('encoding-down') var leveldown = require('./leveldown') module.exports = function (opts) { if (!opts) opts = {} - var down - opts.db = createLeveldown - opts.onflush = onflush - var db = levelup('multileveldown', opts) + var down = leveldown(Object.assign({}, opts, { onflush: onflush })) + var db = levelup(encode(down, opts), opts) + db.createRpcStream = db.connect = connect + db.isFlushed = isFlushed + db.forward = forward return db - function createLeveldown (path) { - down = leveldown(path, opts) - return down - } - function onflush () { db.emit('flush') } function connect (opts) { - if (down) return down.createRpcStream(opts, null) + return down.createRpcStream(opts, null) + } - var proxy = duplexify() - db.open(function () { - down.createRpcStream(opts, proxy) - }) + function isFlushed () { + return down.isFlushed() + } - return proxy + function forward (db) { + down.forward(db) } } diff --git a/leveldown.js b/leveldown.js index 209e6f6..e65ce71 100644 --- a/leveldown.js +++ b/leveldown.js @@ -4,7 +4,9 @@ var util = require('util') var eos = require('end-of-stream') var ids = require('numeric-id-map') var lpstream = require('length-prefixed-stream') +var reachdown = require('reachdown') var messages = require('./messages') +var matchdown = require('./matchdown') var ENCODERS = [ messages.Get, @@ -21,9 +23,9 @@ var DECODERS = [ module.exports = Multilevel -function Multilevel (path, opts) { - if (!(this instanceof Multilevel)) return new Multilevel(path, opts) - abstract.AbstractLevelDOWN.call(this, path) +function Multilevel (opts) { + if (!(this instanceof Multilevel)) return new Multilevel(opts) + abstract.AbstractLevelDOWN.call(this) if (!opts) opts = {} this._iterators = ids() @@ -38,6 +40,8 @@ function Multilevel (path, opts) { util.inherits(Multilevel, abstract.AbstractLevelDOWN) +Multilevel.prototype.type = 'multileveldown' + Multilevel.prototype.createRpcStream = function (opts, proxy) { if (this._streaming) throw new Error('Only one rpc stream can be active') if (!opts) opts = {} @@ -112,12 +116,12 @@ Multilevel.prototype.createRpcStream = function (opts, proxy) { function oncallback (res) { var req = self._requests.remove(res.id) - if (req) req.callback(decodeError(res.error), decodeValue(res.value, req.valueEncoding)) + if (req) req.callback(decodeError(res.error), decodeValue(res.value, req.valueAsBuffer)) } } Multilevel.prototype.forward = function (down) { - this._db = down + this._db = reachdown(down, matchdown, false) } Multilevel.prototype.isFlushed = function () { @@ -145,6 +149,14 @@ Multilevel.prototype._clearRequests = function (closing) { } } +Multilevel.prototype._serializeKey = function (key) { + return Buffer.isBuffer(key) ? key : Buffer.from(String(key)) +} + +Multilevel.prototype._serializeValue = function (value) { + return Buffer.isBuffer(value) ? value : Buffer.from(String(value)) +} + Multilevel.prototype._get = function (key, opts, cb) { if (this._db) return this._db._get(key, opts, cb) @@ -152,7 +164,7 @@ Multilevel.prototype._get = function (key, opts, cb) { tag: 0, id: 0, key: key, - valueEncoding: opts.valueEncoding || (opts.asBuffer === false ? 'utf-8' : 'binary'), + valueAsBuffer: opts.asBuffer, callback: cb || noop } @@ -206,7 +218,7 @@ Multilevel.prototype._batch = function (batch, opts, cb) { Multilevel.prototype._write = function (req) { if (this._requests.length + this._iterators.length === 1) ref(this._ref) var enc = ENCODERS[req.tag] - var buf = new Buffer(enc.encodingLength(req) + 1) + var buf = Buffer.allocUnsafe(enc.encodingLength(req) + 1) buf[0] = req.tag enc.encode(req, buf, 1) this._encode.write(buf) @@ -231,10 +243,11 @@ Multilevel.prototype._iterator = function (opts) { function noop () {} +// TODO: extend AbstractIterator, passing db to ctor function Iterator (parent, opts) { this._parent = parent - this._keyEncoding = opts.keyEncoding - this._valueEncoding = opts.valueEncoding + this._keyAsBuffer = opts.keyAsBuffer + this._valueAsBuffer = opts.valueAsBuffer this._options = opts var req = { @@ -274,8 +287,8 @@ Iterator.prototype.next = function (cb) { this._options.gt = next.key if (this._options.limit > 0) this._options.limit-- - var key = decodeValue(next.key, this._keyEncoding) - var val = decodeValue(next.value, this._valueEncoding) + var key = decodeValue(next.key, this._keyAsBuffer) + var val = decodeValue(next.value, this._valueAsBuffer) return cb(undefined, key, val) } @@ -294,9 +307,9 @@ function decodeError (err) { return err ? new Error(err) : null } -function decodeValue (val, enc) { +function decodeValue (val, asBuffer) { if (!val) return undefined - return (enc === 'utf8' || enc === 'utf-8') ? val.toString() : val + return asBuffer ? val : val.toString() } function ref (r) { diff --git a/matchdown.js b/matchdown.js new file mode 100644 index 0000000..089ce39 --- /dev/null +++ b/matchdown.js @@ -0,0 +1,8 @@ +module.exports = function matchdown (db, type) { + // Skip layers that we handle ourselves + if (type === 'levelup') return false + if (type === 'encoding-down') return false + if (type === 'deferred-leveldown') return false + + return true +} diff --git a/package.json b/package.json index c276257..09cba8d 100644 --- a/package.json +++ b/package.json @@ -4,20 +4,25 @@ "description": "multilevel implemented using leveldowns with reconnect support", "main": "index.js", "dependencies": { - "abstract-leveldown": "^2.4.1", - "duplexify": "^3.4.2", + "abstract-leveldown": "^6.1.1", + "duplexify": "^4.1.1", + "encoding-down": "^6.2.0", "end-of-stream": "^1.1.0", - "length-prefixed-stream": "^1.4.0", - "levelup": "^1.3.1", + "length-prefixed-stream": "^2.0.0", + "levelup": "^4.1.0", "numeric-id-map": "^1.1.0", - "protocol-buffers-encodings": "^1.1.0" + "protocol-buffers-encodings": "^1.1.0", + "reachdown": "^1.0.0" }, "devDependencies": { - "concat-stream": "^1.5.1", - "memdown": "^1.1.0", + "@adorsys/encrypt-down": "^2.0.1", + "concat-stream": "^2.0.0", + "level-compose": "^0.0.2", + "memdown": "^5.0.0", "protocol-buffers": "^4.0.2", - "standard": "^5.4.1", - "tape": "^4.4.0" + "standard": "^14.1.0", + "subleveldown": "^4.1.1", + "tape": "^4.11.0" }, "scripts": { "test": "standard && tape test/*.js", diff --git a/server.js b/server.js index f8775d3..ec74bc5 100644 --- a/server.js +++ b/server.js @@ -1,7 +1,10 @@ var lpstream = require('length-prefixed-stream') var eos = require('end-of-stream') var duplexify = require('duplexify') +var reachdown = require('reachdown') var messages = require('./messages') +var rangeOptions = 'gt gte lt lte'.split(' ') +var matchdown = require('./matchdown') var DECODERS = [ messages.Get, @@ -29,7 +32,7 @@ module.exports = function (db, opts) { return stream function ready () { - var down = db.db + var down = reachdown(db, matchdown, false) var iterators = [] eos(stream, function () { @@ -71,8 +74,8 @@ module.exports = function (db, opts) { }) function callback (id, err, value) { - var msg = {id: id, error: err && err.message, value: value} - var buf = new Buffer(messages.Callback.encodingLength(msg) + 1) + var msg = { id: id, error: err && err.message, value: value } + var buf = Buffer.allocUnsafe(messages.Callback.encodingLength(msg) + 1) buf[0] = 0 messages.Callback.encode(msg, buf, 1) encode.write(buf) @@ -109,6 +112,7 @@ module.exports = function (db, opts) { function onbatch (req) { prebatch(req.ops, function (err) { if (err) return callback(err) + down.batch(req.ops, function (err) { callback(req.id, err) }) @@ -136,15 +140,7 @@ function Iterator (down, req, encode) { var self = this this.batch = req.batch || 0 - - if (req.options) { - if (req.options.gt === null) req.options.gt = undefined - if (req.options.gte === null) req.options.gte = undefined - if (req.options.lt === null) req.options.lt = undefined - if (req.options.lte === null) req.options.lte = undefined - } - - this._iterator = down.iterator(req.options) + this._iterator = down.iterator(cleanRangeOptions(req.options)) this._encode = encode this._send = send this._nexting = false @@ -163,7 +159,7 @@ function Iterator (down, req, encode) { self._data.key = key self._data.value = value self.batch-- - var buf = new Buffer(messages.IteratorData.encodingLength(self._data) + 1) + var buf = Buffer.allocUnsafe(messages.IteratorData.encodingLength(self._data) + 1) buf[0] = 1 messages.IteratorData.encode(self._data, buf, 1) encode.write(buf) @@ -185,3 +181,23 @@ Iterator.prototype.end = function () { } function noop () {} + +function cleanRangeOptions (options) { + if (!options) return + + var result = {} + + for (var k in options) { + if (!hasOwnProperty.call(options, k)) continue + + if (!isRangeOption(k) || options[k] != null) { + result[k] = options[k] + } + } + + return result +} + +function isRangeOption (k) { + return rangeOptions.indexOf(k) !== -1 +} diff --git a/streams.js b/streams.js index 716d57d..9b275ab 100644 --- a/streams.js +++ b/streams.js @@ -1,4 +1,5 @@ var pbs = require('pbs') var fs = require('fs') +var path = require('path') -module.exports = pbs(fs.readFileSync(__dirname + '/schema.proto')) +module.exports = pbs(fs.readFileSync(path.join(__dirname, 'schema.proto'))) diff --git a/test/basic.js b/test/basic.js index 0ef4661..653447c 100644 --- a/test/basic.js +++ b/test/basic.js @@ -2,26 +2,81 @@ var tape = require('tape') var memdown = require('memdown') var concat = require('concat-stream') var levelup = require('levelup') +var encode = require('encoding-down') +var factory = require('level-compose')(memdown, encode, levelup) var multileveldown = require('../') tape('get', function (t) { - var db = levelup('no-location', {db: mem}) + t.plan(7) + + var db = factory() var stream = multileveldown.server(db) var client = multileveldown.client() stream.pipe(client.createRpcStream()).pipe(stream) - db.put('hello', 'world', function () { + db.put('hello', 'world', function (err) { + t.error(err, 'no err') + client.get('hello', function (err, value) { t.error(err, 'no err') t.same(value, 'world') + }) + + client.get(Buffer.from('hello'), function (err, value) { + t.error(err, 'no err') + t.same(value, 'world') + }) + + client.get('hello', { valueEncoding: 'binary' }, function (err, value) { + t.error(err, 'no err') + t.same(value, Buffer.from('world')) + }) + }) +}) + +tape('get with valueEncoding: json in constructor', function (t) { + var db = factory() + var stream = multileveldown.server(db) + var client = multileveldown.client({ valueEncoding: 'json' }) + + stream.pipe(client.createRpcStream()).pipe(stream) + + db.put('hello', '{"foo":"world"}', function () { + client.get('hello', function (err, value) { + t.error(err, 'no err') + t.same(value, { foo: 'world' }) t.end() }) }) }) +tape('get with valueEncoding: json in get options', function (t) { + t.plan(5) + + var db = factory() + var stream = multileveldown.server(db) + var client = multileveldown.client() + + stream.pipe(client.createRpcStream()).pipe(stream) + + db.put('hello', '{"foo":"world"}', function (err) { + t.error(err, 'no err') + + client.get('hello', { valueEncoding: 'json' }, function (err, value) { + t.error(err, 'no err') + t.same(value, { foo: 'world' }) + }) + + client.get('hello', function (err, value) { + t.error(err, 'no err') + t.same(value, '{"foo":"world"}') + }) + }) +}) + tape('put', function (t) { - var db = levelup('no-location', {db: mem}) + var db = factory() var stream = multileveldown.server(db) var client = multileveldown.client() @@ -37,12 +92,60 @@ tape('put', function (t) { }) }) +tape('put with valueEncoding: json in constructor', function (t) { + t.plan(5) + + var db = factory() + var stream = multileveldown.server(db) + var client = multileveldown.client({ valueEncoding: 'json' }) + + stream.pipe(client.createRpcStream()).pipe(stream) + + client.put('hello', { foo: 'world' }, function (err) { + t.error(err, 'no err') + + db.get('hello', function (err, value) { + t.error(err, 'no err') + t.same(value, '{"foo":"world"}') + }) + + client.get('hello', function (err, value) { + t.error(err, 'no err') + t.same(value, { foo: 'world' }) + }) + }) +}) + +tape('put with valueEncoding: json in put options', function (t) { + t.plan(5) + + var db = factory() + var stream = multileveldown.server(db) + var client = multileveldown.client() + + stream.pipe(client.createRpcStream()).pipe(stream) + + client.put('hello', { foo: 'world' }, { valueEncoding: 'json' }, function (err) { + t.error(err, 'no err') + + db.get('hello', function (err, value) { + t.error(err, 'no err') + t.same(value, '{"foo":"world"}') + }) + + client.get('hello', function (err, value) { + t.error(err, 'no err') + t.same(value, '{"foo":"world"}') + }) + }) +}) + tape('readonly', function (t) { - var db = levelup('no-location', {db: mem}) + var db = factory() db.put('hello', 'verden') - var stream = multileveldown.server(db, {readonly: true}) + var stream = multileveldown.server(db, { readonly: true }) var client = multileveldown.client() stream.pipe(client.createRpcStream()).pipe(stream) @@ -58,7 +161,7 @@ tape('readonly', function (t) { }) tape('del', function (t) { - var db = levelup('no-location', {db: mem}) + var db = factory() var stream = multileveldown.server(db) var client = multileveldown.client() @@ -78,13 +181,13 @@ tape('del', function (t) { }) tape('batch', function (t) { - var db = levelup('no-location', {db: mem}) + var db = factory() var stream = multileveldown.server(db) var client = multileveldown.client() stream.pipe(client.createRpcStream()).pipe(stream) - client.batch([{type: 'put', key: 'hello', value: 'world'}, {type: 'put', key: 'hej', value: 'verden'}], function (err) { + client.batch([{ type: 'put', key: 'hello', value: 'world' }, { type: 'put', key: 'hej', value: 'verden' }], function (err) { t.error(err, 'no err') client.get('hello', function (err, value) { t.error(err, 'no err') @@ -99,42 +202,38 @@ tape('batch', function (t) { }) tape('read stream', function (t) { - var db = levelup('no-location', {db: mem}) + var db = factory() var stream = multileveldown.server(db) var client = multileveldown.client() stream.pipe(client.createRpcStream()).pipe(stream) - client.batch([{type: 'put', key: 'hello', value: 'world'}, {type: 'put', key: 'hej', value: 'verden'}], function (err) { + client.batch([{ type: 'put', key: 'hello', value: 'world' }, { type: 'put', key: 'hej', value: 'verden' }], function (err) { t.error(err, 'no err') var rs = client.createReadStream() rs.pipe(concat(function (datas) { t.same(datas.length, 2) - t.same(datas[0], {key: 'hej', value: 'verden'}) - t.same(datas[1], {key: 'hello', value: 'world'}) + t.same(datas[0], { key: 'hej', value: 'verden' }) + t.same(datas[1], { key: 'hello', value: 'world' }) t.end() })) }) }) tape('read stream (gt)', function (t) { - var db = levelup('no-location', {db: mem}) + var db = factory() var stream = multileveldown.server(db) var client = multileveldown.client() stream.pipe(client.createRpcStream()).pipe(stream) - client.batch([{type: 'put', key: 'hello', value: 'world'}, {type: 'put', key: 'hej', value: 'verden'}], function (err) { + client.batch([{ type: 'put', key: 'hello', value: 'world' }, { type: 'put', key: 'hej', value: 'verden' }], function (err) { t.error(err, 'no err') - var rs = client.createReadStream({gt: 'hej'}) + var rs = client.createReadStream({ gt: 'hej' }) rs.pipe(concat(function (datas) { t.same(datas.length, 1) - t.same(datas[0], {key: 'hello', value: 'world'}) + t.same(datas[0], { key: 'hello', value: 'world' }) t.end() })) }) }) - -function mem () { - return memdown() -} diff --git a/test/concurrent.js b/test/concurrent.js index 50afaa2..e3290c7 100644 --- a/test/concurrent.js +++ b/test/concurrent.js @@ -3,16 +3,18 @@ var memdown = require('memdown') var levelup = require('levelup') var concat = require('concat-stream') var multileveldown = require('../') +var encode = require('encoding-down') +var factory = require('level-compose')(memdown, encode, levelup) tape('two concurrent iterators', function (t) { - var db = levelup('', {db: memdown}) + var db = factory() var server = multileveldown.server(db) var client = multileveldown.client() server.pipe(client.connect()).pipe(server) var batch = [] - for (var i = 0; i < 100; i++) batch.push({type: 'put', key: 'key-' + i, value: 'value-' + i}) + for (var i = 0; i < 100; i++) batch.push({ type: 'put', key: 'key-' + i, value: 'value-' + i }) client.batch(batch, function (err) { t.error(err) diff --git a/test/encryptdown.js b/test/encryptdown.js new file mode 100644 index 0000000..795f19f --- /dev/null +++ b/test/encryptdown.js @@ -0,0 +1,69 @@ +var tape = require('tape') +var memdown = require('memdown') +var concat = require('concat-stream') +var levelup = require('levelup') +var encode = require('encoding-down') +var encrypt = require('@adorsys/encrypt-down') +var factory = require('level-compose')(memdown, encrypt, encode, levelup) +var multileveldown = require('../') + +var jwk = { + kty: 'oct', + alg: 'A256GCM', + use: 'enc', + k: '123456789abcdefghijklmnopqrstuvwxyz12345678' +} + +// The reason we test encrypt-down is that multileveldown should in this case +// peel off the levelup, deferred-leveldown and encoding-down layers from db, +// but stop peeling at the encrypt-down layer. This case is also different +// from subleveldown because encrypt-down doesn't export levelup. +tape('multileveldown server on encrypt-down', function (t) { + t.plan(3) + + var db = factory({ jwk }) + var stream = multileveldown.server(db) + var client = multileveldown.client() + + stream.pipe(client.createRpcStream()).pipe(stream) + + client.put('a', 'client', function (err) { + t.error(err, 'no err') + + db.put('b', 'server', function (err) { + t.error(err, 'no err') + + client.createReadStream().pipe(concat(function (entries) { + t.same(entries, [ + { key: 'a', value: 'client' }, + { key: 'b', value: 'server' } + ]) + })) + }) + }) +}) + +tape('multileveldown server on encrypt-down with encoding', function (t) { + t.plan(3) + + var db = factory({ jwk, valueEncoding: 'json' }) + var stream = multileveldown.server(db) + var client = multileveldown.client({ valueEncoding: 'json' }) + + stream.pipe(client.createRpcStream()).pipe(stream) + + client.put('a', { from: 'client' }, function (err) { + t.error(err, 'no err') + + db.put('b', { from: 'server' }, function (err) { + t.error(err, 'no err') + + client.createReadStream().pipe(concat(function (entries) { + t.same(entries, [ + { key: 'a', value: { from: 'client' } }, + { key: 'b', value: { from: 'server' } } + ]) + })) + }) + }) +}) diff --git a/test/retry.js b/test/retry.js index 18e641a..8dee6df 100644 --- a/test/retry.js +++ b/test/retry.js @@ -2,11 +2,13 @@ var tape = require('tape') var memdown = require('memdown') var levelup = require('levelup') var multileveldown = require('../') +var encode = require('encoding-down') +var factory = require('level-compose')(memdown, encode, levelup) tape('retry get', function (t) { - var db = levelup('no-location', {db: memdown}) + var db = factory() var stream = multileveldown.server(db) - var client = multileveldown.client({retry: true}) + var client = multileveldown.client({ retry: true }) db.put('hello', 'world', function () { client.get('hello', function (err, value) { @@ -20,9 +22,9 @@ tape('retry get', function (t) { }) tape('no retry get', function (t) { - var db = levelup('no-location', {db: memdown}) + var db = factory() var stream = multileveldown.server(db) - var client = multileveldown.client({retry: false}) + var client = multileveldown.client({ retry: false }) client.open(function () { db.put('hello', 'world', function () { @@ -44,9 +46,9 @@ tape('no retry get', function (t) { }) tape('retry get', function (t) { - var db = levelup('no-location', {db: memdown}) + var db = factory() var stream = multileveldown.server(db) - var client = multileveldown.client({retry: true}) + var client = multileveldown.client({ retry: true }) client.open(function () { db.put('hello', 'world', function () { @@ -69,8 +71,8 @@ tape('retry get', function (t) { }) tape('retry read stream', function (t) { - var db = levelup('no-location', {db: memdown}) - var client = multileveldown.client({retry: true}) + var db = factory() + var client = multileveldown.client({ retry: true }) client.open(function () { db.batch([{ @@ -124,8 +126,8 @@ tape('retry read stream', function (t) { }) tape('retry read stream and limit', function (t) { - var db = levelup('no-location', {db: memdown}) - var client = multileveldown.client({retry: true}) + var db = factory() + var client = multileveldown.client({ retry: true }) client.open(function () { db.batch([{ @@ -141,7 +143,7 @@ tape('retry read stream and limit', function (t) { key: 'hola', value: 'mundo' }], function () { - var rs = client.createReadStream({limit: 2}) + var rs = client.createReadStream({ limit: 2 }) var expected = [{ key: 'hej', value: 'verden' diff --git a/test/streams.js b/test/streams.js index 50afaa2..e3290c7 100644 --- a/test/streams.js +++ b/test/streams.js @@ -3,16 +3,18 @@ var memdown = require('memdown') var levelup = require('levelup') var concat = require('concat-stream') var multileveldown = require('../') +var encode = require('encoding-down') +var factory = require('level-compose')(memdown, encode, levelup) tape('two concurrent iterators', function (t) { - var db = levelup('', {db: memdown}) + var db = factory() var server = multileveldown.server(db) var client = multileveldown.client() server.pipe(client.connect()).pipe(server) var batch = [] - for (var i = 0; i < 100; i++) batch.push({type: 'put', key: 'key-' + i, value: 'value-' + i}) + for (var i = 0; i < 100; i++) batch.push({ type: 'put', key: 'key-' + i, value: 'value-' + i }) client.batch(batch, function (err) { t.error(err) diff --git a/test/subleveldown.js b/test/subleveldown.js new file mode 100644 index 0000000..e4cafe7 --- /dev/null +++ b/test/subleveldown.js @@ -0,0 +1,167 @@ +var tape = require('tape') +var memdown = require('memdown') +var concat = require('concat-stream') +var levelup = require('levelup') +var encode = require('encoding-down') +var factory = require('level-compose')(memdown, encode, levelup) +var sub = require('subleveldown') +var multileveldown = require('../') + +tape('subleveldown on deferred multileveldown client', function (t) { + t.plan(5) + + var db = factory() + var stream = multileveldown.server(db) + var client = multileveldown.client() + var sub1 = sub(client, 'test', { valueEncoding: 'json' }) + var sub2 = sub(client, 'test') + + t.is(client.isOpen(), false) + stream.pipe(client.createRpcStream()).pipe(stream) + + sub1.put('hello', { test: 'world' }, function (err) { + t.error(err, 'no err') + + sub1.createReadStream().pipe(concat(function (entries) { + t.same(entries, [{ key: 'hello', value: { test: 'world' } }]) + })) + + sub2.createReadStream().pipe(concat(function (entries) { + t.same(entries, [{ key: 'hello', value: '{"test":"world"}' }]) + })) + + db.createReadStream().pipe(concat(function (entries) { + t.same(entries, [{ key: '!test!hello', value: '{"test":"world"}' }]) + })) + }) +}) + +tape('subleveldown on non-deferred multileveldown client', function (t) { + t.plan(5) + + var db = factory() + var stream = multileveldown.server(db) + var client = multileveldown.client() + + stream.pipe(client.createRpcStream()).pipe(stream) + + client.once('open', function () { + t.is(client.isOpen(), true) + + var sub1 = sub(client, 'test', { valueEncoding: 'json' }) + var sub2 = sub(client, 'test') + + sub1.put('hello', { test: 'world' }, function (err) { + t.error(err, 'no err') + + sub1.createReadStream().pipe(concat(function (entries) { + t.same(entries, [{ key: 'hello', value: { test: 'world' } }]) + })) + + sub2.createReadStream().pipe(concat(function (entries) { + t.same(entries, [{ key: 'hello', value: '{"test":"world"}' }]) + })) + + db.createReadStream().pipe(concat(function (entries) { + t.same(entries, [{ key: '!test!hello', value: '{"test":"world"}' }]) + })) + }) + }) +}) + +tape('multileveldown server on deferred subleveldown', function (t) { + t.plan(4) + + var db = factory() + var sub1 = sub(db, 'test1') + var sub2 = sub(db, 'test2') + var stream = multileveldown.server(sub1) + var client = multileveldown.client() + + stream.pipe(client.createRpcStream()).pipe(stream) + + client.put('from', 'client', function (err) { + t.error(err, 'no err') + + sub2.put('from', 'server', function (err) { + t.error(err, 'no err') + + client.createReadStream().pipe(concat(function (entries) { + t.same(entries, [{ key: 'from', value: 'client' }]) + })) + + db.createReadStream().pipe(concat(function (entries) { + t.same(entries, [ + { key: '!test1!from', value: 'client' }, + { key: '!test2!from', value: 'server' } + ]) + })) + }) + }) +}) + +tape('multileveldown server on non-deferred subleveldown', function (t) { + t.plan(4) + + var db = factory() + var sub1 = sub(db, 'test1') + var sub2 = sub(db, 'test2') + + sub1.once('open', function () { + var stream = multileveldown.server(sub1) + var client = multileveldown.client() + + stream.pipe(client.createRpcStream()).pipe(stream) + + client.put('from', 'client', function (err) { + t.error(err, 'no err') + + sub2.put('from', 'server', function (err) { + t.error(err, 'no err') + + client.createReadStream().pipe(concat(function (entries) { + t.same(entries, [{ key: 'from', value: 'client' }]) + })) + + db.createReadStream().pipe(concat(function (entries) { + t.same(entries, [ + { key: '!test1!from', value: 'client' }, + { key: '!test2!from', value: 'server' } + ]) + })) + }) + }) + }) +}) + +tape('multileveldown server on nested subleveldown', function (t) { + t.plan(4) + + var db = factory() + var sub1 = sub(db, 'test1') + var sub2 = sub(sub1, 'test2') + var sub3 = sub(db, 'test3') + var stream = multileveldown.server(sub2) + var client = multileveldown.client() + + stream.pipe(client.createRpcStream()).pipe(stream) + + client.put('from', 'client', function (err) { + t.error(err, 'no err') + + sub3.put('from', 'server', function (err) { + t.error(err, 'no err') + + client.createReadStream().pipe(concat(function (entries) { + t.same(entries, [{ key: 'from', value: 'client' }]) + })) + + db.createReadStream().pipe(concat(function (entries) { + t.same(entries, [ + { key: '!test1!!test2!from', value: 'client' }, + { key: '!test3!from', value: 'server' } + ]) + })) + }) + }) +})