-
-
Notifications
You must be signed in to change notification settings - Fork 20
Upgrade to latest Level modules #15
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,6 +1,5 @@ | ||
language: node_js | ||
node_js: | ||
- '0.10' | ||
- '0.12' | ||
- '4.0' | ||
- '5.0' | ||
- 8 | ||
- 10 | ||
- 12 |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,35 +1,32 @@ | ||
var levelup = require('levelup') | ||
var duplexify = require('duplexify') | ||
var encode = require('encoding-down') | ||
var leveldown = require('./leveldown') | ||
|
||
module.exports = function (opts) { | ||
if (!opts) opts = {} | ||
|
||
var down | ||
opts.db = createLeveldown | ||
opts.onflush = onflush | ||
var db = levelup('multileveldown', opts) | ||
var down = leveldown(Object.assign({}, opts, { onflush: onflush })) | ||
var db = levelup(encode(down, opts), opts) | ||
|
||
db.createRpcStream = db.connect = connect | ||
db.isFlushed = isFlushed | ||
db.forward = forward | ||
|
||
return db | ||
|
||
function createLeveldown (path) { | ||
down = leveldown(path, opts) | ||
return down | ||
} | ||
|
||
function onflush () { | ||
db.emit('flush') | ||
} | ||
|
||
function connect (opts) { | ||
if (down) return down.createRpcStream(opts, null) | ||
return down.createRpcStream(opts, null) | ||
} | ||
|
||
var proxy = duplexify() | ||
db.open(function () { | ||
down.createRpcStream(opts, proxy) | ||
}) | ||
function isFlushed () { | ||
return down.isFlushed() | ||
} | ||
|
||
return proxy | ||
function forward (db) { | ||
down.forward(db) | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -4,7 +4,9 @@ var util = require('util') | |
var eos = require('end-of-stream') | ||
var ids = require('numeric-id-map') | ||
var lpstream = require('length-prefixed-stream') | ||
var reachdown = require('reachdown') | ||
var messages = require('./messages') | ||
var matchdown = require('./matchdown') | ||
|
||
var ENCODERS = [ | ||
messages.Get, | ||
|
@@ -21,9 +23,9 @@ var DECODERS = [ | |
|
||
module.exports = Multilevel | ||
|
||
function Multilevel (path, opts) { | ||
if (!(this instanceof Multilevel)) return new Multilevel(path, opts) | ||
abstract.AbstractLevelDOWN.call(this, path) | ||
function Multilevel (opts) { | ||
if (!(this instanceof Multilevel)) return new Multilevel(opts) | ||
abstract.AbstractLevelDOWN.call(this) | ||
|
||
if (!opts) opts = {} | ||
this._iterators = ids() | ||
|
@@ -38,6 +40,8 @@ function Multilevel (path, opts) { | |
|
||
util.inherits(Multilevel, abstract.AbstractLevelDOWN) | ||
|
||
Multilevel.prototype.type = 'multileveldown' | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is for compatibility with |
||
|
||
Multilevel.prototype.createRpcStream = function (opts, proxy) { | ||
if (this._streaming) throw new Error('Only one rpc stream can be active') | ||
if (!opts) opts = {} | ||
|
@@ -112,12 +116,12 @@ Multilevel.prototype.createRpcStream = function (opts, proxy) { | |
|
||
function oncallback (res) { | ||
var req = self._requests.remove(res.id) | ||
if (req) req.callback(decodeError(res.error), decodeValue(res.value, req.valueEncoding)) | ||
if (req) req.callback(decodeError(res.error), decodeValue(res.value, req.valueAsBuffer)) | ||
} | ||
} | ||
|
||
Multilevel.prototype.forward = function (down) { | ||
this._db = down | ||
this._db = reachdown(down, matchdown, false) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This peels off |
||
} | ||
|
||
Multilevel.prototype.isFlushed = function () { | ||
|
@@ -145,14 +149,22 @@ Multilevel.prototype._clearRequests = function (closing) { | |
} | ||
} | ||
|
||
Multilevel.prototype._serializeKey = function (key) { | ||
return Buffer.isBuffer(key) ? key : Buffer.from(String(key)) | ||
} | ||
|
||
Multilevel.prototype._serializeValue = function (value) { | ||
return Buffer.isBuffer(value) ? value : Buffer.from(String(value)) | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This ensures we're only working with buffers internally. |
||
|
||
Multilevel.prototype._get = function (key, opts, cb) { | ||
if (this._db) return this._db._get(key, opts, cb) | ||
|
||
var req = { | ||
tag: 0, | ||
id: 0, | ||
key: key, | ||
valueEncoding: opts.valueEncoding || (opts.asBuffer === false ? 'utf-8' : 'binary'), | ||
valueAsBuffer: opts.asBuffer, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We no longer have access to |
||
callback: cb || noop | ||
} | ||
|
||
|
@@ -206,7 +218,7 @@ Multilevel.prototype._batch = function (batch, opts, cb) { | |
Multilevel.prototype._write = function (req) { | ||
if (this._requests.length + this._iterators.length === 1) ref(this._ref) | ||
var enc = ENCODERS[req.tag] | ||
var buf = new Buffer(enc.encodingLength(req) + 1) | ||
var buf = Buffer.allocUnsafe(enc.encodingLength(req) + 1) | ||
buf[0] = req.tag | ||
enc.encode(req, buf, 1) | ||
this._encode.write(buf) | ||
|
@@ -231,10 +243,11 @@ Multilevel.prototype._iterator = function (opts) { | |
|
||
function noop () {} | ||
|
||
// TODO: extend AbstractIterator, passing db to ctor | ||
function Iterator (parent, opts) { | ||
this._parent = parent | ||
this._keyEncoding = opts.keyEncoding | ||
this._valueEncoding = opts.valueEncoding | ||
this._keyAsBuffer = opts.keyAsBuffer | ||
this._valueAsBuffer = opts.valueAsBuffer | ||
this._options = opts | ||
|
||
var req = { | ||
|
@@ -274,8 +287,8 @@ Iterator.prototype.next = function (cb) { | |
this._options.gt = next.key | ||
if (this._options.limit > 0) this._options.limit-- | ||
|
||
var key = decodeValue(next.key, this._keyEncoding) | ||
var val = decodeValue(next.value, this._valueEncoding) | ||
var key = decodeValue(next.key, this._keyAsBuffer) | ||
var val = decodeValue(next.value, this._valueAsBuffer) | ||
return cb(undefined, key, val) | ||
} | ||
|
||
|
@@ -294,9 +307,9 @@ function decodeError (err) { | |
return err ? new Error(err) : null | ||
} | ||
|
||
function decodeValue (val, enc) { | ||
function decodeValue (val, asBuffer) { | ||
if (!val) return undefined | ||
return (enc === 'utf8' || enc === 'utf-8') ? val.toString() : val | ||
return asBuffer ? val : val.toString() | ||
} | ||
|
||
function ref (r) { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,8 @@ | ||
module.exports = function matchdown (db, type) { | ||
// Skip layers that we handle ourselves | ||
if (type === 'levelup') return false | ||
if (type === 'encoding-down') return false | ||
if (type === 'deferred-leveldown') return false | ||
|
||
return true | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,7 +1,10 @@ | ||
var lpstream = require('length-prefixed-stream') | ||
var eos = require('end-of-stream') | ||
var duplexify = require('duplexify') | ||
var reachdown = require('reachdown') | ||
var messages = require('./messages') | ||
var rangeOptions = 'gt gte lt lte'.split(' ') | ||
var matchdown = require('./matchdown') | ||
|
||
var DECODERS = [ | ||
messages.Get, | ||
|
@@ -29,7 +32,7 @@ module.exports = function (db, opts) { | |
return stream | ||
|
||
function ready () { | ||
var down = db.db | ||
var down = reachdown(db, matchdown, false) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This peels off |
||
var iterators = [] | ||
|
||
eos(stream, function () { | ||
|
@@ -71,8 +74,8 @@ module.exports = function (db, opts) { | |
}) | ||
|
||
function callback (id, err, value) { | ||
var msg = {id: id, error: err && err.message, value: value} | ||
var buf = new Buffer(messages.Callback.encodingLength(msg) + 1) | ||
var msg = { id: id, error: err && err.message, value: value } | ||
var buf = Buffer.allocUnsafe(messages.Callback.encodingLength(msg) + 1) | ||
buf[0] = 0 | ||
messages.Callback.encode(msg, buf, 1) | ||
encode.write(buf) | ||
|
@@ -109,6 +112,7 @@ module.exports = function (db, opts) { | |
function onbatch (req) { | ||
prebatch(req.ops, function (err) { | ||
if (err) return callback(err) | ||
|
||
down.batch(req.ops, function (err) { | ||
callback(req.id, err) | ||
}) | ||
|
@@ -136,15 +140,7 @@ function Iterator (down, req, encode) { | |
var self = this | ||
|
||
this.batch = req.batch || 0 | ||
|
||
if (req.options) { | ||
if (req.options.gt === null) req.options.gt = undefined | ||
if (req.options.gte === null) req.options.gte = undefined | ||
if (req.options.lt === null) req.options.lt = undefined | ||
if (req.options.lte === null) req.options.lte = undefined | ||
} | ||
|
||
this._iterator = down.iterator(req.options) | ||
this._iterator = down.iterator(cleanRangeOptions(req.options)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Because |
||
this._encode = encode | ||
this._send = send | ||
this._nexting = false | ||
|
@@ -163,7 +159,7 @@ function Iterator (down, req, encode) { | |
self._data.key = key | ||
self._data.value = value | ||
self.batch-- | ||
var buf = new Buffer(messages.IteratorData.encodingLength(self._data) + 1) | ||
var buf = Buffer.allocUnsafe(messages.IteratorData.encodingLength(self._data) + 1) | ||
buf[0] = 1 | ||
messages.IteratorData.encode(self._data, buf, 1) | ||
encode.write(buf) | ||
|
@@ -185,3 +181,23 @@ Iterator.prototype.end = function () { | |
} | ||
|
||
function noop () {} | ||
|
||
function cleanRangeOptions (options) { | ||
if (!options) return | ||
|
||
var result = {} | ||
|
||
for (var k in options) { | ||
if (!hasOwnProperty.call(options, k)) continue | ||
|
||
if (!isRangeOption(k) || options[k] != null) { | ||
result[k] = options[k] | ||
} | ||
} | ||
|
||
return result | ||
} | ||
|
||
function isRangeOption (k) { | ||
return rangeOptions.indexOf(k) !== -1 | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,4 +1,5 @@ | ||
var pbs = require('pbs') | ||
var fs = require('fs') | ||
var path = require('path') | ||
|
||
module.exports = pbs(fs.readFileSync(__dirname + '/schema.proto')) | ||
module.exports = pbs(fs.readFileSync(path.join(__dirname, 'schema.proto'))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
abstract-leveldown
no longer needs a location (akapath
here).