From 94c22e425c204d8a99a73e22b2e3f7241fc55411 Mon Sep 17 00:00:00 2001 From: Michal Powaga Date: Tue, 30 Jul 2019 12:42:58 +0100 Subject: [PATCH 1/4] feat(state channels): persist connection by pinging every 10 seconds --- es/channel/internal.js | 58 ++++++++++++++++++++++++++++++------- test/integration/channel.js | 17 +++++++++-- 2 files changed, 62 insertions(+), 13 deletions(-) diff --git a/es/channel/internal.js b/es/channel/internal.js index 435a21c1f6..c4dda21205 100644 --- a/es/channel/internal.js +++ b/es/channel/internal.js @@ -21,6 +21,11 @@ import * as R from 'ramda' import { pascalToSnake } from '../utils/string' import { awaitingConnection } from './handlers' +// Send ping message every 10 seconds +const PING_TIMEOUT_MS = 10000 +// Close connection if pong message is not received within 5 seconds +const PONG_TIMEOUT_MS = 5000 + const options = new WeakMap() const status = new WeakMap() const state = new WeakMap() @@ -34,6 +39,8 @@ const actionQueueLocked = new WeakMap() const sequence = new WeakMap() const channelId = new WeakMap() const rpcCallbacks = new WeakMap() +const pingTimeoutId = new WeakMap() +const pongTimeoutId = new WeakMap() function channelURL (url, params) { const paramString = R.join('&', R.values(R.mapObjIndexed((value, key) => @@ -118,6 +125,24 @@ async function dequeueMessage (channel) { dequeueMessage(channel) } +function ping (channel) { + clearTimeout(pingTimeoutId.get(channel)) + clearTimeout(pongTimeoutId.get(channel)) + pingTimeoutId.set(channel, setTimeout(() => { + send(channel, { + jsonrpc: '2.0', + method: 'channels.system', + params: { + action: 'ping' + } + }) + pongTimeoutId.set(channel, setTimeout(() => { + disconnect(channel) + emit(channel, 'error', Error('Server pong timed out')) + }, PONG_TIMEOUT_MS)) + }, PING_TIMEOUT_MS)) +} + function onMessage (channel, data) { const message = JSON.parse(data) if (message.id) { @@ -129,6 +154,10 @@ function onMessage (channel, data) { } } else if (message.method === 'channels.message') { emit(channel, 'message', message.params.data.message) + } else if (message.method === 'channels.system.pong') { + if (message.params.channel_id === channelId.get(channel)) { + ping(channel) + } } else { messageQueue.set(channel, [...(messageQueue.get(channel) || []), message]) dequeueMessage(channel) @@ -155,19 +184,19 @@ function call (channel, method, params) { } function disconnect (channel) { - const ws = websockets.get(channel) - if (ws.readyState === ws.OPEN) { - ws._connection.close() - } + websockets.get(channel).close() + clearTimeout(pingTimeoutId.get(channel)) + clearTimeout(pongTimeoutId.get(channel)) } function WebSocket (url, callbacks) { - function fireOnce (target, key, always) { + function fireOnce (target, key, fn) { + const once = R.once(fn) + const original = target[key] target[key] = (...args) => { - always(...args) - target[key] = callbacks[key] - if (typeof target === 'function') { - target(...args) + once(...args) + if (typeof original === 'function') { + original(...args) } } } @@ -193,8 +222,15 @@ async function initialize (channel, channelOptions) { sequence.set(channel, 0) rpcCallbacks.set(channel, new Map()) const ws = await WebSocket(wsUrl, { - onopen: () => changeStatus(channel, 'connected'), - onclose: () => changeStatus(channel, 'disconnected'), + onopen: () => { + changeStatus(channel, 'connected') + ping(channel) + }, + onclose: () => { + changeStatus(channel, 'disconnected') + clearTimeout(pingTimeoutId.get(channel)) + clearTimeout(pongTimeoutId.get(channel)) + }, onmessage: ({ data }) => onMessage(channel, data) }) websockets.set(channel, ws) diff --git a/test/integration/channel.js b/test/integration/channel.js index 30e88a652b..484c11ebbd 100644 --- a/test/integration/channel.js +++ b/test/integration/channel.js @@ -499,6 +499,8 @@ describe('Channel', function () { }) it('can leave a channel', async () => { + initiatorCh.disconnect() + responderCh.disconnect() initiatorCh = await Channel({ ...sharedParams, role: 'initiator', @@ -537,11 +539,11 @@ describe('Channel', function () { await Promise.all([waitForChannel(initiatorCh), waitForChannel(responderCh)]) sinon.assert.notCalled(initiatorSign) sinon.assert.notCalled(responderSign) - initiatorCh.disconnect() - responderCh.disconnect() }) it('can solo close a channel', async () => { + initiatorCh.disconnect() + responderCh.disconnect() initiatorCh = await Channel({ ...sharedParams, role: 'initiator', @@ -599,6 +601,8 @@ describe('Channel', function () { it('can dispute via slash tx', async () => { const initiatorAddr = await initiator.address() const responderAddr = await responder.address() + initiatorCh.disconnect() + responderCh.disconnect() initiatorCh = await Channel({ ...sharedParams, lockPeriod: 5, @@ -660,6 +664,8 @@ describe('Channel', function () { }) it('can create a contract and accept', async () => { + initiatorCh.disconnect() + responderCh.disconnect() initiatorCh = await Channel({ ...sharedParams, role: 'initiator', @@ -803,6 +809,8 @@ describe('Channel', function () { describe('throws errors', function () { before(async function () { + initiatorCh.disconnect() + responderCh.disconnect() initiatorCh = await Channel({ ...sharedParams, role: 'initiator', @@ -818,6 +826,11 @@ describe('Channel', function () { await Promise.all([waitForChannel(initiatorCh), waitForChannel(responderCh)]) }) + after(() => { + initiatorCh.disconnect() + responderCh.disconnect() + }) + async function update ({ from, to, amount, sign }) { return initiatorCh.update( from || await initiator.address(), From 65b0efcef0e59091b2302829b84c4c4e9afaf3f0 Mon Sep 17 00:00:00 2001 From: Michal Powaga Date: Tue, 30 Jul 2019 19:40:32 +0100 Subject: [PATCH 2/4] Set timeout to 2 minutes for channel tests --- test/integration/channel.js | 1 + 1 file changed, 1 insertion(+) diff --git a/test/integration/channel.js b/test/integration/channel.js index 484c11ebbd..587dcfe80d 100644 --- a/test/integration/channel.js +++ b/test/integration/channel.js @@ -46,6 +46,7 @@ function waitForChannel (channel) { describe('Channel', function () { configure(this) + this.timeout(120000) let initiator let responder From 84a7790e1ad0044c4a391cd16ff95765b496f28e Mon Sep 17 00:00:00 2001 From: Michal Powaga Date: Tue, 6 Aug 2019 18:11:11 +0100 Subject: [PATCH 3/4] Skip channelId check if channelId is not known yet --- es/channel/internal.js | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/es/channel/internal.js b/es/channel/internal.js index c4dda21205..82496f4207 100644 --- a/es/channel/internal.js +++ b/es/channel/internal.js @@ -155,7 +155,11 @@ function onMessage (channel, data) { } else if (message.method === 'channels.message') { emit(channel, 'message', message.params.data.message) } else if (message.method === 'channels.system.pong') { - if (message.params.channel_id === channelId.get(channel)) { + if ( + (message.params.channel_id === channelId.get(channel)) || + // Skip channelId check if channelId is not known yet + (channelId.get(channel) == null) + ) { ping(channel) } } else { From 77a0808410f9fdf5d2b2ae882f31da518ac4bc2c Mon Sep 17 00:00:00 2001 From: Michal Powaga Date: Tue, 6 Aug 2019 18:14:57 +0100 Subject: [PATCH 4/4] Fix lint error --- es/channel/internal.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/es/channel/internal.js b/es/channel/internal.js index 82496f4207..edd586fe4a 100644 --- a/es/channel/internal.js +++ b/es/channel/internal.js @@ -159,7 +159,7 @@ function onMessage (channel, data) { (message.params.channel_id === channelId.get(channel)) || // Skip channelId check if channelId is not known yet (channelId.get(channel) == null) - ) { + ) { ping(channel) } } else {