Skip to content
This repository was archived by the owner on Feb 26, 2021. It is now read-only.

Commit

Permalink
feat: Initial server implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
mkg20001 committed Jan 6, 2019
1 parent 1668550 commit 16faa44
Show file tree
Hide file tree
Showing 6 changed files with 294 additions and 0 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
node_modules
package-lock.json
23 changes: 23 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
{
"name": "libp2p-stardust",
"version": "0.0.1",
"description": "Basically ws-star but without the bugs",
"main": "index.js",
"scripts": {
"test": "echo \"Error: no test specified\" && exit 1"
},
"keywords": [
"ws-star",
"rendezvous",
"relay"
],
"author": "Maciej Krüger <mkg20001@gmail.com>",
"license": "MPL-2.0",
"dependencies": {
"libp2p-mplex": "^0.8.4",
"libp2p-websockets": "^0.12.0",
"multistream-select": "^0.14.3",
"peer-id": "^0.12.1",
"protons": "^1.0.1"
}
}
64 changes: 64 additions & 0 deletions src/micro-switch/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
'use strict'

const multiaddr = require('multiaddr')

const WS = require('libp2p-websockets')
const MPLEX = require('libp2p-mplex')

class MicroSwitch {
constructor ({ muxers, transports, addresses, handler }) {
this.transports = transports || [new WS()]
this.muxers = muxers || [MPLEX]
this.addresses = addresses || [multiaddr('/ip6/::/tcp/5892')]
this.handler = handler || console.log
}

/*
* Wraps a connection in a muxer
* @returns MuxedConn
*/
async wrapInMuxer (conn, isServer) {

}

async dial (addr) {
this.transports // TODO: get first that succeeds or throw if none do
.filter(transport => Boolean(transport.filter([addr]).length))
.map(transport => new Promise((resolve, reject) => {
const conn = transport.dial(addr, (err) => {
if (err) {
reject(err)
} else {
resolve(conn)
}
})
}))
}

async startListen () {
this.listeners = this.transports
.map(transport => [transport, transport.filter(this.addresses)])
.filter(res => Boolean(res[1].length))
.map(res => {
const [transport, addresses] = res
return addresses.map(address => new Promise((resolve, reject) => {
const listener = transport.createListener(this.handler.bind(this))
listener.listen(address, (err) => {
if (err) {
reject(err)
} else {
resolve(address)
}
})
}))
})
.reduce((a, b) => a.concat(b), [])
await Promise.all(this.listeners)
}

async stopListen () {
await Promise.all(this.listeners.map(listener => new Promise((resolve, reject) => listener.close(err => err ? reject(err) : resolve()))))
}
}

module.exports = MicroSwitch
36 changes: 36 additions & 0 deletions src/rpc/lp.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
'use strict'

const lp = require('pull-length-prefixed')
const pushable = require('pull-pushable')
const reader = require('pull-reader')
const pull = require('pull-stream/pull')

module.exports = (conn, time) => {
const read = reader(time || 30000) // 30sec, because connections can sometimes be really slow
const write = pushable()

pull(
write,
lp.encode(),
conn,
read
)

return module.exports.wrap(read, write)
}

module.exports.wrap = (read, write) => {
const S = {
write: (msg) => write.push(msg),
read: () => new Promise((resolve, reject) => {
lp.decodeFromReader(read, (err, msg) => err ? reject(err) : resolve(msg))
}),
readProto: async (proto) => {
const msg = await S.read()
return proto.decode(msg)
},
writeProto: (proto, msg) => S.write(proto.encode(msg))
}

return S
}
45 changes: 45 additions & 0 deletions src/rpc/proto.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
'use strict'

const protons = require('protons')

module.exports = protons(`
enum Error {
OK = 0;
E_RAND_LENGTH = 100; // random128 has mismatching length
E_INCORRECT_SOLUTION = 101;
E_GENERIC = 999; // something(TM) went wrong
}
message PeerID {
bytes id = 1;
bytes pubKey = 2;
}
message JoinInit {
bytes random128 = 1; // must be exactly 128 bytes
PeerID peerID = 2;
}
message JoinChallenge {
Error error = 1;
bytes xorEncrypted = 2;
}
message JoinChallengeSolution {
bytes solution = 1; // xor(random128, decrypt(xorEncrypted, id.priv))
}
message JoinVerify {
Error error = 1;
}
message DialRequest {
bytes id = 1;
}
message DialResponse {
Error error = 1;
}
`)
124 changes: 124 additions & 0 deletions src/server/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
'use strict'

const MicroSwitch = require('../micro-switch')
const LP = require('../rpc/lp')
const pull = require('pull-stream/pull')
const handshake = require('pull-handshake')
const {JoinInit, JoinChallenge, JoinChallengeSolution, JoinVerify, Error} = require('../rpc/proto')

const prom = (f) => new Promise((resolve, reject) => f((err, res) => err ? reject(err) : resolve(res)))

const xor = (a, b) => {
const r = Buffer.allocUnsafe(a.length)

for (var i = 0; i < a.length; i++) {
r[i] = a[i] ^ b[i]
}

return r
}

const crypto = require('crypto')
const ID = require('peer-id')

const debug = require('debug')
const log = debug('libp2p:stardust:server')

class Client {
constructor ({muxed, rpc, id, server}) {
this.muxed = muxed
this.rpc = rpc
this.id = id
this.server = server

muxed.on('stream', this.handler.bind(this))
}

async handler (conn) {
const stream = handshake()
pull(
conn,
stream,
conn
)

const shake = stream.handshake
const rpc = LP.wrap(shake, {push: shake.write})

const {target} = rpc.readProto(DialRequest)
const targetB58 = new ID(target).toB58String()

const targetPeer = this.server.network[targetB58]

if (!targetPeer) {
return rpc.writeProto(DialResponse, {error: Error.E_TARGET_UNREACHABLE})
}

try {
const conn = await targetPeer.openConn()
rpc.writeProto(DialResponse, {})

pull(conn, shake.rest(), conn)
} catch (e) {
return rpc.writeProto(DialResponse, {error: Error.E_GENERIC})
}
}

async openConn () {
return prom(cb => this.muxed.newStream(cb))
}
}

class Server {
constructor ({ transports, addresses, muxers }) {
this.switch = new MicroSwitch({ transports, addresses, muxers, handler: this.handler.bind(this) })

this.clients = {}
}

handler (conn) {
const muxed = this.switch.wrapInMuxer(conn, true)
muxed.newStream(async (err, conn) => {
if (err) {
return log(err)
}

const rpc = LP(conn)

try {
const {random, peerID} = await rpc.readProto(JoinInit)
const id = await prom(cb => ID.createFromJSON(peerID))

const xorSecret = crypto.randomBytes(128)
const xorEncrypted = prom(cb => id.encrypt(xorSecret, cb))

rpc.writeProto(JoinChallenge, {xor: xorEncrypted})

const solution = xor(random, xorSecret)

const {solution: solutionClient} = await rpc.readProto(JoinChallengeSolution)

if (!Buffer.compare(solution, solutionClient)) {
return rpc.writeProto(JoinVerify, {error: Error.E_INCORRECT_SOLUTION}) // TODO: connection close
}

rpc.writeProto(JoinVerify, {})

this.addToNetwork(new Client({muxed, rpc, id, server: this}))
} catch (e) {
log(e)
rpc.writeProto(JoinVerify, {error: Error.E_GENERIC}) // if anything fails, respond
}
})
}

addToNetwork (client) {

}

async start () {
await this.switch.startListen()
}
}

module.exports = Server

0 comments on commit 16faa44

Please sign in to comment.