Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(state channels): persist connection by pinging every 10 seconds #571

Merged
merged 6 commits into from
Aug 8, 2019
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 51 additions & 11 deletions es/channel/internal.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@ import * as R from 'ramda'
import { pascalToSnake } from '../utils/string'
import { awaitingConnection } from './handlers'

// Send ping message every 10 seconds
const PING_TIMEOUT_MS = 10000
// Close connection if pong message is not received within 5 seconds
const PONG_TIMEOUT_MS = 5000

const options = new WeakMap()
const status = new WeakMap()
const state = new WeakMap()
Expand All @@ -34,6 +39,8 @@ const actionQueueLocked = new WeakMap()
const sequence = new WeakMap()
const channelId = new WeakMap()
const rpcCallbacks = new WeakMap()
const pingTimeoutId = new WeakMap()
const pongTimeoutId = new WeakMap()

function channelURL (url, params) {
const paramString = R.join('&', R.values(R.mapObjIndexed((value, key) =>
Expand Down Expand Up @@ -118,6 +125,24 @@ async function dequeueMessage (channel) {
dequeueMessage(channel)
}

function ping (channel) {
clearTimeout(pingTimeoutId.get(channel))
clearTimeout(pongTimeoutId.get(channel))
pingTimeoutId.set(channel, setTimeout(() => {
send(channel, {
jsonrpc: '2.0',
method: 'channels.system',
params: {
action: 'ping'
}
})
pongTimeoutId.set(channel, setTimeout(() => {
disconnect(channel)
emit(channel, 'error', Error('Server pong timed out'))
}, PONG_TIMEOUT_MS))
}, PING_TIMEOUT_MS))
}

function onMessage (channel, data) {
const message = JSON.parse(data)
if (message.id) {
Expand All @@ -129,6 +154,14 @@ function onMessage (channel, data) {
}
} else if (message.method === 'channels.message') {
emit(channel, 'message', message.params.data.message)
} else if (message.method === 'channels.system.pong') {
if (
(message.params.channel_id === channelId.get(channel)) ||
// Skip channelId check if channelId is not known yet
(channelId.get(channel) == null)
) {
ping(channel)
}
} else {
messageQueue.set(channel, [...(messageQueue.get(channel) || []), message])
dequeueMessage(channel)
Expand All @@ -155,19 +188,19 @@ function call (channel, method, params) {
}

function disconnect (channel) {
const ws = websockets.get(channel)
if (ws.readyState === ws.OPEN) {
ws._connection.close()
}
websockets.get(channel).close()
clearTimeout(pingTimeoutId.get(channel))
clearTimeout(pongTimeoutId.get(channel))
}

function WebSocket (url, callbacks) {
function fireOnce (target, key, always) {
function fireOnce (target, key, fn) {
const once = R.once(fn)
const original = target[key]
target[key] = (...args) => {
always(...args)
target[key] = callbacks[key]
if (typeof target === 'function') {
target(...args)
once(...args)
if (typeof original === 'function') {
original(...args)
}
}
}
Expand All @@ -193,8 +226,15 @@ async function initialize (channel, channelOptions) {
sequence.set(channel, 0)
rpcCallbacks.set(channel, new Map())
const ws = await WebSocket(wsUrl, {
onopen: () => changeStatus(channel, 'connected'),
onclose: () => changeStatus(channel, 'disconnected'),
onopen: () => {
changeStatus(channel, 'connected')
ping(channel)
},
onclose: () => {
changeStatus(channel, 'disconnected')
clearTimeout(pingTimeoutId.get(channel))
clearTimeout(pongTimeoutId.get(channel))
},
onmessage: ({ data }) => onMessage(channel, data)
})
websockets.set(channel, ws)
Expand Down
18 changes: 16 additions & 2 deletions test/integration/channel.js
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ function waitForChannel (channel) {

describe('Channel', function () {
configure(this)
this.timeout(120000)

let initiator
let responder
Expand Down Expand Up @@ -499,6 +500,8 @@ describe('Channel', function () {
})

it('can leave a channel', async () => {
initiatorCh.disconnect()
responderCh.disconnect()
initiatorCh = await Channel({
...sharedParams,
role: 'initiator',
Expand Down Expand Up @@ -537,11 +540,11 @@ describe('Channel', function () {
await Promise.all([waitForChannel(initiatorCh), waitForChannel(responderCh)])
sinon.assert.notCalled(initiatorSign)
sinon.assert.notCalled(responderSign)
initiatorCh.disconnect()
responderCh.disconnect()
})

it('can solo close a channel', async () => {
initiatorCh.disconnect()
responderCh.disconnect()
initiatorCh = await Channel({
...sharedParams,
role: 'initiator',
Expand Down Expand Up @@ -599,6 +602,8 @@ describe('Channel', function () {
it('can dispute via slash tx', async () => {
const initiatorAddr = await initiator.address()
const responderAddr = await responder.address()
initiatorCh.disconnect()
responderCh.disconnect()
initiatorCh = await Channel({
...sharedParams,
lockPeriod: 5,
Expand Down Expand Up @@ -660,6 +665,8 @@ describe('Channel', function () {
})

it('can create a contract and accept', async () => {
initiatorCh.disconnect()
responderCh.disconnect()
initiatorCh = await Channel({
...sharedParams,
role: 'initiator',
Expand Down Expand Up @@ -803,6 +810,8 @@ describe('Channel', function () {

describe('throws errors', function () {
before(async function () {
initiatorCh.disconnect()
responderCh.disconnect()
initiatorCh = await Channel({
...sharedParams,
role: 'initiator',
Expand All @@ -818,6 +827,11 @@ describe('Channel', function () {
await Promise.all([waitForChannel(initiatorCh), waitForChannel(responderCh)])
})

after(() => {
initiatorCh.disconnect()
responderCh.disconnect()
})

async function update ({ from, to, amount, sign }) {
return initiatorCh.update(
from || await initiator.address(),
Expand Down