Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add support for unix multiaddr listen #10

Merged
merged 12 commits into from
Feb 26, 2019
7 changes: 4 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
"scripts": {
"lint": "aegir lint",
"build": "aegir build",
"test": "aegir test -t node -f test/**/*.js",
"test:node": "aegir test -t node -f test/**/*.js",
"test": "aegir test -t node -f test/*.js test/**/*.js",
"test:node": "aegir test -t node -f test/*.js test/**/*.js",
"release": "aegir release --no-build -t node",
"release-minor": "aegir release --no-build --type minor -t node",
"release-major": "aegir release --no-build --type major -t node"
Expand Down Expand Up @@ -54,7 +54,8 @@
"libp2p-mplex": "~0.8.4",
"libp2p-secio": "~0.11.1",
"libp2p-tcp": "~0.13.0",
"multiaddr": "^6.0.3",
"libp2p-websockets": "~0.12.2",
"multiaddr": "^6.0.5",
"peer-book": "~0.9.0",
"peer-id": "~0.12.0",
"peer-info": "~0.15.0",
Expand Down
16 changes: 13 additions & 3 deletions src/cli/bin.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@ const log = console.log

const main = async (processArgs) => {
parser.yargs
.option('sock', {
desc: 'daemon control socket path',
.option('listen', {
desc: 'daemon control listen multiaddr',
type: 'string',
default: '/tmp/p2pd.sock'
default: '/unix/tmp/p2pd.sock'
})
.option('quiet', {
alias: 'q',
Expand All @@ -28,6 +28,16 @@ const main = async (processArgs) => {
type: 'string',
default: ''
})
.option('hostAddrs', {
desc: 'Comma separated list of multiaddrs the host should listen on',
type: 'string',
default: ''
})
.option('announceAddrs', {
desc: 'Comma separated list of multiaddrs the host should announce to the network',
type: 'string',
default: ''
})
.option('bootstrap', {
alias: 'b',
desc: 'Connects to bootstrap peers and bootstraps the dht if enabled',
Expand Down
17 changes: 9 additions & 8 deletions src/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,15 @@

const net = require('net')
const Socket = net.Socket
const path = require('path')
const { encode, decode } = require('length-prefixed-stream')
const { Request } = require('./protocol')
const LIMIT = 1 << 22 // 4MB

const { ends } = require('../src/util')
const { ends, multiaddrToNetConfig } = require('./util')

class Client {
constructor (socketPath) {
this.path = path.resolve(socketPath)
constructor (addr) {
this.multiaddr = addr
this.server = null
this.socket = new Socket({
readable: true,
Expand All @@ -27,7 +26,8 @@ class Client {
*/
attach () {
return new Promise((resolve, reject) => {
this.socket.connect(this.path, (err) => {
const options = multiaddrToNetConfig(this.multiaddr)
this.socket.connect(options, (err) => {
if (err) return reject(err)
resolve()
})
Expand All @@ -37,11 +37,11 @@ class Client {
/**
* Starts a server listening at `socketPath`. New connections
* will be sent to the `connectionHandler`.
* @param {string} socketPath
* @param {Multiaddr} addr
* @param {function(Stream)} connectionHandler
* @returns {Promise}
*/
async startServer (socketPath, connectionHandler) {
async startServer (addr, connectionHandler) {
if (this.server) {
await this.stopServer()
}
Expand All @@ -50,7 +50,8 @@ class Client {
allowHalfOpen: true
}, connectionHandler)

this.server.listen(path.resolve(socketPath), (err) => {
const options = multiaddrToNetConfig(addr)
this.server.listen(options, (err) => {
if (err) return reject(err)
resolve()
})
Expand Down
41 changes: 26 additions & 15 deletions src/daemon.js
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
'use strict'

const net = require('net')
const path = require('path')
const Libp2p = require('./libp2p')
const PeerInfo = require('peer-info')
const PeerId = require('peer-id')
const multiaddr = require('multiaddr')
const ma = require('multiaddr')
const CID = require('cids')
const { encode, decode } = require('length-prefixed-stream')
const { multiaddrToNetConfig } = require('./util')
const {
Request,
DHTRequest,
Expand All @@ -23,14 +23,14 @@ class Daemon {
/**
* @constructor
* @param {object} options
* @param {string} options.socketPath
* @param {Multiaddr} options.multiaddr
* @param {Libp2p} options.libp2pNode
*/
constructor ({
socketPath,
multiaddr,
libp2pNode
}) {
this.socketPath = socketPath
this.multiaddr = ma(multiaddr)
this.libp2p = libp2pNode
this.server = net.createServer({
allowHalfOpen: true
Expand All @@ -53,7 +53,7 @@ class Daemon {
PeerId.createFromBytes(peer)
)
addrs.forEach((a) => {
peerInfo.multiaddrs.add(multiaddr(a))
peerInfo.multiaddrs.add(ma(a))
})

return this.libp2p.dial(peerInfo)
Expand Down Expand Up @@ -116,15 +116,16 @@ class Daemon {
registerStreamHandler (request) {
return new Promise((resolve, reject) => {
const protocols = request.streamHandler.proto
const socketPath = path.resolve(request.streamHandler.path)
const addr = ma(request.streamHandler.addr)
const addrString = addr.toString()

// If we have a handler, end it
if (this.streamHandlers[socketPath]) {
this.streamHandlers[socketPath].end()
delete this.streamHandlers[socketPath]
if (this.streamHandlers[addrString]) {
this.streamHandlers[addrString].end()
delete this.streamHandlers[addrString]
}

const socket = this.streamHandlers[socketPath] = new net.Socket({
const socket = this.streamHandlers[addrString] = new net.Socket({
readable: true,
writable: true,
allowHalfOpen: true
Expand Down Expand Up @@ -153,7 +154,8 @@ class Daemon {
})
})

socket.connect(socketPath, (err) => {
const options = multiaddrToNetConfig(addr)
socket.connect(options, (err) => {
if (err) return reject(err)
resolve()
})
Expand Down Expand Up @@ -181,7 +183,8 @@ class Daemon {
async start () {
await this.libp2p.start()
return new Promise((resolve, reject) => {
this.server.listen(path.resolve(this.socketPath), (err) => {
const options = multiaddrToNetConfig(this.multiaddr)
this.server.listen(options, (err) => {
if (err) return reject(err)
resolve()
})
Expand Down Expand Up @@ -368,7 +371,15 @@ class Daemon {
id: this.libp2p.peerInfo.id.toBytes(),
// temporary removal of "/ipfs/..." from multiaddrs
// this will be solved in: https://github.com/libp2p/js-libp2p/issues/323
addrs: this.libp2p.peerInfo.multiaddrs.toArray().map(m => m.decapsulate('ipfs').buffer)
addrs: this.libp2p.peerInfo.multiaddrs.toArray().map(m => {
let buffer
try {
buffer = m.decapsulate('ipfs').buffer
} catch (_) {
buffer = m.buffer
}
return buffer
})
}
}))
break
Expand Down Expand Up @@ -486,7 +497,7 @@ function ErrorResponse (message) {
const createDaemon = async (options) => {
const libp2pNode = await Libp2p.createLibp2p(options)
const daemon = new Daemon({
socketPath: options.sock,
multiaddr: options.listen,
libp2pNode: libp2pNode
})

Expand Down
33 changes: 29 additions & 4 deletions src/libp2p.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

const Libp2p = require('libp2p')
const TCP = require('libp2p-tcp')
const WS = require('libp2p-websockets')
const Bootstrap = require('libp2p-bootstrap')
const MPLEX = require('libp2p-mplex')
const SECIO = require('libp2p-secio')
Expand Down Expand Up @@ -199,6 +200,10 @@ class DHT {
}

class DaemonLibp2p extends Libp2p {
constructor (libp2pOpts, { announceAddrs }) {
super(libp2pOpts)
this.announceAddrs = announceAddrs
}
get contentRouting () {
return this._contentRouting
}
Expand Down Expand Up @@ -227,6 +232,14 @@ class DaemonLibp2p extends Libp2p {
return new Promise((resolve, reject) => {
super.start((err) => {
if (err) return reject(err)

// replace with announce addrs until libp2p supports this directly
if (this.announceAddrs.length > 0) {
this.peerInfo.multiaddrs.clear()
this.announceAddrs.forEach(addr => {
this.peerInfo.multiaddrs.add(addr)
})
}
resolve()
})
})
Expand Down Expand Up @@ -295,27 +308,34 @@ class DaemonLibp2p extends Libp2p {
* @param {boolean} opts.connMgr
* @param {number} opts.connMgrLo
* @param {number} opts.connMgrHi
* @param {string} opts.sock
* @param {string} opts.id
* @param {string} opts.bootstrapPeers
* @param {string} opts.hostAddrs
* @returns {Libp2p}
*/
const createLibp2p = async ({
bootstrap,
bootstrapPeers,
hostAddrs,
announceAddrs,
dht,
dhtClient,
connMgr,
connMgrLo,
connMgrHi,
sock,
id
} = {}) => {
const peerInfo = await getPeerInfo(id)
const peerBook = new PeerBook()
const bootstrapList = bootstrapPeers ? bootstrapPeers.split(',').filter(s => s !== '') : null
const listenAddrs = hostAddrs ? hostAddrs.split(',').filter(s => s !== '') : ['/ip4/0.0.0.0/tcp/0']

peerInfo.multiaddrs.add(multiaddr('/ip4/0.0.0.0/tcp/0'))
announceAddrs = announceAddrs ? announceAddrs.split(',').filter(s => s !== '') : []
announceAddrs = announceAddrs.map(addr => multiaddr(addr))

listenAddrs.forEach(addr => {
peerInfo.multiaddrs.add(multiaddr(addr))
})

const libp2p = new DaemonLibp2p({
peerBook,
Expand All @@ -326,7 +346,8 @@ const createLibp2p = async ({
},
modules: {
transport: [
TCP
TCP,
WS
],
streamMuxer: [
MPLEX
Expand Down Expand Up @@ -362,6 +383,10 @@ const createLibp2p = async ({
pubsub: false
}
}
}, {
// using a secondary config until https://github.com/libp2p/js-libp2p/issues/202
// is completed
announceAddrs
})

return libp2p
Expand Down
2 changes: 1 addition & 1 deletion src/protocol/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ message StreamOpenRequest {
}

message StreamHandlerRequest {
required string path = 1;
required bytes addr = 1;
repeated string proto = 2;
}

Expand Down
21 changes: 21 additions & 0 deletions src/util/index.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
'use strict'

const os = require('os')
const { resolve } = require('path')

exports.first = async iterator => {
for await (const value of iterator) return value
}
Expand All @@ -15,3 +18,21 @@ exports.ends = iterator => {
iterator.last = () => exports.last(iterator)
return iterator
}

/**
* Converts the multiaddr to a nodejs NET compliant option
* for .coonect or .listen
* @param {Multiaddr} addr
* @returns {string|object} A nodejs NET compliant option
*/
exports.multiaddrToNetConfig = function multiaddrToNetConfig (addr) {
const listenPath = addr.getPath()
// unix socket listening
if (listenPath) {
return resolve(listenPath)
}
// tcp listening
return addr.nodeAddress()
}

exports.isWindows = os.platform() === 'win32'
Loading