Skip to content

Commit

Permalink
websocket: separate connection logic from websocket (#1973)
Browse files Browse the repository at this point in the history
  • Loading branch information
KhafraDev authored Feb 26, 2023
1 parent 06f77a9 commit 49c6b48
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 73 deletions.
73 changes: 12 additions & 61 deletions lib/websocket/connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,15 @@ const diagnosticsChannel = require('diagnostics_channel')
const { uid, states } = require('./constants')
const {
kReadyState,
kResponse,
kExtensions,
kProtocol,
kSentClose,
kByteParser,
kReceivedClose
} = require('./symbols')
const { fireEvent, failWebsocketConnection } = require('./util')
const { CloseEvent } = require('./events')
const { ByteParser } = require('./receiver')
const { makeRequest } = require('../fetch/request')
const { fetching } = require('../fetch/index')
const { getGlobalDispatcher } = require('../..')
const { getGlobalDispatcher } = require('../global')

const channels = {}
channels.open = diagnosticsChannel.channel('undici:websocket:open')
Expand All @@ -29,8 +25,9 @@ channels.socketError = diagnosticsChannel.channel('undici:websocket:socket_error
* @param {URL} url
* @param {string|string[]} protocols
* @param {import('./websocket').WebSocket} ws
* @param {(response: any) => void} onEstablish
*/
function establishWebSocketConnection (url, protocols, ws) {
function establishWebSocketConnection (url, protocols, ws, onEstablish) {
// 1. Let requestURL be a copy of url, with its scheme set to "http", if url’s
// scheme is "ws", and to "https" otherwise.
const requestURL = url
Expand Down Expand Up @@ -173,67 +170,25 @@ function establishWebSocketConnection (url, protocols, ws) {
return
}

// processResponse is called when the "response’s header list has been received and initialized."
// once this happens, the connection is open
ws[kResponse] = response

const parser = new ByteParser(ws)
response.socket.ws = ws // TODO: use symbol
ws[kByteParser] = parser

whenConnectionEstablished(ws)

response.socket.on('data', onSocketData)
response.socket.on('close', onSocketClose)
response.socket.on('error', onSocketError)

parser.on('drain', onParserDrain)
if (channels.open.hasSubscribers) {
channels.open.publish({
address: response.socket.address(),
protocol: secProtocol,
extensions: secExtension
})
}

onEstablish(response)
}
})

return controller
}

/**
* @see https://websockets.spec.whatwg.org/#feedback-from-the-protocol
* @param {import('./websocket').WebSocket} ws
*/
function whenConnectionEstablished (ws) {
const { [kResponse]: response } = ws

// 1. Change the ready state to OPEN (1).
ws[kReadyState] = states.OPEN

// 2. Change the extensions attribute’s value to the extensions in use, if
// it is not the null value.
// https://datatracker.ietf.org/doc/html/rfc6455#section-9.1
const extensions = response.headersList.get('sec-websocket-extensions')

if (extensions !== null) {
ws[kExtensions] = extensions
}

// 3. Change the protocol attribute’s value to the subprotocol in use, if
// it is not the null value.
// https://datatracker.ietf.org/doc/html/rfc6455#section-1.9
const protocol = response.headersList.get('sec-websocket-protocol')

if (protocol !== null) {
ws[kProtocol] = protocol
}

// 4. Fire an event named open at the WebSocket object.
fireEvent('open', ws)

if (channels.open.hasSubscribers) {
channels.open.publish({
address: response.socket.address(),
protocol,
extensions
})
}
}

/**
* @param {Buffer} chunk
*/
Expand All @@ -243,10 +198,6 @@ function onSocketData (chunk) {
}
}

function onParserDrain () {
this.ws[kResponse].socket.resume()
}

/**
* @see https://websockets.spec.whatwg.org/#feedback-from-the-protocol
* @see https://datatracker.ietf.org/doc/html/rfc6455#section-7.1.4
Expand Down
3 changes: 0 additions & 3 deletions lib/websocket/symbols.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,7 @@ module.exports = {
kReadyState: Symbol('ready state'),
kController: Symbol('controller'),
kResponse: Symbol('response'),
kExtensions: Symbol('extensions'),
kProtocol: Symbol('protocol'),
kBinaryType: Symbol('binary type'),
kClosingFrame: Symbol('closing frame'),
kSentClose: Symbol('sent close'),
kReceivedClose: Symbol('received close'),
kByteParser: Symbol('byte parser')
Expand Down
64 changes: 55 additions & 9 deletions lib/websocket/websocket.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,15 @@ const {
kWebSocketURL,
kReadyState,
kController,
kExtensions,
kProtocol,
kBinaryType,
kResponse,
kSentClose
kSentClose,
kByteParser
} = require('./symbols')
const { isEstablished, isClosing, isValidSubprotocol, failWebsocketConnection } = require('./util')
const { isEstablished, isClosing, isValidSubprotocol, failWebsocketConnection, fireEvent } = require('./util')
const { establishWebSocketConnection } = require('./connection')
const { WebsocketFrameSend } = require('./frame')
const { ByteParser } = require('./receiver')
const { kEnumerableProperty, isBlobLike } = require('../core/util')
const { types } = require('util')

Expand All @@ -32,6 +32,8 @@ class WebSocket extends EventTarget {
}

#bufferedAmount = 0
#protocol = ''
#extensions = ''

/**
* @param {string} url
Expand Down Expand Up @@ -104,18 +106,21 @@ class WebSocket extends EventTarget {

// 1. Establish a WebSocket connection given urlRecord, protocols,
// and client.
this[kController] = establishWebSocketConnection(urlRecord, protocols, this)
this[kController] = establishWebSocketConnection(
urlRecord,
protocols,
this,
(response) => this.#onConnectionEstablished(response)
)

// Each WebSocket object has an associated ready state, which is a
// number representing the state of the connection. Initially it must
// be CONNECTING (0).
this[kReadyState] = WebSocket.CONNECTING

// The extensions attribute must initially return the empty string.
this[kExtensions] = ''

// The protocol attribute must initially return the empty string.
this[kProtocol] = ''

// Each WebSocket object has an associated binary type, which is a
// BinaryType. Initially it must be "blob".
Expand Down Expand Up @@ -368,13 +373,13 @@ class WebSocket extends EventTarget {
get extensions () {
webidl.brandCheck(this, WebSocket)

return this[kExtensions]
return this.#extensions
}

get protocol () {
webidl.brandCheck(this, WebSocket)

return this[kProtocol]
return this.#protocol
}

get onopen () {
Expand Down Expand Up @@ -476,6 +481,47 @@ class WebSocket extends EventTarget {
this[kBinaryType] = type
}
}

/**
* @see https://websockets.spec.whatwg.org/#feedback-from-the-protocol
*/
#onConnectionEstablished (response) {
// processResponse is called when the "response’s header list has been received and initialized."
// once this happens, the connection is open
this[kResponse] = response

const parser = new ByteParser(this)
parser.on('drain', function onParserDrain () {
this.ws[kResponse].socket.resume()
})

response.socket.ws = this
this[kByteParser] = parser

// 1. Change the ready state to OPEN (1).
this[kReadyState] = states.OPEN

// 2. Change the extensions attribute’s value to the extensions in use, if
// it is not the null value.
// https://datatracker.ietf.org/doc/html/rfc6455#section-9.1
const extensions = response.headersList.get('sec-websocket-extensions')

if (extensions !== null) {
this.#extensions = extensions
}

// 3. Change the protocol attribute’s value to the subprotocol in use, if
// it is not the null value.
// https://datatracker.ietf.org/doc/html/rfc6455#section-1.9
const protocol = response.headersList.get('sec-websocket-protocol')

if (protocol !== null) {
this.#protocol = protocol
}

// 4. Fire an event named open at the WebSocket object.
fireEvent('open', this)
}
}

// https://websockets.spec.whatwg.org/#dom-websocket-connecting
Expand Down

0 comments on commit 49c6b48

Please sign in to comment.