This repository was archived by the owner on Feb 12, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 1.2k
/
Copy pathbidi-to-duplex.js
76 lines (64 loc) · 1.93 KB
/
bidi-to-duplex.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
'use strict'
const pushable = require('it-pushable')
const errCode = require('err-code')
const toHeaders = require('./to-headers')
const transport = require('../grpc/transport')
async function sendMessages (service, client, source) {
for await (const obj of source) {
client.send({
serializeBinary: () => service.requestType.serializeBinary(obj)
})
}
}
/**
* Bidirectional streams are many-to-many operations so returns a sink
* for the caller to write client messages into and a source to read
* server messages from.
*
* @param {object} grpc - an @improbable-eng/grpc-web instance
* @param {object} service - an @improbable-eng/grpc-web service
* @param {object} options - RPC options
* @param {string} options.host - The remote host
* @param {boolean} [options.debug] - Whether to print debug messages
* @param {object} [options.metadata] - Metadata sent as headers
* @param {import('http').Agent} [options.agent] - http.Agent used to control HTTP client behaviour (node.js only)
* @returns {{ source: AsyncIterable<object>, sink: { push: Function, end: Function } }}
**/
module.exports = function bidiToDuplex (grpc, service, options) {
// @ts-ignore
const source = pushable()
// @ts-ignore
const sink = pushable()
const client = grpc.client(service, {
...options,
transport: transport({
agent: options.agent
})
})
client.onMessage(message => {
sink.push(message)
})
client.onEnd((status, message, trailers) => {
let err
if (status) {
const error = new Error(message)
err = errCode(error, trailers.get('grpc-code')[0], {
status
})
err.stack = trailers.get('grpc-stack')[0] || error.stack
}
sink.end(err)
})
sendMessages(service, client, source)
.catch(err => {
sink.end(err)
})
.finally(() => {
client.finishSend()
})
client.start(toHeaders(options.metadata))
return {
sink: source,
source: sink
}
}