Skip to content

Commit

Permalink
Add a generic grpc server settings config, cleanup client config
Browse files Browse the repository at this point in the history
Signed-off-by: Bogdan Drutu <bogdandrutu@gmail.com>
  • Loading branch information
bogdandrutu committed Jun 24, 2020
1 parent cddc04f commit 22a70c5
Show file tree
Hide file tree
Showing 19 changed files with 375 additions and 328 deletions.
125 changes: 108 additions & 17 deletions config/configgrpc/configgrpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"google.golang.org/grpc/encoding/gzip"
"google.golang.org/grpc/keepalive"

"go.opentelemetry.io/collector/config/configprotocol"
"go.opentelemetry.io/collector/config/configtls"
)

Expand All @@ -33,6 +34,21 @@ const (
CompressionGzip = "gzip"
)

var (
// Map of opentelemetry compression types to grpc registered compression types
grpcCompressionKeyMap = map[string]string{
CompressionGzip: gzip.Name,
}
)

// KeepaliveClientConfig exposes the keepalive.ClientParameters to be used by the exporter.
// Refer to the original data-structure for the meaning of each parameter.
type KeepaliveClientConfig struct {
Time time.Duration `mapstructure:"time,omitempty"`
Timeout time.Duration `mapstructure:"timeout,omitempty"`
PermitWithoutStream bool `mapstructure:"permit_without_stream,omitempty"`
}

// GRPCClientSettings defines common settings for a gRPC client configuration.
type GRPCClientSettings struct {
// The headers associated with gRPC requests.
Expand All @@ -52,23 +68,54 @@ type GRPCClientSettings struct {

// The keepalive parameters for client gRPC. See grpc.WithKeepaliveParams
// (https://godoc.org/google.golang.org/grpc#WithKeepaliveParams).
KeepaliveParameters *KeepaliveConfig `mapstructure:"keepalive"`
Keepalive *KeepaliveClientConfig `mapstructure:"keepalive"`

// WaitForReady parameter configures client to wait for ready state before sending data.
// (https://github.com/grpc/grpc/blob/master/doc/wait-for-ready.md)
WaitForReady bool `mapstructure:"wait_for_ready"`
}

// KeepaliveConfig exposes the keepalive.ClientParameters to be used by the exporter.
// Refer to the original data-structure for the meaning of each parameter.
type KeepaliveConfig struct {
Time time.Duration `mapstructure:"time,omitempty"`
Timeout time.Duration `mapstructure:"timeout,omitempty"`
type KeepaliveServerConfig struct {
ServerParameters *KeepaliveServerParameters `mapstructure:"server_parameters,omitempty"`
EnforcementPolicy *KeepaliveEnforcementPolicy `mapstructure:"enforcement_policy,omitempty"`
}

// keepaliveServerParameters allow configuration of the keepalive.ServerParameters.
// The same default values as keepalive.ServerParameters are applicable and get applied by the server.
// See https://godoc.org/google.golang.org/grpc/keepalive#ServerParameters for details.
type KeepaliveServerParameters struct {
MaxConnectionIdle time.Duration `mapstructure:"max_connection_idle,omitempty"`
MaxConnectionAge time.Duration `mapstructure:"max_connection_age,omitempty"`
MaxConnectionAgeGrace time.Duration `mapstructure:"max_connection_age_grace,omitempty"`
Time time.Duration `mapstructure:"time,omitempty"`
Timeout time.Duration `mapstructure:"timeout,omitempty"`
}

// keepaliveEnforcementPolicy allow configuration of the keepalive.EnforcementPolicy.
// The same default values as keepalive.EnforcementPolicy are applicable and get applied by the server.
// See https://godoc.org/google.golang.org/grpc/keepalive#EnforcementPolicy for details.
type KeepaliveEnforcementPolicy struct {
MinTime time.Duration `mapstructure:"min_time,omitempty"`
PermitWithoutStream bool `mapstructure:"permit_without_stream,omitempty"`
}

// GrpcSettingsToDialOptions maps configgrpc.GRPCClientSettings to a slice of dial options for gRPC
func GrpcSettingsToDialOptions(settings GRPCClientSettings) ([]grpc.DialOption, error) {
type GRPCServerSettings struct {
// Configures the generic server protocol.
configprotocol.ProtocolServerSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct

// MaxRecvMsgSizeMiB sets the maximum size (in MiB) of messages accepted by the server.
MaxRecvMsgSizeMiB uint64 `mapstructure:"max_recv_msg_size_mib,omitempty"`

// MaxConcurrentStreams sets the limit on the number of concurrent streams to each ServerTransport.
// TODO(nilebox): This setting affecting HTTP/2 streams need to be tested
MaxConcurrentStreams uint32 `mapstructure:"max_concurrent_streams,omitempty"`

// Keepalive anchor for all the settings related to keepalive.
Keepalive *KeepaliveServerConfig `mapstructure:"keepalive,omitempty"`
}

// ToDialOptions maps configgrpc.GRPCClientSettings to a slice of dial options for gRPC
func (settings *GRPCClientSettings) ToDialOptions() ([]grpc.DialOption, error) {
opts := []grpc.DialOption{}

if settings.Compression != "" {
Expand All @@ -85,24 +132,68 @@ func GrpcSettingsToDialOptions(settings GRPCClientSettings) ([]grpc.DialOption,
}
opts = append(opts, tlsDialOption)

if settings.KeepaliveParameters != nil {
if settings.Keepalive != nil {
keepAliveOption := grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: settings.KeepaliveParameters.Time,
Timeout: settings.KeepaliveParameters.Timeout,
PermitWithoutStream: settings.KeepaliveParameters.PermitWithoutStream,
Time: settings.Keepalive.Time,
Timeout: settings.Keepalive.Timeout,
PermitWithoutStream: settings.Keepalive.PermitWithoutStream,
})
opts = append(opts, keepAliveOption)
}

return opts, nil
}

var (
// Map of opentelemetry compression types to grpc registered compression types
grpcCompressionKeyMap = map[string]string{
CompressionGzip: gzip.Name,
// ToDialOptions maps configgrpc.GRPCServerSettings to a slice of dial options for gRPC
func (gss *GRPCServerSettings) ToDialOptions() ([]grpc.ServerOption, error) {
var opts []grpc.ServerOption

if gss.TLSCredentials != nil {
tlsOpt, err := gss.TLSCredentials.LoadgRPCTLSServerCredentials()
if err != nil {
return nil, fmt.Errorf("error initializing TLS Credentials: %v", err)
}
opts = append(opts, tlsOpt)
}
)

if gss.MaxRecvMsgSizeMiB > 0 {
opts = append(opts, grpc.MaxRecvMsgSize(int(gss.MaxRecvMsgSizeMiB*1024*1024)))
}

if gss.MaxConcurrentStreams > 0 {
opts = append(opts, grpc.MaxConcurrentStreams(gss.MaxConcurrentStreams))
}

// The default values referenced in the GRPC docs are set within the server, so this code doesn't need
// to apply them over zero/nil values before passing these as grpc.ServerOptions.
// The following shows the server code for applying default grpc.ServerOptions.
// https://sourcegraph.com/github.com/grpc/grpc-go@120728e1f775e40a2a764341939b78d666b08260/-/blob/internal/transport/http2_server.go#L184-200
if gss.Keepalive != nil {
if gss.Keepalive.ServerParameters != nil {
svrParams := gss.Keepalive.ServerParameters
opts = append(opts, grpc.KeepaliveParams(keepalive.ServerParameters{
MaxConnectionIdle: svrParams.MaxConnectionIdle,
MaxConnectionAge: svrParams.MaxConnectionAge,
MaxConnectionAgeGrace: svrParams.MaxConnectionAgeGrace,
Time: svrParams.Time,
Timeout: svrParams.Timeout,
}))
}
// The default values referenced in the GRPC are set within the server, so this code doesn't need
// to apply them over zero/nil values before passing these as grpc.ServerOptions.
// The following shows the server code for applying default grpc.ServerOptions.
// https://sourcegraph.com/github.com/grpc/grpc-go@120728e1f775e40a2a764341939b78d666b08260/-/blob/internal/transport/http2_server.go#L202-205
if gss.Keepalive.EnforcementPolicy != nil {
enfPol := gss.Keepalive.EnforcementPolicy
opts = append(opts, grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{
MinTime: enfPol.MinTime,
PermitWithoutStream: enfPol.PermitWithoutStream,
}))
}
}

return opts, nil
}

// GetGRPCCompressionKey returns the grpc registered compression key if the
// passed in compression key is supported, and CompressionUnsupported otherwise
Expand Down
36 changes: 18 additions & 18 deletions config/configgrpc/configgrpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,13 @@ import (
)

func TestBasicGrpcSettings(t *testing.T) {

_, err := GrpcSettingsToDialOptions(GRPCClientSettings{
Headers: nil,
Endpoint: "",
Compression: "",
KeepaliveParameters: nil,
})
gcs := &GRPCClientSettings{
Headers: nil,
Endpoint: "",
Compression: "",
Keepalive: nil,
}
_, err := gcs.ToDialOptions()

assert.NoError(t, err)
}
Expand All @@ -52,7 +52,7 @@ func TestInvalidPemFile(t *testing.T) {
Insecure: false,
ServerName: "",
},
KeepaliveParameters: nil,
Keepalive: nil,
},
},
{
Expand All @@ -68,27 +68,27 @@ func TestInvalidPemFile(t *testing.T) {
Insecure: false,
ServerName: "",
},
KeepaliveParameters: nil,
Keepalive: nil,
},
},
}
for _, test := range tests {
t.Run(test.err, func(t *testing.T) {
_, err := GrpcSettingsToDialOptions(test.settings)
_, err := test.settings.ToDialOptions()
assert.Regexp(t, test.err, err)
})
}
}

func TestUseSecure(t *testing.T) {
dialOpts, err := GrpcSettingsToDialOptions(GRPCClientSettings{
Headers: nil,
Endpoint: "",
Compression: "",
TLSSetting: configtls.TLSClientSetting{},
KeepaliveParameters: nil,
})

gcs := &GRPCClientSettings{
Headers: nil,
Endpoint: "",
Compression: "",
TLSSetting: configtls.TLSClientSetting{},
Keepalive: nil,
}
dialOpts, err := gcs.ToDialOptions()
assert.NoError(t, err)
assert.Equal(t, len(dialOpts), 1)
}
Expand Down
3 changes: 1 addition & 2 deletions exporter/jaegerexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"google.golang.org/grpc/metadata"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configgrpc"
"go.opentelemetry.io/collector/consumer/consumererror"
"go.opentelemetry.io/collector/consumer/pdata"
"go.opentelemetry.io/collector/exporter/exporterhelper"
Expand All @@ -34,7 +33,7 @@ import (
// The collectorEndpoint should be of the form "hostname:14250" (a gRPC target).
func New(config *Config) (component.TraceExporter, error) {

opts, err := configgrpc.GrpcSettingsToDialOptions(config.GRPCClientSettings)
opts, err := config.GRPCClientSettings.ToDialOptions()
if err != nil {
return nil, err
}
Expand Down
24 changes: 12 additions & 12 deletions exporter/jaegerexporter/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func TestNew(t *testing.T) {
TLSSetting: configtls.TLSClientSetting{
Insecure: true,
},
KeepaliveParameters: nil,
Keepalive: nil,
},
},
},
Expand All @@ -66,10 +66,10 @@ func TestNew(t *testing.T) {
args: args{
config: Config{
GRPCClientSettings: configgrpc.GRPCClientSettings{
Headers: map[string]string{"extra-header": "header-value"},
Endpoint: "foo.bar",
Compression: "",
KeepaliveParameters: nil,
Headers: map[string]string{"extra-header": "header-value"},
Endpoint: "foo.bar",
Compression: "",
Keepalive: nil,
},
},
},
Expand All @@ -79,10 +79,10 @@ func TestNew(t *testing.T) {
args: args{
config: Config{
GRPCClientSettings: configgrpc.GRPCClientSettings{
Headers: nil,
Endpoint: "foo.bar",
Compression: "",
KeepaliveParameters: nil,
Headers: nil,
Endpoint: "foo.bar",
Compression: "",
Keepalive: nil,
},
},
},
Expand All @@ -101,7 +101,7 @@ func TestNew(t *testing.T) {
},
Insecure: false,
},
KeepaliveParameters: nil,
Keepalive: nil,
},
},
},
Expand All @@ -121,7 +121,7 @@ func TestNew(t *testing.T) {
Insecure: false,
ServerName: "",
},
KeepaliveParameters: &configgrpc.KeepaliveConfig{
Keepalive: &configgrpc.KeepaliveClientConfig{
Time: 0,
Timeout: 0,
PermitWithoutStream: false,
Expand All @@ -144,7 +144,7 @@ func TestNew(t *testing.T) {
},
Insecure: false,
},
KeepaliveParameters: nil,
Keepalive: nil,
},
},
},
Expand Down
2 changes: 1 addition & 1 deletion exporter/opencensusexporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func TestLoadConfig(t *testing.T) {
},
Insecure: false,
},
KeepaliveParameters: &configgrpc.KeepaliveConfig{
Keepalive: &configgrpc.KeepaliveClientConfig{
Time: 20,
PermitWithoutStream: true,
Timeout: 30,
Expand Down
8 changes: 4 additions & 4 deletions exporter/opencensusexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,11 +110,11 @@ func (f *Factory) OCAgentOptions(logger *zap.Logger, ocac *Config) ([]ocagent.Ex
if ocac.ReconnectionDelay > 0 {
opts = append(opts, ocagent.WithReconnectionPeriod(ocac.ReconnectionDelay))
}
if ocac.KeepaliveParameters != nil {
if ocac.Keepalive != nil {
opts = append(opts, ocagent.WithGRPCDialOption(grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: ocac.KeepaliveParameters.Time,
Timeout: ocac.KeepaliveParameters.Timeout,
PermitWithoutStream: ocac.KeepaliveParameters.PermitWithoutStream,
Time: ocac.Keepalive.Time,
Timeout: ocac.Keepalive.Timeout,
PermitWithoutStream: ocac.Keepalive.PermitWithoutStream,
})))
}
return opts, nil
Expand Down
4 changes: 2 additions & 2 deletions exporter/opencensusexporter/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,11 +104,11 @@ func TestCreateTraceExporter(t *testing.T) {
},
},
{
name: "KeepaliveParameters",
name: "Keepalive",
config: Config{
GRPCClientSettings: configgrpc.GRPCClientSettings{
Endpoint: rcvCfg.Endpoint,
KeepaliveParameters: &configgrpc.KeepaliveConfig{
Keepalive: &configgrpc.KeepaliveClientConfig{
Time: 30 * time.Second,
Timeout: 25 * time.Second,
PermitWithoutStream: true,
Expand Down
2 changes: 1 addition & 1 deletion exporter/otlpexporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func TestLoadConfig(t *testing.T) {
},
Insecure: false,
},
KeepaliveParameters: &configgrpc.KeepaliveConfig{
Keepalive: &configgrpc.KeepaliveClientConfig{
Time: 20 * time.Second,
PermitWithoutStream: true,
Timeout: 30 * time.Second,
Expand Down
3 changes: 1 addition & 2 deletions exporter/otlpexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"

"go.opentelemetry.io/collector/config/configgrpc"
otlpmetriccol "go.opentelemetry.io/collector/internal/data/opentelemetry-proto-gen/collector/metrics/v1"
otlptracecol "go.opentelemetry.io/collector/internal/data/opentelemetry-proto-gen/collector/trace/v1"
otlplogcol "go.opentelemetry.io/collector/internal/data/opentelemetry-proto-gen/logs/v1"
Expand Down Expand Up @@ -62,7 +61,7 @@ func newExporter(config *Config) (*exporterImp, error) {
e.config = config

var err error
e.dialOpts, err = configgrpc.GrpcSettingsToDialOptions(e.config.GRPCClientSettings)
e.dialOpts, err = e.config.GRPCClientSettings.ToDialOptions()
if err != nil {
return nil, err
}
Expand Down
Loading

0 comments on commit 22a70c5

Please sign in to comment.