diff --git a/packages/grpc-transport/src/grpc-options.ts b/packages/grpc-transport/src/grpc-options.ts index 51b99589..4c187b54 100644 --- a/packages/grpc-transport/src/grpc-options.ts +++ b/packages/grpc-transport/src/grpc-options.ts @@ -25,6 +25,10 @@ export interface GrpcOptions extends RpcOptions { */ clientOptions?: ClientOptions; +} + +export interface GrpcCallOptions extends RpcOptions { + /** * This option can be provided when calling a client method. * The CallOptions are passed to request factory method of the diff --git a/packages/grpc-transport/src/grpc-transport.ts b/packages/grpc-transport/src/grpc-transport.ts index 5b925888..3a1d978d 100644 --- a/packages/grpc-transport/src/grpc-transport.ts +++ b/packages/grpc-transport/src/grpc-transport.ts @@ -15,7 +15,7 @@ import { ServerStreamingCall, UnaryCall } from "@protobuf-ts/runtime-rpc"; -import {GrpcOptions} from "./grpc-options"; +import {GrpcCallOptions, GrpcOptions} from "./grpc-options"; import {CallOptions, Client, ClientWritableStream, Metadata, status as GrpcStatus} from "@grpc/grpc-js"; import {assert} from "@protobuf-ts/runtime"; import {metadataFromGrpc, isServiceError, metadataToGrpc} from "./util"; @@ -25,16 +25,23 @@ export class GrpcTransport implements RpcTransport { private readonly defaultOptions: GrpcOptions; + private readonly client: Client; constructor(defaultOptions: GrpcOptions) { this.defaultOptions = defaultOptions; + + this.client = new Client( + defaultOptions.host, + defaultOptions.channelCredentials, + defaultOptions.clientOptions + ); } mergeOptions(options?: Partial): RpcOptions { return mergeRpcOptions(this.defaultOptions, options); } - private pickCallOptions(options: GrpcOptions): CallOptions { + private pickCallOptions(options: GrpcCallOptions): CallOptions { if (options.callOptions) { return options.callOptions; } @@ -43,14 +50,13 @@ export class GrpcTransport implements RpcTransport { }; } - unary(method: MethodInfo, input: I, options: RpcOptions): UnaryCall { + unary(method: MethodInfo, input: I, options: GrpcCallOptions): UnaryCall { - const opt = options as GrpcOptions, + const opt = options, meta = opt.meta ?? {}, gMeta = metadataToGrpc(meta, new Metadata({ idempotentRequest: method.idempotency === "IDEMPOTENT" })), - client = new Client(opt.host, opt.channelCredentials, opt.clientOptions), defHeader = new Deferred(), defMessage = new Deferred(), defStatus = new Deferred(), @@ -58,7 +64,7 @@ export class GrpcTransport implements RpcTransport { call = new UnaryCall(method, meta, input, defHeader.promise, defMessage.promise, defStatus.promise, defTrailer.promise) ; - const gCall = client.makeUnaryRequest( + const gCall = this.client.makeUnaryRequest( `/${method.service.typeName}/${method.name}`, (value: I): Buffer => Buffer.from(method.I.toBinary(value, opt.binaryOptions)), (value: Buffer): O => method.O.fromBinary(value, opt.binaryOptions), @@ -111,14 +117,13 @@ export class GrpcTransport implements RpcTransport { } - serverStreaming(method: MethodInfo, input: I, options: RpcOptions): ServerStreamingCall { + serverStreaming(method: MethodInfo, input: I, options: GrpcCallOptions): ServerStreamingCall { - const opt = options as GrpcOptions, + const opt = options, meta = opt.meta ?? {}, gMeta = metadataToGrpc(meta, new Metadata({ idempotentRequest: method.idempotency === "IDEMPOTENT" })), - client = new Client(opt.host, opt.channelCredentials, opt.clientOptions), defHeader = new Deferred(), outStream = new RpcOutputStreamController(), defStatus = new Deferred(), @@ -126,7 +131,7 @@ export class GrpcTransport implements RpcTransport { call = new ServerStreamingCall(method, meta, input, defHeader.promise, outStream, defStatus.promise, defTrailer.promise) ; - const gCall = client.makeServerStreamRequest( + const gCall = this.client.makeServerStreamRequest( `/${method.service.typeName}/${method.name}`, (value: I): Buffer => Buffer.from(method.I.toBinary(value, opt.binaryOptions)), (value: Buffer): O => method.O.fromBinary(value, opt.binaryOptions), @@ -178,19 +183,18 @@ export class GrpcTransport implements RpcTransport { } - clientStreaming(method: MethodInfo, options: RpcOptions): ClientStreamingCall { + clientStreaming(method: MethodInfo, options: GrpcCallOptions): ClientStreamingCall { - const opt = options as GrpcOptions, + const opt = options, meta = opt.meta ?? {}, gMeta = metadataToGrpc(meta, new Metadata({ idempotentRequest: method.idempotency === "IDEMPOTENT" })), - client = new Client(opt.host, opt.channelCredentials, opt.clientOptions), defHeader = new Deferred(), defMessage = new Deferred(), defStatus = new Deferred(), defTrailer = new Deferred(), - gCall = client.makeClientStreamRequest( + gCall = this.client.makeClientStreamRequest( `/${method.service.typeName}/${method.name}`, (value: I): Buffer => Buffer.from(method.I.toBinary(value, opt.binaryOptions)), (value: Buffer): O => method.O.fromBinary(value, opt.binaryOptions), @@ -235,19 +239,18 @@ export class GrpcTransport implements RpcTransport { } - duplex(method: MethodInfo, options: RpcOptions): DuplexStreamingCall { + duplex(method: MethodInfo, options: GrpcCallOptions): DuplexStreamingCall { - const opt = options as GrpcOptions, + const opt = options, meta = opt.meta ?? {}, gMeta = metadataToGrpc(meta, new Metadata({ idempotentRequest: method.idempotency === "IDEMPOTENT" })), - client = new Client(opt.host, opt.channelCredentials, opt.clientOptions), defHeader = new Deferred(), outStream = new RpcOutputStreamController(), defStatus = new Deferred(), defTrailer = new Deferred(), - gCall = client.makeBidiStreamRequest( + gCall = this.client.makeBidiStreamRequest( `/${method.service.typeName}/${method.name}`, (value: I): Buffer => Buffer.from(method.I.toBinary(value, opt.binaryOptions)), (value: Buffer): O => method.O.fromBinary(value, opt.binaryOptions), @@ -300,6 +303,11 @@ export class GrpcTransport implements RpcTransport { return call; } + + close(): void { + this.client.close(); + } + }