diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..d5f19d8 --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +node_modules +package-lock.json diff --git a/package.json b/package.json new file mode 100644 index 0000000..1f5fb87 --- /dev/null +++ b/package.json @@ -0,0 +1,23 @@ +{ + "name": "libp2p-stardust", + "version": "0.0.1", + "description": "Basically ws-star but without the bugs", + "main": "index.js", + "scripts": { + "test": "echo \"Error: no test specified\" && exit 1" + }, + "keywords": [ + "ws-star", + "rendezvous", + "relay" + ], + "author": "Maciej Krüger ", + "license": "MPL-2.0", + "dependencies": { + "libp2p-mplex": "^0.8.4", + "libp2p-websockets": "^0.12.0", + "multistream-select": "^0.14.3", + "peer-id": "^0.12.1", + "protons": "^1.0.1" + } +} diff --git a/src/micro-switch/index.js b/src/micro-switch/index.js new file mode 100644 index 0000000..396f35c --- /dev/null +++ b/src/micro-switch/index.js @@ -0,0 +1,64 @@ +'use strict' + +const multiaddr = require('multiaddr') + +const WS = require('libp2p-websockets') +const MPLEX = require('libp2p-mplex') + +class MicroSwitch { + constructor ({ muxers, transports, addresses, handler }) { + this.transports = transports || [new WS()] + this.muxers = muxers || [MPLEX] + this.addresses = addresses || [multiaddr('/ip6/::/tcp/5892')] + this.handler = handler || console.log + } + + /* + * Wraps a connection in a muxer + * @returns MuxedConn + */ + async wrapInMuxer (conn, isServer) { + + } + + async dial (addr) { + this.transports // TODO: get first that succeeds or throw if none do + .filter(transport => Boolean(transport.filter([addr]).length)) + .map(transport => new Promise((resolve, reject) => { + const conn = transport.dial(addr, (err) => { + if (err) { + reject(err) + } else { + resolve(conn) + } + }) + })) + } + + async startListen () { + this.listeners = this.transports + .map(transport => [transport, transport.filter(this.addresses)]) + .filter(res => Boolean(res[1].length)) + .map(res => { + const [transport, addresses] = res + return addresses.map(address => new Promise((resolve, reject) => { + const listener = transport.createListener(this.handler.bind(this)) + listener.listen(address, (err) => { + if (err) { + reject(err) + } else { + resolve(address) + } + }) + })) + }) + .reduce((a, b) => a.concat(b), []) + await Promise.all(this.listeners) + } + + async stopListen () { + await Promise.all(this.listeners.map(listener => new Promise((resolve, reject) => listener.close(err => err ? reject(err) : resolve())))) + } +} + +module.exports = MicroSwitch diff --git a/src/rpc/lp.js b/src/rpc/lp.js new file mode 100644 index 0000000..a6c06e3 --- /dev/null +++ b/src/rpc/lp.js @@ -0,0 +1,36 @@ +'use strict' + +const lp = require('pull-length-prefixed') +const pushable = require('pull-pushable') +const reader = require('pull-reader') +const pull = require('pull-stream/pull') + +module.exports = (conn, time) => { + const read = reader(time || 30000) // 30sec, because connections can sometimes be really slow + const write = pushable() + + pull( + write, + lp.encode(), + conn, + read + ) + + return module.exports.wrap(read, write) +} + +module.exports.wrap = (read, write) => { + const S = { + write: (msg) => write.push(msg), + read: () => new Promise((resolve, reject) => { + lp.decodeFromReader(read, (err, msg) => err ? reject(err) : resolve(msg)) + }), + readProto: async (proto) => { + const msg = await S.read() + return proto.decode(msg) + }, + writeProto: (proto, msg) => S.write(proto.encode(msg)) + } + + return S +} diff --git a/src/rpc/proto.js b/src/rpc/proto.js new file mode 100644 index 0000000..83236eb --- /dev/null +++ b/src/rpc/proto.js @@ -0,0 +1,45 @@ +'use strict' + +const protons = require('protons') + +module.exports = protons(` + +enum Error { + OK = 0; + E_RAND_LENGTH = 100; // random128 has mismatching length + E_INCORRECT_SOLUTION = 101; + E_GENERIC = 999; // something(TM) went wrong +} + +message PeerID { + bytes id = 1; + bytes pubKey = 2; +} + +message JoinInit { + bytes random128 = 1; // must be exactly 128 bytes + PeerID peerID = 2; +} + +message JoinChallenge { + Error error = 1; + bytes xorEncrypted = 2; +} + +message JoinChallengeSolution { + bytes solution = 1; // xor(random128, decrypt(xorEncrypted, id.priv)) +} + +message JoinVerify { + Error error = 1; +} + +message DialRequest { + bytes id = 1; +} + +message DialResponse { + Error error = 1; +} + +`) diff --git a/src/server/index.js b/src/server/index.js new file mode 100644 index 0000000..042173b --- /dev/null +++ b/src/server/index.js @@ -0,0 +1,124 @@ +'use strict' + +const MicroSwitch = require('../micro-switch') +const LP = require('../rpc/lp') +const pull = require('pull-stream/pull') +const handshake = require('pull-handshake') +const {JoinInit, JoinChallenge, JoinChallengeSolution, JoinVerify, Error} = require('../rpc/proto') + +const prom = (f) => new Promise((resolve, reject) => f((err, res) => err ? reject(err) : resolve(res))) + +const xor = (a, b) => { + const r = Buffer.allocUnsafe(a.length) + + for (var i = 0; i < a.length; i++) { + r[i] = a[i] ^ b[i] + } + + return r +} + +const crypto = require('crypto') +const ID = require('peer-id') + +const debug = require('debug') +const log = debug('libp2p:stardust:server') + +class Client { + constructor ({muxed, rpc, id, server}) { + this.muxed = muxed + this.rpc = rpc + this.id = id + this.server = server + + muxed.on('stream', this.handler.bind(this)) + } + + async handler (conn) { + const stream = handshake() + pull( + conn, + stream, + conn + ) + + const shake = stream.handshake + const rpc = LP.wrap(shake, {push: shake.write}) + + const {target} = rpc.readProto(DialRequest) + const targetB58 = new ID(target).toB58String() + + const targetPeer = this.server.network[targetB58] + + if (!targetPeer) { + return rpc.writeProto(DialResponse, {error: Error.E_TARGET_UNREACHABLE}) + } + + try { + const conn = await targetPeer.openConn() + rpc.writeProto(DialResponse, {}) + + pull(conn, shake.rest(), conn) + } catch (e) { + return rpc.writeProto(DialResponse, {error: Error.E_GENERIC}) + } + } + + async openConn () { + return prom(cb => this.muxed.newStream(cb)) + } +} + +class Server { + constructor ({ transports, addresses, muxers }) { + this.switch = new MicroSwitch({ transports, addresses, muxers, handler: this.handler.bind(this) }) + + this.clients = {} + } + + handler (conn) { + const muxed = this.switch.wrapInMuxer(conn, true) + muxed.newStream(async (err, conn) => { + if (err) { + return log(err) + } + + const rpc = LP(conn) + + try { + const {random, peerID} = await rpc.readProto(JoinInit) + const id = await prom(cb => ID.createFromJSON(peerID)) + + const xorSecret = crypto.randomBytes(128) + const xorEncrypted = prom(cb => id.encrypt(xorSecret, cb)) + + rpc.writeProto(JoinChallenge, {xor: xorEncrypted}) + + const solution = xor(random, xorSecret) + + const {solution: solutionClient} = await rpc.readProto(JoinChallengeSolution) + + if (!Buffer.compare(solution, solutionClient)) { + return rpc.writeProto(JoinVerify, {error: Error.E_INCORRECT_SOLUTION}) // TODO: connection close + } + + rpc.writeProto(JoinVerify, {}) + + this.addToNetwork(new Client({muxed, rpc, id, server: this})) + } catch (e) { + log(e) + rpc.writeProto(JoinVerify, {error: Error.E_GENERIC}) // if anything fails, respond + } + }) + } + + addToNetwork (client) { + + } + + async start () { + await this.switch.startListen() + } +} + +module.exports = Server