From 22a70c544bac11a455035b16ab349bd6d3fa3556 Mon Sep 17 00:00:00 2001 From: Bogdan Drutu Date: Wed, 24 Jun 2020 09:28:30 -0700 Subject: [PATCH] Add a generic grpc server settings config, cleanup client config Signed-off-by: Bogdan Drutu --- config/configgrpc/configgrpc.go | 125 +++++++++++++++++--- config/configgrpc/configgrpc_test.go | 36 +++--- exporter/jaegerexporter/exporter.go | 3 +- exporter/jaegerexporter/exporter_test.go | 24 ++-- exporter/opencensusexporter/config_test.go | 2 +- exporter/opencensusexporter/factory.go | 8 +- exporter/opencensusexporter/factory_test.go | 4 +- exporter/otlpexporter/config_test.go | 2 +- exporter/otlpexporter/exporter.go | 3 +- exporter/otlpexporter/factory_test.go | 4 +- receiver/jaegerreceiver/trace_receiver.go | 2 +- receiver/opencensusreceiver/config.go | 38 +----- receiver/opencensusreceiver/config_test.go | 99 +++++++++------- receiver/opencensusreceiver/factory.go | 7 +- receiver/opencensusreceiver/factory_test.go | 69 +++++++---- receiver/otlpreceiver/config.go | 95 +-------------- receiver/otlpreceiver/config_test.go | 105 +++++++++------- receiver/otlpreceiver/factory.go | 7 +- receiver/otlpreceiver/factory_test.go | 70 +++++++---- 19 files changed, 375 insertions(+), 328 deletions(-) diff --git a/config/configgrpc/configgrpc.go b/config/configgrpc/configgrpc.go index 915883839975..dd4300632a5b 100644 --- a/config/configgrpc/configgrpc.go +++ b/config/configgrpc/configgrpc.go @@ -24,6 +24,7 @@ import ( "google.golang.org/grpc/encoding/gzip" "google.golang.org/grpc/keepalive" + "go.opentelemetry.io/collector/config/configprotocol" "go.opentelemetry.io/collector/config/configtls" ) @@ -33,6 +34,21 @@ const ( CompressionGzip = "gzip" ) +var ( + // Map of opentelemetry compression types to grpc registered compression types + grpcCompressionKeyMap = map[string]string{ + CompressionGzip: gzip.Name, + } +) + +// KeepaliveClientConfig exposes the keepalive.ClientParameters to be used by the exporter. +// Refer to the original data-structure for the meaning of each parameter. +type KeepaliveClientConfig struct { + Time time.Duration `mapstructure:"time,omitempty"` + Timeout time.Duration `mapstructure:"timeout,omitempty"` + PermitWithoutStream bool `mapstructure:"permit_without_stream,omitempty"` +} + // GRPCClientSettings defines common settings for a gRPC client configuration. type GRPCClientSettings struct { // The headers associated with gRPC requests. @@ -52,23 +68,54 @@ type GRPCClientSettings struct { // The keepalive parameters for client gRPC. See grpc.WithKeepaliveParams // (https://godoc.org/google.golang.org/grpc#WithKeepaliveParams). - KeepaliveParameters *KeepaliveConfig `mapstructure:"keepalive"` + Keepalive *KeepaliveClientConfig `mapstructure:"keepalive"` // WaitForReady parameter configures client to wait for ready state before sending data. // (https://github.com/grpc/grpc/blob/master/doc/wait-for-ready.md) WaitForReady bool `mapstructure:"wait_for_ready"` } -// KeepaliveConfig exposes the keepalive.ClientParameters to be used by the exporter. -// Refer to the original data-structure for the meaning of each parameter. -type KeepaliveConfig struct { - Time time.Duration `mapstructure:"time,omitempty"` - Timeout time.Duration `mapstructure:"timeout,omitempty"` +type KeepaliveServerConfig struct { + ServerParameters *KeepaliveServerParameters `mapstructure:"server_parameters,omitempty"` + EnforcementPolicy *KeepaliveEnforcementPolicy `mapstructure:"enforcement_policy,omitempty"` +} + +// keepaliveServerParameters allow configuration of the keepalive.ServerParameters. +// The same default values as keepalive.ServerParameters are applicable and get applied by the server. +// See https://godoc.org/google.golang.org/grpc/keepalive#ServerParameters for details. +type KeepaliveServerParameters struct { + MaxConnectionIdle time.Duration `mapstructure:"max_connection_idle,omitempty"` + MaxConnectionAge time.Duration `mapstructure:"max_connection_age,omitempty"` + MaxConnectionAgeGrace time.Duration `mapstructure:"max_connection_age_grace,omitempty"` + Time time.Duration `mapstructure:"time,omitempty"` + Timeout time.Duration `mapstructure:"timeout,omitempty"` +} + +// keepaliveEnforcementPolicy allow configuration of the keepalive.EnforcementPolicy. +// The same default values as keepalive.EnforcementPolicy are applicable and get applied by the server. +// See https://godoc.org/google.golang.org/grpc/keepalive#EnforcementPolicy for details. +type KeepaliveEnforcementPolicy struct { + MinTime time.Duration `mapstructure:"min_time,omitempty"` PermitWithoutStream bool `mapstructure:"permit_without_stream,omitempty"` } -// GrpcSettingsToDialOptions maps configgrpc.GRPCClientSettings to a slice of dial options for gRPC -func GrpcSettingsToDialOptions(settings GRPCClientSettings) ([]grpc.DialOption, error) { +type GRPCServerSettings struct { + // Configures the generic server protocol. + configprotocol.ProtocolServerSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct + + // MaxRecvMsgSizeMiB sets the maximum size (in MiB) of messages accepted by the server. + MaxRecvMsgSizeMiB uint64 `mapstructure:"max_recv_msg_size_mib,omitempty"` + + // MaxConcurrentStreams sets the limit on the number of concurrent streams to each ServerTransport. + // TODO(nilebox): This setting affecting HTTP/2 streams need to be tested + MaxConcurrentStreams uint32 `mapstructure:"max_concurrent_streams,omitempty"` + + // Keepalive anchor for all the settings related to keepalive. + Keepalive *KeepaliveServerConfig `mapstructure:"keepalive,omitempty"` +} + +// ToDialOptions maps configgrpc.GRPCClientSettings to a slice of dial options for gRPC +func (settings *GRPCClientSettings) ToDialOptions() ([]grpc.DialOption, error) { opts := []grpc.DialOption{} if settings.Compression != "" { @@ -85,11 +132,11 @@ func GrpcSettingsToDialOptions(settings GRPCClientSettings) ([]grpc.DialOption, } opts = append(opts, tlsDialOption) - if settings.KeepaliveParameters != nil { + if settings.Keepalive != nil { keepAliveOption := grpc.WithKeepaliveParams(keepalive.ClientParameters{ - Time: settings.KeepaliveParameters.Time, - Timeout: settings.KeepaliveParameters.Timeout, - PermitWithoutStream: settings.KeepaliveParameters.PermitWithoutStream, + Time: settings.Keepalive.Time, + Timeout: settings.Keepalive.Timeout, + PermitWithoutStream: settings.Keepalive.PermitWithoutStream, }) opts = append(opts, keepAliveOption) } @@ -97,12 +144,56 @@ func GrpcSettingsToDialOptions(settings GRPCClientSettings) ([]grpc.DialOption, return opts, nil } -var ( - // Map of opentelemetry compression types to grpc registered compression types - grpcCompressionKeyMap = map[string]string{ - CompressionGzip: gzip.Name, +// ToDialOptions maps configgrpc.GRPCServerSettings to a slice of dial options for gRPC +func (gss *GRPCServerSettings) ToDialOptions() ([]grpc.ServerOption, error) { + var opts []grpc.ServerOption + + if gss.TLSCredentials != nil { + tlsOpt, err := gss.TLSCredentials.LoadgRPCTLSServerCredentials() + if err != nil { + return nil, fmt.Errorf("error initializing TLS Credentials: %v", err) + } + opts = append(opts, tlsOpt) } -) + + if gss.MaxRecvMsgSizeMiB > 0 { + opts = append(opts, grpc.MaxRecvMsgSize(int(gss.MaxRecvMsgSizeMiB*1024*1024))) + } + + if gss.MaxConcurrentStreams > 0 { + opts = append(opts, grpc.MaxConcurrentStreams(gss.MaxConcurrentStreams)) + } + + // The default values referenced in the GRPC docs are set within the server, so this code doesn't need + // to apply them over zero/nil values before passing these as grpc.ServerOptions. + // The following shows the server code for applying default grpc.ServerOptions. + // https://sourcegraph.com/github.com/grpc/grpc-go@120728e1f775e40a2a764341939b78d666b08260/-/blob/internal/transport/http2_server.go#L184-200 + if gss.Keepalive != nil { + if gss.Keepalive.ServerParameters != nil { + svrParams := gss.Keepalive.ServerParameters + opts = append(opts, grpc.KeepaliveParams(keepalive.ServerParameters{ + MaxConnectionIdle: svrParams.MaxConnectionIdle, + MaxConnectionAge: svrParams.MaxConnectionAge, + MaxConnectionAgeGrace: svrParams.MaxConnectionAgeGrace, + Time: svrParams.Time, + Timeout: svrParams.Timeout, + })) + } + // The default values referenced in the GRPC are set within the server, so this code doesn't need + // to apply them over zero/nil values before passing these as grpc.ServerOptions. + // The following shows the server code for applying default grpc.ServerOptions. + // https://sourcegraph.com/github.com/grpc/grpc-go@120728e1f775e40a2a764341939b78d666b08260/-/blob/internal/transport/http2_server.go#L202-205 + if gss.Keepalive.EnforcementPolicy != nil { + enfPol := gss.Keepalive.EnforcementPolicy + opts = append(opts, grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{ + MinTime: enfPol.MinTime, + PermitWithoutStream: enfPol.PermitWithoutStream, + })) + } + } + + return opts, nil +} // GetGRPCCompressionKey returns the grpc registered compression key if the // passed in compression key is supported, and CompressionUnsupported otherwise diff --git a/config/configgrpc/configgrpc_test.go b/config/configgrpc/configgrpc_test.go index a98114d64a65..91ce48d3b202 100644 --- a/config/configgrpc/configgrpc_test.go +++ b/config/configgrpc/configgrpc_test.go @@ -23,13 +23,13 @@ import ( ) func TestBasicGrpcSettings(t *testing.T) { - - _, err := GrpcSettingsToDialOptions(GRPCClientSettings{ - Headers: nil, - Endpoint: "", - Compression: "", - KeepaliveParameters: nil, - }) + gcs := &GRPCClientSettings{ + Headers: nil, + Endpoint: "", + Compression: "", + Keepalive: nil, + } + _, err := gcs.ToDialOptions() assert.NoError(t, err) } @@ -52,7 +52,7 @@ func TestInvalidPemFile(t *testing.T) { Insecure: false, ServerName: "", }, - KeepaliveParameters: nil, + Keepalive: nil, }, }, { @@ -68,27 +68,27 @@ func TestInvalidPemFile(t *testing.T) { Insecure: false, ServerName: "", }, - KeepaliveParameters: nil, + Keepalive: nil, }, }, } for _, test := range tests { t.Run(test.err, func(t *testing.T) { - _, err := GrpcSettingsToDialOptions(test.settings) + _, err := test.settings.ToDialOptions() assert.Regexp(t, test.err, err) }) } } func TestUseSecure(t *testing.T) { - dialOpts, err := GrpcSettingsToDialOptions(GRPCClientSettings{ - Headers: nil, - Endpoint: "", - Compression: "", - TLSSetting: configtls.TLSClientSetting{}, - KeepaliveParameters: nil, - }) - + gcs := &GRPCClientSettings{ + Headers: nil, + Endpoint: "", + Compression: "", + TLSSetting: configtls.TLSClientSetting{}, + Keepalive: nil, + } + dialOpts, err := gcs.ToDialOptions() assert.NoError(t, err) assert.Equal(t, len(dialOpts), 1) } diff --git a/exporter/jaegerexporter/exporter.go b/exporter/jaegerexporter/exporter.go index 89c19d97a76a..0409ca5c0543 100644 --- a/exporter/jaegerexporter/exporter.go +++ b/exporter/jaegerexporter/exporter.go @@ -22,7 +22,6 @@ import ( "google.golang.org/grpc/metadata" "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/config/configgrpc" "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/consumer/pdata" "go.opentelemetry.io/collector/exporter/exporterhelper" @@ -34,7 +33,7 @@ import ( // The collectorEndpoint should be of the form "hostname:14250" (a gRPC target). func New(config *Config) (component.TraceExporter, error) { - opts, err := configgrpc.GrpcSettingsToDialOptions(config.GRPCClientSettings) + opts, err := config.GRPCClientSettings.ToDialOptions() if err != nil { return nil, err } diff --git a/exporter/jaegerexporter/exporter_test.go b/exporter/jaegerexporter/exporter_test.go index ee95fa58299f..696b18aa4882 100644 --- a/exporter/jaegerexporter/exporter_test.go +++ b/exporter/jaegerexporter/exporter_test.go @@ -56,7 +56,7 @@ func TestNew(t *testing.T) { TLSSetting: configtls.TLSClientSetting{ Insecure: true, }, - KeepaliveParameters: nil, + Keepalive: nil, }, }, }, @@ -66,10 +66,10 @@ func TestNew(t *testing.T) { args: args{ config: Config{ GRPCClientSettings: configgrpc.GRPCClientSettings{ - Headers: map[string]string{"extra-header": "header-value"}, - Endpoint: "foo.bar", - Compression: "", - KeepaliveParameters: nil, + Headers: map[string]string{"extra-header": "header-value"}, + Endpoint: "foo.bar", + Compression: "", + Keepalive: nil, }, }, }, @@ -79,10 +79,10 @@ func TestNew(t *testing.T) { args: args{ config: Config{ GRPCClientSettings: configgrpc.GRPCClientSettings{ - Headers: nil, - Endpoint: "foo.bar", - Compression: "", - KeepaliveParameters: nil, + Headers: nil, + Endpoint: "foo.bar", + Compression: "", + Keepalive: nil, }, }, }, @@ -101,7 +101,7 @@ func TestNew(t *testing.T) { }, Insecure: false, }, - KeepaliveParameters: nil, + Keepalive: nil, }, }, }, @@ -121,7 +121,7 @@ func TestNew(t *testing.T) { Insecure: false, ServerName: "", }, - KeepaliveParameters: &configgrpc.KeepaliveConfig{ + Keepalive: &configgrpc.KeepaliveClientConfig{ Time: 0, Timeout: 0, PermitWithoutStream: false, @@ -144,7 +144,7 @@ func TestNew(t *testing.T) { }, Insecure: false, }, - KeepaliveParameters: nil, + Keepalive: nil, }, }, }, diff --git a/exporter/opencensusexporter/config_test.go b/exporter/opencensusexporter/config_test.go index a85139cdff3d..f8bd024a0ce9 100644 --- a/exporter/opencensusexporter/config_test.go +++ b/exporter/opencensusexporter/config_test.go @@ -62,7 +62,7 @@ func TestLoadConfig(t *testing.T) { }, Insecure: false, }, - KeepaliveParameters: &configgrpc.KeepaliveConfig{ + Keepalive: &configgrpc.KeepaliveClientConfig{ Time: 20, PermitWithoutStream: true, Timeout: 30, diff --git a/exporter/opencensusexporter/factory.go b/exporter/opencensusexporter/factory.go index 2919c7d74cea..f1132cd71597 100644 --- a/exporter/opencensusexporter/factory.go +++ b/exporter/opencensusexporter/factory.go @@ -110,11 +110,11 @@ func (f *Factory) OCAgentOptions(logger *zap.Logger, ocac *Config) ([]ocagent.Ex if ocac.ReconnectionDelay > 0 { opts = append(opts, ocagent.WithReconnectionPeriod(ocac.ReconnectionDelay)) } - if ocac.KeepaliveParameters != nil { + if ocac.Keepalive != nil { opts = append(opts, ocagent.WithGRPCDialOption(grpc.WithKeepaliveParams(keepalive.ClientParameters{ - Time: ocac.KeepaliveParameters.Time, - Timeout: ocac.KeepaliveParameters.Timeout, - PermitWithoutStream: ocac.KeepaliveParameters.PermitWithoutStream, + Time: ocac.Keepalive.Time, + Timeout: ocac.Keepalive.Timeout, + PermitWithoutStream: ocac.Keepalive.PermitWithoutStream, }))) } return opts, nil diff --git a/exporter/opencensusexporter/factory_test.go b/exporter/opencensusexporter/factory_test.go index d0ef4fc9a2c8..ec059b603482 100644 --- a/exporter/opencensusexporter/factory_test.go +++ b/exporter/opencensusexporter/factory_test.go @@ -104,11 +104,11 @@ func TestCreateTraceExporter(t *testing.T) { }, }, { - name: "KeepaliveParameters", + name: "Keepalive", config: Config{ GRPCClientSettings: configgrpc.GRPCClientSettings{ Endpoint: rcvCfg.Endpoint, - KeepaliveParameters: &configgrpc.KeepaliveConfig{ + Keepalive: &configgrpc.KeepaliveClientConfig{ Time: 30 * time.Second, Timeout: 25 * time.Second, PermitWithoutStream: true, diff --git a/exporter/otlpexporter/config_test.go b/exporter/otlpexporter/config_test.go index 4dacabe1b8e4..9df0859cd191 100644 --- a/exporter/otlpexporter/config_test.go +++ b/exporter/otlpexporter/config_test.go @@ -63,7 +63,7 @@ func TestLoadConfig(t *testing.T) { }, Insecure: false, }, - KeepaliveParameters: &configgrpc.KeepaliveConfig{ + Keepalive: &configgrpc.KeepaliveClientConfig{ Time: 20 * time.Second, PermitWithoutStream: true, Timeout: 30 * time.Second, diff --git a/exporter/otlpexporter/exporter.go b/exporter/otlpexporter/exporter.go index b2833dd52bef..98cd968f5a2d 100644 --- a/exporter/otlpexporter/exporter.go +++ b/exporter/otlpexporter/exporter.go @@ -27,7 +27,6 @@ import ( "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" - "go.opentelemetry.io/collector/config/configgrpc" otlpmetriccol "go.opentelemetry.io/collector/internal/data/opentelemetry-proto-gen/collector/metrics/v1" otlptracecol "go.opentelemetry.io/collector/internal/data/opentelemetry-proto-gen/collector/trace/v1" otlplogcol "go.opentelemetry.io/collector/internal/data/opentelemetry-proto-gen/logs/v1" @@ -62,7 +61,7 @@ func newExporter(config *Config) (*exporterImp, error) { e.config = config var err error - e.dialOpts, err = configgrpc.GrpcSettingsToDialOptions(e.config.GRPCClientSettings) + e.dialOpts, err = e.config.GRPCClientSettings.ToDialOptions() if err != nil { return nil, err } diff --git a/exporter/otlpexporter/factory_test.go b/exporter/otlpexporter/factory_test.go index d5e533150105..52164d7d11fa 100644 --- a/exporter/otlpexporter/factory_test.go +++ b/exporter/otlpexporter/factory_test.go @@ -77,11 +77,11 @@ func TestCreateTraceExporter(t *testing.T) { }, }, { - name: "KeepaliveParameters", + name: "Keepalive", config: Config{ GRPCClientSettings: configgrpc.GRPCClientSettings{ Endpoint: endpoint, - KeepaliveParameters: &configgrpc.KeepaliveConfig{ + Keepalive: &configgrpc.KeepaliveClientConfig{ Time: 30 * time.Second, Timeout: 25 * time.Second, PermitWithoutStream: true, diff --git a/receiver/jaegerreceiver/trace_receiver.go b/receiver/jaegerreceiver/trace_receiver.go index bce2bd63413a..09dafc2ba558 100644 --- a/receiver/jaegerreceiver/trace_receiver.go +++ b/receiver/jaegerreceiver/trace_receiver.go @@ -395,7 +395,7 @@ func (jr *jReceiver) startAgent(_ component.Host) error { // Start upstream grpc client before serving sampling endpoints over HTTP if jr.config.RemoteSamplingClientSettings.Endpoint != "" { - grpcOpts, err := configgrpc.GrpcSettingsToDialOptions(jr.config.RemoteSamplingClientSettings) + grpcOpts, err := jr.config.RemoteSamplingClientSettings.ToDialOptions() if err != nil { jr.logger.Error("Error creating grpc dial options for remote sampling endpoint", zap.Error(err)) return err diff --git a/receiver/opencensusreceiver/config.go b/receiver/opencensusreceiver/config.go index 5777ef0589bd..5acc1d9e203e 100644 --- a/receiver/opencensusreceiver/config.go +++ b/receiver/opencensusreceiver/config.go @@ -16,13 +16,12 @@ package opencensusreceiver import ( "fmt" - "time" "google.golang.org/grpc" "google.golang.org/grpc/keepalive" + "go.opentelemetry.io/collector/config/configgrpc" "go.opentelemetry.io/collector/config/configmodels" - "go.opentelemetry.io/collector/config/configprotocol" ) // Config defines configuration for OpenCensus receiver. @@ -30,7 +29,7 @@ type Config struct { configmodels.ReceiverSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct // Configures the receiver server protocol. - configprotocol.ProtocolServerSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct + configgrpc.GRPCServerSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct // Transport to use: one of tcp or unix, defaults to tcp Transport string `mapstructure:"transport"` @@ -40,39 +39,6 @@ type Config struct { // An empty list means that CORS is not enabled at all. A wildcard (*) can be // used to match any origin or one or more characters of an origin. CorsOrigins []string `mapstructure:"cors_allowed_origins"` - - // Keepalive anchor for all the settings related to keepalive. - Keepalive *serverParametersAndEnforcementPolicy `mapstructure:"keepalive,omitempty"` - - // MaxRecvMsgSizeMiB sets the maximum size (in MiB) of messages accepted by the server. - MaxRecvMsgSizeMiB uint64 `mapstructure:"max_recv_msg_size_mib,omitempty"` - - // MaxConcurrentStreams sets the limit on the number of concurrent streams to each ServerTransport. - MaxConcurrentStreams uint32 `mapstructure:"max_concurrent_streams,omitempty"` -} - -type serverParametersAndEnforcementPolicy struct { - ServerParameters *keepaliveServerParameters `mapstructure:"server_parameters,omitempty"` - EnforcementPolicy *keepaliveEnforcementPolicy `mapstructure:"enforcement_policy,omitempty"` -} - -// keepaliveServerParameters allow configuration of the keepalive.ServerParameters. -// The same default values as keepalive.ServerParameters are applicable and get applied by the server. -// See https://godoc.org/google.golang.org/grpc/keepalive#ServerParameters for details. -type keepaliveServerParameters struct { - MaxConnectionIdle time.Duration `mapstructure:"max_connection_idle,omitempty"` - MaxConnectionAge time.Duration `mapstructure:"max_connection_age,omitempty"` - MaxConnectionAgeGrace time.Duration `mapstructure:"max_connection_age_grace,omitempty"` - Time time.Duration `mapstructure:"time,omitempty"` - Timeout time.Duration `mapstructure:"timeout,omitempty"` -} - -// keepaliveEnforcementPolicy allow configuration of the keepalive.EnforcementPolicy. -// The same default values as keepalive.EnforcementPolicy are applicable and get applied by the server. -// See https://godoc.org/google.golang.org/grpc/keepalive#EnforcementPolicy for details. -type keepaliveEnforcementPolicy struct { - MinTime time.Duration `mapstructure:"min_time,omitempty"` - PermitWithoutStream bool `mapstructure:"permit_without_stream,omitempty"` } func (rOpts *Config) buildOptions() ([]Option, error) { diff --git a/receiver/opencensusreceiver/config_test.go b/receiver/opencensusreceiver/config_test.go index 6ae0dabaa24d..89199c8033cf 100644 --- a/receiver/opencensusreceiver/config_test.go +++ b/receiver/opencensusreceiver/config_test.go @@ -23,6 +23,7 @@ import ( "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/config" + "go.opentelemetry.io/collector/config/configgrpc" "go.opentelemetry.io/collector/config/configmodels" "go.opentelemetry.io/collector/config/configprotocol" "go.opentelemetry.io/collector/config/configtls" @@ -51,9 +52,11 @@ func TestLoadConfig(t *testing.T) { TypeVal: typeStr, NameVal: "opencensus/customname", }, - ProtocolServerSettings: configprotocol.ProtocolServerSettings{ - Endpoint: "0.0.0.0:9090", - TLSCredentials: nil, + GRPCServerSettings: configgrpc.GRPCServerSettings{ + ProtocolServerSettings: configprotocol.ProtocolServerSettings{ + Endpoint: "0.0.0.0:9090", + TLSCredentials: nil, + }, }, Transport: "tcp", }) @@ -65,24 +68,26 @@ func TestLoadConfig(t *testing.T) { TypeVal: typeStr, NameVal: "opencensus/keepalive", }, - ProtocolServerSettings: configprotocol.ProtocolServerSettings{ - TLSCredentials: nil, - Endpoint: "0.0.0.0:55678", - }, - Transport: "tcp", - Keepalive: &serverParametersAndEnforcementPolicy{ - ServerParameters: &keepaliveServerParameters{ - MaxConnectionIdle: 11 * time.Second, - MaxConnectionAge: 12 * time.Second, - MaxConnectionAgeGrace: 13 * time.Second, - Time: 30 * time.Second, - Timeout: 5 * time.Second, + GRPCServerSettings: configgrpc.GRPCServerSettings{ + ProtocolServerSettings: configprotocol.ProtocolServerSettings{ + TLSCredentials: nil, + Endpoint: "0.0.0.0:55678", }, - EnforcementPolicy: &keepaliveEnforcementPolicy{ - MinTime: 10 * time.Second, - PermitWithoutStream: true, + Keepalive: &configgrpc.KeepaliveServerConfig{ + ServerParameters: &configgrpc.KeepaliveServerParameters{ + MaxConnectionIdle: 11 * time.Second, + MaxConnectionAge: 12 * time.Second, + MaxConnectionAgeGrace: 13 * time.Second, + Time: 30 * time.Second, + Timeout: 5 * time.Second, + }, + EnforcementPolicy: &configgrpc.KeepaliveEnforcementPolicy{ + MinTime: 10 * time.Second, + PermitWithoutStream: true, + }, }, }, + Transport: "tcp", }) r3 := cfg.Receivers["opencensus/msg-size-conc-connect-max-idle"].(*Config) @@ -92,18 +97,20 @@ func TestLoadConfig(t *testing.T) { TypeVal: typeStr, NameVal: "opencensus/msg-size-conc-connect-max-idle", }, - ProtocolServerSettings: configprotocol.ProtocolServerSettings{ - Endpoint: "0.0.0.0:55678", - TLSCredentials: nil, - }, - Transport: "tcp", - MaxRecvMsgSizeMiB: 32, - MaxConcurrentStreams: 16, - Keepalive: &serverParametersAndEnforcementPolicy{ - ServerParameters: &keepaliveServerParameters{ - MaxConnectionIdle: 10 * time.Second, + GRPCServerSettings: configgrpc.GRPCServerSettings{ + ProtocolServerSettings: configprotocol.ProtocolServerSettings{ + Endpoint: "0.0.0.0:55678", + TLSCredentials: nil, + }, + MaxRecvMsgSizeMiB: 32, + MaxConcurrentStreams: 16, + Keepalive: &configgrpc.KeepaliveServerConfig{ + ServerParameters: &configgrpc.KeepaliveServerParameters{ + MaxConnectionIdle: 10 * time.Second, + }, }, }, + Transport: "tcp", }) // TODO(ccaraman): Once the config loader checks for the files existence, this test may fail and require @@ -115,12 +122,14 @@ func TestLoadConfig(t *testing.T) { TypeVal: typeStr, NameVal: "opencensus/tlscredentials", }, - ProtocolServerSettings: configprotocol.ProtocolServerSettings{ - Endpoint: "0.0.0.0:55678", - TLSCredentials: &configtls.TLSServerSetting{ - TLSSetting: configtls.TLSSetting{ - CertFile: "test.crt", - KeyFile: "test.key", + GRPCServerSettings: configgrpc.GRPCServerSettings{ + ProtocolServerSettings: configprotocol.ProtocolServerSettings{ + Endpoint: "0.0.0.0:55678", + TLSCredentials: &configtls.TLSServerSetting{ + TLSSetting: configtls.TLSSetting{ + CertFile: "test.crt", + KeyFile: "test.key", + }, }, }, }, @@ -134,8 +143,10 @@ func TestLoadConfig(t *testing.T) { TypeVal: typeStr, NameVal: "opencensus/cors", }, - ProtocolServerSettings: configprotocol.ProtocolServerSettings{ - Endpoint: "0.0.0.0:55678", + GRPCServerSettings: configgrpc.GRPCServerSettings{ + ProtocolServerSettings: configprotocol.ProtocolServerSettings{ + Endpoint: "0.0.0.0:55678", + }, }, Transport: "tcp", CorsOrigins: []string{"https://*.test.com", "https://test.com"}, @@ -148,8 +159,10 @@ func TestLoadConfig(t *testing.T) { TypeVal: typeStr, NameVal: "opencensus/uds", }, - ProtocolServerSettings: configprotocol.ProtocolServerSettings{ - Endpoint: "/tmp/opencensus.sock", + GRPCServerSettings: configgrpc.GRPCServerSettings{ + ProtocolServerSettings: configprotocol.ProtocolServerSettings{ + Endpoint: "/tmp/opencensus.sock", + }, }, Transport: "unix", }) @@ -160,10 +173,12 @@ func TestBuildOptions_TLSCredentials(t *testing.T) { ReceiverSettings: configmodels.ReceiverSettings{ NameVal: "IncorrectTLS", }, - ProtocolServerSettings: configprotocol.ProtocolServerSettings{ - TLSCredentials: &configtls.TLSServerSetting{ - TLSSetting: configtls.TLSSetting{ - CertFile: "willfail", + GRPCServerSettings: configgrpc.GRPCServerSettings{ + ProtocolServerSettings: configprotocol.ProtocolServerSettings{ + TLSCredentials: &configtls.TLSServerSetting{ + TLSSetting: configtls.TLSSetting{ + CertFile: "willfail", + }, }, }, }, diff --git a/receiver/opencensusreceiver/factory.go b/receiver/opencensusreceiver/factory.go index ca7c24f88f49..dc9fac239313 100644 --- a/receiver/opencensusreceiver/factory.go +++ b/receiver/opencensusreceiver/factory.go @@ -20,6 +20,7 @@ import ( "go.uber.org/zap" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/config/configgrpc" "go.opentelemetry.io/collector/config/configmodels" "go.opentelemetry.io/collector/config/configprotocol" "go.opentelemetry.io/collector/consumer" @@ -51,8 +52,10 @@ func (f *Factory) CreateDefaultConfig() configmodels.Receiver { TypeVal: typeStr, NameVal: typeStr, }, - ProtocolServerSettings: configprotocol.ProtocolServerSettings{ - Endpoint: "0.0.0.0:55678", + GRPCServerSettings: configgrpc.GRPCServerSettings{ + ProtocolServerSettings: configprotocol.ProtocolServerSettings{ + Endpoint: "0.0.0.0:55678", + }, }, Transport: "tcp", } diff --git a/receiver/opencensusreceiver/factory_test.go b/receiver/opencensusreceiver/factory_test.go index a7d681425a34..5fb710ffe5f9 100644 --- a/receiver/opencensusreceiver/factory_test.go +++ b/receiver/opencensusreceiver/factory_test.go @@ -25,6 +25,7 @@ import ( "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/config/configcheck" + "go.opentelemetry.io/collector/config/configgrpc" "go.opentelemetry.io/collector/config/configmodels" "go.opentelemetry.io/collector/config/configprotocol" "go.opentelemetry.io/collector/exporter/exportertest" @@ -61,8 +62,10 @@ func TestCreateTraceReceiver(t *testing.T) { TypeVal: typeStr, NameVal: typeStr, } - defaultProtocolSettings := configprotocol.ProtocolServerSettings{ - Endpoint: endpoint, + defaultGRPCSettings := configgrpc.GRPCServerSettings{ + ProtocolServerSettings: configprotocol.ProtocolServerSettings{ + Endpoint: endpoint, + }, } tests := []struct { name string @@ -72,9 +75,9 @@ func TestCreateTraceReceiver(t *testing.T) { { name: "default", cfg: &Config{ - ReceiverSettings: defaultReceiverSettings, - ProtocolServerSettings: defaultProtocolSettings, - Transport: "tcp", + ReceiverSettings: defaultReceiverSettings, + GRPCServerSettings: defaultGRPCSettings, + Transport: "tcp", }, }, { @@ -84,8 +87,10 @@ func TestCreateTraceReceiver(t *testing.T) { TypeVal: typeStr, NameVal: typeStr, }, - ProtocolServerSettings: configprotocol.ProtocolServerSettings{ - Endpoint: "localhost:112233", + GRPCServerSettings: configgrpc.GRPCServerSettings{ + ProtocolServerSettings: configprotocol.ProtocolServerSettings{ + Endpoint: "localhost:112233", + }, }, Transport: "tcp", }, @@ -94,10 +99,15 @@ func TestCreateTraceReceiver(t *testing.T) { { name: "max-msg-size-and-concurrent-connections", cfg: &Config{ - ReceiverSettings: defaultReceiverSettings, - Transport: "tcp", - MaxRecvMsgSizeMiB: 32, - MaxConcurrentStreams: 16, + ReceiverSettings: defaultReceiverSettings, + GRPCServerSettings: configgrpc.GRPCServerSettings{ + ProtocolServerSettings: configprotocol.ProtocolServerSettings{ + Endpoint: endpoint, + }, + MaxRecvMsgSizeMiB: 32, + MaxConcurrentStreams: 16, + }, + Transport: "tcp", }, }, } @@ -127,8 +137,10 @@ func TestCreateMetricReceiver(t *testing.T) { TypeVal: typeStr, NameVal: typeStr, } - defaultProtocolSettings := configprotocol.ProtocolServerSettings{ - Endpoint: endpoint, + defaultGRPCSettings := configgrpc.GRPCServerSettings{ + ProtocolServerSettings: configprotocol.ProtocolServerSettings{ + Endpoint: endpoint, + }, } tests := []struct { @@ -139,9 +151,9 @@ func TestCreateMetricReceiver(t *testing.T) { { name: "default", cfg: &Config{ - ReceiverSettings: defaultReceiverSettings, - ProtocolServerSettings: defaultProtocolSettings, - Transport: "tcp", + ReceiverSettings: defaultReceiverSettings, + GRPCServerSettings: defaultGRPCSettings, + Transport: "tcp", }, }, { @@ -151,8 +163,10 @@ func TestCreateMetricReceiver(t *testing.T) { TypeVal: typeStr, NameVal: typeStr, }, - ProtocolServerSettings: configprotocol.ProtocolServerSettings{ - Endpoint: "327.0.0.1:1122", + GRPCServerSettings: configgrpc.GRPCServerSettings{ + ProtocolServerSettings: configprotocol.ProtocolServerSettings{ + Endpoint: "327.0.0.1:1122", + }, }, Transport: "tcp", }, @@ -162,16 +176,21 @@ func TestCreateMetricReceiver(t *testing.T) { name: "keepalive", cfg: &Config{ ReceiverSettings: defaultReceiverSettings, - Transport: "tcp", - Keepalive: &serverParametersAndEnforcementPolicy{ - ServerParameters: &keepaliveServerParameters{ - MaxConnectionAge: 60 * time.Second, + GRPCServerSettings: configgrpc.GRPCServerSettings{ + ProtocolServerSettings: configprotocol.ProtocolServerSettings{ + Endpoint: endpoint, }, - EnforcementPolicy: &keepaliveEnforcementPolicy{ - MinTime: 30 * time.Second, - PermitWithoutStream: true, + Keepalive: &configgrpc.KeepaliveServerConfig{ + ServerParameters: &configgrpc.KeepaliveServerParameters{ + MaxConnectionAge: 60 * time.Second, + }, + EnforcementPolicy: &configgrpc.KeepaliveEnforcementPolicy{ + MinTime: 30 * time.Second, + PermitWithoutStream: true, + }, }, }, + Transport: "tcp", }, }, } diff --git a/receiver/otlpreceiver/config.go b/receiver/otlpreceiver/config.go index 957d092fcedb..b584d0a6583e 100644 --- a/receiver/otlpreceiver/config.go +++ b/receiver/otlpreceiver/config.go @@ -15,14 +15,8 @@ package otlpreceiver import ( - "fmt" - "time" - - "google.golang.org/grpc" - "google.golang.org/grpc/keepalive" - + "go.opentelemetry.io/collector/config/configgrpc" "go.opentelemetry.io/collector/config/configmodels" - "go.opentelemetry.io/collector/config/configprotocol" ) // Config defines configuration for OTLP receiver. @@ -30,7 +24,7 @@ type Config struct { configmodels.ReceiverSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct // Configures the receiver server protocol. - configprotocol.ProtocolServerSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct + configgrpc.GRPCServerSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct // Transport to use: one of tcp or unix, defaults to tcp Transport string `mapstructure:"transport"` @@ -40,98 +34,21 @@ type Config struct { // An empty list means that CORS is not enabled at all. A wildcard (*) can be // used to match any origin or one or more characters of an origin. CorsOrigins []string `mapstructure:"cors_allowed_origins"` - - // Keepalive anchor for all the settings related to keepalive. - Keepalive *serverParametersAndEnforcementPolicy `mapstructure:"keepalive,omitempty"` - - // MaxRecvMsgSizeMiB sets the maximum size (in MiB) of messages accepted by the server. - MaxRecvMsgSizeMiB uint64 `mapstructure:"max_recv_msg_size_mib,omitempty"` - - // MaxConcurrentStreams sets the limit on the number of concurrent streams to each ServerTransport. - // TODO(nilebox): This setting affecting HTTP/2 streams need to be tested - MaxConcurrentStreams uint32 `mapstructure:"max_concurrent_streams,omitempty"` -} - -type serverParametersAndEnforcementPolicy struct { - ServerParameters *keepaliveServerParameters `mapstructure:"server_parameters,omitempty"` - EnforcementPolicy *keepaliveEnforcementPolicy `mapstructure:"enforcement_policy,omitempty"` -} - -// keepaliveServerParameters allow configuration of the keepalive.ServerParameters. -// The same default values as keepalive.ServerParameters are applicable and get applied by the server. -// See https://godoc.org/google.golang.org/grpc/keepalive#ServerParameters for details. -type keepaliveServerParameters struct { - MaxConnectionIdle time.Duration `mapstructure:"max_connection_idle,omitempty"` - MaxConnectionAge time.Duration `mapstructure:"max_connection_age,omitempty"` - MaxConnectionAgeGrace time.Duration `mapstructure:"max_connection_age_grace,omitempty"` - Time time.Duration `mapstructure:"time,omitempty"` - Timeout time.Duration `mapstructure:"timeout,omitempty"` -} - -// keepaliveEnforcementPolicy allow configuration of the keepalive.EnforcementPolicy. -// The same default values as keepalive.EnforcementPolicy are applicable and get applied by the server. -// See https://godoc.org/google.golang.org/grpc/keepalive#EnforcementPolicy for details. -type keepaliveEnforcementPolicy struct { - MinTime time.Duration `mapstructure:"min_time,omitempty"` - PermitWithoutStream bool `mapstructure:"permit_without_stream,omitempty"` } func (rOpts *Config) buildOptions() ([]Option, error) { var opts []Option - if rOpts.TLSCredentials != nil { - tlsCredsOptions, err := rOpts.TLSCredentials.LoadgRPCTLSServerCredentials() - if err != nil { - return nil, fmt.Errorf("error initializing OTLP receiver %q TLS Credentials: %v", rOpts.NameVal, err) - } - opts = append(opts, WithGRPCServerOptions(tlsCredsOptions)) - } if len(rOpts.CorsOrigins) > 0 { opts = append(opts, WithCorsOrigins(rOpts.CorsOrigins)) } - grpcServerOptions := rOpts.grpcServerOptions() + grpcServerOptions, err := rOpts.GRPCServerSettings.ToDialOptions() + if err != nil { + return nil, err + } if len(grpcServerOptions) > 0 { opts = append(opts, WithGRPCServerOptions(grpcServerOptions...)) } return opts, nil } - -func (rOpts *Config) grpcServerOptions() []grpc.ServerOption { - var grpcServerOptions []grpc.ServerOption - if rOpts.MaxRecvMsgSizeMiB > 0 { - grpcServerOptions = append(grpcServerOptions, grpc.MaxRecvMsgSize(int(rOpts.MaxRecvMsgSizeMiB*1024*1024))) - } - if rOpts.MaxConcurrentStreams > 0 { - grpcServerOptions = append(grpcServerOptions, grpc.MaxConcurrentStreams(rOpts.MaxConcurrentStreams)) - } - // The default values referenced in the GRPC docs are set within the server, so this code doesn't need - // to apply them over zero/nil values before passing these as grpc.ServerOptions. - // The following shows the server code for applying default grpc.ServerOptions. - // https://sourcegraph.com/github.com/grpc/grpc-go@120728e1f775e40a2a764341939b78d666b08260/-/blob/internal/transport/http2_server.go#L184-200 - if rOpts.Keepalive != nil { - if rOpts.Keepalive.ServerParameters != nil { - svrParams := rOpts.Keepalive.ServerParameters - grpcServerOptions = append(grpcServerOptions, grpc.KeepaliveParams(keepalive.ServerParameters{ - MaxConnectionIdle: svrParams.MaxConnectionIdle, - MaxConnectionAge: svrParams.MaxConnectionAge, - MaxConnectionAgeGrace: svrParams.MaxConnectionAgeGrace, - Time: svrParams.Time, - Timeout: svrParams.Timeout, - })) - } - // The default values referenced in the GRPC are set within the server, so this code doesn't need - // to apply them over zero/nil values before passing these as grpc.ServerOptions. - // The following shows the server code for applying default grpc.ServerOptions. - // https://sourcegraph.com/github.com/grpc/grpc-go@120728e1f775e40a2a764341939b78d666b08260/-/blob/internal/transport/http2_server.go#L202-205 - if rOpts.Keepalive.EnforcementPolicy != nil { - enfPol := rOpts.Keepalive.EnforcementPolicy - grpcServerOptions = append(grpcServerOptions, grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{ - MinTime: enfPol.MinTime, - PermitWithoutStream: enfPol.PermitWithoutStream, - })) - } - } - - return grpcServerOptions -} diff --git a/receiver/otlpreceiver/config_test.go b/receiver/otlpreceiver/config_test.go index 114edb9d42f3..481348b90e76 100644 --- a/receiver/otlpreceiver/config_test.go +++ b/receiver/otlpreceiver/config_test.go @@ -23,6 +23,7 @@ import ( "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/config" + "go.opentelemetry.io/collector/config/configgrpc" "go.opentelemetry.io/collector/config/configmodels" "go.opentelemetry.io/collector/config/configprotocol" "go.opentelemetry.io/collector/config/configtls" @@ -51,9 +52,11 @@ func TestLoadConfig(t *testing.T) { TypeVal: typeStr, NameVal: "otlp/customname", }, - ProtocolServerSettings: configprotocol.ProtocolServerSettings{ - Endpoint: "localhost:9090", - TLSCredentials: nil, + GRPCServerSettings: configgrpc.GRPCServerSettings{ + ProtocolServerSettings: configprotocol.ProtocolServerSettings{ + Endpoint: "localhost:9090", + TLSCredentials: nil, + }, }, Transport: "tcp", }) @@ -65,24 +68,26 @@ func TestLoadConfig(t *testing.T) { TypeVal: typeStr, NameVal: "otlp/keepalive", }, - ProtocolServerSettings: configprotocol.ProtocolServerSettings{ - Endpoint: "0.0.0.0:55680", - TLSCredentials: nil, - }, - Transport: "tcp", - Keepalive: &serverParametersAndEnforcementPolicy{ - ServerParameters: &keepaliveServerParameters{ - MaxConnectionIdle: 11 * time.Second, - MaxConnectionAge: 12 * time.Second, - MaxConnectionAgeGrace: 13 * time.Second, - Time: 30 * time.Second, - Timeout: 5 * time.Second, + GRPCServerSettings: configgrpc.GRPCServerSettings{ + ProtocolServerSettings: configprotocol.ProtocolServerSettings{ + Endpoint: "0.0.0.0:55680", + TLSCredentials: nil, }, - EnforcementPolicy: &keepaliveEnforcementPolicy{ - MinTime: 10 * time.Second, - PermitWithoutStream: true, + Keepalive: &configgrpc.KeepaliveServerConfig{ + ServerParameters: &configgrpc.KeepaliveServerParameters{ + MaxConnectionIdle: 11 * time.Second, + MaxConnectionAge: 12 * time.Second, + MaxConnectionAgeGrace: 13 * time.Second, + Time: 30 * time.Second, + Timeout: 5 * time.Second, + }, + EnforcementPolicy: &configgrpc.KeepaliveEnforcementPolicy{ + MinTime: 10 * time.Second, + PermitWithoutStream: true, + }, }, }, + Transport: "tcp", }) r3 := cfg.Receivers["otlp/msg-size-conc-connect-max-idle"].(*Config) @@ -92,18 +97,20 @@ func TestLoadConfig(t *testing.T) { TypeVal: typeStr, NameVal: "otlp/msg-size-conc-connect-max-idle", }, - ProtocolServerSettings: configprotocol.ProtocolServerSettings{ - Endpoint: "0.0.0.0:55680", - TLSCredentials: nil, - }, - Transport: "tcp", - MaxRecvMsgSizeMiB: 32, - MaxConcurrentStreams: 16, - Keepalive: &serverParametersAndEnforcementPolicy{ - ServerParameters: &keepaliveServerParameters{ - MaxConnectionIdle: 10 * time.Second, + GRPCServerSettings: configgrpc.GRPCServerSettings{ + ProtocolServerSettings: configprotocol.ProtocolServerSettings{ + Endpoint: "0.0.0.0:55680", + TLSCredentials: nil, + }, + MaxRecvMsgSizeMiB: 32, + MaxConcurrentStreams: 16, + Keepalive: &configgrpc.KeepaliveServerConfig{ + ServerParameters: &configgrpc.KeepaliveServerParameters{ + MaxConnectionIdle: 10 * time.Second, + }, }, }, + Transport: "tcp", }) // NOTE: Once the config loader checks for the files existence, this test may fail and require @@ -115,12 +122,14 @@ func TestLoadConfig(t *testing.T) { TypeVal: typeStr, NameVal: "otlp/tlscredentials", }, - ProtocolServerSettings: configprotocol.ProtocolServerSettings{ - Endpoint: "0.0.0.0:55680", - TLSCredentials: &configtls.TLSServerSetting{ - TLSSetting: configtls.TLSSetting{ - CertFile: "test.crt", - KeyFile: "test.key", + GRPCServerSettings: configgrpc.GRPCServerSettings{ + ProtocolServerSettings: configprotocol.ProtocolServerSettings{ + Endpoint: "0.0.0.0:55680", + TLSCredentials: &configtls.TLSServerSetting{ + TLSSetting: configtls.TLSSetting{ + CertFile: "test.crt", + KeyFile: "test.key", + }, }, }, }, @@ -134,9 +143,11 @@ func TestLoadConfig(t *testing.T) { TypeVal: typeStr, NameVal: "otlp/cors", }, - ProtocolServerSettings: configprotocol.ProtocolServerSettings{ - Endpoint: "0.0.0.0:55680", - TLSCredentials: nil, + GRPCServerSettings: configgrpc.GRPCServerSettings{ + ProtocolServerSettings: configprotocol.ProtocolServerSettings{ + Endpoint: "0.0.0.0:55680", + TLSCredentials: nil, + }, }, Transport: "tcp", CorsOrigins: []string{"https://*.test.com", "https://test.com"}, @@ -149,9 +160,11 @@ func TestLoadConfig(t *testing.T) { TypeVal: typeStr, NameVal: "otlp/uds", }, - ProtocolServerSettings: configprotocol.ProtocolServerSettings{ - Endpoint: "/tmp/otlp.sock", - TLSCredentials: nil, + GRPCServerSettings: configgrpc.GRPCServerSettings{ + ProtocolServerSettings: configprotocol.ProtocolServerSettings{ + Endpoint: "/tmp/otlp.sock", + TLSCredentials: nil, + }, }, Transport: "unix", }) @@ -162,16 +175,18 @@ func TestBuildOptions_TLSCredentials(t *testing.T) { ReceiverSettings: configmodels.ReceiverSettings{ NameVal: "IncorrectTLS", }, - ProtocolServerSettings: configprotocol.ProtocolServerSettings{ - TLSCredentials: &configtls.TLSServerSetting{ - TLSSetting: configtls.TLSSetting{ - CertFile: "willfail", + GRPCServerSettings: configgrpc.GRPCServerSettings{ + ProtocolServerSettings: configprotocol.ProtocolServerSettings{ + TLSCredentials: &configtls.TLSServerSetting{ + TLSSetting: configtls.TLSSetting{ + CertFile: "willfail", + }, }, }, }, } _, err := cfg.buildOptions() - assert.EqualError(t, err, `error initializing OTLP receiver "IncorrectTLS" TLS Credentials: failed to load TLS config: for auth via TLS, either both certificate and key must be supplied, or neither`) + assert.EqualError(t, err, `error initializing TLS Credentials: failed to load TLS config: for auth via TLS, either both certificate and key must be supplied, or neither`) cfg.TLSCredentials = &configtls.TLSServerSetting{} opt, err := cfg.buildOptions() diff --git a/receiver/otlpreceiver/factory.go b/receiver/otlpreceiver/factory.go index 034d2e8cbb64..b7fbe335c34a 100644 --- a/receiver/otlpreceiver/factory.go +++ b/receiver/otlpreceiver/factory.go @@ -18,6 +18,7 @@ import ( "context" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/config/configgrpc" "go.opentelemetry.io/collector/config/configmodels" "go.opentelemetry.io/collector/config/configprotocol" "go.opentelemetry.io/collector/consumer" @@ -49,8 +50,10 @@ func (f *Factory) CreateDefaultConfig() configmodels.Receiver { TypeVal: typeStr, NameVal: typeStr, }, - ProtocolServerSettings: configprotocol.ProtocolServerSettings{ - Endpoint: "0.0.0.0:55680", + GRPCServerSettings: configgrpc.GRPCServerSettings{ + ProtocolServerSettings: configprotocol.ProtocolServerSettings{ + Endpoint: "0.0.0.0:55680", + }, }, Transport: "tcp", } diff --git a/receiver/otlpreceiver/factory_test.go b/receiver/otlpreceiver/factory_test.go index d9a21904ca38..86ead8ade662 100644 --- a/receiver/otlpreceiver/factory_test.go +++ b/receiver/otlpreceiver/factory_test.go @@ -26,6 +26,7 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/config/configcheck" + "go.opentelemetry.io/collector/config/configgrpc" "go.opentelemetry.io/collector/config/configmodels" "go.opentelemetry.io/collector/config/configprotocol" "go.opentelemetry.io/collector/exporter/exportertest" @@ -63,8 +64,10 @@ func TestCreateTraceReceiver(t *testing.T) { TypeVal: typeStr, NameVal: typeStr, } - defaultProtocolSettings := configprotocol.ProtocolServerSettings{ - Endpoint: endpoint, + defaultGRPCSettings := configgrpc.GRPCServerSettings{ + ProtocolServerSettings: configprotocol.ProtocolServerSettings{ + Endpoint: endpoint, + }, } tests := []struct { @@ -75,9 +78,9 @@ func TestCreateTraceReceiver(t *testing.T) { { name: "default", cfg: &Config{ - ReceiverSettings: defaultReceiverSettings, - ProtocolServerSettings: defaultProtocolSettings, - Transport: "tcp", + ReceiverSettings: defaultReceiverSettings, + GRPCServerSettings: defaultGRPCSettings, + Transport: "tcp", }, }, { @@ -87,8 +90,10 @@ func TestCreateTraceReceiver(t *testing.T) { TypeVal: typeStr, NameVal: typeStr, }, - ProtocolServerSettings: configprotocol.ProtocolServerSettings{ - Endpoint: "localhost:112233", + GRPCServerSettings: configgrpc.GRPCServerSettings{ + ProtocolServerSettings: configprotocol.ProtocolServerSettings{ + Endpoint: "localhost:112233", + }, }, Transport: "tcp", }, @@ -97,10 +102,15 @@ func TestCreateTraceReceiver(t *testing.T) { { name: "max-msg-size-and-concurrent-connections", cfg: &Config{ - ReceiverSettings: defaultReceiverSettings, - Transport: "tcp", - MaxRecvMsgSizeMiB: 32, - MaxConcurrentStreams: 16, + ReceiverSettings: defaultReceiverSettings, + GRPCServerSettings: configgrpc.GRPCServerSettings{ + ProtocolServerSettings: configprotocol.ProtocolServerSettings{ + Endpoint: endpoint, + }, + MaxRecvMsgSizeMiB: 32, + MaxConcurrentStreams: 16, + }, + Transport: "tcp", }, }, } @@ -129,9 +139,12 @@ func TestCreateMetricReceiver(t *testing.T) { TypeVal: typeStr, NameVal: typeStr, } - defaultProtocolSettings := configprotocol.ProtocolServerSettings{ - Endpoint: endpoint, + defaultGRPCSettings := configgrpc.GRPCServerSettings{ + ProtocolServerSettings: configprotocol.ProtocolServerSettings{ + Endpoint: endpoint, + }, } + tests := []struct { name string cfg *Config @@ -140,9 +153,9 @@ func TestCreateMetricReceiver(t *testing.T) { { name: "default", cfg: &Config{ - ReceiverSettings: defaultReceiverSettings, - ProtocolServerSettings: defaultProtocolSettings, - Transport: "tcp", + ReceiverSettings: defaultReceiverSettings, + GRPCServerSettings: defaultGRPCSettings, + Transport: "tcp", }, }, { @@ -152,8 +165,10 @@ func TestCreateMetricReceiver(t *testing.T) { TypeVal: typeStr, NameVal: typeStr, }, - ProtocolServerSettings: configprotocol.ProtocolServerSettings{ - Endpoint: "327.0.0.1:1122", + GRPCServerSettings: configgrpc.GRPCServerSettings{ + ProtocolServerSettings: configprotocol.ProtocolServerSettings{ + Endpoint: "327.0.0.1:1122", + }, }, Transport: "tcp", }, @@ -163,16 +178,21 @@ func TestCreateMetricReceiver(t *testing.T) { name: "keepalive", cfg: &Config{ ReceiverSettings: defaultReceiverSettings, - Transport: "tcp", - Keepalive: &serverParametersAndEnforcementPolicy{ - ServerParameters: &keepaliveServerParameters{ - MaxConnectionAge: 60 * time.Second, + GRPCServerSettings: configgrpc.GRPCServerSettings{ + ProtocolServerSettings: configprotocol.ProtocolServerSettings{ + Endpoint: endpoint, }, - EnforcementPolicy: &keepaliveEnforcementPolicy{ - MinTime: 30 * time.Second, - PermitWithoutStream: true, + Keepalive: &configgrpc.KeepaliveServerConfig{ + ServerParameters: &configgrpc.KeepaliveServerParameters{ + MaxConnectionAge: 60 * time.Second, + }, + EnforcementPolicy: &configgrpc.KeepaliveEnforcementPolicy{ + MinTime: 30 * time.Second, + PermitWithoutStream: true, + }, }, }, + Transport: "tcp", }, }, }