This repository was archived by the owner on Feb 12, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 1.2k
/
Copy pathtransport.node.js
146 lines (124 loc) · 3.86 KB
/
transport.node.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
'use strict'
// copied from https://github.com/improbable-eng/grpc-web/blob/master/client/grpc-web/src/transports/websocket/websocket.ts
// but uses the ws implementation of WebSockets
// see: https://github.com/improbable-eng/grpc-web/issues/796
const WebSocket = require('ws')
const debug = require('debug')('ipfs:grpc-client:websocket-transport')
const WebsocketSignal = {
FINISH_SEND: 1
}
const finishSendFrame = new Uint8Array([1])
/**
* @param {object} options
* @param {import('http').Agent} [options.agent] - http.Agent used to control HTTP client behaviour
*/
function WebsocketTransport (options) {
/**
* @param {import('@improbable-eng/grpc-web').grpc.TransportOptions} opts
*/
const websocketTransportFactory = (opts) => {
return websocketRequest({
...options,
...opts
})
}
return websocketTransportFactory
}
/**
* @typedef {object} NodeTransportOptions
* @property {import('http').Agent} [options.agent]
*
* @typedef {NodeTransportOptions & import('@improbable-eng/grpc-web').grpc.TransportOptions} WebSocketTransportOptions
*
* @param {WebSocketTransportOptions} options
*/
function websocketRequest (options) {
const webSocketAddress = constructWebSocketAddress(options.url)
let sendQueue = []
let ws
function sendToWebsocket (toSend) {
if (toSend === WebsocketSignal.FINISH_SEND) {
ws.send(finishSendFrame)
} else {
const byteArray = toSend
const c = new Int8Array(byteArray.byteLength + 1)
c.set(new Uint8Array([0]))
c.set(byteArray, 1)
ws.send(c)
}
}
return {
sendMessage: (msgBytes) => {
if (!ws || ws.readyState === ws.CONNECTING) {
sendQueue.push(msgBytes)
} else {
sendToWebsocket(msgBytes)
}
},
finishSend: () => {
if (!ws || ws.readyState === ws.CONNECTING) {
sendQueue.push(WebsocketSignal.FINISH_SEND)
} else {
sendToWebsocket(WebsocketSignal.FINISH_SEND)
}
},
start: (metadata) => {
ws = new WebSocket(webSocketAddress, ['grpc-websockets'], options)
ws.binaryType = 'arraybuffer'
ws.onopen = function () {
options.debug && debug('websocketRequest.onopen')
ws.send(headersToBytes(metadata))
// send any messages that were passed to sendMessage before the connection was ready
sendQueue.forEach(toSend => {
sendToWebsocket(toSend)
})
sendQueue = []
}
ws.onclose = function (closeEvent) {
options.onEnd()
}
ws.onerror = function (error) {
options.debug && debug('websocketRequest.onerror', error)
}
ws.onmessage = function (e) {
options.onChunk(new Uint8Array(e.data, 0, e.data.byteLength))
}
},
cancel: () => {
ws.close()
}
}
}
function constructWebSocketAddress (url) {
if (url.startsWith('wss://') || url.startsWith('ws://')) {
return url
} else if (url.substr(0, 8) === 'https://') {
return `wss://${url.substr(8)}`
} else if (url.substr(0, 7) === 'http://') {
return `ws://${url.substr(7)}`
}
throw new Error('Websocket transport url must start with ws:// or wss:// or http:// or https://')
}
function headersToBytes (headers) {
let asString = ''
headers.forEach((key, values) => {
asString += `${key}: ${values.join(', ')}\r\n`
})
return encodeASCII(asString)
}
function encodeASCII (input) {
const encoded = new Uint8Array(input.length)
for (let i = 0; i !== input.length; ++i) {
const charCode = input.charCodeAt(i)
if (!isValidHeaderAscii(charCode)) {
throw new Error('Metadata contains invalid ASCII')
}
encoded[i] = charCode
}
return encoded
}
const isAllowedControlChars = (char) => char === 0x9 || char === 0xa || char === 0xd
function isValidHeaderAscii (val) {
return isAllowedControlChars(val) || (val >= 0x20 && val <= 0x7e)
}
module.exports = WebsocketTransport