From 3c90cbf0f087b9177659cafe4ccca11f6a32cf05 Mon Sep 17 00:00:00 2001 From: Vasco Santos Date: Wed, 29 Aug 2018 18:10:30 +0100 Subject: [PATCH] feat: ipns over pubsub --- README.md | 4 + package.json | 7 +- src/cli/commands/daemon.js | 4 + src/cli/commands/name/pubsub.js | 18 ++ src/cli/commands/name/pubsub/cancel.js | 19 ++ src/cli/commands/name/pubsub/state.js | 19 ++ src/cli/commands/name/pubsub/subs.js | 21 ++ src/core/components/init.js | 10 + src/core/components/libp2p.js | 2 +- src/core/components/name-pubsub.js | 60 +++++ src/core/components/name.js | 11 +- src/core/components/start.js | 25 ++ src/core/config.js | 1 + src/core/index.js | 7 +- src/core/ipns/index.js | 7 +- src/core/ipns/path.js | 4 +- src/core/ipns/publisher.js | 12 +- src/core/ipns/resolver.js | 54 ++--- src/core/ipns/routing/offline-datastore.js | 67 ++++++ src/core/ipns/routing/pubsub.js | 208 +++++++++++++++++ src/core/ipns/routing/utils.js | 8 + src/http/api/resources/name.js | 63 +++++ src/http/api/routes/name.js | 25 ++ src/http/index.js | 1 + test/cli/commands.js | 2 +- test/cli/name-pubsub.js | 253 +++++++++++++++++++++ test/cli/name.js | 2 +- test/core/name-pubsub.js | 204 +++++++++++++++++ 28 files changed, 1062 insertions(+), 56 deletions(-) create mode 100644 src/cli/commands/name/pubsub.js create mode 100644 src/cli/commands/name/pubsub/cancel.js create mode 100644 src/cli/commands/name/pubsub/state.js create mode 100644 src/cli/commands/name/pubsub/subs.js create mode 100644 src/core/components/name-pubsub.js create mode 100644 src/core/ipns/routing/offline-datastore.js create mode 100644 src/core/ipns/routing/pubsub.js create mode 100644 src/core/ipns/routing/utils.js create mode 100644 test/cli/name-pubsub.js create mode 100644 test/core/name-pubsub.js diff --git a/README.md b/README.md index 12cd9c5e3c..b81bcdc329 100644 --- a/README.md +++ b/README.md @@ -304,6 +304,7 @@ Configure external nodes that will preload content added to this node. Enable and configure experimental features. - `pubsub` (boolean): Enable libp2p pub-sub. (Default: `false`) +- `ipnsPubsub` (boolean): Enable pub-sub on IPNS. (Default: `false`) - `sharding` (boolean): Enable directory sharding. Directories that have many child objects will be represented by multiple DAG nodes instead of just one. It can improve lookup performance when a directory has several thousand files or more. (Default: `false`) - `dht` (boolean): Enable KadDHT. **This is currently not interopable with `go-ipfs`.** @@ -554,6 +555,9 @@ The core API is grouped into several areas: - [name](https://github.com/ipfs/interface-ipfs-core/blob/master/SPEC/NAME.md) - [`ipfs.name.publish(value, [options], [callback])`](https://github.com/ipfs/interface-ipfs-core/blob/master/SPEC/NAME.md#namepublish) - [`ipfs.name.resolve(value, [options], [callback])`](https://github.com/ipfs/interface-ipfs-core/blob/master/SPEC/NAME.md#nameresolve) + - [`ipfs.name.pubsub.cancel(arg, [callback])`](https://github.com/ipfs/interface-ipfs-core/blob/master/SPEC/NAME.md#namepubsubcancel) + - [`ipfs.name.pubsub.state([callback])`](https://github.com/ipfs/interface-ipfs-core/blob/master/SPEC/NAME.md#namepubsubstate) + - [`ipfs.name.pubsub.subs([callback])`](https://github.com/ipfs/interface-ipfs-core/blob/master/SPEC/NAME.md#namepubsubsubs) #### Crypto and Key Management diff --git a/package.json b/package.json index 5fb97aa240..8254d5eb8d 100644 --- a/package.json +++ b/package.json @@ -71,7 +71,7 @@ "form-data": "^2.3.2", "hat": "0.0.3", "interface-ipfs-core": "~0.76.1", - "ipfsd-ctl": "~0.39.1", + "ipfsd-ctl": "~0.39.2", "mocha": "^5.2.0", "ncp": "^2.0.0", "nexpect": "~0.5.0", @@ -85,6 +85,7 @@ "dependencies": { "@nodeutils/defaults-deep": "^1.1.0", "async": "^2.6.1", + "base32.js": "~0.1.0", "big.js": "^5.1.2", "binary-querystring": "~0.1.2", "bl": "^2.0.1", @@ -92,6 +93,8 @@ "bs58": "^4.0.1", "byteman": "^1.3.5", "cids": "~0.5.3", + "datastore-core": "~0.4.0", + "datastore-pubsub": "~0.0.2", "debug": "^3.1.0", "err-code": "^1.1.2", "file-type": "^8.1.0", @@ -118,7 +121,7 @@ "ipld": "~0.17.3", "ipld-dag-cbor": "~0.12.1", "ipld-dag-pb": "~0.14.6", - "ipns": "~0.1.3", + "ipns": "~0.1.5", "is-ipfs": "~0.4.2", "is-pull-stream": "~0.0.0", "is-stream": "^1.1.0", diff --git a/src/cli/commands/daemon.js b/src/cli/commands/daemon.js index 78a2213e78..02c6604b3c 100644 --- a/src/cli/commands/daemon.js +++ b/src/cli/commands/daemon.js @@ -25,6 +25,10 @@ module.exports = { type: 'boolean', default: false }) + .option('enable-namesys-pubsub', { + type: 'boolean', + default: false + }) }, handler (argv) { diff --git a/src/cli/commands/name/pubsub.js b/src/cli/commands/name/pubsub.js new file mode 100644 index 0000000000..218afb6c4b --- /dev/null +++ b/src/cli/commands/name/pubsub.js @@ -0,0 +1,18 @@ +'use strict' + +/* +Manage and inspect the state of the IPNS pubsub resolver. +Note: this command is experimental and subject to change as the system is refined. +*/ +module.exports = { + command: 'pubsub', + + description: 'IPNS pubsub management.', + + builder (yargs) { + return yargs.commandDir('pubsub') + }, + + handler (argv) { + } +} diff --git a/src/cli/commands/name/pubsub/cancel.js b/src/cli/commands/name/pubsub/cancel.js new file mode 100644 index 0000000000..5f0d709294 --- /dev/null +++ b/src/cli/commands/name/pubsub/cancel.js @@ -0,0 +1,19 @@ +'use strict' + +const print = require('../../../utils').print + +module.exports = { + command: 'cancel ', + + describe: 'Cancel a name subscription.', + + handler (argv) { + argv.ipfs.name.pubsub.cancel(argv.name, (err, result) => { + if (err) { + throw err + } else { + print(result.canceled ? 'canceled' : 'no subscription') + } + }) + } +} diff --git a/src/cli/commands/name/pubsub/state.js b/src/cli/commands/name/pubsub/state.js new file mode 100644 index 0000000000..08c65fbed1 --- /dev/null +++ b/src/cli/commands/name/pubsub/state.js @@ -0,0 +1,19 @@ +'use strict' + +const print = require('../../../utils').print + +module.exports = { + command: 'state', + + describe: 'Query the state of IPNS pubsub.', + + handler (argv) { + argv.ipfs.name.pubsub.state((err, result) => { + if (err) { + throw err + } else { + print(result.enabled ? 'enabled' : 'disabled') + } + }) + } +} diff --git a/src/cli/commands/name/pubsub/subs.js b/src/cli/commands/name/pubsub/subs.js new file mode 100644 index 0000000000..e5539aa558 --- /dev/null +++ b/src/cli/commands/name/pubsub/subs.js @@ -0,0 +1,21 @@ +'use strict' + +const print = require('../../../utils').print + +module.exports = { + command: 'subs', + + describe: 'Show current name subscriptions.', + + handler (argv) { + argv.ipfs.name.pubsub.subs((err, result) => { + if (err) { + throw err + } else { + result.strings.forEach((s) => { + print(s) + }) + } + }) + } +} diff --git a/src/core/components/init.js b/src/core/components/init.js index eb91a09979..9ad682c55b 100644 --- a/src/core/components/init.js +++ b/src/core/components/init.js @@ -7,6 +7,9 @@ const promisify = require('promisify-es6') const defaultConfig = require('../runtime/config-nodejs.js') const Keychain = require('libp2p-keychain') +const IPNS = require('../ipns') +const OfflineDatastore = require('../ipns/routing/offline-datastore') + const addDefaultAssets = require('./init-assets') module.exports = function init (self) { @@ -103,6 +106,13 @@ module.exports = function init (self) { cb(null, true) } }, + // Setup offline routing for IPNS. This is primarily used for offline ipns modifications, such as the initializeKeyspace feature. + (_, cb) => { + const offlineDatastore = new OfflineDatastore(self._repo) + + self._ipns = new IPNS(offlineDatastore, self) + cb(null, true) + }, // add empty unixfs dir object (go-ipfs assumes this exists) (_, cb) => { if (opts.emptyRepo) { diff --git a/src/core/components/libp2p.js b/src/core/components/libp2p.js index 87838cb1e7..3d129d59c2 100644 --- a/src/core/components/libp2p.js +++ b/src/core/components/libp2p.js @@ -45,7 +45,7 @@ module.exports = function libp2p (self) { }, EXPERIMENTAL: { dht: get(opts.options, 'EXPERIMENTAL.dht', false), - pubsub: get(opts.options, 'EXPERIMENTAL.pubsub', false) + pubsub: get(opts.options, 'EXPERIMENTAL.pubsub', false) || get(opts.options, 'EXPERIMENTAL.ipnsPubsub', false) } }, connectionManager: get(opts.options, 'connectionManager', diff --git a/src/core/components/name-pubsub.js b/src/core/components/name-pubsub.js new file mode 100644 index 0000000000..d5f71ef520 --- /dev/null +++ b/src/core/components/name-pubsub.js @@ -0,0 +1,60 @@ +'use strict' + +const debug = require('debug') +const errcode = require('err-code') +const promisify = require('promisify-es6') + +const log = debug('jsipfs:name-pubsub') +log.error = debug('jsipfs:name-pubsub:error') + +const isNamePubsubEnabled = (node) => ( + node._options.EXPERIMENTAL.ipnsPubsub && node._libp2pNode._floodSub +) + +module.exports = function namePubsub (self) { + return { + /** + * Query the state of IPNS pubsub. + * + * @returns {Promise|void} + */ + state: promisify((callback) => { + callback(null, { + enabled: Boolean(isNamePubsubEnabled(self)) + }) + }), + /** + * Cancel a name subscription. + * + * @param {String} name subscription name. + * @param {function(Error)} [callback] + * @returns {Promise|void} + */ + cancel: promisify((name, callback) => { + if (!isNamePubsubEnabled(self)) { + const errMsg = 'IPNS pubsub subsystem is not enabled' + + log.error(errMsg) + return callback(errcode(errMsg, 'ERR_IPNS_PS_NOT_ENABLED')) + } + + self._ipns.pubsub.cancel(name, callback) + }), + /** + * Show current name subscriptions. + * + * @param {function(Error)} [callback] + * @returns {Promise|void} + */ + subs: promisify((callback) => { + if (!isNamePubsubEnabled(self)) { + const errMsg = 'IPNS pubsub subsystem is not enabled' + + log.error(errMsg) + return callback(errcode(errMsg, 'ERR_IPNS_PS_NOT_ENABLED')) + } + + self._ipns.pubsub.getSubscriptions(callback) + }) + } +} diff --git a/src/core/components/name.js b/src/core/components/name.js index f53878f84e..6c734a35b2 100644 --- a/src/core/components/name.js +++ b/src/core/components/name.js @@ -11,6 +11,7 @@ const errcode = require('err-code') const log = debug('jsipfs:name') log.error = debug('jsipfs:name:error') +const namePubsub = require('./name-pubsub') const utils = require('../utils') const path = require('../ipns/path') @@ -128,7 +129,7 @@ module.exports = function name (self) { const nocache = options.nocache && options.nocache.toString() === 'true' const recursive = options.recursive && options.recursive.toString() === 'true' - const local = true // TODO ROUTING - use self._options.local + const local = self._options.local if (!self.isOnline() && !local) { const errMsg = utils.OFFLINE_ERROR @@ -157,11 +158,11 @@ module.exports = function name (self) { const resolveOptions = { nocache, - recursive, - local + recursive } - self._ipns.resolve(name, self._peerInfo.id, resolveOptions, callback) - }) + self._ipns.resolve(name, resolveOptions, callback) + }), + pubsub: namePubsub(self) } } diff --git a/src/core/components/start.js b/src/core/components/start.js index 7b4df7cb61..44a69b9bad 100644 --- a/src/core/components/start.js +++ b/src/core/components/start.js @@ -2,8 +2,14 @@ const series = require('async/series') const Bitswap = require('ipfs-bitswap') +const get = require('lodash/get') const setImmediate = require('async/setImmediate') const promisify = require('promisify-es6') +const { TieredDatastore } = require('datastore-core') + +const IPNS = require('../ipns') +const Pubsub = require('../ipns/routing/pubsub') +const OfflineDatastore = require('../ipns/routing/offline-datastore') module.exports = (self) => { return promisify((callback) => { @@ -34,6 +40,25 @@ module.exports = (self) => { }, (cb) => self.libp2p.start(cb), (cb) => { + // Setup online routing for IPNS with a tiered routing composed by a DHT and a Pubsub router (if properly enabled) + const ipnsStores = [] + + // Add IPNS pubsub if enabled + let pubsub + if (get(self._options, 'EXPERIMENTAL.ipnsPubsub', false)) { + pubsub = new Pubsub(self) + + ipnsStores.push(pubsub) + } + + // NOTE: Until the IPNS over DHT is not ready, it is being replaced by the local repo datastore + // When DHT is added, If local option enabled, should receive offlineDatastore as well + const offlineDatastore = new OfflineDatastore(self._repo) + ipnsStores.push(offlineDatastore) + + const routing = new TieredDatastore(ipnsStores) + self._ipns = new IPNS(routing, self, pubsub) + self._bitswap = new Bitswap( self._libp2pNode, self._repo.blocks, diff --git a/src/core/config.js b/src/core/config.js index 8c34ad51cd..547a20e019 100644 --- a/src/core/config.js +++ b/src/core/config.js @@ -28,6 +28,7 @@ const schema = Joi.object().keys({ }).allow(null), EXPERIMENTAL: Joi.object().keys({ pubsub: Joi.boolean(), + namesysPubsub: Joi.boolean(), sharding: Joi.boolean(), dht: Joi.boolean() }).allow(null), diff --git a/src/core/index.js b/src/core/index.js index b20262459b..081b1bde49 100644 --- a/src/core/index.js +++ b/src/core/index.js @@ -20,7 +20,7 @@ const EventEmitter = require('events') const config = require('./config') const boot = require('./boot') const components = require('./components') -const IPNS = require('./ipns') + // replaced by repo-browser when running in the browser const defaultRepo = require('./runtime/repo-nodejs') const preload = require('./preload') @@ -90,7 +90,7 @@ class IPFS extends EventEmitter { this._pubsub = undefined this._preload = preload(this) this._mfsPreload = mfsPreload(this) - this._ipns = new IPNS(null, this) + this._ipns = undefined // IPFS Core exposed components // - for booting up a node @@ -128,6 +128,9 @@ class IPFS extends EventEmitter { if (this._options.EXPERIMENTAL.pubsub) { this.log('EXPERIMENTAL pubsub is enabled') } + if (this._options.EXPERIMENTAL.ipnsPubsub) { + this.log('EXPERIMENTAL IPNS pubsub is enabled') + } if (this._options.EXPERIMENTAL.sharding) { this.log('EXPERIMENTAL sharding is enabled') } diff --git a/src/core/ipns/index.js b/src/core/ipns/index.js index 8bfc7cd718..4aa6da681c 100644 --- a/src/core/ipns/index.js +++ b/src/core/ipns/index.js @@ -16,11 +16,12 @@ const path = require('./path') const defaultRecordTtl = 60 * 1000 class IPNS { - constructor (routing, ipfs) { + constructor (routing, ipfs, pubsub) { this.publisher = new IpnsPublisher(routing, ipfs._repo) this.republisher = new IpnsRepublisher(this.publisher, ipfs) this.resolver = new IpnsResolver(routing, ipfs._repo) this.cache = new Receptacle({ max: 1000 }) // Create an LRU cache with max 1000 items + this.pubsub = pubsub } // Publish @@ -53,7 +54,7 @@ class IPNS { } // Resolve - resolve (name, peerId, options, callback) { + resolve (name, options, callback) { // If recursive, we should not try to get the cached value if (!options.nocache && !options.recursive) { // Try to get the record from cache @@ -67,7 +68,7 @@ class IPNS { } } - this.resolver.resolve(name, peerId, options, (err, result) => { + this.resolver.resolve(name, options, (err, result) => { if (err) { log.error(err) return callback(err) diff --git a/src/core/ipns/path.js b/src/core/ipns/path.js index 1bb05a8f90..4a83888125 100644 --- a/src/core/ipns/path.js +++ b/src/core/ipns/path.js @@ -13,13 +13,13 @@ const resolvePath = (ipfsNode, name, callback) => { if (isIPFS.ipnsPath(name)) { log(`resolve ipns path ${name}`) - const local = true // TODO ROUTING - use self._options.local + const local = ipfsNode._options.local const options = { local: local } - return ipfsNode._ipns.resolve(name, ipfsNode._peerInfo.id, options, callback) + return ipfsNode._ipns.resolve(name, options, callback) } // ipfs path diff --git a/src/core/ipns/publisher.js b/src/core/ipns/publisher.js index c848ef3227..e9472c51cd 100644 --- a/src/core/ipns/publisher.js +++ b/src/core/ipns/publisher.js @@ -74,10 +74,10 @@ class IpnsPublisher { } series([ - (cb) => this._publishEntry(keys.ipnsKey, embedPublicKeyRecord || record, peerId, privKey, cb), + (cb) => this._publishEntry(keys.routingKey, embedPublicKeyRecord || record, peerId, privKey, cb), // Publish the public key if a public key cannot be extracted from the ID // We will be able to deprecate this part in the future, since the public keys will be only in the peerId - (cb) => embedPublicKeyRecord ? this._publishPublicKey(keys.pkKey, publicKey, peerId, privKey, cb) : cb() + (cb) => embedPublicKeyRecord ? this._publishPublicKey(keys.routingPubKey, publicKey, peerId, privKey, cb) : cb() ], (err) => { if (err) { log.error(err) @@ -114,8 +114,8 @@ class IpnsPublisher { return callback(err) } - // TODO Routing - this should be replaced by a put to the DHT - this._repo.datastore.put(key, serializedRecord, (err, res) => { + // Add record to routing (buffer key) + this._routing.put(key.toBuffer(), serializedRecord, (err, res) => { if (err) { const errMsg = `ipns record for ${key.toString()} could not be stored in the routing` @@ -159,8 +159,8 @@ class IpnsPublisher { return callback(err) } - // TODO Routing - this should be replaced by a put to the DHT - this._repo.datastore.put(key, serializedRecord, (err, res) => { + // Add public key to routing (buffer key) + this._routing.put(key.toBuffer(), serializedRecord, (err, res) => { if (err) { const errMsg = `public key for ${key.toString()} could not be stored in the routing` diff --git a/src/core/ipns/resolver.js b/src/core/ipns/resolver.js index 525471a548..ca98195216 100644 --- a/src/core/ipns/resolver.js +++ b/src/core/ipns/resolver.js @@ -3,6 +3,7 @@ const ipns = require('ipns') const { fromB58String } = require('multihashes') const Record = require('libp2p-record').Record +const PeerId = require('peer-id') const errcode = require('err-code') const debug = require('debug') @@ -15,10 +16,9 @@ class IpnsResolver { constructor (routing, repo) { this._routing = routing this._repo = repo - this._resolver = undefined // TODO Routing - add Router resolver } - resolve (name, peerId, options, callback) { + resolve (name, options, callback) { if (typeof options === 'function') { callback = options options = {} @@ -33,7 +33,6 @@ class IpnsResolver { options = options || {} const recursive = options.recursive && options.recursive.toString() === 'true' - const local = !(options.local === false) const nameSegments = name.split('/') @@ -53,20 +52,7 @@ class IpnsResolver { depth = defaultMaximumRecursiveDepth } - // Get the intended resoulver function - // TODO Routing - set default resolverFn - - let resolverFn - - if (local) { - resolverFn = this._resolveLocal - } - - if (!resolverFn) { - return callback(new Error('not implemented yet')) - } - - this.resolver(key, depth, peerId, resolverFn, (err, res) => { + this.resolver(key, depth, (err, res) => { if (err) { return callback(err) } @@ -77,10 +63,7 @@ class IpnsResolver { } // Recursive resolver according to the specified depth - resolver (name, depth, peerId, resolverFn, callback) { - // bind resolver function - this._resolver = resolverFn - + resolver (name, depth, callback) { // Exceeded recursive maximum depth if (depth === 0) { const errMsg = `could not resolve name (recursion limit of ${defaultMaximumRecursiveDepth} exceeded)` @@ -89,7 +72,7 @@ class IpnsResolver { return callback(errcode(new Error(errMsg), 'ERR_RESOLVE_RECURSION_LIMIT')) } - this._resolver(name, peerId, (err, res) => { + this._resolveName(name, (err, res) => { if (err) { return callback(err) } @@ -102,30 +85,35 @@ class IpnsResolver { } // continue recursively until depth equals 0 - this.resolver(nameSegments[2], depth - 1, peerId, resolverFn, callback) + this.resolver(nameSegments[2], depth - 1, callback) }) } - // resolve ipns entries locally using the datastore - _resolveLocal (name, peerId, callback) { - const { ipnsKey } = ipns.getIdKeys(fromB58String(name)) + // resolve ipns entries from the provided routing + _resolveName (name, callback) { + const peerId = PeerId.createFromB58String(name) + const { routingKey } = ipns.getIdKeys(fromB58String(name)) - this._repo.datastore.get(ipnsKey, (err, dsVal) => { - if (err) { - const errMsg = `local record requested was not found for ${name} (${ipnsKey})` + // TODO DHT - get public key from routing? + // https://github.com/ipfs/go-ipfs/blob/master/namesys/routing.go#L70 + // https://github.com/libp2p/go-libp2p-routing/blob/master/routing.go#L99 + + this._routing.get(routingKey.toBuffer(), (err, res) => { + if (err || !res) { + const errMsg = `record requested was not found for ${name} (${routingKey}) in the network` log.error(errMsg) - return callback(errcode(new Error(errMsg), 'ERR_NO_LOCAL_RECORD_FOUND')) + return callback(errcode(new Error(errMsg), 'ERR_NO_NETWORK_RECORD_FOUND')) } - if (!Buffer.isBuffer(dsVal)) { + if (!Buffer.isBuffer(res)) { const errMsg = `found ipns record that we couldn't convert to a value` log.error(errMsg) return callback(errcode(new Error(errMsg), 'ERR_INVALID_RECORD_RECEIVED')) } - const record = Record.deserialize(dsVal) + const record = Record.deserialize(res) const ipnsEntry = ipns.unmarshal(record.value) ipns.extractPublicKey(peerId, ipnsEntry, (err, pubKey) => { @@ -139,7 +127,7 @@ class IpnsResolver { return callback(err) } - // IPNS entry validation + // Record validation ipns.validate(pubKey, ipnsEntry, (err) => { if (err) { return callback(err) diff --git a/src/core/ipns/routing/offline-datastore.js b/src/core/ipns/routing/offline-datastore.js new file mode 100644 index 0000000000..b2b790531d --- /dev/null +++ b/src/core/ipns/routing/offline-datastore.js @@ -0,0 +1,67 @@ +'use strict' + +const { Key } = require('interface-datastore') +const { encodeBase32 } = require('./utils') + +const errcode = require('err-code') +const debug = require('debug') +const log = debug('jsipfs:ipns:offline-datastore') +log.error = debug('jsipfs:ipns:offline-datastore:error') + +// Offline datastore aims to mimic the same encoding as routing when storing records +// to the local datastore +class OfflineDatastore { + constructor (repo) { + this._repo = repo + } + + /** + * Put a value to the local datastore indexed by the received key properly encoded. + * @param {Buffer} key identifier of the value. + * @param {Buffer} value value to be stored. + * @param {function(Error)} callback + * @returns {void} + */ + put (key, value, callback) { + if (!Buffer.isBuffer(key)) { + const errMsg = `key does not have a valid format` + + log.error(errMsg) + return callback(errcode(new Error(errMsg), 'ERR_INVALID_KEY')) + } + + if (!Buffer.isBuffer(value)) { + const errMsg = `received value is not a buffer` + + log.error(errMsg) + return callback(errcode(new Error(errMsg), 'ERR_INVALID_VALUE_RECEIVED')) + } + + // encode key properly - base32(/ipns/{cid}) + const routingKey = new Key('/' + encodeBase32(key), false) + + this._repo.datastore.put(routingKey, value, callback) + } + + /** + * Get a value from the local datastore indexed by the received key properly encoded. + * @param {Buffer} key identifier of the value to be obtained. + * @param {function(Error, Buffer)} callback + * @returns {void} + */ + get (key, callback) { + if (!Buffer.isBuffer(key)) { + const errMsg = `key does not have a valid format` + + log.error(errMsg) + return callback(errcode(new Error(errMsg), 'ERR_INVALID_KEY')) + } + + // encode key properly - base32(/ipns/{cid}) + const routingKey = new Key('/' + encodeBase32(key), false) + + this._repo.datastore.get(routingKey, callback) + } +} + +exports = module.exports = OfflineDatastore diff --git a/src/core/ipns/routing/pubsub.js b/src/core/ipns/routing/pubsub.js new file mode 100644 index 0000000000..a1701bc2a0 --- /dev/null +++ b/src/core/ipns/routing/pubsub.js @@ -0,0 +1,208 @@ +'use strict' + +const ipns = require('ipns') +const { fromB58String, toB58String } = require('multihashes') +const PeerId = require('peer-id') +const DatastorePubsub = require('datastore-pubsub') + +const errcode = require('err-code') +const debug = require('debug') +const log = debug('jsipfs:ipns:pubsub') +log.error = debug('jsipfs:ipns:pubsub:error') + +const ipnsNS = '/ipns/' +const ipnsNSLength = ipnsNS.length + +// Pubsub aims to manage the pubsub subscriptions for IPNS +class Pubsub { + constructor (node) { + const pubsub = node._libp2pNode.pubsub + const localDatastore = node._repo.datastore + const peerId = node._peerInfo.id + this._pubsub = pubsub + + this._subscriptions = {} + + // Bind _handleSubscriptionKey function, which is called by datastorePubsub. + this._handleSubscriptionKey = this._handleSubscriptionKey.bind(this) + this._dsPubsub = new DatastorePubsub(pubsub, localDatastore, peerId, ipns.validator, this._handleSubscriptionKey) + } + + /** + * Put a value to the pubsub datastore indexed by the received key properly encoded. + * @param {Buffer} key identifier of the value. + * @param {Buffer} value value to be stored. + * @param {function(Error)} callback + * @returns {void} + */ + put (key, value, callback) { + if (!Buffer.isBuffer(key)) { + const errMsg = `key does not have a valid format` + + log.error(errMsg) + return callback(errcode(new Error(errMsg), 'ERR_INVALID_KEY')) + } + + if (!Buffer.isBuffer(value)) { + const errMsg = `received value is not a buffer` + + log.error(errMsg) + return callback(errcode(new Error(errMsg), 'ERR_INVALID_VALUE_RECEIVED')) + } + + this._dsPubsub.put(key, value, callback) + } + + /** + * Get a value from the pubsub datastore indexed by the received key properly encoded. + * Moreover, the identifier topic is subscribed and the pubsub datastore records will be + * updated once new publishes occur. + * @param {Buffer} key identifier of the value to be obtained. + * @param {function(Error, Buffer)} callback + * @returns {void} + */ + get (key, callback) { + if (!Buffer.isBuffer(key)) { + const errMsg = `key does not have a valid format` + + log.error(errMsg) + return callback(errcode(new Error(errMsg), 'ERR_INVALID_KEY')) + } + + this._dsPubsub.get(key, (err, res) => { + // Add topic subscribed + const ns = key.slice(0, ipnsNSLength) + + if (ns.toString() === ipnsNS) { + const stringifiedTopic = key.toString() + const id = toB58String(key.slice(ipnsNSLength)) + + this._subscriptions[stringifiedTopic] = id + + log(`subscribed pubsub ${stringifiedTopic}: ${id}`) + } + + if (err) { + return callback(err) + } + + callback(null, res) + }) + } + + // Modify subscription key to have a proper encoding + // Without this, the utf-8 encoding gets the key broken + _handleSubscriptionKey (key, callback) { + const subscriber = this._subscriptions[key] + + if (!subscriber) { + const errMsg = `key ${key} does not correspond to a subscription` + + log.error(errMsg) + return callback(errcode(new Error(errMsg), 'ERR_INVALID_KEY')) + } + + let keys + try { + keys = ipns.getIdKeys(fromB58String(subscriber)) + } catch (err) { + log.error(err) + return callback(err) + } + + callback(null, keys.routingKey.toBuffer()) + } + + /** + * Get pubsub subscriptions related to ipns. + * @param {function(Error, Object)} callback + * @returns {void} + */ + getSubscriptions (callback) { + this._pubsub.ls((err, res) => { + if (err || !res) { + log.error(err) + return callback(err) + } + + // Iterate over subscriptions + const strings = [] + res.forEach((subs) => { + const ns = subs.substring(0, ipnsNSLength) + const subscription = this._subscriptions[subs] + + if (ns === ipnsNS && subscription) { + // Verify valid PeerID + try { + PeerId.createFromBytes(Buffer.from(subscription)) + // add to the list + strings.push(`/ipns/${subscription}`) + } catch (err) { + log.error('ipns key not a valid peer ID') + } + } + }) + + callback(null, { + strings + }) + }) + } + + /** + * Get pubsub subscriptions related to ipns. + * @param {String} name ipns path to cancel the pubsub subscription. + * @param {function(Error, Object)} callback + * @returns {void} + */ + cancel (name, callback) { + if (typeof name !== 'string') { + const errMsg = `received subscription name is not valid` + + log.error(errMsg) + return callback(errcode(new Error(errMsg), 'ERR_INVALID_SUBSCRIPTION_NAME')) + } + + // Trim /ipns/ prefix from the name + if (name.startsWith(ipnsNS)) { + name = name.substring(ipnsNSLength) + } + + // Verify peerId validity + try { + PeerId.createFromBytes(Buffer.from(name)) + } catch (err) { + const errMsg = `ipns key is not a valid peer ID` + + log.error(errMsg) + return callback(errcode(new Error(errMsg), 'ERR_INVALID_IPNS_KEY')) + } + + const stringifiedTopic = Object.keys(this._subscriptions).find((key) => this._subscriptions[key] === name) + + // Not found topic + if (!stringifiedTopic) { + return callback(null, { + canceled: false + }) + } + + // Unsubscribe topic + try { + const bufTopic = Buffer.from(stringifiedTopic) + + this._dsPubsub.unsubscribe(bufTopic) + } catch (err) { + return callback(err) + } + + delete this._subscriptions[stringifiedTopic] + log(`unsubscribed pubsub ${stringifiedTopic}: ${name}`) + + callback(null, { + canceled: true + }) + } +} + +exports = module.exports = Pubsub diff --git a/src/core/ipns/routing/utils.js b/src/core/ipns/routing/utils.js new file mode 100644 index 0000000000..8c0cd278b9 --- /dev/null +++ b/src/core/ipns/routing/utils.js @@ -0,0 +1,8 @@ +'use strict' + +const base32 = require('base32.js') + +module.exports.encodeBase32 = (buf) => { + const enc = new base32.Encoder() + return enc.write(buf).finalize() +} diff --git a/src/http/api/resources/name.js b/src/http/api/resources/name.js index 23fc8bbab1..090b2ce630 100644 --- a/src/http/api/resources/name.js +++ b/src/http/api/resources/name.js @@ -59,3 +59,66 @@ exports.publish = { }) } } + +exports.pubsub = { + state: { + handler: (request, reply) => { + const ipfs = request.server.app.ipfs + + ipfs.name.pubsub.state((err, res) => { + if (err) { + return reply({ + Message: err.toString(), + Code: 0 + }).code(500) + } + + return reply({ + Enabled: res.enabled + }).code(200) + }) + } + }, + subs: { + handler: (request, reply) => { + const ipfs = request.server.app.ipfs + + ipfs.name.pubsub.subs((err, res) => { + if (err) { + return reply({ + Message: err.toString(), + Code: 0 + }).code(500) + } + + return reply({ + Strings: res.strings + }).code(200) + }) + } + }, + cancel: { + validate: { + query: Joi.object().keys({ + arg: Joi.string().required() + }).unknown() + }, + handler: (request, reply) => { + const ipfs = request.server.app.ipfs + const { arg } = request.query + + ipfs.name.pubsub.cancel(arg, (err, res) => { + if (err) { + return reply({ + Message: err.toString(), + Code: 0 + }).code(500) + } + + return reply({ + Canceled: res.canceled + }).code(200) + }) + } + } +} diff --git a/src/http/api/routes/name.js b/src/http/api/routes/name.js index 29647f3a6a..f49d0bd6ca 100644 --- a/src/http/api/routes/name.js +++ b/src/http/api/routes/name.js @@ -22,4 +22,29 @@ module.exports = (server) => { validate: resources.name.publish.validate } }) + + api.route({ + method: '*', + path: '/api/v0/name/pubsub/state', + config: { + handler: resources.name.pubsub.state.handler + } + }) + + api.route({ + method: '*', + path: '/api/v0/name/pubsub/subs', + config: { + handler: resources.name.pubsub.subs.handler + } + }) + + api.route({ + method: '*', + path: '/api/v0/name/pubsub/cancel', + config: { + handler: resources.name.pubsub.cancel.handler, + validate: resources.name.pubsub.cancel.validate + } + }) } diff --git a/src/http/index.js b/src/http/index.js index ed1edce219..bda8ee04f7 100644 --- a/src/http/index.js +++ b/src/http/index.js @@ -75,6 +75,7 @@ function HttpApi (repo, config, cliArgs) { pass: cliArgs && cliArgs.pass, EXPERIMENTAL: { pubsub: cliArgs && cliArgs.enablePubsubExperiment, + ipnsPubsub: cliArgs && cliArgs.enableNamesysPubsub, dht: cliArgs && cliArgs.enableDhtExperiment, sharding: cliArgs && cliArgs.enableShardingExperiment }, diff --git a/test/cli/commands.js b/test/cli/commands.js index e6ae40498a..101fba3881 100644 --- a/test/cli/commands.js +++ b/test/cli/commands.js @@ -4,7 +4,7 @@ const expect = require('chai').expect const runOnAndOff = require('../utils/on-and-off') -const commandCount = 81 +const commandCount = 85 describe('commands', () => runOnAndOff((thing) => { let ipfs diff --git a/test/cli/name-pubsub.js b/test/cli/name-pubsub.js new file mode 100644 index 0000000000..79416ea40f --- /dev/null +++ b/test/cli/name-pubsub.js @@ -0,0 +1,253 @@ +/* eslint max-nested-callbacks: ["error", 7] */ +/* eslint-env mocha */ +'use strict' + +const chai = require('chai') +const dirtyChai = require('dirty-chai') +const expect = chai.expect +chai.use(dirtyChai) + +const parallel = require('async/parallel') +const series = require('async/series') +const ipfsExec = require('../utils/ipfs-exec') + +const DaemonFactory = require('ipfsd-ctl') +const df = DaemonFactory.create({ type: 'js' }) + +const checkAll = (bits) => string => bits.every(bit => string.includes(bit)) +const emptyDirCid = 'QmUNLLsPACCz1vLxQVkXqqLX5R1X345qqfHbsf67hvA3Nn' + +const spawnDaemon = (callback) => { + df.spawn({ + exec: `./src/cli/bin.js`, + args: ['--enable-namesys-pubsub'], + initOptions: { bits: 512 } + }, callback) +} + +describe('name-pubsub', () => { + describe('enabled', () => { + let ipfsA + let ipfsB + let nodeAId + let nodeBId + let bMultiaddr + let nodes = [] + + // Spawn daemons + before(function (done) { + // CI takes longer to instantiate the daemon, so we need to increase the + // timeout for the before step + this.timeout(80 * 1000) + + series([ + (cb) => { + spawnDaemon((err, node) => { + expect(err).to.not.exist() + ipfsA = ipfsExec(node.repoPath) + nodes.push(node) + cb() + }) + }, + (cb) => { + spawnDaemon((err, node) => { + expect(err).to.not.exist() + ipfsB = ipfsExec(node.repoPath) + nodes.push(node) + cb() + }) + } + ], done) + }) + + // Get node ids + before(function (done) { + parallel([ + (cb) => { + ipfsA('id').then((res) => { + nodeAId = JSON.parse(res) + cb() + }) + }, + (cb) => { + ipfsB('id').then((res) => { + const id = JSON.parse(res) + + nodeBId = id + bMultiaddr = id.addresses[0] + cb() + }) + } + ], done) + }) + + // Connect + before(function (done) { + ipfsA('swarm', 'connect', bMultiaddr).then((out) => { + expect(out).to.eql(`connect ${bMultiaddr} success\n`) + done() + }) + }) + + after((done) => parallel(nodes.map((node) => (cb) => node.stop(cb)), done)) + + describe('pubsub commands', () => { + before(function (done) { + this.timeout(50 * 1000) + done() + }) + + it('should get enabled state of pubsub', function (done) { + ipfsA('name pubsub state').then((res) => { + expect(res).to.exist() + expect(res).to.have.string('enabled') // enabled + + done() + }) + }) + + it('should subscribe on name resolve', function (done) { + this.timeout(80 * 1000) + + ipfsB(`name resolve ${nodeAId.id}`).catch((err) => { + expect(err).to.exist() // Not available (subscribed) + + ipfsB('pubsub ls').then((res) => { + expect(res).to.exist() + expect(res).to.have.string('/ipns/') // have an ipns subscribtion + + ipfsB('name pubsub subs').then((res) => { + expect(res).to.exist() + expect(res).to.have.string(`/ipns/${nodeAId.id}`) // have subscription + + done() + }) + }) + }) + }) + + it('should be able to cancel subscriptions', function (done) { + this.timeout(80 * 1000) + + ipfsA(`name pubsub cancel /ipns/${nodeBId.id}`).then((res) => { + expect(res).to.exist() + expect(res).to.have.string('no subscription') // tried to cancel a not yet subscribed id + + ipfsA(`name resolve ${nodeBId.id}`).catch((err) => { + expect(err).to.exist() // Not available (subscribed now) + + ipfsA(`name pubsub cancel /ipns/${nodeBId.id}`).then((res) => { + expect(res).to.exist() + expect(res).to.have.string('canceled') // canceled now + + ipfsA('pubsub ls').then((res) => { + expect(res).to.exist() + expect(res).to.not.have.string('/ipns/') // ipns subscribtion not available + + ipfsA('name pubsub subs').then((res) => { + expect(res).to.exist() + expect(res).to.not.have.string(`/ipns/${nodeBId.id}`) // ipns subscribtion not available + + done() + }) + }) + }) + }) + }) + }) + }) + + describe('pubsub records', () => { + let cidAdded + + before(function (done) { + this.timeout(50 * 1000) + ipfsA('add src/init-files/init-docs/readme').then((out) => { + cidAdded = out.split(' ')[1] + done() + }) + }) + + it('should publish the received record to the subscriber', function (done) { + this.timeout(80 * 1000) + + ipfsB(`name resolve ${nodeBId.id}`).then((res) => { + expect(res).to.exist() + expect(res).to.satisfy(checkAll([emptyDirCid])) // Empty dir received (subscribed) + + ipfsA(`name resolve ${nodeBId.id}`).catch((err) => { + expect(err).to.exist() // Not available (subscribed now) + + ipfsB(`name publish ${cidAdded}`).then((res) => { + // published to IpfsB and published through pubsub to ipfsa + expect(res).to.exist() + expect(res).to.satisfy(checkAll([cidAdded, nodeBId.id])) + + ipfsB(`name resolve ${nodeBId.id}`).then((res) => { + expect(res).to.exist() + expect(res).to.satisfy(checkAll([cidAdded])) + + ipfsA(`name resolve ${nodeBId.id}`).then((res) => { + expect(res).to.exist() + expect(res).to.satisfy(checkAll([cidAdded])) // value propagated to node B + + done() + }) + }) + }) + }) + }) + }) + }) + }) + + describe('disabled', () => { + let ipfsA + let nodes = [] + + // Spawn daemons + before(function (done) { + // CI takes longer to instantiate the daemon, so we need to increase the + // timeout for the before step + this.timeout(80 * 1000) + + df.spawn({ + exec: `./src/cli/bin.js`, + config: {}, + initOptions: { bits: 512 } + }, (err, node) => { + expect(err).to.not.exist() + ipfsA = ipfsExec(node.repoPath) + nodes.push(node) + done() + }) + }) + + after((done) => parallel(nodes.map((node) => (cb) => node.stop(cb)), done)) + + it('should get disabled state of pubsub', function (done) { + ipfsA('name pubsub state').then((res) => { + expect(res).to.exist() + expect(res).to.have.string('disabled') + + done() + }) + }) + + it('should get error getting the available subscriptions', function (done) { + ipfsA('name pubsub subs').catch((err) => { + expect(err).to.exist() // error as it is disabled + + done() + }) + }) + + it('should get error canceling a subscription', function (done) { + ipfsA('name pubsub cancel /ipns/QmSWxaPcGgf4TDnFEBDWz2JnbHywF14phmY9hNcAeBEK5v').catch((err) => { + expect(err).to.exist() // error as it is disabled + + done() + }) + }) + }) +}) diff --git a/test/cli/name.js b/test/cli/name.js index d4ab1dbcee..bf537eee57 100644 --- a/test/cli/name.js +++ b/test/cli/name.js @@ -33,7 +33,7 @@ describe('name', () => { df.spawn({ exec: `./src/cli/bin.js`, config: {}, - args: pass.split(' '), + args: ['--pass', passPhrase, '--local'], initOptions: { bits: 512 } }, (err, _ipfsd) => { expect(err).to.not.exist() diff --git a/test/core/name-pubsub.js b/test/core/name-pubsub.js new file mode 100644 index 0000000000..23344892de --- /dev/null +++ b/test/core/name-pubsub.js @@ -0,0 +1,204 @@ +/* eslint max-nested-callbacks: ["error", 6] */ +/* eslint-env mocha */ +'use strict' + +const chai = require('chai') +const dirtyChai = require('dirty-chai') +const expect = chai.expect +chai.use(dirtyChai) + +const parallel = require('async/parallel') +const retry = require('async/retry') +const series = require('async/series') + +const isNode = require('detect-node') +const IPFS = require('../../src') +const { fromB58String } = require('multihashes') + +const DaemonFactory = require('ipfsd-ctl') +const df = DaemonFactory.create({ type: 'proc', exec: IPFS }) + +const spawnNode = (callback) => { + df.spawn({ + args: ['--enable-namesys-pubsub', '--enable-pubsub-experiment'], + disposable: true, + bits: 512 + }, callback) +} + +const ipfsRef = '/ipfs/QmPFVLPmp9zv5Z5KUqLhe2EivAGccQW2r7M7jhVJGLZoZU' + +describe('name-pubsub', function () { + if (!isNode) { + return + } + + let ipfsA + let ipfsB + let nodeAId + let nodeBId + let bMultiaddr + let nodes = [] + + // Spawn daemons + before(function (done) { + // CI takes longer to instantiate the daemon, so we need to increase the + // timeout for the before step + this.timeout(80 * 1000) + + series([ + (cb) => { + spawnNode((err, node) => { + expect(err).to.not.exist() + ipfsA = node.api + nodes.push(node) + cb() + }) + }, + (cb) => { + spawnNode((err, node) => { + expect(err).to.not.exist() + ipfsB = node.api + nodes.push(node) + cb() + }) + } + ], done) + }) + + // Get node ids + before(function (done) { + parallel([ + (cb) => { + ipfsA.id((err, res) => { + expect(err).to.not.exist() + expect(res.id).to.exist() + nodeAId = res + cb() + }) + }, + (cb) => { + ipfsB.id((err, res) => { + expect(err).to.not.exist() + expect(res.id).to.exist() + nodeBId = res + bMultiaddr = res.addresses[0] + cb() + }) + } + ], done) + }) + + // Connect + before(function (done) { + this.timeout(60 * 1000) + ipfsA.swarm.connect(bMultiaddr, done) + }) + + after((done) => parallel(nodes.map((node) => (cb) => node.stop(cb)), done)) + + it('should get enabled state of pubsub', function (done) { + ipfsA.name.pubsub.state((err, state) => { + expect(err).to.not.exist() + expect(state).to.exist() + expect(state.enabled).to.equal(true) + + done() + }) + }) + + it('should subscribe on resolve', function (done) { + this.timeout(80 * 1000) + + ipfsB.name.pubsub.subs((err, subs) => { + expect(err).to.not.exist() + expect(subs).to.exist() + expect(subs.strings).to.deep.equal([]) + + ipfsB.name.resolve(nodeBId.id, (err) => { + expect(err).to.not.exist() + + ipfsB.name.pubsub.subs((err, subs) => { + expect(err).to.not.exist() + expect(subs).to.exist() + expect(subs.strings).to.include(`/ipns/${nodeBId.id}`) + + done() + }) + }) + }) + }) + + it('should be able to cancel subscriptions', function (done) { + this.timeout(80 * 1000) + const path = `/ipns/${nodeAId.id}` + + ipfsA.name.pubsub.cancel(path, (err, res) => { + expect(err).to.not.exist() + expect(res).to.exist() + expect(res.canceled).to.equal(false) + + ipfsA.name.resolve(nodeAId.id, (err) => { + expect(err).to.not.exist() + + ipfsA.name.pubsub.cancel(path, (err, res) => { + expect(err).to.not.exist() + expect(res).to.exist() + expect(res.canceled).to.equal(true) + + ipfsA.name.pubsub.subs((err, subs) => { + expect(err).to.not.exist() + expect(subs).to.exist() + expect(subs.strings).to.not.include(path) + + done() + }) + }) + }) + }) + }) + + it('should publish the received record to the subscriber', function (done) { + this.timeout(80 * 1000) + const topic = `/ipns/${fromB58String(nodeAId.id).toString()}` + + ipfsB.name.resolve(nodeAId.id, (err, res) => { + expect(err).to.exist() + + series([ + (cb) => waitForPeerToSubscribe(topic, nodeBId, ipfsA, cb), + (cb) => ipfsA.name.publish(ipfsRef, { resolve: false }, cb), + (cb) => ipfsA.name.resolve(nodeAId.id, cb), + (cb) => ipfsB.name.resolve(nodeAId.id, cb) + ], (err, res) => { + expect(err).to.not.exist() + expect(res).to.exist() + + expect(res[1].name).to.equal(nodeAId.id) // Published to Node A ID + expect(res[2].path).to.equal(ipfsRef) + expect(res[3].path).to.equal(ipfsRef) + done() + }) + }) + }) +}) + +// Wait until a peer subscribes a topic +const waitForPeerToSubscribe = (topic, peer, daemon, callback) => { + retry({ + times: 5, + interval: 2000 + }, (next) => { + daemon.pubsub.peers(topic, (error, peers) => { + if (error) { + return next(error) + } + + if (!peers.includes(peer.id)) { + return next(new Error(`Could not find peer ${peer.id}`)) + } + + return next() + }) + }, callback) +}