Skip to content

Commit

Permalink
feat(state channels): persist connection by pinging every 10 seconds (#…
Browse files Browse the repository at this point in the history
…571)

* feat(state channels): persist connection by pinging every 10 seconds

* Set timeout to 2 minutes for channel tests

* Skip channelId check if channelId is not known yet

* Fix lint error
  • Loading branch information
mpowaga authored and nduchak committed Aug 8, 2019
1 parent 040c040 commit a70f919
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 13 deletions.
62 changes: 51 additions & 11 deletions es/channel/internal.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@ import * as R from 'ramda'
import { pascalToSnake } from '../utils/string'
import { awaitingConnection } from './handlers'

// Send ping message every 10 seconds
const PING_TIMEOUT_MS = 10000
// Close connection if pong message is not received within 5 seconds
const PONG_TIMEOUT_MS = 5000

const options = new WeakMap()
const status = new WeakMap()
const state = new WeakMap()
Expand All @@ -34,6 +39,8 @@ const actionQueueLocked = new WeakMap()
const sequence = new WeakMap()
const channelId = new WeakMap()
const rpcCallbacks = new WeakMap()
const pingTimeoutId = new WeakMap()
const pongTimeoutId = new WeakMap()

function channelURL (url, params) {
const paramString = R.join('&', R.values(R.mapObjIndexed((value, key) =>
Expand Down Expand Up @@ -118,6 +125,24 @@ async function dequeueMessage (channel) {
dequeueMessage(channel)
}

function ping (channel) {
clearTimeout(pingTimeoutId.get(channel))
clearTimeout(pongTimeoutId.get(channel))
pingTimeoutId.set(channel, setTimeout(() => {
send(channel, {
jsonrpc: '2.0',
method: 'channels.system',
params: {
action: 'ping'
}
})
pongTimeoutId.set(channel, setTimeout(() => {
disconnect(channel)
emit(channel, 'error', Error('Server pong timed out'))
}, PONG_TIMEOUT_MS))
}, PING_TIMEOUT_MS))
}

function onMessage (channel, data) {
const message = JSON.parse(data)
if (message.id) {
Expand All @@ -129,6 +154,14 @@ function onMessage (channel, data) {
}
} else if (message.method === 'channels.message') {
emit(channel, 'message', message.params.data.message)
} else if (message.method === 'channels.system.pong') {
if (
(message.params.channel_id === channelId.get(channel)) ||
// Skip channelId check if channelId is not known yet
(channelId.get(channel) == null)
) {
ping(channel)
}
} else {
messageQueue.set(channel, [...(messageQueue.get(channel) || []), message])
dequeueMessage(channel)
Expand All @@ -155,19 +188,19 @@ function call (channel, method, params) {
}

function disconnect (channel) {
const ws = websockets.get(channel)
if (ws.readyState === ws.OPEN) {
ws._connection.close()
}
websockets.get(channel).close()
clearTimeout(pingTimeoutId.get(channel))
clearTimeout(pongTimeoutId.get(channel))
}

function WebSocket (url, callbacks) {
function fireOnce (target, key, always) {
function fireOnce (target, key, fn) {
const once = R.once(fn)
const original = target[key]
target[key] = (...args) => {
always(...args)
target[key] = callbacks[key]
if (typeof target === 'function') {
target(...args)
once(...args)
if (typeof original === 'function') {
original(...args)
}
}
}
Expand All @@ -193,8 +226,15 @@ async function initialize (channel, channelOptions) {
sequence.set(channel, 0)
rpcCallbacks.set(channel, new Map())
const ws = await WebSocket(wsUrl, {
onopen: () => changeStatus(channel, 'connected'),
onclose: () => changeStatus(channel, 'disconnected'),
onopen: () => {
changeStatus(channel, 'connected')
ping(channel)
},
onclose: () => {
changeStatus(channel, 'disconnected')
clearTimeout(pingTimeoutId.get(channel))
clearTimeout(pongTimeoutId.get(channel))
},
onmessage: ({ data }) => onMessage(channel, data)
})
websockets.set(channel, ws)
Expand Down
18 changes: 16 additions & 2 deletions test/integration/channel.js
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ function waitForChannel (channel) {

describe('Channel', function () {
configure(this)
this.timeout(120000)

let initiator
let responder
Expand Down Expand Up @@ -499,6 +500,8 @@ describe('Channel', function () {
})

it('can leave a channel', async () => {
initiatorCh.disconnect()
responderCh.disconnect()
initiatorCh = await Channel({
...sharedParams,
role: 'initiator',
Expand Down Expand Up @@ -537,11 +540,11 @@ describe('Channel', function () {
await Promise.all([waitForChannel(initiatorCh), waitForChannel(responderCh)])
sinon.assert.notCalled(initiatorSign)
sinon.assert.notCalled(responderSign)
initiatorCh.disconnect()
responderCh.disconnect()
})

it('can solo close a channel', async () => {
initiatorCh.disconnect()
responderCh.disconnect()
initiatorCh = await Channel({
...sharedParams,
role: 'initiator',
Expand Down Expand Up @@ -599,6 +602,8 @@ describe('Channel', function () {
it('can dispute via slash tx', async () => {
const initiatorAddr = await initiator.address()
const responderAddr = await responder.address()
initiatorCh.disconnect()
responderCh.disconnect()
initiatorCh = await Channel({
...sharedParams,
lockPeriod: 5,
Expand Down Expand Up @@ -660,6 +665,8 @@ describe('Channel', function () {
})

it('can create a contract and accept', async () => {
initiatorCh.disconnect()
responderCh.disconnect()
initiatorCh = await Channel({
...sharedParams,
role: 'initiator',
Expand Down Expand Up @@ -803,6 +810,8 @@ describe('Channel', function () {

describe('throws errors', function () {
before(async function () {
initiatorCh.disconnect()
responderCh.disconnect()
initiatorCh = await Channel({
...sharedParams,
role: 'initiator',
Expand All @@ -818,6 +827,11 @@ describe('Channel', function () {
await Promise.all([waitForChannel(initiatorCh), waitForChannel(responderCh)])
})

after(() => {
initiatorCh.disconnect()
responderCh.disconnect()
})

async function update ({ from, to, amount, sign }) {
return initiatorCh.update(
from || await initiator.address(),
Expand Down

0 comments on commit a70f919

Please sign in to comment.