Skip to content
This repository was archived by the owner on Dec 2, 2024. It is now read-only.

Commit

Permalink
Upgrade to latest Level modules (#15)
Browse files Browse the repository at this point in the history
* Upgrade to latest Level modules
* Add subleveldown test
* Add encrypt-down test
  • Loading branch information
vweevers authored Oct 25, 2019
1 parent 45b257c commit cfaa4c7
Show file tree
Hide file tree
Showing 13 changed files with 471 additions and 91 deletions.
7 changes: 3 additions & 4 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
language: node_js
node_js:
- '0.10'
- '0.12'
- '4.0'
- '5.0'
- 8
- 10
- 12
29 changes: 13 additions & 16 deletions client.js
Original file line number Diff line number Diff line change
@@ -1,35 +1,32 @@
var levelup = require('levelup')
var duplexify = require('duplexify')
var encode = require('encoding-down')
var leveldown = require('./leveldown')

module.exports = function (opts) {
if (!opts) opts = {}

var down
opts.db = createLeveldown
opts.onflush = onflush
var db = levelup('multileveldown', opts)
var down = leveldown(Object.assign({}, opts, { onflush: onflush }))
var db = levelup(encode(down, opts), opts)

db.createRpcStream = db.connect = connect
db.isFlushed = isFlushed
db.forward = forward

return db

function createLeveldown (path) {
down = leveldown(path, opts)
return down
}

function onflush () {
db.emit('flush')
}

function connect (opts) {
if (down) return down.createRpcStream(opts, null)
return down.createRpcStream(opts, null)
}

var proxy = duplexify()
db.open(function () {
down.createRpcStream(opts, proxy)
})
function isFlushed () {
return down.isFlushed()
}

return proxy
function forward (db) {
down.forward(db)
}
}
39 changes: 26 additions & 13 deletions leveldown.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ var util = require('util')
var eos = require('end-of-stream')
var ids = require('numeric-id-map')
var lpstream = require('length-prefixed-stream')
var reachdown = require('reachdown')
var messages = require('./messages')
var matchdown = require('./matchdown')

var ENCODERS = [
messages.Get,
Expand All @@ -21,9 +23,9 @@ var DECODERS = [

module.exports = Multilevel

function Multilevel (path, opts) {
if (!(this instanceof Multilevel)) return new Multilevel(path, opts)
abstract.AbstractLevelDOWN.call(this, path)
function Multilevel (opts) {
if (!(this instanceof Multilevel)) return new Multilevel(opts)
abstract.AbstractLevelDOWN.call(this)

if (!opts) opts = {}
this._iterators = ids()
Expand All @@ -38,6 +40,8 @@ function Multilevel (path, opts) {

util.inherits(Multilevel, abstract.AbstractLevelDOWN)

Multilevel.prototype.type = 'multileveldown'

Multilevel.prototype.createRpcStream = function (opts, proxy) {
if (this._streaming) throw new Error('Only one rpc stream can be active')
if (!opts) opts = {}
Expand Down Expand Up @@ -112,12 +116,12 @@ Multilevel.prototype.createRpcStream = function (opts, proxy) {

function oncallback (res) {
var req = self._requests.remove(res.id)
if (req) req.callback(decodeError(res.error), decodeValue(res.value, req.valueEncoding))
if (req) req.callback(decodeError(res.error), decodeValue(res.value, req.valueAsBuffer))
}
}

Multilevel.prototype.forward = function (down) {
this._db = down
this._db = reachdown(down, matchdown, false)
}

Multilevel.prototype.isFlushed = function () {
Expand Down Expand Up @@ -145,14 +149,22 @@ Multilevel.prototype._clearRequests = function (closing) {
}
}

Multilevel.prototype._serializeKey = function (key) {
return Buffer.isBuffer(key) ? key : Buffer.from(String(key))
}

Multilevel.prototype._serializeValue = function (value) {
return Buffer.isBuffer(value) ? value : Buffer.from(String(value))
}

Multilevel.prototype._get = function (key, opts, cb) {
if (this._db) return this._db._get(key, opts, cb)

var req = {
tag: 0,
id: 0,
key: key,
valueEncoding: opts.valueEncoding || (opts.asBuffer === false ? 'utf-8' : 'binary'),
valueAsBuffer: opts.asBuffer,
callback: cb || noop
}

Expand Down Expand Up @@ -206,7 +218,7 @@ Multilevel.prototype._batch = function (batch, opts, cb) {
Multilevel.prototype._write = function (req) {
if (this._requests.length + this._iterators.length === 1) ref(this._ref)
var enc = ENCODERS[req.tag]
var buf = new Buffer(enc.encodingLength(req) + 1)
var buf = Buffer.allocUnsafe(enc.encodingLength(req) + 1)
buf[0] = req.tag
enc.encode(req, buf, 1)
this._encode.write(buf)
Expand All @@ -231,10 +243,11 @@ Multilevel.prototype._iterator = function (opts) {

function noop () {}

// TODO: extend AbstractIterator, passing db to ctor
function Iterator (parent, opts) {
this._parent = parent
this._keyEncoding = opts.keyEncoding
this._valueEncoding = opts.valueEncoding
this._keyAsBuffer = opts.keyAsBuffer
this._valueAsBuffer = opts.valueAsBuffer
this._options = opts

var req = {
Expand Down Expand Up @@ -274,8 +287,8 @@ Iterator.prototype.next = function (cb) {
this._options.gt = next.key
if (this._options.limit > 0) this._options.limit--

var key = decodeValue(next.key, this._keyEncoding)
var val = decodeValue(next.value, this._valueEncoding)
var key = decodeValue(next.key, this._keyAsBuffer)
var val = decodeValue(next.value, this._valueAsBuffer)
return cb(undefined, key, val)
}

Expand All @@ -294,9 +307,9 @@ function decodeError (err) {
return err ? new Error(err) : null
}

function decodeValue (val, enc) {
function decodeValue (val, asBuffer) {
if (!val) return undefined
return (enc === 'utf8' || enc === 'utf-8') ? val.toString() : val
return asBuffer ? val : val.toString()
}

function ref (r) {
Expand Down
8 changes: 8 additions & 0 deletions matchdown.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
module.exports = function matchdown (db, type) {
// Skip layers that we handle ourselves
if (type === 'levelup') return false
if (type === 'encoding-down') return false
if (type === 'deferred-leveldown') return false

return true
}
23 changes: 14 additions & 9 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,25 @@
"description": "multilevel implemented using leveldowns with reconnect support",
"main": "index.js",
"dependencies": {
"abstract-leveldown": "^2.4.1",
"duplexify": "^3.4.2",
"abstract-leveldown": "^6.1.1",
"duplexify": "^4.1.1",
"encoding-down": "^6.2.0",
"end-of-stream": "^1.1.0",
"length-prefixed-stream": "^1.4.0",
"levelup": "^1.3.1",
"length-prefixed-stream": "^2.0.0",
"levelup": "^4.1.0",
"numeric-id-map": "^1.1.0",
"protocol-buffers-encodings": "^1.1.0"
"protocol-buffers-encodings": "^1.1.0",
"reachdown": "^1.0.0"
},
"devDependencies": {
"concat-stream": "^1.5.1",
"memdown": "^1.1.0",
"@adorsys/encrypt-down": "^2.0.1",
"concat-stream": "^2.0.0",
"level-compose": "^0.0.2",
"memdown": "^5.0.0",
"protocol-buffers": "^4.0.2",
"standard": "^5.4.1",
"tape": "^4.4.0"
"standard": "^14.1.0",
"subleveldown": "^4.1.1",
"tape": "^4.11.0"
},
"scripts": {
"test": "standard && tape test/*.js",
Expand Down
42 changes: 29 additions & 13 deletions server.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
var lpstream = require('length-prefixed-stream')
var eos = require('end-of-stream')
var duplexify = require('duplexify')
var reachdown = require('reachdown')
var messages = require('./messages')
var rangeOptions = 'gt gte lt lte'.split(' ')
var matchdown = require('./matchdown')

var DECODERS = [
messages.Get,
Expand Down Expand Up @@ -29,7 +32,7 @@ module.exports = function (db, opts) {
return stream

function ready () {
var down = db.db
var down = reachdown(db, matchdown, false)
var iterators = []

eos(stream, function () {
Expand Down Expand Up @@ -71,8 +74,8 @@ module.exports = function (db, opts) {
})

function callback (id, err, value) {
var msg = {id: id, error: err && err.message, value: value}
var buf = new Buffer(messages.Callback.encodingLength(msg) + 1)
var msg = { id: id, error: err && err.message, value: value }
var buf = Buffer.allocUnsafe(messages.Callback.encodingLength(msg) + 1)
buf[0] = 0
messages.Callback.encode(msg, buf, 1)
encode.write(buf)
Expand Down Expand Up @@ -109,6 +112,7 @@ module.exports = function (db, opts) {
function onbatch (req) {
prebatch(req.ops, function (err) {
if (err) return callback(err)

down.batch(req.ops, function (err) {
callback(req.id, err)
})
Expand Down Expand Up @@ -136,15 +140,7 @@ function Iterator (down, req, encode) {
var self = this

this.batch = req.batch || 0

if (req.options) {
if (req.options.gt === null) req.options.gt = undefined
if (req.options.gte === null) req.options.gte = undefined
if (req.options.lt === null) req.options.lt = undefined
if (req.options.lte === null) req.options.lte = undefined
}

this._iterator = down.iterator(req.options)
this._iterator = down.iterator(cleanRangeOptions(req.options))
this._encode = encode
this._send = send
this._nexting = false
Expand All @@ -163,7 +159,7 @@ function Iterator (down, req, encode) {
self._data.key = key
self._data.value = value
self.batch--
var buf = new Buffer(messages.IteratorData.encodingLength(self._data) + 1)
var buf = Buffer.allocUnsafe(messages.IteratorData.encodingLength(self._data) + 1)
buf[0] = 1
messages.IteratorData.encode(self._data, buf, 1)
encode.write(buf)
Expand All @@ -185,3 +181,23 @@ Iterator.prototype.end = function () {
}

function noop () {}

function cleanRangeOptions (options) {
if (!options) return

var result = {}

for (var k in options) {
if (!hasOwnProperty.call(options, k)) continue

if (!isRangeOption(k) || options[k] != null) {
result[k] = options[k]
}
}

return result
}

function isRangeOption (k) {
return rangeOptions.indexOf(k) !== -1
}
3 changes: 2 additions & 1 deletion streams.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
var pbs = require('pbs')
var fs = require('fs')
var path = require('path')

module.exports = pbs(fs.readFileSync(__dirname + '/schema.proto'))
module.exports = pbs(fs.readFileSync(path.join(__dirname, 'schema.proto')))
Loading

0 comments on commit cfaa4c7

Please sign in to comment.