From 310c3790ad90914215e512647e90ccc655c1aba7 Mon Sep 17 00:00:00 2001 From: Vasco Santos Date: Wed, 29 Aug 2018 18:10:30 +0100 Subject: [PATCH 1/9] feat: ipns over pubsub --- README.md | 4 + package.json | 4 + src/cli/commands/daemon.js | 3 + src/cli/commands/name/pubsub.js | 18 ++ src/cli/commands/name/pubsub/cancel.js | 19 ++ src/cli/commands/name/pubsub/state.js | 19 ++ src/cli/commands/name/pubsub/subs.js | 21 ++ src/core/components/init.js | 3 + src/core/components/libp2p.js | 2 +- src/core/components/name-pubsub.js | 60 ++++++ src/core/components/name.js | 4 +- src/core/components/start.js | 12 +- src/core/config.js | 1 + src/core/index.js | 3 + src/core/ipns/index.js | 3 +- src/core/ipns/routing/pubsub.js | 208 ++++++++++++++++++++ src/http/api/resources/name.js | 63 ++++++ src/http/api/routes/name.js | 25 +++ src/http/index.js | 1 + test/cli/commands.js | 2 +- test/cli/name-pubsub.js | 253 +++++++++++++++++++++++++ test/core/name-pubsub.js | 204 ++++++++++++++++++++ 22 files changed, 926 insertions(+), 6 deletions(-) create mode 100644 src/cli/commands/name/pubsub.js create mode 100644 src/cli/commands/name/pubsub/cancel.js create mode 100644 src/cli/commands/name/pubsub/state.js create mode 100644 src/cli/commands/name/pubsub/subs.js create mode 100644 src/core/components/name-pubsub.js create mode 100644 src/core/ipns/routing/pubsub.js create mode 100644 test/cli/name-pubsub.js create mode 100644 test/core/name-pubsub.js diff --git a/README.md b/README.md index f839130a50..ea5abdb05c 100644 --- a/README.md +++ b/README.md @@ -305,6 +305,7 @@ Configure remote preload nodes. The remote will preload content added on this no Enable and configure experimental features. - `pubsub` (boolean): Enable libp2p pub-sub. (Default: `false`) +- `ipnsPubsub` (boolean): Enable pub-sub on IPNS. (Default: `false`) - `sharding` (boolean): Enable directory sharding. Directories that have many child objects will be represented by multiple DAG nodes instead of just one. It can improve lookup performance when a directory has several thousand files or more. (Default: `false`) - `dht` (boolean): Enable KadDHT. **This is currently not interoperable with `go-ipfs`.** @@ -564,6 +565,9 @@ The core API is grouped into several areas: - [name](https://github.com/ipfs/interface-ipfs-core/blob/master/SPEC/NAME.md) - [`ipfs.name.publish(value, [options], [callback])`](https://github.com/ipfs/interface-ipfs-core/blob/master/SPEC/NAME.md#namepublish) - [`ipfs.name.resolve(value, [options], [callback])`](https://github.com/ipfs/interface-ipfs-core/blob/master/SPEC/NAME.md#nameresolve) + - [`ipfs.name.pubsub.cancel(arg, [callback])`](https://github.com/ipfs/interface-ipfs-core/blob/master/SPEC/NAME.md#namepubsubcancel) + - [`ipfs.name.pubsub.state([callback])`](https://github.com/ipfs/interface-ipfs-core/blob/master/SPEC/NAME.md#namepubsubstate) + - [`ipfs.name.pubsub.subs([callback])`](https://github.com/ipfs/interface-ipfs-core/blob/master/SPEC/NAME.md#namepubsubsubs) #### Crypto and Key Management diff --git a/package.json b/package.json index a1a3b40fd4..bbdf310d3d 100644 --- a/package.json +++ b/package.json @@ -88,7 +88,11 @@ "byteman": "^1.3.5", "cid-tool": "~0.2.0", "cids": "~0.5.5", +<<<<<<< HEAD "datastore-core": "~0.6.0", +======= + "datastore-pubsub": "~0.0.2", +>>>>>>> feat: ipns over pubsub "debug": "^4.1.0", "deep-extend": "~0.6.0", "err-code": "^1.1.2", diff --git a/src/cli/commands/daemon.js b/src/cli/commands/daemon.js index cca6845bf5..e71d3a5d38 100644 --- a/src/cli/commands/daemon.js +++ b/src/cli/commands/daemon.js @@ -27,6 +27,9 @@ module.exports = { }) .option('local', { desc: 'Run commands locally to the daemon', + default: false + }) + .option('enable-namesys-pubsub', { type: 'boolean', default: false }) diff --git a/src/cli/commands/name/pubsub.js b/src/cli/commands/name/pubsub.js new file mode 100644 index 0000000000..218afb6c4b --- /dev/null +++ b/src/cli/commands/name/pubsub.js @@ -0,0 +1,18 @@ +'use strict' + +/* +Manage and inspect the state of the IPNS pubsub resolver. +Note: this command is experimental and subject to change as the system is refined. +*/ +module.exports = { + command: 'pubsub', + + description: 'IPNS pubsub management.', + + builder (yargs) { + return yargs.commandDir('pubsub') + }, + + handler (argv) { + } +} diff --git a/src/cli/commands/name/pubsub/cancel.js b/src/cli/commands/name/pubsub/cancel.js new file mode 100644 index 0000000000..5f0d709294 --- /dev/null +++ b/src/cli/commands/name/pubsub/cancel.js @@ -0,0 +1,19 @@ +'use strict' + +const print = require('../../../utils').print + +module.exports = { + command: 'cancel ', + + describe: 'Cancel a name subscription.', + + handler (argv) { + argv.ipfs.name.pubsub.cancel(argv.name, (err, result) => { + if (err) { + throw err + } else { + print(result.canceled ? 'canceled' : 'no subscription') + } + }) + } +} diff --git a/src/cli/commands/name/pubsub/state.js b/src/cli/commands/name/pubsub/state.js new file mode 100644 index 0000000000..08c65fbed1 --- /dev/null +++ b/src/cli/commands/name/pubsub/state.js @@ -0,0 +1,19 @@ +'use strict' + +const print = require('../../../utils').print + +module.exports = { + command: 'state', + + describe: 'Query the state of IPNS pubsub.', + + handler (argv) { + argv.ipfs.name.pubsub.state((err, result) => { + if (err) { + throw err + } else { + print(result.enabled ? 'enabled' : 'disabled') + } + }) + } +} diff --git a/src/cli/commands/name/pubsub/subs.js b/src/cli/commands/name/pubsub/subs.js new file mode 100644 index 0000000000..e5539aa558 --- /dev/null +++ b/src/cli/commands/name/pubsub/subs.js @@ -0,0 +1,21 @@ +'use strict' + +const print = require('../../../utils').print + +module.exports = { + command: 'subs', + + describe: 'Show current name subscriptions.', + + handler (argv) { + argv.ipfs.name.pubsub.subs((err, result) => { + if (err) { + throw err + } else { + result.strings.forEach((s) => { + print(s) + }) + } + }) + } +} diff --git a/src/core/components/init.js b/src/core/components/init.js index 51c197e977..4069092a38 100644 --- a/src/core/components/init.js +++ b/src/core/components/init.js @@ -15,6 +15,9 @@ const UnixFs = require('ipfs-unixfs') const IPNS = require('../ipns') const OfflineDatastore = require('../ipns/routing/offline-datastore') +const IPNS = require('../ipns') +const OfflineDatastore = require('../ipns/routing/offline-datastore') + const addDefaultAssets = require('./init-assets') module.exports = function init (self) { diff --git a/src/core/components/libp2p.js b/src/core/components/libp2p.js index 87838cb1e7..3d129d59c2 100644 --- a/src/core/components/libp2p.js +++ b/src/core/components/libp2p.js @@ -45,7 +45,7 @@ module.exports = function libp2p (self) { }, EXPERIMENTAL: { dht: get(opts.options, 'EXPERIMENTAL.dht', false), - pubsub: get(opts.options, 'EXPERIMENTAL.pubsub', false) + pubsub: get(opts.options, 'EXPERIMENTAL.pubsub', false) || get(opts.options, 'EXPERIMENTAL.ipnsPubsub', false) } }, connectionManager: get(opts.options, 'connectionManager', diff --git a/src/core/components/name-pubsub.js b/src/core/components/name-pubsub.js new file mode 100644 index 0000000000..d5f71ef520 --- /dev/null +++ b/src/core/components/name-pubsub.js @@ -0,0 +1,60 @@ +'use strict' + +const debug = require('debug') +const errcode = require('err-code') +const promisify = require('promisify-es6') + +const log = debug('jsipfs:name-pubsub') +log.error = debug('jsipfs:name-pubsub:error') + +const isNamePubsubEnabled = (node) => ( + node._options.EXPERIMENTAL.ipnsPubsub && node._libp2pNode._floodSub +) + +module.exports = function namePubsub (self) { + return { + /** + * Query the state of IPNS pubsub. + * + * @returns {Promise|void} + */ + state: promisify((callback) => { + callback(null, { + enabled: Boolean(isNamePubsubEnabled(self)) + }) + }), + /** + * Cancel a name subscription. + * + * @param {String} name subscription name. + * @param {function(Error)} [callback] + * @returns {Promise|void} + */ + cancel: promisify((name, callback) => { + if (!isNamePubsubEnabled(self)) { + const errMsg = 'IPNS pubsub subsystem is not enabled' + + log.error(errMsg) + return callback(errcode(errMsg, 'ERR_IPNS_PS_NOT_ENABLED')) + } + + self._ipns.pubsub.cancel(name, callback) + }), + /** + * Show current name subscriptions. + * + * @param {function(Error)} [callback] + * @returns {Promise|void} + */ + subs: promisify((callback) => { + if (!isNamePubsubEnabled(self)) { + const errMsg = 'IPNS pubsub subsystem is not enabled' + + log.error(errMsg) + return callback(errcode(errMsg, 'ERR_IPNS_PS_NOT_ENABLED')) + } + + self._ipns.pubsub.getSubscriptions(callback) + }) + } +} diff --git a/src/core/components/name.js b/src/core/components/name.js index 8acb5c89c2..6c734a35b2 100644 --- a/src/core/components/name.js +++ b/src/core/components/name.js @@ -11,6 +11,7 @@ const errcode = require('err-code') const log = debug('jsipfs:name') log.error = debug('jsipfs:name:error') +const namePubsub = require('./name-pubsub') const utils = require('../utils') const path = require('../ipns/path') @@ -161,6 +162,7 @@ module.exports = function name (self) { } self._ipns.resolve(name, resolveOptions, callback) - }) + }), + pubsub: namePubsub(self) } } diff --git a/src/core/components/start.js b/src/core/components/start.js index 518d613b0e..0b9b8dcce2 100644 --- a/src/core/components/start.js +++ b/src/core/components/start.js @@ -2,11 +2,13 @@ const series = require('async/series') const Bitswap = require('ipfs-bitswap') +const get = require('lodash/get') const setImmediate = require('async/setImmediate') const promisify = require('promisify-es6') const { TieredDatastore } = require('datastore-core') const IPNS = require('../ipns') +const Pubsub = require('../ipns/routing/pubsub') const OfflineDatastore = require('../ipns/routing/offline-datastore') module.exports = (self) => { @@ -41,7 +43,13 @@ module.exports = (self) => { // Setup online routing for IPNS with a tiered routing composed by a DHT and a Pubsub router (if properly enabled) const ipnsStores = [] - // TODO Add IPNS pubsub if enabled + // Add IPNS pubsub if enabled + let pubsub + if (get(self._options, 'EXPERIMENTAL.ipnsPubsub', false)) { + pubsub = new Pubsub(self) + + ipnsStores.push(pubsub) + } // NOTE: IPNS routing is being replaced by the local repo datastore while the IPNS over DHT is not ready // When DHT is added, if local option enabled, should receive offlineDatastore as well @@ -50,7 +58,7 @@ module.exports = (self) => { // Create ipns routing with a set of datastores const routing = new TieredDatastore(ipnsStores) - self._ipns = new IPNS(routing, self._repo, self._peerInfo, self._keychain, self._options) + self._ipns = new IPNS(routing, self._repo, self._peerInfo, self._keychain, self._options, pubsub) self._bitswap = new Bitswap( self._libp2pNode, diff --git a/src/core/config.js b/src/core/config.js index a39432d53e..73ee98038e 100644 --- a/src/core/config.js +++ b/src/core/config.js @@ -29,6 +29,7 @@ const schema = Joi.object().keys({ }).allow(null), EXPERIMENTAL: Joi.object().keys({ pubsub: Joi.boolean(), + namesysPubsub: Joi.boolean(), sharding: Joi.boolean(), dht: Joi.boolean() }).allow(null), diff --git a/src/core/index.js b/src/core/index.js index 566cec8404..f00d29e00c 100644 --- a/src/core/index.js +++ b/src/core/index.js @@ -165,6 +165,9 @@ class IPFS extends EventEmitter { if (this._options.EXPERIMENTAL.pubsub) { this.log('EXPERIMENTAL pubsub is enabled') } + if (this._options.EXPERIMENTAL.ipnsPubsub) { + this.log('EXPERIMENTAL IPNS pubsub is enabled') + } if (this._options.EXPERIMENTAL.sharding) { this.log('EXPERIMENTAL sharding is enabled') } diff --git a/src/core/ipns/index.js b/src/core/ipns/index.js index 06b43ca330..9245c4ae83 100644 --- a/src/core/ipns/index.js +++ b/src/core/ipns/index.js @@ -17,12 +17,13 @@ const path = require('./path') const defaultRecordTtl = 60 * 1000 class IPNS { - constructor (routing, repo, peerInfo, keychain, options) { + constructor (routing, repo, peerInfo, keychain, options, pubsub) { this.publisher = new IpnsPublisher(routing, repo) this.republisher = new IpnsRepublisher(this.publisher, repo, peerInfo, keychain, options) this.resolver = new IpnsResolver(routing) this.cache = new Receptacle({ max: 1000 }) // Create an LRU cache with max 1000 items this.routing = routing + this.pubsub = pubsub } // Publish diff --git a/src/core/ipns/routing/pubsub.js b/src/core/ipns/routing/pubsub.js new file mode 100644 index 0000000000..a1701bc2a0 --- /dev/null +++ b/src/core/ipns/routing/pubsub.js @@ -0,0 +1,208 @@ +'use strict' + +const ipns = require('ipns') +const { fromB58String, toB58String } = require('multihashes') +const PeerId = require('peer-id') +const DatastorePubsub = require('datastore-pubsub') + +const errcode = require('err-code') +const debug = require('debug') +const log = debug('jsipfs:ipns:pubsub') +log.error = debug('jsipfs:ipns:pubsub:error') + +const ipnsNS = '/ipns/' +const ipnsNSLength = ipnsNS.length + +// Pubsub aims to manage the pubsub subscriptions for IPNS +class Pubsub { + constructor (node) { + const pubsub = node._libp2pNode.pubsub + const localDatastore = node._repo.datastore + const peerId = node._peerInfo.id + this._pubsub = pubsub + + this._subscriptions = {} + + // Bind _handleSubscriptionKey function, which is called by datastorePubsub. + this._handleSubscriptionKey = this._handleSubscriptionKey.bind(this) + this._dsPubsub = new DatastorePubsub(pubsub, localDatastore, peerId, ipns.validator, this._handleSubscriptionKey) + } + + /** + * Put a value to the pubsub datastore indexed by the received key properly encoded. + * @param {Buffer} key identifier of the value. + * @param {Buffer} value value to be stored. + * @param {function(Error)} callback + * @returns {void} + */ + put (key, value, callback) { + if (!Buffer.isBuffer(key)) { + const errMsg = `key does not have a valid format` + + log.error(errMsg) + return callback(errcode(new Error(errMsg), 'ERR_INVALID_KEY')) + } + + if (!Buffer.isBuffer(value)) { + const errMsg = `received value is not a buffer` + + log.error(errMsg) + return callback(errcode(new Error(errMsg), 'ERR_INVALID_VALUE_RECEIVED')) + } + + this._dsPubsub.put(key, value, callback) + } + + /** + * Get a value from the pubsub datastore indexed by the received key properly encoded. + * Moreover, the identifier topic is subscribed and the pubsub datastore records will be + * updated once new publishes occur. + * @param {Buffer} key identifier of the value to be obtained. + * @param {function(Error, Buffer)} callback + * @returns {void} + */ + get (key, callback) { + if (!Buffer.isBuffer(key)) { + const errMsg = `key does not have a valid format` + + log.error(errMsg) + return callback(errcode(new Error(errMsg), 'ERR_INVALID_KEY')) + } + + this._dsPubsub.get(key, (err, res) => { + // Add topic subscribed + const ns = key.slice(0, ipnsNSLength) + + if (ns.toString() === ipnsNS) { + const stringifiedTopic = key.toString() + const id = toB58String(key.slice(ipnsNSLength)) + + this._subscriptions[stringifiedTopic] = id + + log(`subscribed pubsub ${stringifiedTopic}: ${id}`) + } + + if (err) { + return callback(err) + } + + callback(null, res) + }) + } + + // Modify subscription key to have a proper encoding + // Without this, the utf-8 encoding gets the key broken + _handleSubscriptionKey (key, callback) { + const subscriber = this._subscriptions[key] + + if (!subscriber) { + const errMsg = `key ${key} does not correspond to a subscription` + + log.error(errMsg) + return callback(errcode(new Error(errMsg), 'ERR_INVALID_KEY')) + } + + let keys + try { + keys = ipns.getIdKeys(fromB58String(subscriber)) + } catch (err) { + log.error(err) + return callback(err) + } + + callback(null, keys.routingKey.toBuffer()) + } + + /** + * Get pubsub subscriptions related to ipns. + * @param {function(Error, Object)} callback + * @returns {void} + */ + getSubscriptions (callback) { + this._pubsub.ls((err, res) => { + if (err || !res) { + log.error(err) + return callback(err) + } + + // Iterate over subscriptions + const strings = [] + res.forEach((subs) => { + const ns = subs.substring(0, ipnsNSLength) + const subscription = this._subscriptions[subs] + + if (ns === ipnsNS && subscription) { + // Verify valid PeerID + try { + PeerId.createFromBytes(Buffer.from(subscription)) + // add to the list + strings.push(`/ipns/${subscription}`) + } catch (err) { + log.error('ipns key not a valid peer ID') + } + } + }) + + callback(null, { + strings + }) + }) + } + + /** + * Get pubsub subscriptions related to ipns. + * @param {String} name ipns path to cancel the pubsub subscription. + * @param {function(Error, Object)} callback + * @returns {void} + */ + cancel (name, callback) { + if (typeof name !== 'string') { + const errMsg = `received subscription name is not valid` + + log.error(errMsg) + return callback(errcode(new Error(errMsg), 'ERR_INVALID_SUBSCRIPTION_NAME')) + } + + // Trim /ipns/ prefix from the name + if (name.startsWith(ipnsNS)) { + name = name.substring(ipnsNSLength) + } + + // Verify peerId validity + try { + PeerId.createFromBytes(Buffer.from(name)) + } catch (err) { + const errMsg = `ipns key is not a valid peer ID` + + log.error(errMsg) + return callback(errcode(new Error(errMsg), 'ERR_INVALID_IPNS_KEY')) + } + + const stringifiedTopic = Object.keys(this._subscriptions).find((key) => this._subscriptions[key] === name) + + // Not found topic + if (!stringifiedTopic) { + return callback(null, { + canceled: false + }) + } + + // Unsubscribe topic + try { + const bufTopic = Buffer.from(stringifiedTopic) + + this._dsPubsub.unsubscribe(bufTopic) + } catch (err) { + return callback(err) + } + + delete this._subscriptions[stringifiedTopic] + log(`unsubscribed pubsub ${stringifiedTopic}: ${name}`) + + callback(null, { + canceled: true + }) + } +} + +exports = module.exports = Pubsub diff --git a/src/http/api/resources/name.js b/src/http/api/resources/name.js index 23fc8bbab1..090b2ce630 100644 --- a/src/http/api/resources/name.js +++ b/src/http/api/resources/name.js @@ -59,3 +59,66 @@ exports.publish = { }) } } + +exports.pubsub = { + state: { + handler: (request, reply) => { + const ipfs = request.server.app.ipfs + + ipfs.name.pubsub.state((err, res) => { + if (err) { + return reply({ + Message: err.toString(), + Code: 0 + }).code(500) + } + + return reply({ + Enabled: res.enabled + }).code(200) + }) + } + }, + subs: { + handler: (request, reply) => { + const ipfs = request.server.app.ipfs + + ipfs.name.pubsub.subs((err, res) => { + if (err) { + return reply({ + Message: err.toString(), + Code: 0 + }).code(500) + } + + return reply({ + Strings: res.strings + }).code(200) + }) + } + }, + cancel: { + validate: { + query: Joi.object().keys({ + arg: Joi.string().required() + }).unknown() + }, + handler: (request, reply) => { + const ipfs = request.server.app.ipfs + const { arg } = request.query + + ipfs.name.pubsub.cancel(arg, (err, res) => { + if (err) { + return reply({ + Message: err.toString(), + Code: 0 + }).code(500) + } + + return reply({ + Canceled: res.canceled + }).code(200) + }) + } + } +} diff --git a/src/http/api/routes/name.js b/src/http/api/routes/name.js index 29647f3a6a..f49d0bd6ca 100644 --- a/src/http/api/routes/name.js +++ b/src/http/api/routes/name.js @@ -22,4 +22,29 @@ module.exports = (server) => { validate: resources.name.publish.validate } }) + + api.route({ + method: '*', + path: '/api/v0/name/pubsub/state', + config: { + handler: resources.name.pubsub.state.handler + } + }) + + api.route({ + method: '*', + path: '/api/v0/name/pubsub/subs', + config: { + handler: resources.name.pubsub.subs.handler + } + }) + + api.route({ + method: '*', + path: '/api/v0/name/pubsub/cancel', + config: { + handler: resources.name.pubsub.cancel.handler, + validate: resources.name.pubsub.cancel.validate + } + }) } diff --git a/src/http/index.js b/src/http/index.js index 2d7781fc3f..03cfb394c3 100644 --- a/src/http/index.js +++ b/src/http/index.js @@ -75,6 +75,7 @@ function HttpApi (repo, config, cliArgs) { pass: cliArgs && cliArgs.pass, EXPERIMENTAL: { pubsub: cliArgs && cliArgs.enablePubsubExperiment, + ipnsPubsub: cliArgs && cliArgs.enableNamesysPubsub, dht: cliArgs && cliArgs.enableDhtExperiment, sharding: cliArgs && cliArgs.enableShardingExperiment }, diff --git a/test/cli/commands.js b/test/cli/commands.js index a14a31e826..9ba3f44519 100644 --- a/test/cli/commands.js +++ b/test/cli/commands.js @@ -4,7 +4,7 @@ const expect = require('chai').expect const runOnAndOff = require('../utils/on-and-off') -const commandCount = 82 +const commandCount = 86 describe('commands', () => runOnAndOff((thing) => { let ipfs diff --git a/test/cli/name-pubsub.js b/test/cli/name-pubsub.js new file mode 100644 index 0000000000..79416ea40f --- /dev/null +++ b/test/cli/name-pubsub.js @@ -0,0 +1,253 @@ +/* eslint max-nested-callbacks: ["error", 7] */ +/* eslint-env mocha */ +'use strict' + +const chai = require('chai') +const dirtyChai = require('dirty-chai') +const expect = chai.expect +chai.use(dirtyChai) + +const parallel = require('async/parallel') +const series = require('async/series') +const ipfsExec = require('../utils/ipfs-exec') + +const DaemonFactory = require('ipfsd-ctl') +const df = DaemonFactory.create({ type: 'js' }) + +const checkAll = (bits) => string => bits.every(bit => string.includes(bit)) +const emptyDirCid = 'QmUNLLsPACCz1vLxQVkXqqLX5R1X345qqfHbsf67hvA3Nn' + +const spawnDaemon = (callback) => { + df.spawn({ + exec: `./src/cli/bin.js`, + args: ['--enable-namesys-pubsub'], + initOptions: { bits: 512 } + }, callback) +} + +describe('name-pubsub', () => { + describe('enabled', () => { + let ipfsA + let ipfsB + let nodeAId + let nodeBId + let bMultiaddr + let nodes = [] + + // Spawn daemons + before(function (done) { + // CI takes longer to instantiate the daemon, so we need to increase the + // timeout for the before step + this.timeout(80 * 1000) + + series([ + (cb) => { + spawnDaemon((err, node) => { + expect(err).to.not.exist() + ipfsA = ipfsExec(node.repoPath) + nodes.push(node) + cb() + }) + }, + (cb) => { + spawnDaemon((err, node) => { + expect(err).to.not.exist() + ipfsB = ipfsExec(node.repoPath) + nodes.push(node) + cb() + }) + } + ], done) + }) + + // Get node ids + before(function (done) { + parallel([ + (cb) => { + ipfsA('id').then((res) => { + nodeAId = JSON.parse(res) + cb() + }) + }, + (cb) => { + ipfsB('id').then((res) => { + const id = JSON.parse(res) + + nodeBId = id + bMultiaddr = id.addresses[0] + cb() + }) + } + ], done) + }) + + // Connect + before(function (done) { + ipfsA('swarm', 'connect', bMultiaddr).then((out) => { + expect(out).to.eql(`connect ${bMultiaddr} success\n`) + done() + }) + }) + + after((done) => parallel(nodes.map((node) => (cb) => node.stop(cb)), done)) + + describe('pubsub commands', () => { + before(function (done) { + this.timeout(50 * 1000) + done() + }) + + it('should get enabled state of pubsub', function (done) { + ipfsA('name pubsub state').then((res) => { + expect(res).to.exist() + expect(res).to.have.string('enabled') // enabled + + done() + }) + }) + + it('should subscribe on name resolve', function (done) { + this.timeout(80 * 1000) + + ipfsB(`name resolve ${nodeAId.id}`).catch((err) => { + expect(err).to.exist() // Not available (subscribed) + + ipfsB('pubsub ls').then((res) => { + expect(res).to.exist() + expect(res).to.have.string('/ipns/') // have an ipns subscribtion + + ipfsB('name pubsub subs').then((res) => { + expect(res).to.exist() + expect(res).to.have.string(`/ipns/${nodeAId.id}`) // have subscription + + done() + }) + }) + }) + }) + + it('should be able to cancel subscriptions', function (done) { + this.timeout(80 * 1000) + + ipfsA(`name pubsub cancel /ipns/${nodeBId.id}`).then((res) => { + expect(res).to.exist() + expect(res).to.have.string('no subscription') // tried to cancel a not yet subscribed id + + ipfsA(`name resolve ${nodeBId.id}`).catch((err) => { + expect(err).to.exist() // Not available (subscribed now) + + ipfsA(`name pubsub cancel /ipns/${nodeBId.id}`).then((res) => { + expect(res).to.exist() + expect(res).to.have.string('canceled') // canceled now + + ipfsA('pubsub ls').then((res) => { + expect(res).to.exist() + expect(res).to.not.have.string('/ipns/') // ipns subscribtion not available + + ipfsA('name pubsub subs').then((res) => { + expect(res).to.exist() + expect(res).to.not.have.string(`/ipns/${nodeBId.id}`) // ipns subscribtion not available + + done() + }) + }) + }) + }) + }) + }) + }) + + describe('pubsub records', () => { + let cidAdded + + before(function (done) { + this.timeout(50 * 1000) + ipfsA('add src/init-files/init-docs/readme').then((out) => { + cidAdded = out.split(' ')[1] + done() + }) + }) + + it('should publish the received record to the subscriber', function (done) { + this.timeout(80 * 1000) + + ipfsB(`name resolve ${nodeBId.id}`).then((res) => { + expect(res).to.exist() + expect(res).to.satisfy(checkAll([emptyDirCid])) // Empty dir received (subscribed) + + ipfsA(`name resolve ${nodeBId.id}`).catch((err) => { + expect(err).to.exist() // Not available (subscribed now) + + ipfsB(`name publish ${cidAdded}`).then((res) => { + // published to IpfsB and published through pubsub to ipfsa + expect(res).to.exist() + expect(res).to.satisfy(checkAll([cidAdded, nodeBId.id])) + + ipfsB(`name resolve ${nodeBId.id}`).then((res) => { + expect(res).to.exist() + expect(res).to.satisfy(checkAll([cidAdded])) + + ipfsA(`name resolve ${nodeBId.id}`).then((res) => { + expect(res).to.exist() + expect(res).to.satisfy(checkAll([cidAdded])) // value propagated to node B + + done() + }) + }) + }) + }) + }) + }) + }) + }) + + describe('disabled', () => { + let ipfsA + let nodes = [] + + // Spawn daemons + before(function (done) { + // CI takes longer to instantiate the daemon, so we need to increase the + // timeout for the before step + this.timeout(80 * 1000) + + df.spawn({ + exec: `./src/cli/bin.js`, + config: {}, + initOptions: { bits: 512 } + }, (err, node) => { + expect(err).to.not.exist() + ipfsA = ipfsExec(node.repoPath) + nodes.push(node) + done() + }) + }) + + after((done) => parallel(nodes.map((node) => (cb) => node.stop(cb)), done)) + + it('should get disabled state of pubsub', function (done) { + ipfsA('name pubsub state').then((res) => { + expect(res).to.exist() + expect(res).to.have.string('disabled') + + done() + }) + }) + + it('should get error getting the available subscriptions', function (done) { + ipfsA('name pubsub subs').catch((err) => { + expect(err).to.exist() // error as it is disabled + + done() + }) + }) + + it('should get error canceling a subscription', function (done) { + ipfsA('name pubsub cancel /ipns/QmSWxaPcGgf4TDnFEBDWz2JnbHywF14phmY9hNcAeBEK5v').catch((err) => { + expect(err).to.exist() // error as it is disabled + + done() + }) + }) + }) +}) diff --git a/test/core/name-pubsub.js b/test/core/name-pubsub.js new file mode 100644 index 0000000000..23344892de --- /dev/null +++ b/test/core/name-pubsub.js @@ -0,0 +1,204 @@ +/* eslint max-nested-callbacks: ["error", 6] */ +/* eslint-env mocha */ +'use strict' + +const chai = require('chai') +const dirtyChai = require('dirty-chai') +const expect = chai.expect +chai.use(dirtyChai) + +const parallel = require('async/parallel') +const retry = require('async/retry') +const series = require('async/series') + +const isNode = require('detect-node') +const IPFS = require('../../src') +const { fromB58String } = require('multihashes') + +const DaemonFactory = require('ipfsd-ctl') +const df = DaemonFactory.create({ type: 'proc', exec: IPFS }) + +const spawnNode = (callback) => { + df.spawn({ + args: ['--enable-namesys-pubsub', '--enable-pubsub-experiment'], + disposable: true, + bits: 512 + }, callback) +} + +const ipfsRef = '/ipfs/QmPFVLPmp9zv5Z5KUqLhe2EivAGccQW2r7M7jhVJGLZoZU' + +describe('name-pubsub', function () { + if (!isNode) { + return + } + + let ipfsA + let ipfsB + let nodeAId + let nodeBId + let bMultiaddr + let nodes = [] + + // Spawn daemons + before(function (done) { + // CI takes longer to instantiate the daemon, so we need to increase the + // timeout for the before step + this.timeout(80 * 1000) + + series([ + (cb) => { + spawnNode((err, node) => { + expect(err).to.not.exist() + ipfsA = node.api + nodes.push(node) + cb() + }) + }, + (cb) => { + spawnNode((err, node) => { + expect(err).to.not.exist() + ipfsB = node.api + nodes.push(node) + cb() + }) + } + ], done) + }) + + // Get node ids + before(function (done) { + parallel([ + (cb) => { + ipfsA.id((err, res) => { + expect(err).to.not.exist() + expect(res.id).to.exist() + nodeAId = res + cb() + }) + }, + (cb) => { + ipfsB.id((err, res) => { + expect(err).to.not.exist() + expect(res.id).to.exist() + nodeBId = res + bMultiaddr = res.addresses[0] + cb() + }) + } + ], done) + }) + + // Connect + before(function (done) { + this.timeout(60 * 1000) + ipfsA.swarm.connect(bMultiaddr, done) + }) + + after((done) => parallel(nodes.map((node) => (cb) => node.stop(cb)), done)) + + it('should get enabled state of pubsub', function (done) { + ipfsA.name.pubsub.state((err, state) => { + expect(err).to.not.exist() + expect(state).to.exist() + expect(state.enabled).to.equal(true) + + done() + }) + }) + + it('should subscribe on resolve', function (done) { + this.timeout(80 * 1000) + + ipfsB.name.pubsub.subs((err, subs) => { + expect(err).to.not.exist() + expect(subs).to.exist() + expect(subs.strings).to.deep.equal([]) + + ipfsB.name.resolve(nodeBId.id, (err) => { + expect(err).to.not.exist() + + ipfsB.name.pubsub.subs((err, subs) => { + expect(err).to.not.exist() + expect(subs).to.exist() + expect(subs.strings).to.include(`/ipns/${nodeBId.id}`) + + done() + }) + }) + }) + }) + + it('should be able to cancel subscriptions', function (done) { + this.timeout(80 * 1000) + const path = `/ipns/${nodeAId.id}` + + ipfsA.name.pubsub.cancel(path, (err, res) => { + expect(err).to.not.exist() + expect(res).to.exist() + expect(res.canceled).to.equal(false) + + ipfsA.name.resolve(nodeAId.id, (err) => { + expect(err).to.not.exist() + + ipfsA.name.pubsub.cancel(path, (err, res) => { + expect(err).to.not.exist() + expect(res).to.exist() + expect(res.canceled).to.equal(true) + + ipfsA.name.pubsub.subs((err, subs) => { + expect(err).to.not.exist() + expect(subs).to.exist() + expect(subs.strings).to.not.include(path) + + done() + }) + }) + }) + }) + }) + + it('should publish the received record to the subscriber', function (done) { + this.timeout(80 * 1000) + const topic = `/ipns/${fromB58String(nodeAId.id).toString()}` + + ipfsB.name.resolve(nodeAId.id, (err, res) => { + expect(err).to.exist() + + series([ + (cb) => waitForPeerToSubscribe(topic, nodeBId, ipfsA, cb), + (cb) => ipfsA.name.publish(ipfsRef, { resolve: false }, cb), + (cb) => ipfsA.name.resolve(nodeAId.id, cb), + (cb) => ipfsB.name.resolve(nodeAId.id, cb) + ], (err, res) => { + expect(err).to.not.exist() + expect(res).to.exist() + + expect(res[1].name).to.equal(nodeAId.id) // Published to Node A ID + expect(res[2].path).to.equal(ipfsRef) + expect(res[3].path).to.equal(ipfsRef) + done() + }) + }) + }) +}) + +// Wait until a peer subscribes a topic +const waitForPeerToSubscribe = (topic, peer, daemon, callback) => { + retry({ + times: 5, + interval: 2000 + }, (next) => { + daemon.pubsub.peers(topic, (error, peers) => { + if (error) { + return next(error) + } + + if (!peers.includes(peer.id)) { + return next(new Error(`Could not find peer ${peer.id}`)) + } + + return next() + }) + }, callback) +} From 4c40fc1ae31efc327d1256f6545b3f1890ca75b6 Mon Sep 17 00:00:00 2001 From: Vasco Santos Date: Wed, 17 Oct 2018 17:24:51 +0100 Subject: [PATCH 2/9] fix: updated according to the modification on the api --- src/cli/commands/name/pubsub/subs.js | 2 +- src/core/ipns/index.js | 2 +- src/core/ipns/routing/pubsub.js | 4 +--- test/core/interface.spec.js | 8 ++++++++ 4 files changed, 11 insertions(+), 5 deletions(-) diff --git a/src/cli/commands/name/pubsub/subs.js b/src/cli/commands/name/pubsub/subs.js index e5539aa558..ced5682626 100644 --- a/src/cli/commands/name/pubsub/subs.js +++ b/src/cli/commands/name/pubsub/subs.js @@ -12,7 +12,7 @@ module.exports = { if (err) { throw err } else { - result.strings.forEach((s) => { + result.forEach((s) => { print(s) }) } diff --git a/src/core/ipns/index.js b/src/core/ipns/index.js index 9245c4ae83..6092c043f9 100644 --- a/src/core/ipns/index.js +++ b/src/core/ipns/index.js @@ -72,7 +72,7 @@ class IPNS { options = options || {} // If recursive, we should not try to get the cached value - if (!options.nocache && !options.recursive) { + if (options.nocache && !options.recursive) { // Try to get the record from cache const id = name.split('/')[2] const result = this.cache.get(id) diff --git a/src/core/ipns/routing/pubsub.js b/src/core/ipns/routing/pubsub.js index a1701bc2a0..770ccb305e 100644 --- a/src/core/ipns/routing/pubsub.js +++ b/src/core/ipns/routing/pubsub.js @@ -143,9 +143,7 @@ class Pubsub { } }) - callback(null, { - strings - }) + callback(null, strings) }) } diff --git a/test/core/interface.spec.js b/test/core/interface.spec.js index 6fde7915c9..7067a5733c 100644 --- a/test/core/interface.spec.js +++ b/test/core/interface.spec.js @@ -124,6 +124,14 @@ describe('interface-ipfs-core tests', () => { } })) + /* + tests.namePubsub(CommonFactory.create({ + spawnOptions: { + args: ['--enable-namesys-pubsub'], + initOptions: { bits: 1024 } + } + })) */ + tests.object(defaultCommonFactory) tests.pin(defaultCommonFactory) From 66907b58d2531c5cb3ac96be3ca80976d160b9a3 Mon Sep 17 00:00:00 2001 From: Vasco Santos Date: Thu, 18 Oct 2018 15:34:49 +0100 Subject: [PATCH 3/9] fix: first pass of code review --- README.md | 2 +- src/core/components/name-pubsub.js | 10 +- src/core/components/start.js | 13 +- .../{pubsub.js => pubsub-datastore.js} | 68 ++---- src/http/api/resources/name.js | 2 +- test/core/interface.spec.js | 3 +- test/core/name-pubsub.js | 204 ------------------ 7 files changed, 33 insertions(+), 269 deletions(-) rename src/core/ipns/routing/{pubsub.js => pubsub-datastore.js} (70%) delete mode 100644 test/core/name-pubsub.js diff --git a/README.md b/README.md index ea5abdb05c..d98a608b8d 100644 --- a/README.md +++ b/README.md @@ -564,10 +564,10 @@ The core API is grouped into several areas: - [name](https://github.com/ipfs/interface-ipfs-core/blob/master/SPEC/NAME.md) - [`ipfs.name.publish(value, [options], [callback])`](https://github.com/ipfs/interface-ipfs-core/blob/master/SPEC/NAME.md#namepublish) - - [`ipfs.name.resolve(value, [options], [callback])`](https://github.com/ipfs/interface-ipfs-core/blob/master/SPEC/NAME.md#nameresolve) - [`ipfs.name.pubsub.cancel(arg, [callback])`](https://github.com/ipfs/interface-ipfs-core/blob/master/SPEC/NAME.md#namepubsubcancel) - [`ipfs.name.pubsub.state([callback])`](https://github.com/ipfs/interface-ipfs-core/blob/master/SPEC/NAME.md#namepubsubstate) - [`ipfs.name.pubsub.subs([callback])`](https://github.com/ipfs/interface-ipfs-core/blob/master/SPEC/NAME.md#namepubsubsubs) + - [`ipfs.name.resolve(value, [options], [callback])`](https://github.com/ipfs/interface-ipfs-core/blob/master/SPEC/NAME.md#nameresolve) #### Crypto and Key Management diff --git a/src/core/components/name-pubsub.js b/src/core/components/name-pubsub.js index d5f71ef520..3612b8d292 100644 --- a/src/core/components/name-pubsub.js +++ b/src/core/components/name-pubsub.js @@ -8,7 +8,9 @@ const log = debug('jsipfs:name-pubsub') log.error = debug('jsipfs:name-pubsub:error') const isNamePubsubEnabled = (node) => ( - node._options.EXPERIMENTAL.ipnsPubsub && node._libp2pNode._floodSub + Boolean(node._options.EXPERIMENTAL.ipnsPubsub && + node._libp2pNode && + node._libp2pNode._floodSub) ) module.exports = function namePubsub (self) { @@ -20,7 +22,7 @@ module.exports = function namePubsub (self) { */ state: promisify((callback) => { callback(null, { - enabled: Boolean(isNamePubsubEnabled(self)) + enabled: isNamePubsubEnabled(self) }) }), /** @@ -35,7 +37,7 @@ module.exports = function namePubsub (self) { const errMsg = 'IPNS pubsub subsystem is not enabled' log.error(errMsg) - return callback(errcode(errMsg, 'ERR_IPNS_PS_NOT_ENABLED')) + return callback(errcode(errMsg, 'ERR_IPNS_PUBSUB_NOT_ENABLED')) } self._ipns.pubsub.cancel(name, callback) @@ -51,7 +53,7 @@ module.exports = function namePubsub (self) { const errMsg = 'IPNS pubsub subsystem is not enabled' log.error(errMsg) - return callback(errcode(errMsg, 'ERR_IPNS_PS_NOT_ENABLED')) + return callback(errcode(errMsg, 'ERR_IPNS_PUBSUB_NOT_ENABLED')) } self._ipns.pubsub.getSubscriptions(callback) diff --git a/src/core/components/start.js b/src/core/components/start.js index 0b9b8dcce2..8f58712ca9 100644 --- a/src/core/components/start.js +++ b/src/core/components/start.js @@ -8,7 +8,7 @@ const promisify = require('promisify-es6') const { TieredDatastore } = require('datastore-core') const IPNS = require('../ipns') -const Pubsub = require('../ipns/routing/pubsub') +const PubsubDatastore = require('../ipns/routing/pubsub-datastore') const OfflineDatastore = require('../ipns/routing/offline-datastore') module.exports = (self) => { @@ -44,11 +44,14 @@ module.exports = (self) => { const ipnsStores = [] // Add IPNS pubsub if enabled - let pubsub + let pubsubDs if (get(self._options, 'EXPERIMENTAL.ipnsPubsub', false)) { - pubsub = new Pubsub(self) + const pubsub = self._libp2pNode.pubsub + const localDatastore = self._repo.datastore + const peerId = self._peerInfo.id - ipnsStores.push(pubsub) + pubsubDs = new PubsubDatastore(pubsub, localDatastore, peerId) + ipnsStores.push(pubsubDs) } // NOTE: IPNS routing is being replaced by the local repo datastore while the IPNS over DHT is not ready @@ -58,7 +61,7 @@ module.exports = (self) => { // Create ipns routing with a set of datastores const routing = new TieredDatastore(ipnsStores) - self._ipns = new IPNS(routing, self._repo, self._peerInfo, self._keychain, self._options, pubsub) + self._ipns = new IPNS(routing, self._repo, self._peerInfo, self._keychain, self._options, pubsubDs) self._bitswap = new Bitswap( self._libp2pNode, diff --git a/src/core/ipns/routing/pubsub.js b/src/core/ipns/routing/pubsub-datastore.js similarity index 70% rename from src/core/ipns/routing/pubsub.js rename to src/core/ipns/routing/pubsub-datastore.js index 770ccb305e..b58ee97483 100644 --- a/src/core/ipns/routing/pubsub.js +++ b/src/core/ipns/routing/pubsub-datastore.js @@ -3,7 +3,7 @@ const ipns = require('ipns') const { fromB58String, toB58String } = require('multihashes') const PeerId = require('peer-id') -const DatastorePubsub = require('datastore-pubsub') +const PubsubDatastore = require('datastore-pubsub') const errcode = require('err-code') const debug = require('debug') @@ -14,18 +14,14 @@ const ipnsNS = '/ipns/' const ipnsNSLength = ipnsNS.length // Pubsub aims to manage the pubsub subscriptions for IPNS -class Pubsub { - constructor (node) { - const pubsub = node._libp2pNode.pubsub - const localDatastore = node._repo.datastore - const peerId = node._peerInfo.id +class IpnsPubsubDatastore { + constructor (pubsub, localDatastore, peerId) { this._pubsub = pubsub - this._subscriptions = {} - // Bind _handleSubscriptionKey function, which is called by datastorePubsub. + // Bind _handleSubscriptionKey function, which is called by PubsubDatastore. this._handleSubscriptionKey = this._handleSubscriptionKey.bind(this) - this._dsPubsub = new DatastorePubsub(pubsub, localDatastore, peerId, ipns.validator, this._handleSubscriptionKey) + this._pubsubDs = new PubsubDatastore(pubsub, localDatastore, peerId, ipns.validator, this._handleSubscriptionKey) } /** @@ -37,20 +33,20 @@ class Pubsub { */ put (key, value, callback) { if (!Buffer.isBuffer(key)) { - const errMsg = `key does not have a valid format` + const errMsg = `key ${key} does not have a valid format` log.error(errMsg) return callback(errcode(new Error(errMsg), 'ERR_INVALID_KEY')) } if (!Buffer.isBuffer(value)) { - const errMsg = `received value is not a buffer` + const errMsg = `received value ${value} is not a buffer` log.error(errMsg) return callback(errcode(new Error(errMsg), 'ERR_INVALID_VALUE_RECEIVED')) } - this._dsPubsub.put(key, value, callback) + this._pubsubDs.put(key, value, callback) } /** @@ -63,13 +59,13 @@ class Pubsub { */ get (key, callback) { if (!Buffer.isBuffer(key)) { - const errMsg = `key does not have a valid format` + const errMsg = `key ${key} does not have a valid format` log.error(errMsg) return callback(errcode(new Error(errMsg), 'ERR_INVALID_KEY')) } - this._dsPubsub.get(key, (err, res) => { + this._pubsubDs.get(key, (err, res) => { // Add topic subscribed const ns = key.slice(0, ipnsNSLength) @@ -82,6 +78,7 @@ class Pubsub { log(`subscribed pubsub ${stringifiedTopic}: ${id}`) } + // If no data was obtained, after storing the subscription, return the error. if (err) { return callback(err) } @@ -119,36 +116,13 @@ class Pubsub { * @returns {void} */ getSubscriptions (callback) { - this._pubsub.ls((err, res) => { - if (err || !res) { - log.error(err) - return callback(err) - } + const subscriptions = Object.values(this._subscriptions) - // Iterate over subscriptions - const strings = [] - res.forEach((subs) => { - const ns = subs.substring(0, ipnsNSLength) - const subscription = this._subscriptions[subs] - - if (ns === ipnsNS && subscription) { - // Verify valid PeerID - try { - PeerId.createFromBytes(Buffer.from(subscription)) - // add to the list - strings.push(`/ipns/${subscription}`) - } catch (err) { - log.error('ipns key not a valid peer ID') - } - } - }) - - callback(null, strings) - }) + return callback(null, subscriptions.map((sub) => `/ipns/${sub}`)) } /** - * Get pubsub subscriptions related to ipns. + * Cancel pubsub subscriptions related to ipns. * @param {String} name ipns path to cancel the pubsub subscription. * @param {function(Error, Object)} callback * @returns {void} @@ -166,16 +140,6 @@ class Pubsub { name = name.substring(ipnsNSLength) } - // Verify peerId validity - try { - PeerId.createFromBytes(Buffer.from(name)) - } catch (err) { - const errMsg = `ipns key is not a valid peer ID` - - log.error(errMsg) - return callback(errcode(new Error(errMsg), 'ERR_INVALID_IPNS_KEY')) - } - const stringifiedTopic = Object.keys(this._subscriptions).find((key) => this._subscriptions[key] === name) // Not found topic @@ -189,7 +153,7 @@ class Pubsub { try { const bufTopic = Buffer.from(stringifiedTopic) - this._dsPubsub.unsubscribe(bufTopic) + this._pubsubDs.unsubscribe(bufTopic) } catch (err) { return callback(err) } @@ -203,4 +167,4 @@ class Pubsub { } } -exports = module.exports = Pubsub +exports = module.exports = IpnsPubsubDatastore diff --git a/src/http/api/resources/name.js b/src/http/api/resources/name.js index 090b2ce630..be2c9eaad5 100644 --- a/src/http/api/resources/name.js +++ b/src/http/api/resources/name.js @@ -92,7 +92,7 @@ exports.pubsub = { } return reply({ - Strings: res.strings + Strings: res }).code(200) }) } diff --git a/test/core/interface.spec.js b/test/core/interface.spec.js index 7067a5733c..9d28ea3332 100644 --- a/test/core/interface.spec.js +++ b/test/core/interface.spec.js @@ -124,13 +124,12 @@ describe('interface-ipfs-core tests', () => { } })) - /* tests.namePubsub(CommonFactory.create({ spawnOptions: { args: ['--enable-namesys-pubsub'], initOptions: { bits: 1024 } } - })) */ + })) tests.object(defaultCommonFactory) diff --git a/test/core/name-pubsub.js b/test/core/name-pubsub.js deleted file mode 100644 index 23344892de..0000000000 --- a/test/core/name-pubsub.js +++ /dev/null @@ -1,204 +0,0 @@ -/* eslint max-nested-callbacks: ["error", 6] */ -/* eslint-env mocha */ -'use strict' - -const chai = require('chai') -const dirtyChai = require('dirty-chai') -const expect = chai.expect -chai.use(dirtyChai) - -const parallel = require('async/parallel') -const retry = require('async/retry') -const series = require('async/series') - -const isNode = require('detect-node') -const IPFS = require('../../src') -const { fromB58String } = require('multihashes') - -const DaemonFactory = require('ipfsd-ctl') -const df = DaemonFactory.create({ type: 'proc', exec: IPFS }) - -const spawnNode = (callback) => { - df.spawn({ - args: ['--enable-namesys-pubsub', '--enable-pubsub-experiment'], - disposable: true, - bits: 512 - }, callback) -} - -const ipfsRef = '/ipfs/QmPFVLPmp9zv5Z5KUqLhe2EivAGccQW2r7M7jhVJGLZoZU' - -describe('name-pubsub', function () { - if (!isNode) { - return - } - - let ipfsA - let ipfsB - let nodeAId - let nodeBId - let bMultiaddr - let nodes = [] - - // Spawn daemons - before(function (done) { - // CI takes longer to instantiate the daemon, so we need to increase the - // timeout for the before step - this.timeout(80 * 1000) - - series([ - (cb) => { - spawnNode((err, node) => { - expect(err).to.not.exist() - ipfsA = node.api - nodes.push(node) - cb() - }) - }, - (cb) => { - spawnNode((err, node) => { - expect(err).to.not.exist() - ipfsB = node.api - nodes.push(node) - cb() - }) - } - ], done) - }) - - // Get node ids - before(function (done) { - parallel([ - (cb) => { - ipfsA.id((err, res) => { - expect(err).to.not.exist() - expect(res.id).to.exist() - nodeAId = res - cb() - }) - }, - (cb) => { - ipfsB.id((err, res) => { - expect(err).to.not.exist() - expect(res.id).to.exist() - nodeBId = res - bMultiaddr = res.addresses[0] - cb() - }) - } - ], done) - }) - - // Connect - before(function (done) { - this.timeout(60 * 1000) - ipfsA.swarm.connect(bMultiaddr, done) - }) - - after((done) => parallel(nodes.map((node) => (cb) => node.stop(cb)), done)) - - it('should get enabled state of pubsub', function (done) { - ipfsA.name.pubsub.state((err, state) => { - expect(err).to.not.exist() - expect(state).to.exist() - expect(state.enabled).to.equal(true) - - done() - }) - }) - - it('should subscribe on resolve', function (done) { - this.timeout(80 * 1000) - - ipfsB.name.pubsub.subs((err, subs) => { - expect(err).to.not.exist() - expect(subs).to.exist() - expect(subs.strings).to.deep.equal([]) - - ipfsB.name.resolve(nodeBId.id, (err) => { - expect(err).to.not.exist() - - ipfsB.name.pubsub.subs((err, subs) => { - expect(err).to.not.exist() - expect(subs).to.exist() - expect(subs.strings).to.include(`/ipns/${nodeBId.id}`) - - done() - }) - }) - }) - }) - - it('should be able to cancel subscriptions', function (done) { - this.timeout(80 * 1000) - const path = `/ipns/${nodeAId.id}` - - ipfsA.name.pubsub.cancel(path, (err, res) => { - expect(err).to.not.exist() - expect(res).to.exist() - expect(res.canceled).to.equal(false) - - ipfsA.name.resolve(nodeAId.id, (err) => { - expect(err).to.not.exist() - - ipfsA.name.pubsub.cancel(path, (err, res) => { - expect(err).to.not.exist() - expect(res).to.exist() - expect(res.canceled).to.equal(true) - - ipfsA.name.pubsub.subs((err, subs) => { - expect(err).to.not.exist() - expect(subs).to.exist() - expect(subs.strings).to.not.include(path) - - done() - }) - }) - }) - }) - }) - - it('should publish the received record to the subscriber', function (done) { - this.timeout(80 * 1000) - const topic = `/ipns/${fromB58String(nodeAId.id).toString()}` - - ipfsB.name.resolve(nodeAId.id, (err, res) => { - expect(err).to.exist() - - series([ - (cb) => waitForPeerToSubscribe(topic, nodeBId, ipfsA, cb), - (cb) => ipfsA.name.publish(ipfsRef, { resolve: false }, cb), - (cb) => ipfsA.name.resolve(nodeAId.id, cb), - (cb) => ipfsB.name.resolve(nodeAId.id, cb) - ], (err, res) => { - expect(err).to.not.exist() - expect(res).to.exist() - - expect(res[1].name).to.equal(nodeAId.id) // Published to Node A ID - expect(res[2].path).to.equal(ipfsRef) - expect(res[3].path).to.equal(ipfsRef) - done() - }) - }) - }) -}) - -// Wait until a peer subscribes a topic -const waitForPeerToSubscribe = (topic, peer, daemon, callback) => { - retry({ - times: 5, - interval: 2000 - }, (next) => { - daemon.pubsub.peers(topic, (error, peers) => { - if (error) { - return next(error) - } - - if (!peers.includes(peer.id)) { - return next(new Error(`Could not find peer ${peer.id}`)) - } - - return next() - }) - }, callback) -} From 3a4b139b07d7937ec17c82c465320fc043dcef1e Mon Sep 17 00:00:00 2001 From: Vasco Santos Date: Thu, 18 Oct 2018 18:21:48 +0100 Subject: [PATCH 4/9] fix: second pass of code review --- package.json | 6 + src/core/components/name-pubsub.js | 68 +++++++--- src/core/components/pre-start.js | 11 ++ src/core/ipns/routing/offline-datastore.js | 5 - src/core/ipns/routing/pubsub-datastore.js | 6 +- test/cli/name-pubsub.js | 149 +++++++++++---------- 6 files changed, 152 insertions(+), 93 deletions(-) diff --git a/package.json b/package.json index bbdf310d3d..f920bfee7e 100644 --- a/package.json +++ b/package.json @@ -80,6 +80,7 @@ "dependencies": { "@nodeutils/defaults-deep": "^1.1.0", "async": "^2.6.1", + "base32.js": "~0.1.0", "big.js": "^5.2.2", "binary-querystring": "~0.1.2", "bl": "^2.1.2", @@ -88,9 +89,14 @@ "byteman": "^1.3.5", "cid-tool": "~0.2.0", "cids": "~0.5.5", +<<<<<<< HEAD <<<<<<< HEAD "datastore-core": "~0.6.0", ======= +======= + "class-is": "^1.1.0", + "datastore-core": "~0.4.0", +>>>>>>> fix: second pass of code review "datastore-pubsub": "~0.0.2", >>>>>>> feat: ipns over pubsub "debug": "^4.1.0", diff --git a/src/core/components/name-pubsub.js b/src/core/components/name-pubsub.js index 3612b8d292..706afb832a 100644 --- a/src/core/components/name-pubsub.js +++ b/src/core/components/name-pubsub.js @@ -4,14 +4,48 @@ const debug = require('debug') const errcode = require('err-code') const promisify = require('promisify-es6') +const IpnsPubsubDatastore = require('../ipns/routing/pubsub-datastore') + const log = debug('jsipfs:name-pubsub') log.error = debug('jsipfs:name-pubsub:error') -const isNamePubsubEnabled = (node) => ( - Boolean(node._options.EXPERIMENTAL.ipnsPubsub && - node._libp2pNode && - node._libp2pNode._floodSub) -) +// Is pubsub enabled +const isNamePubsubEnabled = (node) => { + let pubsub + try { + pubsub = getPubsubRouting(node) + } catch (err) { + return false + } + + return Boolean(pubsub) +} + +// Get pubsub from IPNS routing +const getPubsubRouting = (node) => { + if (!node._ipns || !node._options.EXPERIMENTAL.ipnsPubsub) { + const errMsg = 'IPNS pubsub subsystem is not enabled' + + log.error(errMsg) + throw errcode(errMsg, 'ERR_IPNS_PUBSUB_NOT_ENABLED') + } + + // Only one store and it is pubsub + if (IpnsPubsubDatastore.isIpnsPubsubDatastore(node._ipns.routing)) { + return node._ipns.routing + } + + // Find in tiered + const pubsub = (node._ipns.routing.stores || []).find(s => IpnsPubsubDatastore.isIpnsPubsubDatastore(s)) + + if (!pubsub) { + const errMsg = 'IPNS pubsub datastore not found' + + log.error(errMsg) + throw errcode(errMsg, 'ERR_PUBSUB_DATASTORE_NOT_FOUND') + } + return pubsub +} module.exports = function namePubsub (self) { return { @@ -33,14 +67,14 @@ module.exports = function namePubsub (self) { * @returns {Promise|void} */ cancel: promisify((name, callback) => { - if (!isNamePubsubEnabled(self)) { - const errMsg = 'IPNS pubsub subsystem is not enabled' - - log.error(errMsg) - return callback(errcode(errMsg, 'ERR_IPNS_PUBSUB_NOT_ENABLED')) + let pubsub + try { + pubsub = getPubsubRouting(self) + } catch (err) { + return callback(err) } - self._ipns.pubsub.cancel(name, callback) + pubsub.cancel(name, callback) }), /** * Show current name subscriptions. @@ -49,14 +83,14 @@ module.exports = function namePubsub (self) { * @returns {Promise|void} */ subs: promisify((callback) => { - if (!isNamePubsubEnabled(self)) { - const errMsg = 'IPNS pubsub subsystem is not enabled' - - log.error(errMsg) - return callback(errcode(errMsg, 'ERR_IPNS_PUBSUB_NOT_ENABLED')) + let pubsub + try { + pubsub = getPubsubRouting(self) + } catch (err) { + return callback(err) } - self._ipns.pubsub.getSubscriptions(callback) + pubsub.getSubscriptions(callback) }) } } diff --git a/src/core/components/pre-start.js b/src/core/components/pre-start.js index 13d914acc4..9914a8167e 100644 --- a/src/core/components/pre-start.js +++ b/src/core/components/pre-start.js @@ -7,6 +7,10 @@ const waterfall = require('async/waterfall') const Keychain = require('libp2p-keychain') const defaultsDeep = require('@nodeutils/defaults-deep') const NoKeychain = require('./no-keychain') + +const IPNS = require('../ipns') +const OfflineDatastore = require('../ipns/routing/offline-datastore') + /* * Load stuff from Repo into memory */ @@ -95,6 +99,13 @@ module.exports = function preStart (self) { cb() }, + // Setup offline routing for IPNS. + (cb) => { + const offlineDatastore = new OfflineDatastore(self._repo) + + self._ipns = new IPNS(offlineDatastore, self) + cb() + }, (cb) => self.pin._load(cb) ], callback) } diff --git a/src/core/ipns/routing/offline-datastore.js b/src/core/ipns/routing/offline-datastore.js index 26de52528c..bc33099e5f 100644 --- a/src/core/ipns/routing/offline-datastore.js +++ b/src/core/ipns/routing/offline-datastore.js @@ -3,11 +3,6 @@ const { Key } = require('interface-datastore') const { encodeBase32 } = require('./utils') -const errcode = require('err-code') -const debug = require('debug') -const log = debug('jsipfs:ipns:offline-datastore') -log.error = debug('jsipfs:ipns:offline-datastore:error') - // Offline datastore aims to mimic the same encoding as routing when storing records // to the local datastore class OfflineDatastore { diff --git a/src/core/ipns/routing/pubsub-datastore.js b/src/core/ipns/routing/pubsub-datastore.js index b58ee97483..0556698c4b 100644 --- a/src/core/ipns/routing/pubsub-datastore.js +++ b/src/core/ipns/routing/pubsub-datastore.js @@ -2,9 +2,10 @@ const ipns = require('ipns') const { fromB58String, toB58String } = require('multihashes') -const PeerId = require('peer-id') const PubsubDatastore = require('datastore-pubsub') +const withIs = require('class-is') + const errcode = require('err-code') const debug = require('debug') const log = debug('jsipfs:ipns:pubsub') @@ -167,4 +168,5 @@ class IpnsPubsubDatastore { } } -exports = module.exports = IpnsPubsubDatastore +// exports = module.exports = IpnsPubsubDatastore +exports = module.exports = withIs(IpnsPubsubDatastore, { className: 'IpnsPubsubDatastore', symbolName: '@js-ipfs/ipns/IpnsPubsubDatastore' }) diff --git a/test/cli/name-pubsub.js b/test/cli/name-pubsub.js index 79416ea40f..dbd89b8698 100644 --- a/test/cli/name-pubsub.js +++ b/test/cli/name-pubsub.js @@ -82,10 +82,9 @@ describe('name-pubsub', () => { }) // Connect - before(function (done) { - ipfsA('swarm', 'connect', bMultiaddr).then((out) => { + before(function () { + return ipfsA('swarm', 'connect', bMultiaddr).then((out) => { expect(out).to.eql(`connect ${bMultiaddr} success\n`) - done() }) }) @@ -97,63 +96,69 @@ describe('name-pubsub', () => { done() }) - it('should get enabled state of pubsub', function (done) { - ipfsA('name pubsub state').then((res) => { + it('should get enabled state of pubsub', function () { + return ipfsA('name pubsub state').then((res) => { expect(res).to.exist() expect(res).to.have.string('enabled') // enabled - - done() }) }) - it('should subscribe on name resolve', function (done) { + it('should subscribe on name resolve', function () { this.timeout(80 * 1000) - ipfsB(`name resolve ${nodeAId.id}`).catch((err) => { - expect(err).to.exist() // Not available (subscribed) + return ipfsB(`name resolve ${nodeAId.id}`) + .catch((err) => { + expect(err).to.exist() // Not available (subscribed) - ipfsB('pubsub ls').then((res) => { - expect(res).to.exist() - expect(res).to.have.string('/ipns/') // have an ipns subscribtion + return Promise.all([ + ipfsB('pubsub ls'), + ipfsB('name pubsub subs') + ]) + .then((res) => { + expect(res).to.exist() - ipfsB('name pubsub subs').then((res) => { - expect(res).to.exist() - expect(res).to.have.string(`/ipns/${nodeAId.id}`) // have subscription + expect(res[0]).to.exist() + expect(res[0]).to.have.string('/ipns/') // have an ipns subscribtion - done() - }) + expect(res[1]).to.exist() + expect(res[1]).to.have.string(`/ipns/${nodeAId.id}`) // have subscription + }) }) - }) }) - it('should be able to cancel subscriptions', function (done) { + it('should be able to cancel subscriptions', function () { this.timeout(80 * 1000) - ipfsA(`name pubsub cancel /ipns/${nodeBId.id}`).then((res) => { - expect(res).to.exist() - expect(res).to.have.string('no subscription') // tried to cancel a not yet subscribed id - - ipfsA(`name resolve ${nodeBId.id}`).catch((err) => { - expect(err).to.exist() // Not available (subscribed now) - - ipfsA(`name pubsub cancel /ipns/${nodeBId.id}`).then((res) => { - expect(res).to.exist() - expect(res).to.have.string('canceled') // canceled now - - ipfsA('pubsub ls').then((res) => { - expect(res).to.exist() - expect(res).to.not.have.string('/ipns/') // ipns subscribtion not available - - ipfsA('name pubsub subs').then((res) => { - expect(res).to.exist() - expect(res).to.not.have.string(`/ipns/${nodeBId.id}`) // ipns subscribtion not available - - done() - }) + return ipfsA(`name pubsub cancel /ipns/${nodeBId.id}`) + .then((res) => { + expect(res).to.exist() + expect(res).to.have.string('no subscription') // tried to cancel a not yet subscribed id + + return ipfsA(`name resolve ${nodeBId.id}`) + .catch((err) => { + expect(err).to.exist() // Not available (subscribed now) + + return ipfsA(`name pubsub cancel /ipns/${nodeBId.id}`) + .then((res) => { + expect(res).to.exist() + expect(res).to.have.string('canceled') // canceled now + + return Promise.all([ + ipfsA('pubsub ls'), + ipfsA('name pubsub subs') + ]) + .then((res) => { + expect(res).to.exist() + + expect(res[0]).to.exist() + expect(res[0]).to.not.have.string('/ipns/') // ipns subscribtion not available + + expect(res[1]).to.exist() + expect(res[1]).to.not.have.string(`/ipns/${nodeBId.id}`) // ipns subscribtion not available + }) + }) }) - }) }) - }) }) }) @@ -168,35 +173,40 @@ describe('name-pubsub', () => { }) }) - it('should publish the received record to the subscriber', function (done) { + it('should publish the received record to the subscriber', function () { this.timeout(80 * 1000) - ipfsB(`name resolve ${nodeBId.id}`).then((res) => { - expect(res).to.exist() - expect(res).to.satisfy(checkAll([emptyDirCid])) // Empty dir received (subscribed) - - ipfsA(`name resolve ${nodeBId.id}`).catch((err) => { - expect(err).to.exist() // Not available (subscribed now) - - ipfsB(`name publish ${cidAdded}`).then((res) => { - // published to IpfsB and published through pubsub to ipfsa - expect(res).to.exist() - expect(res).to.satisfy(checkAll([cidAdded, nodeBId.id])) - - ipfsB(`name resolve ${nodeBId.id}`).then((res) => { - expect(res).to.exist() - expect(res).to.satisfy(checkAll([cidAdded])) - - ipfsA(`name resolve ${nodeBId.id}`).then((res) => { - expect(res).to.exist() - expect(res).to.satisfy(checkAll([cidAdded])) // value propagated to node B - - done() - }) + return ipfsB(`name resolve ${nodeBId.id}`) + .then((res) => { + expect(res).to.exist() + expect(res).to.satisfy(checkAll([emptyDirCid])) // Empty dir received (subscribed) + + return ipfsA(`name resolve ${nodeBId.id}`) + .catch((err) => { + expect(err).to.exist() // Not available (subscribed now) + + return ipfsB(`name publish ${cidAdded}`) + .then((res) => { + // published to IpfsB and published through pubsub to ipfsa + expect(res).to.exist() + expect(res).to.satisfy(checkAll([cidAdded, nodeBId.id])) + + return Promise.all([ + ipfsB(`name resolve ${nodeBId.id}`), + ipfsA(`name resolve ${nodeBId.id}`) + ]) + .then((res) => { + expect(res).to.exist() + + expect(res[0]).to.exist() + expect(res[0]).to.satisfy(checkAll([cidAdded])) + + expect(res[1]).to.exist() + expect(res[1]).to.satisfy(checkAll([cidAdded])) // value propagated to node B + }) + }) }) - }) }) - }) }) }) }) @@ -237,7 +247,7 @@ describe('name-pubsub', () => { it('should get error getting the available subscriptions', function (done) { ipfsA('name pubsub subs').catch((err) => { expect(err).to.exist() // error as it is disabled - + expect(err.toString()).to.have.string('IPNS pubsub subsystem is not enabled') done() }) }) @@ -245,6 +255,7 @@ describe('name-pubsub', () => { it('should get error canceling a subscription', function (done) { ipfsA('name pubsub cancel /ipns/QmSWxaPcGgf4TDnFEBDWz2JnbHywF14phmY9hNcAeBEK5v').catch((err) => { expect(err).to.exist() // error as it is disabled + expect(err.toString()).to.have.string('IPNS pubsub subsystem is not enabled') done() }) From 9bb8816ae447de15c143aa455cb4ba4252787641 Mon Sep 17 00:00:00 2001 From: Vasco Santos Date: Tue, 23 Oct 2018 14:55:25 +0100 Subject: [PATCH 5/9] fix: code review for tests --- package.json | 14 +- src/core/components/init.js | 3 - src/core/components/pre-start.js | 11 -- src/core/components/start.js | 2 +- src/core/ipns/index.js | 5 +- src/core/ipns/routing/offline-datastore.js | 5 + test/cli/name-pubsub.js | 180 ++++++++++----------- test/core/name-pubsub.js | 83 ++++++++++ 8 files changed, 179 insertions(+), 124 deletions(-) create mode 100644 test/core/name-pubsub.js diff --git a/package.json b/package.json index f920bfee7e..28d8e62273 100644 --- a/package.json +++ b/package.json @@ -69,7 +69,7 @@ "execa": "^1.0.0", "form-data": "^2.3.3", "hat": "0.0.3", - "interface-ipfs-core": "~0.88.0", + "interface-ipfs-core": "ipfs/interface-ipfs-core#fix/ipns-over-pubsub-tests", "ipfsd-ctl": "~0.40.1", "ncp": "^2.0.0", "qs": "^6.5.2", @@ -80,7 +80,6 @@ "dependencies": { "@nodeutils/defaults-deep": "^1.1.0", "async": "^2.6.1", - "base32.js": "~0.1.0", "big.js": "^5.2.2", "binary-querystring": "~0.1.2", "bl": "^2.1.2", @@ -89,16 +88,9 @@ "byteman": "^1.3.5", "cid-tool": "~0.2.0", "cids": "~0.5.5", -<<<<<<< HEAD -<<<<<<< HEAD - "datastore-core": "~0.6.0", -======= -======= "class-is": "^1.1.0", - "datastore-core": "~0.4.0", ->>>>>>> fix: second pass of code review - "datastore-pubsub": "~0.0.2", ->>>>>>> feat: ipns over pubsub + "datastore-core": "~0.6.0", + "datastore-pubsub": "ipfs/js-datastore-pubsub#feat/encode-record-store-keys", "debug": "^4.1.0", "deep-extend": "~0.6.0", "err-code": "^1.1.2", diff --git a/src/core/components/init.js b/src/core/components/init.js index 4069092a38..51c197e977 100644 --- a/src/core/components/init.js +++ b/src/core/components/init.js @@ -15,9 +15,6 @@ const UnixFs = require('ipfs-unixfs') const IPNS = require('../ipns') const OfflineDatastore = require('../ipns/routing/offline-datastore') -const IPNS = require('../ipns') -const OfflineDatastore = require('../ipns/routing/offline-datastore') - const addDefaultAssets = require('./init-assets') module.exports = function init (self) { diff --git a/src/core/components/pre-start.js b/src/core/components/pre-start.js index 9914a8167e..13d914acc4 100644 --- a/src/core/components/pre-start.js +++ b/src/core/components/pre-start.js @@ -7,10 +7,6 @@ const waterfall = require('async/waterfall') const Keychain = require('libp2p-keychain') const defaultsDeep = require('@nodeutils/defaults-deep') const NoKeychain = require('./no-keychain') - -const IPNS = require('../ipns') -const OfflineDatastore = require('../ipns/routing/offline-datastore') - /* * Load stuff from Repo into memory */ @@ -99,13 +95,6 @@ module.exports = function preStart (self) { cb() }, - // Setup offline routing for IPNS. - (cb) => { - const offlineDatastore = new OfflineDatastore(self._repo) - - self._ipns = new IPNS(offlineDatastore, self) - cb() - }, (cb) => self.pin._load(cb) ], callback) } diff --git a/src/core/components/start.js b/src/core/components/start.js index 8f58712ca9..8a8fe10a85 100644 --- a/src/core/components/start.js +++ b/src/core/components/start.js @@ -61,7 +61,7 @@ module.exports = (self) => { // Create ipns routing with a set of datastores const routing = new TieredDatastore(ipnsStores) - self._ipns = new IPNS(routing, self._repo, self._peerInfo, self._keychain, self._options, pubsubDs) + self._ipns = new IPNS(routing, self._repo, self._peerInfo, self._keychain, self._options) self._bitswap = new Bitswap( self._libp2pNode, diff --git a/src/core/ipns/index.js b/src/core/ipns/index.js index 6092c043f9..06b43ca330 100644 --- a/src/core/ipns/index.js +++ b/src/core/ipns/index.js @@ -17,13 +17,12 @@ const path = require('./path') const defaultRecordTtl = 60 * 1000 class IPNS { - constructor (routing, repo, peerInfo, keychain, options, pubsub) { + constructor (routing, repo, peerInfo, keychain, options) { this.publisher = new IpnsPublisher(routing, repo) this.republisher = new IpnsRepublisher(this.publisher, repo, peerInfo, keychain, options) this.resolver = new IpnsResolver(routing) this.cache = new Receptacle({ max: 1000 }) // Create an LRU cache with max 1000 items this.routing = routing - this.pubsub = pubsub } // Publish @@ -72,7 +71,7 @@ class IPNS { options = options || {} // If recursive, we should not try to get the cached value - if (options.nocache && !options.recursive) { + if (!options.nocache && !options.recursive) { // Try to get the record from cache const id = name.split('/')[2] const result = this.cache.get(id) diff --git a/src/core/ipns/routing/offline-datastore.js b/src/core/ipns/routing/offline-datastore.js index bc33099e5f..26de52528c 100644 --- a/src/core/ipns/routing/offline-datastore.js +++ b/src/core/ipns/routing/offline-datastore.js @@ -3,6 +3,11 @@ const { Key } = require('interface-datastore') const { encodeBase32 } = require('./utils') +const errcode = require('err-code') +const debug = require('debug') +const log = debug('jsipfs:ipns:offline-datastore') +log.error = debug('jsipfs:ipns:offline-datastore:error') + // Offline datastore aims to mimic the same encoding as routing when storing records // to the local datastore class OfflineDatastore { diff --git a/test/cli/name-pubsub.js b/test/cli/name-pubsub.js index dbd89b8698..6b252f67a1 100644 --- a/test/cli/name-pubsub.js +++ b/test/cli/name-pubsub.js @@ -83,24 +83,21 @@ describe('name-pubsub', () => { // Connect before(function () { - return ipfsA('swarm', 'connect', bMultiaddr).then((out) => { - expect(out).to.eql(`connect ${bMultiaddr} success\n`) - }) + return ipfsA('swarm', 'connect', bMultiaddr) + .then((out) => { + expect(out).to.eql(`connect ${bMultiaddr} success\n`) + }) }) after((done) => parallel(nodes.map((node) => (cb) => node.stop(cb)), done)) describe('pubsub commands', () => { - before(function (done) { - this.timeout(50 * 1000) - done() - }) - it('should get enabled state of pubsub', function () { - return ipfsA('name pubsub state').then((res) => { - expect(res).to.exist() - expect(res).to.have.string('enabled') // enabled - }) + return ipfsA('name pubsub state') + .then((res) => { + expect(res).to.exist() + expect(res).to.have.string('enabled') // enabled + }) }) it('should subscribe on name resolve', function () { @@ -110,19 +107,17 @@ describe('name-pubsub', () => { .catch((err) => { expect(err).to.exist() // Not available (subscribed) - return Promise.all([ - ipfsB('pubsub ls'), - ipfsB('name pubsub subs') - ]) - .then((res) => { - expect(res).to.exist() - - expect(res[0]).to.exist() - expect(res[0]).to.have.string('/ipns/') // have an ipns subscribtion + return ipfsB('pubsub ls') + }) + .then((res) => { + expect(res).to.exist() + expect(res).to.have.string('/record/') // have a record ipns subscribtion - expect(res[1]).to.exist() - expect(res[1]).to.have.string(`/ipns/${nodeAId.id}`) // have subscription - }) + return ipfsB('name pubsub subs') + }) + .then((res) => { + expect(res).to.exist() + expect(res).to.have.string(`/ipns/${nodeAId.id}`) // have subscription }) }) @@ -135,29 +130,27 @@ describe('name-pubsub', () => { expect(res).to.have.string('no subscription') // tried to cancel a not yet subscribed id return ipfsA(`name resolve ${nodeBId.id}`) - .catch((err) => { - expect(err).to.exist() // Not available (subscribed now) - - return ipfsA(`name pubsub cancel /ipns/${nodeBId.id}`) - .then((res) => { - expect(res).to.exist() - expect(res).to.have.string('canceled') // canceled now - - return Promise.all([ - ipfsA('pubsub ls'), - ipfsA('name pubsub subs') - ]) - .then((res) => { - expect(res).to.exist() - - expect(res[0]).to.exist() - expect(res[0]).to.not.have.string('/ipns/') // ipns subscribtion not available - - expect(res[1]).to.exist() - expect(res[1]).to.not.have.string(`/ipns/${nodeBId.id}`) // ipns subscribtion not available - }) - }) - }) + }) + .catch((err) => { + expect(err).to.exist() // Not available (subscribed now) + + return ipfsA(`name pubsub cancel /ipns/${nodeBId.id}`) + }) + .then((res) => { + expect(res).to.exist() + expect(res).to.have.string('canceled') // canceled now + + return ipfsA('pubsub ls') + }) + .then((res) => { + expect(res).to.exist() + expect(res).to.not.have.string('/ipns/') // ipns subscribtion not available + + return ipfsA('name pubsub subs') + }) + .then((res) => { + expect(res).to.exist() + expect(res).to.not.have.string(`/ipns/${nodeBId.id}`) // ipns subscribtion not available }) }) }) @@ -167,10 +160,11 @@ describe('name-pubsub', () => { before(function (done) { this.timeout(50 * 1000) - ipfsA('add src/init-files/init-docs/readme').then((out) => { - cidAdded = out.split(' ')[1] - done() - }) + ipfsA('add src/init-files/init-docs/readme') + .then((out) => { + cidAdded = out.split(' ')[1] + done() + }) }) it('should publish the received record to the subscriber', function () { @@ -182,30 +176,28 @@ describe('name-pubsub', () => { expect(res).to.satisfy(checkAll([emptyDirCid])) // Empty dir received (subscribed) return ipfsA(`name resolve ${nodeBId.id}`) - .catch((err) => { - expect(err).to.exist() // Not available (subscribed now) - - return ipfsB(`name publish ${cidAdded}`) - .then((res) => { - // published to IpfsB and published through pubsub to ipfsa - expect(res).to.exist() - expect(res).to.satisfy(checkAll([cidAdded, nodeBId.id])) - - return Promise.all([ - ipfsB(`name resolve ${nodeBId.id}`), - ipfsA(`name resolve ${nodeBId.id}`) - ]) - .then((res) => { - expect(res).to.exist() - - expect(res[0]).to.exist() - expect(res[0]).to.satisfy(checkAll([cidAdded])) - - expect(res[1]).to.exist() - expect(res[1]).to.satisfy(checkAll([cidAdded])) // value propagated to node B - }) - }) - }) + }) + .catch((err) => { + expect(err).to.exist() // Not available (subscribed now) + + return ipfsB(`name publish ${cidAdded}`) + }) + .then((res) => { + // published to IpfsB and published through pubsub to ipfsa + expect(res).to.exist() + expect(res).to.satisfy(checkAll([cidAdded, nodeBId.id])) + + return ipfsB(`name resolve ${nodeBId.id}`) + }) + .then((res) => { + expect(res).to.exist() + expect(res).to.satisfy(checkAll([cidAdded])) + + return ipfsA(`name resolve ${nodeBId.id}`) + }) + .then((res) => { + expect(res).to.exist() + expect(res).to.satisfy(checkAll([cidAdded])) // value propagated to node B }) }) }) @@ -235,30 +227,28 @@ describe('name-pubsub', () => { after((done) => parallel(nodes.map((node) => (cb) => node.stop(cb)), done)) - it('should get disabled state of pubsub', function (done) { - ipfsA('name pubsub state').then((res) => { - expect(res).to.exist() - expect(res).to.have.string('disabled') - - done() - }) + it('should get disabled state of pubsub', function () { + return ipfsA('name pubsub state') + .then((res) => { + expect(res).to.exist() + expect(res).to.have.string('disabled') + }) }) - it('should get error getting the available subscriptions', function (done) { - ipfsA('name pubsub subs').catch((err) => { - expect(err).to.exist() // error as it is disabled - expect(err.toString()).to.have.string('IPNS pubsub subsystem is not enabled') - done() - }) + it('should get error getting the available subscriptions', function () { + return ipfsA('name pubsub subs') + .catch((err) => { + expect(err).to.exist() // error as it is disabled + expect(err.toString()).to.have.string('IPNS pubsub subsystem is not enabled') + }) }) - it('should get error canceling a subscription', function (done) { - ipfsA('name pubsub cancel /ipns/QmSWxaPcGgf4TDnFEBDWz2JnbHywF14phmY9hNcAeBEK5v').catch((err) => { - expect(err).to.exist() // error as it is disabled - expect(err.toString()).to.have.string('IPNS pubsub subsystem is not enabled') - - done() - }) + it('should get error canceling a subscription', function () { + return ipfsA('name pubsub cancel /ipns/QmSWxaPcGgf4TDnFEBDWz2JnbHywF14phmY9hNcAeBEK5v') + .catch((err) => { + expect(err).to.exist() // error as it is disabled + expect(err.toString()).to.have.string('IPNS pubsub subsystem is not enabled') + }) }) }) }) diff --git a/test/core/name-pubsub.js b/test/core/name-pubsub.js new file mode 100644 index 0000000000..510fd44599 --- /dev/null +++ b/test/core/name-pubsub.js @@ -0,0 +1,83 @@ +/* eslint max-nested-callbacks: ["error", 6] */ +/* eslint-env mocha */ +'use strict' + +const hat = require('hat') +const chai = require('chai') +const dirtyChai = require('dirty-chai') +const expect = chai.expect +chai.use(dirtyChai) + +const parallel = require('async/parallel') + +const isNode = require('detect-node') +const IPFS = require('../../src') + +const DaemonFactory = require('ipfsd-ctl') +const df = DaemonFactory.create({ type: 'proc' }) + +const ipfsRef = '/ipfs/QmPFVLPmp9zv5Z5KUqLhe2EivAGccQW2r7M7jhVJGLZoZU' + +describe('name-pubsub', function () { + if (!isNode) { + return + } + + let nodes + let nodeA + let nodeB + let idA + + const createNode = (callback) => { + df.spawn({ + exec: IPFS, + args: [`--pass ${hat()}`, '--enable-namesys-pubsub'], + config: { Bootstrap: [] } + }, callback) + } + + before(function (done) { + this.timeout(40 * 1000) + + parallel([ + (cb) => createNode(cb), + (cb) => createNode(cb) + ], (err, _nodes) => { + expect(err).to.not.exist() + + nodes = _nodes + nodeA = _nodes[0].api + nodeB = _nodes[1].api + + parallel([ + (cb) => nodeA.id(cb), + (cb) => nodeB.id(cb) + ], (err, ids) => { + expect(err).to.not.exist() + + idA = ids[0] + nodeA.swarm.connect(ids[1].addresses[0], done) + }) + }) + }) + + after((done) => parallel(nodes.map((node) => (cb) => node.stop(cb)), done)) + + it('should publish and then resolve correctly', function (done) { + nodeB.name.resolve(idA.id, (err) => { + expect(err).to.exist() + + nodeA.name.publish(ipfsRef, { resolve: false }, (err, res) => { + expect(err).to.not.exist() + expect(res).to.exist() + + nodeB.name.resolve(idA.id, (err, res) => { + expect(err).to.not.exist() + expect(res).to.exist() + expect(res.path).to.equal(ipfsRef) + done() + }) + }) + }) + }) +}) From 62f25a69824d71d2c9c89203ecf1944ca372382b Mon Sep 17 00:00:00 2001 From: Vasco Santos Date: Thu, 22 Nov 2018 12:15:57 +0000 Subject: [PATCH 6/9] fix: remove libp2p-record for pubsub --- package.json | 4 ++-- src/core/components/pubsub.js | 14 +++++++------ src/core/ipns/publisher.js | 20 ++++--------------- src/core/ipns/resolver.js | 4 +--- src/core/ipns/routing/offline-datastore.js | 23 ++++++++++++++++++++-- src/core/ipns/routing/pubsub-datastore.js | 1 - 6 files changed, 36 insertions(+), 30 deletions(-) diff --git a/package.json b/package.json index 28d8e62273..cd3223f3d1 100644 --- a/package.json +++ b/package.json @@ -90,7 +90,7 @@ "cids": "~0.5.5", "class-is": "^1.1.0", "datastore-core": "~0.6.0", - "datastore-pubsub": "ipfs/js-datastore-pubsub#feat/encode-record-store-keys", + "datastore-pubsub": "~0.1.1", "debug": "^4.1.0", "deep-extend": "~0.6.0", "err-code": "^1.1.2", @@ -120,7 +120,7 @@ "ipld-ethereum": "^2.0.1", "ipld-git": "~0.2.2", "ipld-zcash": "~0.1.6", - "ipns": "~0.3.0", + "ipns": "~0.4.2", "is-ipfs": "~0.4.7", "is-pull-stream": "~0.0.0", "is-stream": "^1.1.0", diff --git a/src/core/components/pubsub.js b/src/core/components/pubsub.js index f22f34cf34..8bb15910bc 100644 --- a/src/core/components/pubsub.js +++ b/src/core/components/pubsub.js @@ -8,6 +8,8 @@ const errPubsubDisabled = () => { return errCode(new Error('pubsub experiment is not enabled'), 'ERR_PUBSUB_DISABLED') } +const pubsubEnabled = (options) => options.EXPERIMENTAL.pubsub || options.EXPERIMENTAL.ipnsPubsub + module.exports = function pubsub (self) { return { subscribe: (topic, handler, options, callback) => { @@ -16,7 +18,7 @@ module.exports = function pubsub (self) { options = {} } - if (!self._options.EXPERIMENTAL.pubsub) { + if (!pubsubEnabled(self._options)) { return callback ? setImmediate(() => callback(errPubsubDisabled())) : Promise.reject(errPubsubDisabled()) @@ -37,7 +39,7 @@ module.exports = function pubsub (self) { }, unsubscribe: (topic, handler, callback) => { - if (!self._options.EXPERIMENTAL.pubsub) { + if (!pubsubEnabled(self._options)) { return callback ? setImmediate(() => callback(errPubsubDisabled())) : Promise.reject(errPubsubDisabled()) @@ -53,28 +55,28 @@ module.exports = function pubsub (self) { }, publish: promisify((topic, data, callback) => { - if (!self._options.EXPERIMENTAL.pubsub) { + if (!pubsubEnabled(self._options)) { return setImmediate(() => callback(errPubsubDisabled())) } self._libp2pNode.pubsub.publish(topic, data, callback) }), ls: promisify((callback) => { - if (!self._options.EXPERIMENTAL.pubsub) { + if (!pubsubEnabled(self._options)) { return setImmediate(() => callback(errPubsubDisabled())) } self._libp2pNode.pubsub.ls(callback) }), peers: promisify((topic, callback) => { - if (!self._options.EXPERIMENTAL.pubsub) { + if (!pubsubEnabled(self._options)) { return setImmediate(() => callback(errPubsubDisabled())) } self._libp2pNode.pubsub.peers(topic, callback) }), setMaxListeners (n) { - if (!self._options.EXPERIMENTAL.pubsub) { + if (!pubsubEnabled(self._options)) { throw errPubsubDisabled() } self._libp2pNode.pubsub.setMaxListeners(n) diff --git a/src/core/ipns/publisher.js b/src/core/ipns/publisher.js index 9e8fcdf3f3..4be4fc41bb 100644 --- a/src/core/ipns/publisher.js +++ b/src/core/ipns/publisher.js @@ -1,7 +1,6 @@ 'use strict' const PeerId = require('peer-id') -const Record = require('libp2p-record').Record const { Key } = require('interface-datastore') const series = require('async/series') const errcode = require('err-code') @@ -97,19 +96,17 @@ class IpnsPublisher { return callback(errcode(new Error(errMsg), 'ERR_INVALID_DATASTORE_KEY')) } - let rec + let entryData try { // Marshal record - const entryData = ipns.marshal(entry) - // Marshal to libp2p record - rec = new Record(key.toBuffer(), entryData) + entryData = ipns.marshal(entry) } catch (err) { log.error(err) return callback(err) } // Add record to routing (buffer key) - this._routing.put(key.toBuffer(), rec.serialize(), (err, res) => { + this._routing.put(key.toBuffer(), entryData, (err, res) => { if (err) { const errMsg = `ipns record for ${key.toString()} could not be stored in the routing` @@ -137,17 +134,8 @@ class IpnsPublisher { return callback(errcode(new Error(errMsg), 'ERR_UNDEFINED_PARAMETER')) } - let rec - try { - // Marshal to libp2p record - rec = new Record(key.toBuffer(), publicKey.bytes) - } catch (err) { - log.error(err) - return callback(err) - } - // Add public key to routing (buffer key) - this._routing.put(key.toBuffer(), rec.serialize(), (err, res) => { + this._routing.put(key.toBuffer(), publicKey.bytes, (err, res) => { if (err) { const errMsg = `public key for ${key.toString()} could not be stored in the routing` diff --git a/src/core/ipns/resolver.js b/src/core/ipns/resolver.js index f9fb81ccd5..8ff9c38ab3 100644 --- a/src/core/ipns/resolver.js +++ b/src/core/ipns/resolver.js @@ -1,7 +1,6 @@ 'use strict' const ipns = require('ipns') -const Record = require('libp2p-record').Record const PeerId = require('peer-id') const errcode = require('err-code') @@ -119,8 +118,7 @@ class IpnsResolver { let ipnsEntry try { - const record = Record.deserialize(res) - ipnsEntry = ipns.unmarshal(record.value) + ipnsEntry = ipns.unmarshal(res) } catch (err) { const errMsg = `found ipns record that we couldn't convert to a value` diff --git a/src/core/ipns/routing/offline-datastore.js b/src/core/ipns/routing/offline-datastore.js index 26de52528c..84a49545eb 100644 --- a/src/core/ipns/routing/offline-datastore.js +++ b/src/core/ipns/routing/offline-datastore.js @@ -1,6 +1,7 @@ 'use strict' const { Key } = require('interface-datastore') +const Record = require('libp2p-record').Record const { encodeBase32 } = require('./utils') const errcode = require('err-code') @@ -48,7 +49,10 @@ class OfflineDatastore { return callback(errcode(new Error(errMsg), 'ERR_GENERATING_ROUTING_KEY')) } - this._repo.datastore.put(routingKey, value, callback) + // Marshal to libp2p record as the DHT does + let record = new Record(key, value) + + this._repo.datastore.put(routingKey, record.serialize(), callback) } /** @@ -76,7 +80,22 @@ class OfflineDatastore { return callback(errcode(new Error(errMsg), 'ERR_GENERATING_ROUTING_KEY')) } - this._repo.datastore.get(routingKey, callback) + this._repo.datastore.get(routingKey, (err, res) => { + if (err) { + return callback(err) + } + + // Unmarshal libp2p record as the DHT does + let record + try { + record = Record.deserialize(res) + } catch (err) { + log.error(err) + return callback(err) + } + + callback(null, record.value) + }) } // encode key properly - base32(/ipns/{cid}) diff --git a/src/core/ipns/routing/pubsub-datastore.js b/src/core/ipns/routing/pubsub-datastore.js index 0556698c4b..17d4b718e9 100644 --- a/src/core/ipns/routing/pubsub-datastore.js +++ b/src/core/ipns/routing/pubsub-datastore.js @@ -89,7 +89,6 @@ class IpnsPubsubDatastore { } // Modify subscription key to have a proper encoding - // Without this, the utf-8 encoding gets the key broken _handleSubscriptionKey (key, callback) { const subscriber = this._subscriptions[key] From b2755a0ad4bbd736663b53d848e47c83b103eb5f Mon Sep 17 00:00:00 2001 From: Hugo Dias Date: Fri, 30 Nov 2018 12:18:08 +0000 Subject: [PATCH 7/9] fix: code review Co-Authored-By: vasco-santos --- package.json | 3 ++- src/core/components/name-pubsub.js | 8 ++------ src/core/components/start.js | 2 +- src/core/config.js | 2 +- src/core/ipns/routing/offline-datastore.js | 2 +- src/core/ipns/routing/pubsub-datastore.js | 22 +++++++++------------- test/core/name-pubsub.js | 2 ++ 7 files changed, 18 insertions(+), 23 deletions(-) diff --git a/package.json b/package.json index cd3223f3d1..ad7c1cbe50 100644 --- a/package.json +++ b/package.json @@ -93,6 +93,7 @@ "datastore-pubsub": "~0.1.1", "debug": "^4.1.0", "deep-extend": "~0.6.0", + "dlv": "^1.1.2", "err-code": "^1.1.2", "file-type": "^10.2.0", "fnv1a": "^1.0.1", @@ -120,7 +121,7 @@ "ipld-ethereum": "^2.0.1", "ipld-git": "~0.2.2", "ipld-zcash": "~0.1.6", - "ipns": "~0.4.2", + "ipns": "~0.4.3", "is-ipfs": "~0.4.7", "is-pull-stream": "~0.0.0", "is-stream": "^1.1.0", diff --git a/src/core/components/name-pubsub.js b/src/core/components/name-pubsub.js index 706afb832a..6e92a23387 100644 --- a/src/core/components/name-pubsub.js +++ b/src/core/components/name-pubsub.js @@ -11,14 +11,11 @@ log.error = debug('jsipfs:name-pubsub:error') // Is pubsub enabled const isNamePubsubEnabled = (node) => { - let pubsub try { - pubsub = getPubsubRouting(node) + return Boolean(getPubsubRouting(node)) } catch (err) { return false } - - return Boolean(pubsub) } // Get pubsub from IPNS routing @@ -26,7 +23,6 @@ const getPubsubRouting = (node) => { if (!node._ipns || !node._options.EXPERIMENTAL.ipnsPubsub) { const errMsg = 'IPNS pubsub subsystem is not enabled' - log.error(errMsg) throw errcode(errMsg, 'ERR_IPNS_PUBSUB_NOT_ENABLED') } @@ -41,9 +37,9 @@ const getPubsubRouting = (node) => { if (!pubsub) { const errMsg = 'IPNS pubsub datastore not found' - log.error(errMsg) throw errcode(errMsg, 'ERR_PUBSUB_DATASTORE_NOT_FOUND') } + return pubsub } diff --git a/src/core/components/start.js b/src/core/components/start.js index 8a8fe10a85..b2436d400a 100644 --- a/src/core/components/start.js +++ b/src/core/components/start.js @@ -2,7 +2,7 @@ const series = require('async/series') const Bitswap = require('ipfs-bitswap') -const get = require('lodash/get') +const get = require('dlv') const setImmediate = require('async/setImmediate') const promisify = require('promisify-es6') const { TieredDatastore } = require('datastore-core') diff --git a/src/core/config.js b/src/core/config.js index 73ee98038e..acad9d877f 100644 --- a/src/core/config.js +++ b/src/core/config.js @@ -29,7 +29,7 @@ const schema = Joi.object().keys({ }).allow(null), EXPERIMENTAL: Joi.object().keys({ pubsub: Joi.boolean(), - namesysPubsub: Joi.boolean(), + ipnsPubsub: Joi.boolean(), sharding: Joi.boolean(), dht: Joi.boolean() }).allow(null), diff --git a/src/core/ipns/routing/offline-datastore.js b/src/core/ipns/routing/offline-datastore.js index 84a49545eb..9c788c2f91 100644 --- a/src/core/ipns/routing/offline-datastore.js +++ b/src/core/ipns/routing/offline-datastore.js @@ -1,7 +1,7 @@ 'use strict' const { Key } = require('interface-datastore') -const Record = require('libp2p-record').Record +const { Record } = require('libp2p-record') const { encodeBase32 } = require('./utils') const errcode = require('err-code') diff --git a/src/core/ipns/routing/pubsub-datastore.js b/src/core/ipns/routing/pubsub-datastore.js index 17d4b718e9..2d29b5e4fe 100644 --- a/src/core/ipns/routing/pubsub-datastore.js +++ b/src/core/ipns/routing/pubsub-datastore.js @@ -11,10 +11,7 @@ const debug = require('debug') const log = debug('jsipfs:ipns:pubsub') log.error = debug('jsipfs:ipns:pubsub:error') -const ipnsNS = '/ipns/' -const ipnsNSLength = ipnsNS.length - -// Pubsub aims to manage the pubsub subscriptions for IPNS +// Pubsub datastore aims to manage the pubsub subscriptions for IPNS class IpnsPubsubDatastore { constructor (pubsub, localDatastore, peerId) { this._pubsub = pubsub @@ -68,11 +65,11 @@ class IpnsPubsubDatastore { this._pubsubDs.get(key, (err, res) => { // Add topic subscribed - const ns = key.slice(0, ipnsNSLength) + const ns = key.slice(0, ipns.namespaceLength) - if (ns.toString() === ipnsNS) { + if (ns.toString() === ipns.namespace) { const stringifiedTopic = key.toString() - const id = toB58String(key.slice(ipnsNSLength)) + const id = toB58String(key.slice(ipns.namespaceLength)) this._subscriptions[stringifiedTopic] = id @@ -116,9 +113,9 @@ class IpnsPubsubDatastore { * @returns {void} */ getSubscriptions (callback) { - const subscriptions = Object.values(this._subscriptions) + const subscriptions = Object.values(this._subscriptions).filter(Boolean) - return callback(null, subscriptions.map((sub) => `/ipns/${sub}`)) + return callback(null, subscriptions.map((sub) => `${ipns.namespace}${sub}`)) } /** @@ -136,8 +133,8 @@ class IpnsPubsubDatastore { } // Trim /ipns/ prefix from the name - if (name.startsWith(ipnsNS)) { - name = name.substring(ipnsNSLength) + if (name.startsWith(ipns.namespace)) { + name = name.substring(ipns.namespaceLength) } const stringifiedTopic = Object.keys(this._subscriptions).find((key) => this._subscriptions[key] === name) @@ -158,7 +155,7 @@ class IpnsPubsubDatastore { return callback(err) } - delete this._subscriptions[stringifiedTopic] + this._subscriptions[stringifiedTopic] = undefined log(`unsubscribed pubsub ${stringifiedTopic}: ${name}`) callback(null, { @@ -167,5 +164,4 @@ class IpnsPubsubDatastore { } } -// exports = module.exports = IpnsPubsubDatastore exports = module.exports = withIs(IpnsPubsubDatastore, { className: 'IpnsPubsubDatastore', symbolName: '@js-ipfs/ipns/IpnsPubsubDatastore' }) diff --git a/test/core/name-pubsub.js b/test/core/name-pubsub.js index 510fd44599..a2225392bd 100644 --- a/test/core/name-pubsub.js +++ b/test/core/name-pubsub.js @@ -64,6 +64,8 @@ describe('name-pubsub', function () { after((done) => parallel(nodes.map((node) => (cb) => node.stop(cb)), done)) it('should publish and then resolve correctly', function (done) { + this.timeout(50 * 1000) + nodeB.name.resolve(idA.id, (err) => { expect(err).to.exist() From 7edade9f93d57611bb76a810f2500f3f37f0e513 Mon Sep 17 00:00:00 2001 From: Vasco Santos Date: Mon, 3 Dec 2018 14:47:40 +0000 Subject: [PATCH 8/9] fix: code review --- package.json | 1 - src/core/components/libp2p.js | 2 +- src/core/components/pubsub.js | 14 ++++++-------- src/core/components/start.js | 2 +- src/core/index.js | 5 +++++ src/core/ipns/routing/offline-datastore.js | 2 +- src/core/ipns/routing/pubsub-datastore.js | 21 --------------------- src/http/index.js | 13 +++++++------ 8 files changed, 21 insertions(+), 39 deletions(-) diff --git a/package.json b/package.json index ad7c1cbe50..68efb1ffc5 100644 --- a/package.json +++ b/package.json @@ -93,7 +93,6 @@ "datastore-pubsub": "~0.1.1", "debug": "^4.1.0", "deep-extend": "~0.6.0", - "dlv": "^1.1.2", "err-code": "^1.1.2", "file-type": "^10.2.0", "fnv1a": "^1.0.1", diff --git a/src/core/components/libp2p.js b/src/core/components/libp2p.js index 3d129d59c2..87838cb1e7 100644 --- a/src/core/components/libp2p.js +++ b/src/core/components/libp2p.js @@ -45,7 +45,7 @@ module.exports = function libp2p (self) { }, EXPERIMENTAL: { dht: get(opts.options, 'EXPERIMENTAL.dht', false), - pubsub: get(opts.options, 'EXPERIMENTAL.pubsub', false) || get(opts.options, 'EXPERIMENTAL.ipnsPubsub', false) + pubsub: get(opts.options, 'EXPERIMENTAL.pubsub', false) } }, connectionManager: get(opts.options, 'connectionManager', diff --git a/src/core/components/pubsub.js b/src/core/components/pubsub.js index 8bb15910bc..f22f34cf34 100644 --- a/src/core/components/pubsub.js +++ b/src/core/components/pubsub.js @@ -8,8 +8,6 @@ const errPubsubDisabled = () => { return errCode(new Error('pubsub experiment is not enabled'), 'ERR_PUBSUB_DISABLED') } -const pubsubEnabled = (options) => options.EXPERIMENTAL.pubsub || options.EXPERIMENTAL.ipnsPubsub - module.exports = function pubsub (self) { return { subscribe: (topic, handler, options, callback) => { @@ -18,7 +16,7 @@ module.exports = function pubsub (self) { options = {} } - if (!pubsubEnabled(self._options)) { + if (!self._options.EXPERIMENTAL.pubsub) { return callback ? setImmediate(() => callback(errPubsubDisabled())) : Promise.reject(errPubsubDisabled()) @@ -39,7 +37,7 @@ module.exports = function pubsub (self) { }, unsubscribe: (topic, handler, callback) => { - if (!pubsubEnabled(self._options)) { + if (!self._options.EXPERIMENTAL.pubsub) { return callback ? setImmediate(() => callback(errPubsubDisabled())) : Promise.reject(errPubsubDisabled()) @@ -55,28 +53,28 @@ module.exports = function pubsub (self) { }, publish: promisify((topic, data, callback) => { - if (!pubsubEnabled(self._options)) { + if (!self._options.EXPERIMENTAL.pubsub) { return setImmediate(() => callback(errPubsubDisabled())) } self._libp2pNode.pubsub.publish(topic, data, callback) }), ls: promisify((callback) => { - if (!pubsubEnabled(self._options)) { + if (!self._options.EXPERIMENTAL.pubsub) { return setImmediate(() => callback(errPubsubDisabled())) } self._libp2pNode.pubsub.ls(callback) }), peers: promisify((topic, callback) => { - if (!pubsubEnabled(self._options)) { + if (!self._options.EXPERIMENTAL.pubsub) { return setImmediate(() => callback(errPubsubDisabled())) } self._libp2pNode.pubsub.peers(topic, callback) }), setMaxListeners (n) { - if (!pubsubEnabled(self._options)) { + if (!self._options.EXPERIMENTAL.pubsub) { throw errPubsubDisabled() } self._libp2pNode.pubsub.setMaxListeners(n) diff --git a/src/core/components/start.js b/src/core/components/start.js index b2436d400a..8a8fe10a85 100644 --- a/src/core/components/start.js +++ b/src/core/components/start.js @@ -2,7 +2,7 @@ const series = require('async/series') const Bitswap = require('ipfs-bitswap') -const get = require('dlv') +const get = require('lodash/get') const setImmediate = require('async/setImmediate') const promisify = require('promisify-es6') const { TieredDatastore } = require('datastore-core') diff --git a/src/core/index.js b/src/core/index.js index f00d29e00c..8425cab7a2 100644 --- a/src/core/index.js +++ b/src/core/index.js @@ -166,6 +166,11 @@ class IPFS extends EventEmitter { this.log('EXPERIMENTAL pubsub is enabled') } if (this._options.EXPERIMENTAL.ipnsPubsub) { + if (!this._options.EXPERIMENTAL.pubsub) { + this.log('EXPERIMENTAL pubsub is enabled to use IPNS pubsub') + this._options.EXPERIMENTAL.pubsub = true + } + this.log('EXPERIMENTAL IPNS pubsub is enabled') } if (this._options.EXPERIMENTAL.sharding) { diff --git a/src/core/ipns/routing/offline-datastore.js b/src/core/ipns/routing/offline-datastore.js index 9c788c2f91..d561a5665f 100644 --- a/src/core/ipns/routing/offline-datastore.js +++ b/src/core/ipns/routing/offline-datastore.js @@ -50,7 +50,7 @@ class OfflineDatastore { } // Marshal to libp2p record as the DHT does - let record = new Record(key, value) + const record = new Record(key, value) this._repo.datastore.put(routingKey, record.serialize(), callback) } diff --git a/src/core/ipns/routing/pubsub-datastore.js b/src/core/ipns/routing/pubsub-datastore.js index 2d29b5e4fe..10cfad8647 100644 --- a/src/core/ipns/routing/pubsub-datastore.js +++ b/src/core/ipns/routing/pubsub-datastore.js @@ -30,20 +30,6 @@ class IpnsPubsubDatastore { * @returns {void} */ put (key, value, callback) { - if (!Buffer.isBuffer(key)) { - const errMsg = `key ${key} does not have a valid format` - - log.error(errMsg) - return callback(errcode(new Error(errMsg), 'ERR_INVALID_KEY')) - } - - if (!Buffer.isBuffer(value)) { - const errMsg = `received value ${value} is not a buffer` - - log.error(errMsg) - return callback(errcode(new Error(errMsg), 'ERR_INVALID_VALUE_RECEIVED')) - } - this._pubsubDs.put(key, value, callback) } @@ -56,13 +42,6 @@ class IpnsPubsubDatastore { * @returns {void} */ get (key, callback) { - if (!Buffer.isBuffer(key)) { - const errMsg = `key ${key} does not have a valid format` - - log.error(errMsg) - return callback(errcode(new Error(errMsg), 'ERR_INVALID_KEY')) - } - this._pubsubDs.get(key, (err, res) => { // Add topic subscribed const ns = key.slice(0, ipns.namespaceLength) diff --git a/src/http/index.js b/src/http/index.js index 03cfb394c3..dd6b19b430 100644 --- a/src/http/index.js +++ b/src/http/index.js @@ -21,6 +21,7 @@ function uriToMultiaddr (uri) { } function HttpApi (repo, config, cliArgs) { + cliArgs = cliArgs || {} this.node = undefined this.server = undefined @@ -71,13 +72,13 @@ function HttpApi (repo, config, cliArgs) { init: init, start: true, config: config, - local: cliArgs && cliArgs.local, - pass: cliArgs && cliArgs.pass, + local: cliArgs.local, + pass: cliArgs.pass, EXPERIMENTAL: { - pubsub: cliArgs && cliArgs.enablePubsubExperiment, - ipnsPubsub: cliArgs && cliArgs.enableNamesysPubsub, - dht: cliArgs && cliArgs.enableDhtExperiment, - sharding: cliArgs && cliArgs.enableShardingExperiment + pubsub: cliArgs.enablePubsubExperiment, + ipnsPubsub: cliArgs.enableNamesysPubsub, + dht: cliArgs.enableDhtExperiment, + sharding: cliArgs.enableShardingExperiment }, libp2p: libp2p }) From fa5b652ce5ac324460520e800cdc7e1d768accd8 Mon Sep 17 00:00:00 2001 From: Vasco Santos Date: Mon, 3 Dec 2018 19:18:52 +0000 Subject: [PATCH 9/9] chore: upgrade interface-ipfs-core --- package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/package.json b/package.json index 68efb1ffc5..f7c4f5ef9d 100644 --- a/package.json +++ b/package.json @@ -69,7 +69,7 @@ "execa": "^1.0.0", "form-data": "^2.3.3", "hat": "0.0.3", - "interface-ipfs-core": "ipfs/interface-ipfs-core#fix/ipns-over-pubsub-tests", + "interface-ipfs-core": "~0.89.0", "ipfsd-ctl": "~0.40.1", "ncp": "^2.0.0", "qs": "^6.5.2",