Skip to content

Commit

Permalink
fix(grpc-transport): reuse grpc-js client
Browse files Browse the repository at this point in the history
  • Loading branch information
Denis Goncharenko committed Jun 22, 2021
1 parent e24eacb commit 6ec85cf
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 18 deletions.
4 changes: 4 additions & 0 deletions packages/grpc-transport/src/grpc-options.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ export interface GrpcOptions extends RpcOptions {
*/
clientOptions?: ClientOptions;

}

export interface GrpcCallOptions extends RpcOptions {

/**
* This option can be provided when calling a client method.
* The CallOptions are passed to request factory method of the
Expand Down
44 changes: 26 additions & 18 deletions packages/grpc-transport/src/grpc-transport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import {
ServerStreamingCall,
UnaryCall
} from "@protobuf-ts/runtime-rpc";
import {GrpcOptions} from "./grpc-options";
import {GrpcCallOptions, GrpcOptions} from "./grpc-options";
import {CallOptions, Client, ClientWritableStream, Metadata, status as GrpcStatus} from "@grpc/grpc-js";
import {assert} from "@protobuf-ts/runtime";
import {metadataFromGrpc, isServiceError, metadataToGrpc} from "./util";
Expand All @@ -25,16 +25,23 @@ export class GrpcTransport implements RpcTransport {


private readonly defaultOptions: GrpcOptions;
private readonly client: Client;

constructor(defaultOptions: GrpcOptions) {
this.defaultOptions = defaultOptions;

this.client = new Client(
defaultOptions.host,
defaultOptions.channelCredentials,
defaultOptions.clientOptions
);
}

mergeOptions(options?: Partial<RpcOptions>): RpcOptions {
return mergeRpcOptions(this.defaultOptions, options);
}

private pickCallOptions(options: GrpcOptions): CallOptions {
private pickCallOptions(options: GrpcCallOptions): CallOptions {
if (options.callOptions) {
return options.callOptions;
}
Expand All @@ -43,22 +50,21 @@ export class GrpcTransport implements RpcTransport {
};
}

unary<I extends object, O extends object>(method: MethodInfo<I, O>, input: I, options: RpcOptions): UnaryCall<I, O> {
unary<I extends object, O extends object>(method: MethodInfo<I, O>, input: I, options: GrpcCallOptions): UnaryCall<I, O> {

const opt = options as GrpcOptions,
const opt = options,
meta = opt.meta ?? {},
gMeta = metadataToGrpc(meta, new Metadata({
idempotentRequest: method.idempotency === "IDEMPOTENT"
})),
client = new Client(opt.host, opt.channelCredentials, opt.clientOptions),
defHeader = new Deferred<RpcMetadata>(),
defMessage = new Deferred<O>(),
defStatus = new Deferred<RpcStatus>(),
defTrailer = new Deferred<RpcMetadata>(),
call = new UnaryCall<I, O>(method, meta, input, defHeader.promise, defMessage.promise, defStatus.promise, defTrailer.promise)
;

const gCall = client.makeUnaryRequest<I, O>(
const gCall = this.client.makeUnaryRequest<I, O>(
`/${method.service.typeName}/${method.name}`,
(value: I): Buffer => Buffer.from(method.I.toBinary(value, opt.binaryOptions)),
(value: Buffer): O => method.O.fromBinary(value, opt.binaryOptions),
Expand Down Expand Up @@ -111,22 +117,21 @@ export class GrpcTransport implements RpcTransport {
}


serverStreaming<I extends object, O extends object>(method: MethodInfo<I, O>, input: I, options: RpcOptions): ServerStreamingCall<I, O> {
serverStreaming<I extends object, O extends object>(method: MethodInfo<I, O>, input: I, options: GrpcCallOptions): ServerStreamingCall<I, O> {

const opt = options as GrpcOptions,
const opt = options,
meta = opt.meta ?? {},
gMeta = metadataToGrpc(meta, new Metadata({
idempotentRequest: method.idempotency === "IDEMPOTENT"
})),
client = new Client(opt.host, opt.channelCredentials, opt.clientOptions),
defHeader = new Deferred<RpcMetadata>(),
outStream = new RpcOutputStreamController<O>(),
defStatus = new Deferred<RpcStatus>(),
defTrailer = new Deferred<RpcMetadata>(),
call = new ServerStreamingCall<I, O>(method, meta, input, defHeader.promise, outStream, defStatus.promise, defTrailer.promise)
;

const gCall = client.makeServerStreamRequest<I, O>(
const gCall = this.client.makeServerStreamRequest<I, O>(
`/${method.service.typeName}/${method.name}`,
(value: I): Buffer => Buffer.from(method.I.toBinary(value, opt.binaryOptions)),
(value: Buffer): O => method.O.fromBinary(value, opt.binaryOptions),
Expand Down Expand Up @@ -178,19 +183,18 @@ export class GrpcTransport implements RpcTransport {
}


clientStreaming<I extends object, O extends object>(method: MethodInfo<I, O>, options: RpcOptions): ClientStreamingCall<I, O> {
clientStreaming<I extends object, O extends object>(method: MethodInfo<I, O>, options: GrpcCallOptions): ClientStreamingCall<I, O> {

const opt = options as GrpcOptions,
const opt = options,
meta = opt.meta ?? {},
gMeta = metadataToGrpc(meta, new Metadata({
idempotentRequest: method.idempotency === "IDEMPOTENT"
})),
client = new Client(opt.host, opt.channelCredentials, opt.clientOptions),
defHeader = new Deferred<RpcMetadata>(),
defMessage = new Deferred<O>(),
defStatus = new Deferred<RpcStatus>(),
defTrailer = new Deferred<RpcMetadata>(),
gCall = client.makeClientStreamRequest<I, O>(
gCall = this.client.makeClientStreamRequest<I, O>(
`/${method.service.typeName}/${method.name}`,
(value: I): Buffer => Buffer.from(method.I.toBinary(value, opt.binaryOptions)),
(value: Buffer): O => method.O.fromBinary(value, opt.binaryOptions),
Expand Down Expand Up @@ -235,19 +239,18 @@ export class GrpcTransport implements RpcTransport {
}


duplex<I extends object, O extends object>(method: MethodInfo<I, O>, options: RpcOptions): DuplexStreamingCall<I, O> {
duplex<I extends object, O extends object>(method: MethodInfo<I, O>, options: GrpcCallOptions): DuplexStreamingCall<I, O> {

const opt = options as GrpcOptions,
const opt = options,
meta = opt.meta ?? {},
gMeta = metadataToGrpc(meta, new Metadata({
idempotentRequest: method.idempotency === "IDEMPOTENT"
})),
client = new Client(opt.host, opt.channelCredentials, opt.clientOptions),
defHeader = new Deferred<RpcMetadata>(),
outStream = new RpcOutputStreamController<O>(),
defStatus = new Deferred<RpcStatus>(),
defTrailer = new Deferred<RpcMetadata>(),
gCall = client.makeBidiStreamRequest<I, O>(
gCall = this.client.makeBidiStreamRequest<I, O>(
`/${method.service.typeName}/${method.name}`,
(value: I): Buffer => Buffer.from(method.I.toBinary(value, opt.binaryOptions)),
(value: Buffer): O => method.O.fromBinary(value, opt.binaryOptions),
Expand Down Expand Up @@ -300,6 +303,11 @@ export class GrpcTransport implements RpcTransport {
return call;
}


close(): void {
this.client.close();
}

}


Expand Down

0 comments on commit 6ec85cf

Please sign in to comment.