From b07a06d7f1b85b97a0d046576776e11fe08bbbcb Mon Sep 17 00:00:00 2001 From: Aryan Goyal <137564277+ary82@users.noreply.github.com> Date: Sat, 28 Dec 2024 23:56:20 +0530 Subject: [PATCH] [extension/jaegerremotesampling] remove jaeger sampling dependency Signed-off-by: Aryan Goyal <137564277+ary82@users.noreply.github.com> --- extension/jaegerremotesampling/constants.go | 41 ++ extension/jaegerremotesampling/extension.go | 8 +- ...StrategiesDeprecatedBehavior_ServiceA.json | 16 + ...StrategiesDeprecatedBehavior_ServiceB.json | 6 + ...viceNoPerOperationStrategies_ServiceA.json | 16 + ...viceNoPerOperationStrategies_ServiceB.json | 17 + .../fixtures/bad_strategies.json | 1 + .../fixtures/missing-service-types.json | 33 ++ .../fixtures/operation_strategies.json | 74 +++ .../fixtures/service_no_per_operation.json | 25 + .../fixtures/strategies.json | 18 + .../jaegerremotesampling/internal/grpc.go | 8 +- .../internal/grpc_handler.go | 27 + .../internal/grpc_handler_test.go | 52 ++ .../jaegerremotesampling/internal/http.go | 5 +- .../internal/interface.go | 20 + .../internal/remote_strategy_store.go | 3 +- extension/jaegerremotesampling/options.go | 20 + extension/jaegerremotesampling/provider.go | 380 ++++++++++++ .../jaegerremotesampling/provider_test.go | 557 ++++++++++++++++++ extension/jaegerremotesampling/strategy.go | 30 + 21 files changed, 1342 insertions(+), 15 deletions(-) create mode 100644 extension/jaegerremotesampling/constants.go create mode 100644 extension/jaegerremotesampling/fixtures/TestServiceNoPerOperationStrategiesDeprecatedBehavior_ServiceA.json create mode 100644 extension/jaegerremotesampling/fixtures/TestServiceNoPerOperationStrategiesDeprecatedBehavior_ServiceB.json create mode 100644 extension/jaegerremotesampling/fixtures/TestServiceNoPerOperationStrategies_ServiceA.json create mode 100644 extension/jaegerremotesampling/fixtures/TestServiceNoPerOperationStrategies_ServiceB.json create mode 100644 extension/jaegerremotesampling/fixtures/bad_strategies.json create mode 100644 extension/jaegerremotesampling/fixtures/missing-service-types.json create mode 100644 extension/jaegerremotesampling/fixtures/operation_strategies.json create mode 100644 extension/jaegerremotesampling/fixtures/service_no_per_operation.json create mode 100644 extension/jaegerremotesampling/fixtures/strategies.json create mode 100644 extension/jaegerremotesampling/internal/grpc_handler.go create mode 100644 extension/jaegerremotesampling/internal/grpc_handler_test.go create mode 100644 extension/jaegerremotesampling/internal/interface.go create mode 100644 extension/jaegerremotesampling/options.go create mode 100644 extension/jaegerremotesampling/provider.go create mode 100644 extension/jaegerremotesampling/provider_test.go create mode 100644 extension/jaegerremotesampling/strategy.go diff --git a/extension/jaegerremotesampling/constants.go b/extension/jaegerremotesampling/constants.go new file mode 100644 index 000000000000..f6bd9221b483 --- /dev/null +++ b/extension/jaegerremotesampling/constants.go @@ -0,0 +1,41 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package jaegerremotesampling + +import ( + "github.com/jaegertracing/jaeger/proto-gen/api_v2" +) + +const ( + // samplerTypeProbabilistic is the type of sampler that samples traces + // with a certain fixed probability. + samplerTypeProbabilistic = "probabilistic" + + // samplerTypeRateLimiting is the type of sampler that samples + // only up to a fixed number of traces per second. + samplerTypeRateLimiting = "ratelimiting" + + // defaultSamplingProbability is the default sampling probability the + // Strategy Store will use if none is provided. + defaultSamplingProbability = 0.001 +) + +// defaultStrategy is the default sampling strategy the Strategy Store will return +// if none is provided. +func defaultStrategyResponse() *api_v2.SamplingStrategyResponse { + return &api_v2.SamplingStrategyResponse{ + StrategyType: api_v2.SamplingStrategyType_PROBABILISTIC, + ProbabilisticSampling: &api_v2.ProbabilisticSamplingStrategy{ + SamplingRate: defaultSamplingProbability, + }, + } +} + +func defaultStrategies() *storedStrategies { + s := &storedStrategies{ + serviceStrategies: make(map[string]*api_v2.SamplingStrategyResponse), + } + s.defaultStrategy = defaultStrategyResponse() + return s +} diff --git a/extension/jaegerremotesampling/extension.go b/extension/jaegerremotesampling/extension.go index 74f7d9467b99..67fb929bed70 100644 --- a/extension/jaegerremotesampling/extension.go +++ b/extension/jaegerremotesampling/extension.go @@ -7,8 +7,6 @@ import ( "context" "fmt" - "github.com/jaegertracing/jaeger/cmd/collector/app/sampling/samplingstrategy" - "github.com/jaegertracing/jaeger/plugin/sampling/strategyprovider/static" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/extension" "go.uber.org/zap" @@ -24,7 +22,7 @@ type jrsExtension struct { httpServer component.Component grpcServer component.Component - samplingStore samplingstrategy.Provider + samplingStore internal.Provider closers []func() error } @@ -44,11 +42,11 @@ func (jrse *jrsExtension) Start(ctx context.Context, host component.Host) error // - local file // we can then use a simplified logic here to assign the appropriate store if jrse.cfg.Source.File != "" { - opts := static.Options{ + opts := Options{ StrategiesFile: jrse.cfg.Source.File, ReloadInterval: jrse.cfg.Source.ReloadInterval, } - ss, err := static.NewProvider(opts, jrse.telemetry.Logger) + ss, err := NewProvider(opts, jrse.telemetry.Logger) if err != nil { return fmt.Errorf("failed to create the local file strategy store: %w", err) } diff --git a/extension/jaegerremotesampling/fixtures/TestServiceNoPerOperationStrategiesDeprecatedBehavior_ServiceA.json b/extension/jaegerremotesampling/fixtures/TestServiceNoPerOperationStrategiesDeprecatedBehavior_ServiceA.json new file mode 100644 index 000000000000..6834df079eb6 --- /dev/null +++ b/extension/jaegerremotesampling/fixtures/TestServiceNoPerOperationStrategiesDeprecatedBehavior_ServiceA.json @@ -0,0 +1,16 @@ +{ + "probabilisticSampling": { + "samplingRate": 1 + }, + "operationSampling": { + "defaultSamplingProbability": 1, + "perOperationStrategies": [ + { + "operation": "/health", + "probabilisticSampling": { + "samplingRate": 0.1 + } + } + ] + } +} \ No newline at end of file diff --git a/extension/jaegerremotesampling/fixtures/TestServiceNoPerOperationStrategiesDeprecatedBehavior_ServiceB.json b/extension/jaegerremotesampling/fixtures/TestServiceNoPerOperationStrategiesDeprecatedBehavior_ServiceB.json new file mode 100644 index 000000000000..56e51c78391f --- /dev/null +++ b/extension/jaegerremotesampling/fixtures/TestServiceNoPerOperationStrategiesDeprecatedBehavior_ServiceB.json @@ -0,0 +1,6 @@ +{ + "strategyType": 1, + "rateLimitingSampling": { + "maxTracesPerSecond": 3 + } +} \ No newline at end of file diff --git a/extension/jaegerremotesampling/fixtures/TestServiceNoPerOperationStrategies_ServiceA.json b/extension/jaegerremotesampling/fixtures/TestServiceNoPerOperationStrategies_ServiceA.json new file mode 100644 index 000000000000..6834df079eb6 --- /dev/null +++ b/extension/jaegerremotesampling/fixtures/TestServiceNoPerOperationStrategies_ServiceA.json @@ -0,0 +1,16 @@ +{ + "probabilisticSampling": { + "samplingRate": 1 + }, + "operationSampling": { + "defaultSamplingProbability": 1, + "perOperationStrategies": [ + { + "operation": "/health", + "probabilisticSampling": { + "samplingRate": 0.1 + } + } + ] + } +} \ No newline at end of file diff --git a/extension/jaegerremotesampling/fixtures/TestServiceNoPerOperationStrategies_ServiceB.json b/extension/jaegerremotesampling/fixtures/TestServiceNoPerOperationStrategies_ServiceB.json new file mode 100644 index 000000000000..cc28f904fefa --- /dev/null +++ b/extension/jaegerremotesampling/fixtures/TestServiceNoPerOperationStrategies_ServiceB.json @@ -0,0 +1,17 @@ +{ + "strategyType": 1, + "rateLimitingSampling": { + "maxTracesPerSecond": 3 + }, + "operationSampling": { + "defaultSamplingProbability": 0.2, + "perOperationStrategies": [ + { + "operation": "/health", + "probabilisticSampling": { + "samplingRate": 0.1 + } + } + ] + } +} \ No newline at end of file diff --git a/extension/jaegerremotesampling/fixtures/bad_strategies.json b/extension/jaegerremotesampling/fixtures/bad_strategies.json new file mode 100644 index 000000000000..209a97341c53 --- /dev/null +++ b/extension/jaegerremotesampling/fixtures/bad_strategies.json @@ -0,0 +1 @@ +"nonsense" diff --git a/extension/jaegerremotesampling/fixtures/missing-service-types.json b/extension/jaegerremotesampling/fixtures/missing-service-types.json new file mode 100644 index 000000000000..0d3d5f2a3c00 --- /dev/null +++ b/extension/jaegerremotesampling/fixtures/missing-service-types.json @@ -0,0 +1,33 @@ +{ + "default_strategy": { + "type": "probabilistic", + "param": 0.5 + }, + "service_strategies": [ + { + "service": "foo", + "operation_strategies": [ + { + "operation": "op1", + "type": "probabilistic", + "param": 0.2 + } + ] + }, + { + "service": "bar", + "operation_strategies": [ + { + "operation": "op3", + "type": "probabilistic", + "param": 0.3 + }, + { + "operation": "op5", + "type": "probabilistic", + "param": 0.4 + } + ] + } + ] +} diff --git a/extension/jaegerremotesampling/fixtures/operation_strategies.json b/extension/jaegerremotesampling/fixtures/operation_strategies.json new file mode 100644 index 000000000000..8a1b7677aab1 --- /dev/null +++ b/extension/jaegerremotesampling/fixtures/operation_strategies.json @@ -0,0 +1,74 @@ +{ + "default_strategy": { + "type": "probabilistic", + "param": 0.5, + "operation_strategies": [ + { + "operation": "op0", + "type": "probabilistic", + "param": 0.2 + }, + { + "operation": "op6", + "type": "probabilistic", + "param": 0 + }, + { + "operation": "spam", + "type": "ratelimiting", + "param": 1 + }, + { + "operation": "op7", + "type": "probabilistic", + "param": 1 + } + ] + }, + "service_strategies": [ + { + "service": "foo", + "type": "probabilistic", + "param": 0.8, + "operation_strategies": [ + { + "operation": "op6", + "type": "probabilistic", + "param": 0.5 + }, + { + "operation": "op1", + "type": "probabilistic", + "param": 0.2 + }, + { + "operation": "op2", + "type": "ratelimiting", + "param": 10 + } + ] + }, + { + "service": "bar", + "type": "ratelimiting", + "param": 5, + "operation_strategies": [ + { + "operation": "op3", + "type": "probabilistic", + "param": 0.3 + }, + { + "operation": "op4", + "type": "ratelimiting", + "param": 100 + }, + { + "operation": "op5", + "type": "probabilistic", + "param": 0.4 + } + ] + } + ] +} diff --git a/extension/jaegerremotesampling/fixtures/service_no_per_operation.json b/extension/jaegerremotesampling/fixtures/service_no_per_operation.json new file mode 100644 index 000000000000..29b50d9f4d3f --- /dev/null +++ b/extension/jaegerremotesampling/fixtures/service_no_per_operation.json @@ -0,0 +1,25 @@ +{ + "service_strategies": [ + { + "service": "ServiceA", + "type": "probabilistic", + "param": 1.0 + }, + { + "service": "ServiceB", + "type": "ratelimiting", + "param": 3 + } + ], + "default_strategy": { + "type": "probabilistic", + "param": 0.2, + "operation_strategies": [ + { + "operation": "/health", + "type": "probabilistic", + "param": 0.1 + } + ] + } +} diff --git a/extension/jaegerremotesampling/fixtures/strategies.json b/extension/jaegerremotesampling/fixtures/strategies.json new file mode 100644 index 000000000000..e81d43984963 --- /dev/null +++ b/extension/jaegerremotesampling/fixtures/strategies.json @@ -0,0 +1,18 @@ +{ + "default_strategy": { + "type": "probabilistic", + "param": 0.5 + }, + "service_strategies": [ + { + "service": "foo", + "type": "probabilistic", + "param": 0.8 + }, + { + "service": "bar", + "type": "ratelimiting", + "param": 5 + } + ] +} diff --git a/extension/jaegerremotesampling/internal/grpc.go b/extension/jaegerremotesampling/internal/grpc.go index 5b614f90fae4..2dcdea97fa99 100644 --- a/extension/jaegerremotesampling/internal/grpc.go +++ b/extension/jaegerremotesampling/internal/grpc.go @@ -9,8 +9,6 @@ import ( "fmt" "net" - "github.com/jaegertracing/jaeger/cmd/collector/app/sampling" - "github.com/jaegertracing/jaeger/cmd/collector/app/sampling/samplingstrategy" "github.com/jaegertracing/jaeger/proto-gen/api_v2" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/configgrpc" @@ -34,7 +32,7 @@ type grpcServer interface { func NewGRPC( telemetry component.TelemetrySettings, settings configgrpc.ServerConfig, - strategyStore samplingstrategy.Provider, + strategyStore Provider, ) (*SamplingGRPCServer, error) { if strategyStore == nil { return nil, errMissingStrategyStore @@ -51,7 +49,7 @@ func NewGRPC( type SamplingGRPCServer struct { telemetry component.TelemetrySettings settings configgrpc.ServerConfig - strategyStore samplingstrategy.Provider + strategyStore Provider grpcServer grpcServer } @@ -64,7 +62,7 @@ func (s *SamplingGRPCServer) Start(ctx context.Context, host component.Host) err reflection.Register(server) s.grpcServer = server - api_v2.RegisterSamplingManagerServer(server, sampling.NewGRPCHandler(s.strategyStore)) + api_v2.RegisterSamplingManagerServer(server, NewGRPCHandler(s.strategyStore)) healthServer := health.NewServer() healthServer.SetServingStatus("jaeger.api_v2.SamplingManager", grpc_health_v1.HealthCheckResponse_SERVING) diff --git a/extension/jaegerremotesampling/internal/grpc_handler.go b/extension/jaegerremotesampling/internal/grpc_handler.go new file mode 100644 index 000000000000..dc142f928d13 --- /dev/null +++ b/extension/jaegerremotesampling/internal/grpc_handler.go @@ -0,0 +1,27 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package internal + +import ( + "context" + + "github.com/jaegertracing/jaeger/proto-gen/api_v2" +) + +// GRPCHandler is sampling strategy handler for gRPC. +type GRPCHandler struct { + samplingProvider Provider +} + +// NewGRPCHandler creates a handler that controls sampling strategies for services. +func NewGRPCHandler(provider Provider) GRPCHandler { + return GRPCHandler{ + samplingProvider: provider, + } +} + +// GetSamplingStrategy returns sampling decision from store. +func (s GRPCHandler) GetSamplingStrategy(ctx context.Context, param *api_v2.SamplingStrategyParameters) (*api_v2.SamplingStrategyResponse, error) { + return s.samplingProvider.GetSamplingStrategy(ctx, param.GetServiceName()) +} diff --git a/extension/jaegerremotesampling/internal/grpc_handler_test.go b/extension/jaegerremotesampling/internal/grpc_handler_test.go new file mode 100644 index 000000000000..e27f830c6515 --- /dev/null +++ b/extension/jaegerremotesampling/internal/grpc_handler_test.go @@ -0,0 +1,52 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package internal + +import ( + "errors" + "testing" + + "github.com/jaegertracing/jaeger/proto-gen/api_v2" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "golang.org/x/net/context" +) + +type mockSamplingStore struct{} + +func (mockSamplingStore) GetSamplingStrategy(_ context.Context, serviceName string) (*api_v2.SamplingStrategyResponse, error) { + if serviceName == "error" { + return nil, errors.New("some error") + } else if serviceName == "nil" { + return nil, nil + } + return &api_v2.SamplingStrategyResponse{StrategyType: api_v2.SamplingStrategyType_PROBABILISTIC}, nil +} + +func (mockSamplingStore) Close() error { + return nil +} + +func TestNewGRPCHandler(t *testing.T) { + tests := []struct { + req *api_v2.SamplingStrategyParameters + resp *api_v2.SamplingStrategyResponse + err string + }{ + {req: &api_v2.SamplingStrategyParameters{ServiceName: "error"}, err: "some error"}, + {req: &api_v2.SamplingStrategyParameters{ServiceName: "nil"}, resp: nil}, + {req: &api_v2.SamplingStrategyParameters{ServiceName: "foo"}, resp: &api_v2.SamplingStrategyResponse{StrategyType: api_v2.SamplingStrategyType_PROBABILISTIC}}, + } + h := NewGRPCHandler(mockSamplingStore{}) + for _, test := range tests { + resp, err := h.GetSamplingStrategy(context.Background(), test.req) + if test.err != "" { + require.EqualError(t, err, test.err) + require.Nil(t, resp) + } else { + require.NoError(t, err) + assert.Equal(t, test.resp, resp) + } + } +} diff --git a/extension/jaegerremotesampling/internal/http.go b/extension/jaegerremotesampling/internal/http.go index 6ab9f75cba78..59bab16673cc 100644 --- a/extension/jaegerremotesampling/internal/http.go +++ b/extension/jaegerremotesampling/internal/http.go @@ -12,7 +12,6 @@ import ( "net/http" "sync" - "github.com/jaegertracing/jaeger/cmd/collector/app/sampling/samplingstrategy" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componentstatus" "go.opentelemetry.io/collector/config/confighttp" @@ -25,14 +24,14 @@ var _ component.Component = (*SamplingHTTPServer)(nil) type SamplingHTTPServer struct { telemetry component.TelemetrySettings settings confighttp.ServerConfig - strategyStore samplingstrategy.Provider + strategyStore Provider mux *http.ServeMux srv *http.Server shutdownWG *sync.WaitGroup } -func NewHTTP(telemetry component.TelemetrySettings, settings confighttp.ServerConfig, strategyStore samplingstrategy.Provider) (*SamplingHTTPServer, error) { +func NewHTTP(telemetry component.TelemetrySettings, settings confighttp.ServerConfig, strategyStore Provider) (*SamplingHTTPServer, error) { if strategyStore == nil { return nil, errMissingStrategyStore } diff --git a/extension/jaegerremotesampling/internal/interface.go b/extension/jaegerremotesampling/internal/interface.go new file mode 100644 index 000000000000..238001bdcc5b --- /dev/null +++ b/extension/jaegerremotesampling/internal/interface.go @@ -0,0 +1,20 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package internal + +import ( + "context" + "io" + + "github.com/jaegertracing/jaeger/proto-gen/api_v2" +) + +// Provider keeps track of service specific sampling strategies. +type Provider interface { + // Close() from io.Closer stops the processor from calculating probabilities. + io.Closer + + // GetSamplingStrategy retrieves the sampling strategy for the specified service. + GetSamplingStrategy(ctx context.Context, serviceName string) (*api_v2.SamplingStrategyResponse, error) +} diff --git a/extension/jaegerremotesampling/internal/remote_strategy_store.go b/extension/jaegerremotesampling/internal/remote_strategy_store.go index 68ce1be8843b..a4e789392d60 100644 --- a/extension/jaegerremotesampling/internal/remote_strategy_store.go +++ b/extension/jaegerremotesampling/internal/remote_strategy_store.go @@ -10,7 +10,6 @@ import ( "time" grpcstore "github.com/jaegertracing/jaeger/cmd/agent/app/configmanager/grpc" - "github.com/jaegertracing/jaeger/cmd/collector/app/sampling/samplingstrategy" "github.com/jaegertracing/jaeger/proto-gen/api_v2" "go.opentelemetry.io/collector/config/configgrpc" "go.opentelemetry.io/collector/config/configopaque" @@ -32,7 +31,7 @@ func NewRemoteStrategyStore( conn *grpc.ClientConn, grpcClientSettings *configgrpc.ClientConfig, reloadInterval time.Duration, -) (samplingstrategy.Provider, io.Closer) { +) (Provider, io.Closer) { cache := newNoopStrategyCache() if reloadInterval > 0 { cache = newServiceStrategyCache(reloadInterval) diff --git a/extension/jaegerremotesampling/options.go b/extension/jaegerremotesampling/options.go new file mode 100644 index 000000000000..ad93f5bffd26 --- /dev/null +++ b/extension/jaegerremotesampling/options.go @@ -0,0 +1,20 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package jaegerremotesampling + +import ( + "time" +) + +// Options holds configuration for the static sampling strategy store. +type Options struct { + // StrategiesFile is the path for the sampling strategies file in JSON format + StrategiesFile string + // ReloadInterval is the time interval to check and reload sampling strategies file + ReloadInterval time.Duration + // Flag for enabling possibly breaking change which includes default operations level + // strategies when calculating Ratelimiting type service level strategy + // more information https://github.com/jaegertracing/jaeger/issues/5270 + IncludeDefaultOpStrategies bool +} diff --git a/extension/jaegerremotesampling/provider.go b/extension/jaegerremotesampling/provider.go new file mode 100644 index 000000000000..8e8e46714631 --- /dev/null +++ b/extension/jaegerremotesampling/provider.go @@ -0,0 +1,380 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package jaegerremotesampling + +import ( + "bytes" + "context" + "encoding/gob" + "encoding/json" + "fmt" + "net/http" + "net/url" + "os" + "path/filepath" + "sync/atomic" + "time" + + ss "github.com/jaegertracing/jaeger/cmd/collector/app/sampling/samplingstrategy" + "github.com/jaegertracing/jaeger/proto-gen/api_v2" + "go.uber.org/zap" +) + +// null represents "null" JSON value and +// it un-marshals to nil pointer. +var nullJSON = []byte("null") + +type samplingProvider struct { + logger *zap.Logger + + storedStrategies atomic.Value // holds *storedStrategies + + cancelFunc context.CancelFunc + + options Options +} + +type storedStrategies struct { + defaultStrategy *api_v2.SamplingStrategyResponse + serviceStrategies map[string]*api_v2.SamplingStrategyResponse +} + +type strategyLoader func() ([]byte, error) + +// NewProvider creates a strategy store that holds static sampling strategies. +func NewProvider(options Options, logger *zap.Logger) (ss.Provider, error) { + ctx, cancelFunc := context.WithCancel(context.Background()) + h := &samplingProvider{ + logger: logger, + cancelFunc: cancelFunc, + options: options, + } + h.storedStrategies.Store(defaultStrategies()) + + if options.StrategiesFile == "" { + h.logger.Info("No sampling strategies source provided, using defaults") + return h, nil + } + + loadFn := h.samplingStrategyLoader(options.StrategiesFile) + strategies, err := loadStrategies(loadFn) + if err != nil { + return nil, err + } else if strategies == nil { + h.logger.Info("No sampling strategies found or URL is unavailable, using defaults") + return h, nil + } + + if !h.options.IncludeDefaultOpStrategies { + h.logger.Warn("Default operations level strategies will not be included for Ratelimiting service strategies." + + "This behavior will be changed in future releases. " + + "Cf. https://github.com/jaegertracing/jaeger/issues/5270") + h.parseStrategies_deprecated(strategies) + } else { + h.parseStrategies(strategies) + } + + if options.ReloadInterval > 0 { + go h.autoUpdateStrategies(ctx, options.ReloadInterval, loadFn) + } + return h, nil +} + +// GetSamplingStrategy implements StrategyStore#GetSamplingStrategy. +func (h *samplingProvider) GetSamplingStrategy(_ context.Context, serviceName string) (*api_v2.SamplingStrategyResponse, error) { + storedStrategies := h.storedStrategies.Load().(*storedStrategies) + serviceStrategies := storedStrategies.serviceStrategies + if strategy, ok := serviceStrategies[serviceName]; ok { + return strategy, nil + } + h.logger.Debug("sampling strategy not found, using default", zap.String("service", serviceName)) + return storedStrategies.defaultStrategy, nil +} + +// Close stops updating the strategies +func (h *samplingProvider) Close() error { + h.cancelFunc() + return nil +} + +func (h *samplingProvider) downloadSamplingStrategies(samplingURL string) ([]byte, error) { + h.logger.Info("Downloading sampling strategies", zap.String("url", samplingURL)) + + ctx, cx := context.WithTimeout(context.Background(), time.Second) + defer cx() + req, err := http.NewRequestWithContext(ctx, http.MethodGet, samplingURL, nil) + if err != nil { + return nil, fmt.Errorf("cannot construct HTTP request: %w", err) + } + resp, err := http.DefaultClient.Do(req) + if err != nil { + return nil, fmt.Errorf("failed to download sampling strategies: %w", err) + } + defer resp.Body.Close() + + buf := new(bytes.Buffer) + if _, err = buf.ReadFrom(resp.Body); err != nil { + return nil, fmt.Errorf("failed to read sampling strategies HTTP response body: %w", err) + } + + if resp.StatusCode == http.StatusServiceUnavailable { + return nullJSON, nil + } + if resp.StatusCode != http.StatusOK { + return nil, fmt.Errorf( + "receiving %s while downloading strategies file: %s", + resp.Status, + buf.String(), + ) + } + + return buf.Bytes(), nil +} + +func isURL(str string) bool { + u, err := url.Parse(str) + return err == nil && u.Scheme != "" && u.Host != "" +} + +func (h *samplingProvider) samplingStrategyLoader(strategiesFile string) strategyLoader { + if isURL(strategiesFile) { + return func() ([]byte, error) { + return h.downloadSamplingStrategies(strategiesFile) + } + } + + return func() ([]byte, error) { + h.logger.Info("Loading sampling strategies", zap.String("filename", strategiesFile)) + currBytes, err := os.ReadFile(filepath.Clean(strategiesFile)) + if err != nil { + return nil, fmt.Errorf("failed to read strategies file %s: %w", strategiesFile, err) + } + return currBytes, nil + } +} + +func (h *samplingProvider) autoUpdateStrategies(ctx context.Context, interval time.Duration, loader strategyLoader) { + lastValue := string(nullJSON) + ticker := time.NewTicker(interval) + defer ticker.Stop() + for { + select { + case <-ticker.C: + lastValue = h.reloadSamplingStrategy(loader, lastValue) + case <-ctx.Done(): + return + } + } +} + +func (h *samplingProvider) reloadSamplingStrategy(loadFn strategyLoader, lastValue string) string { + newValue, err := loadFn() + if err != nil { + h.logger.Error("failed to re-load sampling strategies", zap.Error(err)) + return lastValue + } + if lastValue == string(newValue) { + return lastValue + } + if err := h.updateSamplingStrategy(newValue); err != nil { + h.logger.Error("failed to update sampling strategies", zap.Error(err)) + return lastValue + } + return string(newValue) +} + +func (h *samplingProvider) updateSamplingStrategy(dataBytes []byte) error { + var strategies strategies + if err := json.Unmarshal(dataBytes, &strategies); err != nil { + return fmt.Errorf("failed to unmarshal sampling strategies: %w", err) + } + h.parseStrategies(&strategies) + h.logger.Info("Updated sampling strategies:" + string(dataBytes)) + return nil +} + +// TODO good candidate for a global util function +func loadStrategies(loadFn strategyLoader) (*strategies, error) { + strategyBytes, err := loadFn() + if err != nil { + return nil, err + } + + var strategies *strategies + if err := json.Unmarshal(strategyBytes, &strategies); err != nil { + return nil, fmt.Errorf("failed to unmarshal strategies: %w", err) + } + return strategies, nil +} + +func (h *samplingProvider) parseStrategies_deprecated(strategies *strategies) { + newStore := defaultStrategies() + if strategies.DefaultStrategy != nil { + newStore.defaultStrategy = h.parseServiceStrategies(strategies.DefaultStrategy) + } + + merge := true + if newStore.defaultStrategy.OperationSampling == nil || + newStore.defaultStrategy.OperationSampling.PerOperationStrategies == nil { + merge = false + } + + for _, s := range strategies.ServiceStrategies { + newStore.serviceStrategies[s.Service] = h.parseServiceStrategies(s) + + // Merge with the default operation strategies, because only merging with + // the default strategy has no effect on service strategies (the default strategy + // is not merged with and only used as a fallback). + opS := newStore.serviceStrategies[s.Service].OperationSampling + if opS == nil { + if newStore.defaultStrategy.OperationSampling == nil || + newStore.serviceStrategies[s.Service].ProbabilisticSampling == nil { + continue + } + // Service has no per-operation strategies, so just reference the default settings and change default samplingRate. + newOpS := *newStore.defaultStrategy.OperationSampling + newOpS.DefaultSamplingProbability = newStore.serviceStrategies[s.Service].ProbabilisticSampling.SamplingRate + newStore.serviceStrategies[s.Service].OperationSampling = &newOpS + continue + } + if merge { + opS.PerOperationStrategies = mergePerOperationSamplingStrategies( + opS.PerOperationStrategies, + newStore.defaultStrategy.OperationSampling.PerOperationStrategies) + } + } + h.storedStrategies.Store(newStore) +} + +func (h *samplingProvider) parseStrategies(strategies *strategies) { + newStore := defaultStrategies() + if strategies.DefaultStrategy != nil { + newStore.defaultStrategy = h.parseServiceStrategies(strategies.DefaultStrategy) + } + + for _, s := range strategies.ServiceStrategies { + newStore.serviceStrategies[s.Service] = h.parseServiceStrategies(s) + + // Config for this service may not have per-operation strategies, + // but if the default strategy has them they should still apply. + + if newStore.defaultStrategy.OperationSampling == nil { + // Default strategy doens't have them either, nothing to do. + continue + } + + opS := newStore.serviceStrategies[s.Service].OperationSampling + if opS == nil { + // Service does not have its own per-operation rules, so copy (by value) from the default strategy. + newOpS := *newStore.defaultStrategy.OperationSampling + + // If the service's own default is probabilistic, then its sampling rate should take precedence. + if newStore.serviceStrategies[s.Service].ProbabilisticSampling != nil { + newOpS.DefaultSamplingProbability = newStore.serviceStrategies[s.Service].ProbabilisticSampling.SamplingRate + } + newStore.serviceStrategies[s.Service].OperationSampling = &newOpS + continue + } + + // If the service did have its own per-operation strategies, then merge them with the default ones. + opS.PerOperationStrategies = mergePerOperationSamplingStrategies( + opS.PerOperationStrategies, + newStore.defaultStrategy.OperationSampling.PerOperationStrategies) + } + h.storedStrategies.Store(newStore) +} + +// mergePerOperationSamplingStrategies merges two operation strategies a and b, where a takes precedence over b. +func mergePerOperationSamplingStrategies( + a, b []*api_v2.OperationSamplingStrategy, +) []*api_v2.OperationSamplingStrategy { + m := make(map[string]bool) + for _, aOp := range a { + m[aOp.Operation] = true + } + for _, bOp := range b { + if m[bOp.Operation] { + continue + } + a = append(a, bOp) + } + return a +} + +func (h *samplingProvider) parseServiceStrategies(strategy *serviceStrategy) *api_v2.SamplingStrategyResponse { + resp := h.parseStrategy(&strategy.strategy) + if len(strategy.OperationStrategies) == 0 { + return resp + } + opS := &api_v2.PerOperationSamplingStrategies{ + DefaultSamplingProbability: defaultSamplingProbability, + } + if resp.StrategyType == api_v2.SamplingStrategyType_PROBABILISTIC { + opS.DefaultSamplingProbability = resp.ProbabilisticSampling.SamplingRate + } + for _, operationStrategy := range strategy.OperationStrategies { + s, ok := h.parseOperationStrategy(operationStrategy, opS) + if !ok { + continue + } + + opS.PerOperationStrategies = append(opS.PerOperationStrategies, + &api_v2.OperationSamplingStrategy{ + Operation: operationStrategy.Operation, + ProbabilisticSampling: s.ProbabilisticSampling, + }) + } + resp.OperationSampling = opS + return resp +} + +func (h *samplingProvider) parseOperationStrategy( + strategy *operationStrategy, + parent *api_v2.PerOperationSamplingStrategies, +) (s *api_v2.SamplingStrategyResponse, ok bool) { + s = h.parseStrategy(&strategy.strategy) + if s.StrategyType == api_v2.SamplingStrategyType_RATE_LIMITING { + // TODO OperationSamplingStrategy only supports probabilistic sampling + h.logger.Warn( + fmt.Sprintf( + "Operation strategies only supports probabilistic sampling at the moment,"+ + "'%s' defaulting to probabilistic sampling with probability %f", + strategy.Operation, parent.DefaultSamplingProbability), + zap.Any("strategy", strategy)) + return nil, false + } + return s, true +} + +func (h *samplingProvider) parseStrategy(strategy *strategy) *api_v2.SamplingStrategyResponse { + switch strategy.Type { + case samplerTypeProbabilistic: + return &api_v2.SamplingStrategyResponse{ + StrategyType: api_v2.SamplingStrategyType_PROBABILISTIC, + ProbabilisticSampling: &api_v2.ProbabilisticSamplingStrategy{ + SamplingRate: strategy.Param, + }, + } + case samplerTypeRateLimiting: + return &api_v2.SamplingStrategyResponse{ + StrategyType: api_v2.SamplingStrategyType_RATE_LIMITING, + RateLimitingSampling: &api_v2.RateLimitingSamplingStrategy{ + MaxTracesPerSecond: int32(strategy.Param), + }, + } + default: + h.logger.Warn("Failed to parse sampling strategy", zap.Any("strategy", strategy)) + return defaultStrategyResponse() + } +} + +func deepCopy(s *api_v2.SamplingStrategyResponse) *api_v2.SamplingStrategyResponse { + var buf bytes.Buffer + enc := gob.NewEncoder(&buf) + dec := gob.NewDecoder(&buf) + enc.Encode(*s) + var copyValue api_v2.SamplingStrategyResponse + dec.Decode(©Value) + return ©Value +} diff --git a/extension/jaegerremotesampling/provider_test.go b/extension/jaegerremotesampling/provider_test.go new file mode 100644 index 000000000000..10bae001f555 --- /dev/null +++ b/extension/jaegerremotesampling/provider_test.go @@ -0,0 +1,557 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package jaegerremotesampling + +import ( + "context" + "encoding/json" + "fmt" + "net/http" + "net/http/httptest" + "os" + "path/filepath" + "strings" + "sync/atomic" + "testing" + "time" + + "github.com/jaegertracing/jaeger/pkg/testutils" + "github.com/jaegertracing/jaeger/proto-gen/api_v2" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/zap" + "go.uber.org/zap/zaptest/observer" +) + +const snapshotLocation = "./fixtures/" + +// Snapshots can be regenerated via: +// +// REGENERATE_SNAPSHOTS=true go test -v ./plugin/sampling/strategyprovider/static/provider_test.go +var regenerateSnapshots = os.Getenv("REGENERATE_SNAPSHOTS") == "true" + +// strategiesJSON returns the strategy with +// a given probability. +func strategiesJSON(probability float32) string { + strategy := fmt.Sprintf(` + { + "default_strategy": { + "type": "probabilistic", + "param": 0.5 + }, + "service_strategies": [ + { + "service": "foo", + "type": "probabilistic", + "param": %.1f + }, + { + "service": "bar", + "type": "ratelimiting", + "param": 5 + } + ] + } + `, + probability, + ) + return strategy +} + +// Returns strategies in JSON format. Used for testing +// URL option for sampling strategies. +func mockStrategyServer(t *testing.T) (*httptest.Server, *atomic.Pointer[string]) { + var strategy atomic.Pointer[string] + value := strategiesJSON(0.8) + strategy.Store(&value) + f := func(w http.ResponseWriter, r *http.Request) { + switch r.URL.Path { + case "/bad-content": + w.Write([]byte("bad-content")) + return + + case "/bad-status": + w.WriteHeader(http.StatusNotFound) + return + + case "/service-unavailable": + w.WriteHeader(http.StatusServiceUnavailable) + return + + default: + w.WriteHeader(http.StatusOK) + w.Header().Set("Content-Type", "application/json") + w.Write([]byte(*strategy.Load())) + } + } + mockserver := httptest.NewServer(http.HandlerFunc(f)) + t.Cleanup(func() { + mockserver.Close() + }) + return mockserver, &strategy +} + +func TestStrategyStoreWithFile(t *testing.T) { + _, err := NewProvider(Options{StrategiesFile: "fileNotFound.json"}, zap.NewNop()) + require.ErrorContains(t, err, "failed to read strategies file fileNotFound.json") + + _, err = NewProvider(Options{StrategiesFile: "fixtures/bad_strategies.json"}, zap.NewNop()) + require.EqualError(t, err, + "failed to unmarshal strategies: json: cannot unmarshal string into Go value of type jaegerremotesampling.strategies") + + // Test default strategy + logger, buf := testutils.NewLogger() + provider, err := NewProvider(Options{}, logger) + require.NoError(t, err) + assert.Contains(t, buf.String(), "No sampling strategies source provided, using defaults") + s, err := provider.GetSamplingStrategy(context.Background(), "foo") + require.NoError(t, err) + assert.EqualValues(t, makeResponse(api_v2.SamplingStrategyType_PROBABILISTIC, 0.001), *s) + + // Test reading strategies from a file + provider, err = NewProvider(Options{StrategiesFile: "fixtures/strategies.json"}, logger) + require.NoError(t, err) + s, err = provider.GetSamplingStrategy(context.Background(), "foo") + require.NoError(t, err) + assert.EqualValues(t, makeResponse(api_v2.SamplingStrategyType_PROBABILISTIC, 0.8), *s) + + s, err = provider.GetSamplingStrategy(context.Background(), "bar") + require.NoError(t, err) + assert.EqualValues(t, makeResponse(api_v2.SamplingStrategyType_RATE_LIMITING, 5), *s) + + s, err = provider.GetSamplingStrategy(context.Background(), "default") + require.NoError(t, err) + assert.EqualValues(t, makeResponse(api_v2.SamplingStrategyType_PROBABILISTIC, 0.5), *s) +} + +func TestStrategyStoreWithURL(t *testing.T) { + // Test default strategy when URL is temporarily unavailable. + logger, buf := testutils.NewLogger() + mockServer, _ := mockStrategyServer(t) + provider, err := NewProvider(Options{StrategiesFile: mockServer.URL + "/service-unavailable"}, logger) + require.NoError(t, err) + assert.Contains(t, buf.String(), "No sampling strategies found or URL is unavailable, using defaults") + s, err := provider.GetSamplingStrategy(context.Background(), "foo") + require.NoError(t, err) + assert.EqualValues(t, makeResponse(api_v2.SamplingStrategyType_PROBABILISTIC, 0.001), *s) + + // Test downloading strategies from a URL. + provider, err = NewProvider(Options{StrategiesFile: mockServer.URL}, logger) + require.NoError(t, err) + + s, err = provider.GetSamplingStrategy(context.Background(), "foo") + require.NoError(t, err) + assert.EqualValues(t, makeResponse(api_v2.SamplingStrategyType_PROBABILISTIC, 0.8), *s) + + s, err = provider.GetSamplingStrategy(context.Background(), "bar") + require.NoError(t, err) + assert.EqualValues(t, makeResponse(api_v2.SamplingStrategyType_RATE_LIMITING, 5), *s) +} + +func TestPerOperationSamplingStrategies(t *testing.T) { + tests := []struct { + options Options + }{ + {Options{StrategiesFile: "fixtures/operation_strategies.json"}}, + {Options{ + StrategiesFile: "fixtures/operation_strategies.json", + IncludeDefaultOpStrategies: true, + }}, + } + + for _, tc := range tests { + logger, buf := testutils.NewLogger() + provider, err := NewProvider(tc.options, logger) + assert.Contains(t, buf.String(), "Operation strategies only supports probabilistic sampling at the moment,"+ + "'op2' defaulting to probabilistic sampling with probability 0.8") + assert.Contains(t, buf.String(), "Operation strategies only supports probabilistic sampling at the moment,"+ + "'op4' defaulting to probabilistic sampling with probability 0.001") + require.NoError(t, err) + + expected := makeResponse(api_v2.SamplingStrategyType_PROBABILISTIC, 0.8) + + s, err := provider.GetSamplingStrategy(context.Background(), "foo") + require.NoError(t, err) + assert.Equal(t, api_v2.SamplingStrategyType_PROBABILISTIC, s.StrategyType) + assert.Equal(t, *expected.ProbabilisticSampling, *s.ProbabilisticSampling) + + require.NotNil(t, s.OperationSampling) + opSampling := s.OperationSampling + assert.InDelta(t, 0.8, opSampling.DefaultSamplingProbability, 0.01) + require.Len(t, opSampling.PerOperationStrategies, 4) + + assert.Equal(t, "op6", opSampling.PerOperationStrategies[0].Operation) + assert.InDelta(t, 0.5, opSampling.PerOperationStrategies[0].ProbabilisticSampling.SamplingRate, 0.01) + assert.Equal(t, "op1", opSampling.PerOperationStrategies[1].Operation) + assert.InDelta(t, 0.2, opSampling.PerOperationStrategies[1].ProbabilisticSampling.SamplingRate, 0.01) + assert.Equal(t, "op0", opSampling.PerOperationStrategies[2].Operation) + assert.InDelta(t, 0.2, opSampling.PerOperationStrategies[2].ProbabilisticSampling.SamplingRate, 0.01) + assert.Equal(t, "op7", opSampling.PerOperationStrategies[3].Operation) + assert.InDelta(t, 1.0, opSampling.PerOperationStrategies[3].ProbabilisticSampling.SamplingRate, 0.01) + + expected = makeResponse(api_v2.SamplingStrategyType_RATE_LIMITING, 5) + + s, err = provider.GetSamplingStrategy(context.Background(), "bar") + require.NoError(t, err) + assert.Equal(t, api_v2.SamplingStrategyType_RATE_LIMITING, s.StrategyType) + assert.Equal(t, *expected.RateLimitingSampling, *s.RateLimitingSampling) + + require.NotNil(t, s.OperationSampling) + opSampling = s.OperationSampling + assert.InDelta(t, 0.001, opSampling.DefaultSamplingProbability, 1e-4) + require.Len(t, opSampling.PerOperationStrategies, 5) + assert.Equal(t, "op3", opSampling.PerOperationStrategies[0].Operation) + assert.InDelta(t, 0.3, opSampling.PerOperationStrategies[0].ProbabilisticSampling.SamplingRate, 0.01) + assert.Equal(t, "op5", opSampling.PerOperationStrategies[1].Operation) + assert.InDelta(t, 0.4, opSampling.PerOperationStrategies[1].ProbabilisticSampling.SamplingRate, 0.01) + assert.Equal(t, "op0", opSampling.PerOperationStrategies[2].Operation) + assert.InDelta(t, 0.2, opSampling.PerOperationStrategies[2].ProbabilisticSampling.SamplingRate, 0.01) + assert.Equal(t, "op6", opSampling.PerOperationStrategies[3].Operation) + assert.InDelta(t, 0.0, opSampling.PerOperationStrategies[3].ProbabilisticSampling.SamplingRate, 0.01) + assert.Equal(t, "op7", opSampling.PerOperationStrategies[4].Operation) + assert.InDelta(t, 1.0, opSampling.PerOperationStrategies[4].ProbabilisticSampling.SamplingRate, 0.01) + + s, err = provider.GetSamplingStrategy(context.Background(), "default") + require.NoError(t, err) + expectedRsp := makeResponse(api_v2.SamplingStrategyType_PROBABILISTIC, 0.5) + expectedRsp.OperationSampling = &api_v2.PerOperationSamplingStrategies{ + DefaultSamplingProbability: 0.5, + PerOperationStrategies: []*api_v2.OperationSamplingStrategy{ + { + Operation: "op0", + ProbabilisticSampling: &api_v2.ProbabilisticSamplingStrategy{ + SamplingRate: 0.2, + }, + }, + { + Operation: "op6", + ProbabilisticSampling: &api_v2.ProbabilisticSamplingStrategy{ + SamplingRate: 0, + }, + }, + { + Operation: "op7", + ProbabilisticSampling: &api_v2.ProbabilisticSamplingStrategy{ + SamplingRate: 1, + }, + }, + }, + } + assert.EqualValues(t, expectedRsp, *s) + } +} + +func TestMissingServiceSamplingStrategyTypes(t *testing.T) { + logger, buf := testutils.NewLogger() + provider, err := NewProvider(Options{StrategiesFile: "fixtures/missing-service-types.json"}, logger) + assert.Contains(t, buf.String(), "Failed to parse sampling strategy") + require.NoError(t, err) + + expected := makeResponse(api_v2.SamplingStrategyType_PROBABILISTIC, defaultSamplingProbability) + + s, err := provider.GetSamplingStrategy(context.Background(), "foo") + require.NoError(t, err) + assert.Equal(t, api_v2.SamplingStrategyType_PROBABILISTIC, s.StrategyType) + assert.Equal(t, *expected.ProbabilisticSampling, *s.ProbabilisticSampling) + + require.NotNil(t, s.OperationSampling) + opSampling := s.OperationSampling + assert.InDelta(t, defaultSamplingProbability, opSampling.DefaultSamplingProbability, 1e-4) + require.Len(t, opSampling.PerOperationStrategies, 1) + assert.Equal(t, "op1", opSampling.PerOperationStrategies[0].Operation) + assert.InDelta(t, 0.2, opSampling.PerOperationStrategies[0].ProbabilisticSampling.SamplingRate, 0.001) + + expected = makeResponse(api_v2.SamplingStrategyType_PROBABILISTIC, defaultSamplingProbability) + + s, err = provider.GetSamplingStrategy(context.Background(), "bar") + require.NoError(t, err) + assert.Equal(t, api_v2.SamplingStrategyType_PROBABILISTIC, s.StrategyType) + assert.Equal(t, *expected.ProbabilisticSampling, *s.ProbabilisticSampling) + + require.NotNil(t, s.OperationSampling) + opSampling = s.OperationSampling + assert.InDelta(t, 0.001, opSampling.DefaultSamplingProbability, 1e-4) + require.Len(t, opSampling.PerOperationStrategies, 2) + assert.Equal(t, "op3", opSampling.PerOperationStrategies[0].Operation) + assert.InDelta(t, 0.3, opSampling.PerOperationStrategies[0].ProbabilisticSampling.SamplingRate, 0.01) + assert.Equal(t, "op5", opSampling.PerOperationStrategies[1].Operation) + assert.InDelta(t, 0.4, opSampling.PerOperationStrategies[1].ProbabilisticSampling.SamplingRate, 0.01) + + s, err = provider.GetSamplingStrategy(context.Background(), "default") + require.NoError(t, err) + assert.EqualValues(t, makeResponse(api_v2.SamplingStrategyType_PROBABILISTIC, 0.5), *s) +} + +func TestParseStrategy(t *testing.T) { + tests := []struct { + strategy serviceStrategy + expected api_v2.SamplingStrategyResponse + }{ + { + strategy: serviceStrategy{ + Service: "svc", + strategy: strategy{Type: "probabilistic", Param: 0.2}, + }, + expected: makeResponse(api_v2.SamplingStrategyType_PROBABILISTIC, 0.2), + }, + { + strategy: serviceStrategy{ + Service: "svc", + strategy: strategy{Type: "ratelimiting", Param: 3.5}, + }, + expected: makeResponse(api_v2.SamplingStrategyType_RATE_LIMITING, 3), + }, + } + logger, buf := testutils.NewLogger() + provider := &samplingProvider{logger: logger} + for _, test := range tests { + tt := test + t.Run("", func(t *testing.T) { + assert.EqualValues(t, tt.expected, *provider.parseStrategy(&tt.strategy.strategy)) + }) + } + assert.Empty(t, buf.String()) + + // Test nonexistent strategy type + actual := *provider.parseStrategy(&strategy{Type: "blah", Param: 3.5}) + expected := makeResponse(api_v2.SamplingStrategyType_PROBABILISTIC, defaultSamplingProbability) + assert.EqualValues(t, expected, actual) + assert.Contains(t, buf.String(), "Failed to parse sampling strategy") +} + +func makeResponse(samplerType api_v2.SamplingStrategyType, param float64) (resp api_v2.SamplingStrategyResponse) { + resp.StrategyType = samplerType + if samplerType == api_v2.SamplingStrategyType_PROBABILISTIC { + resp.ProbabilisticSampling = &api_v2.ProbabilisticSamplingStrategy{ + SamplingRate: param, + } + } else if samplerType == api_v2.SamplingStrategyType_RATE_LIMITING { + resp.RateLimitingSampling = &api_v2.RateLimitingSamplingStrategy{ + MaxTracesPerSecond: int32(param), + } + } + return resp +} + +func TestDeepCopy(t *testing.T) { + s := &api_v2.SamplingStrategyResponse{ + StrategyType: api_v2.SamplingStrategyType_PROBABILISTIC, + ProbabilisticSampling: &api_v2.ProbabilisticSamplingStrategy{ + SamplingRate: 0.5, + }, + } + cp := deepCopy(s) + assert.NotSame(t, cp, s) + assert.EqualValues(t, cp, s) +} + +func TestAutoUpdateStrategyWithFile(t *testing.T) { + tempFile, _ := os.CreateTemp("", "for_go_test_*.json") + require.NoError(t, tempFile.Close()) + defer func() { + require.NoError(t, os.Remove(tempFile.Name())) + }() + + // copy known fixture content into temp file which we can later overwrite + srcFile, dstFile := "fixtures/strategies.json", tempFile.Name() + srcBytes, err := os.ReadFile(srcFile) + require.NoError(t, err) + require.NoError(t, os.WriteFile(dstFile, srcBytes, 0o644)) + + ss, err := NewProvider(Options{ + StrategiesFile: dstFile, + ReloadInterval: time.Millisecond * 10, + }, zap.NewNop()) + require.NoError(t, err) + provider := ss.(*samplingProvider) + defer provider.Close() + + // confirm baseline value + s, err := provider.GetSamplingStrategy(context.Background(), "foo") + require.NoError(t, err) + assert.EqualValues(t, makeResponse(api_v2.SamplingStrategyType_PROBABILISTIC, 0.8), *s) + + // verify that reloading is a no-op + value := provider.reloadSamplingStrategy(provider.samplingStrategyLoader(dstFile), string(srcBytes)) + assert.Equal(t, string(srcBytes), value) + + // update file with new probability of 0.9 + newStr := strings.Replace(string(srcBytes), "0.8", "0.9", 1) + require.NoError(t, os.WriteFile(dstFile, []byte(newStr), 0o644)) + + // wait for reload timer + for i := 0; i < 1000; i++ { // wait up to 1sec + s, err = provider.GetSamplingStrategy(context.Background(), "foo") + require.NoError(t, err) + if s.ProbabilisticSampling != nil && s.ProbabilisticSampling.SamplingRate == 0.9 { + break + } + time.Sleep(1 * time.Millisecond) + } + assert.EqualValues(t, makeResponse(api_v2.SamplingStrategyType_PROBABILISTIC, 0.9), *s) +} + +func TestAutoUpdateStrategyWithURL(t *testing.T) { + mockServer, mockStrategy := mockStrategyServer(t) + ss, err := NewProvider(Options{ + StrategiesFile: mockServer.URL, + ReloadInterval: 10 * time.Millisecond, + }, zap.NewNop()) + require.NoError(t, err) + provider := ss.(*samplingProvider) + defer provider.Close() + + // confirm baseline value + s, err := provider.GetSamplingStrategy(context.Background(), "foo") + require.NoError(t, err) + assert.EqualValues(t, makeResponse(api_v2.SamplingStrategyType_PROBABILISTIC, 0.8), *s) + + // verify that reloading in no-op + value := provider.reloadSamplingStrategy( + provider.samplingStrategyLoader(mockServer.URL), + *mockStrategy.Load(), + ) + assert.Equal(t, *mockStrategy.Load(), value) + + // update original strategies with new probability of 0.9 + { + v09 := strategiesJSON(0.9) + mockStrategy.Store(&v09) + } + + // wait for reload timer + for i := 0; i < 1000; i++ { // wait up to 1sec + s, err = provider.GetSamplingStrategy(context.Background(), "foo") + require.NoError(t, err) + if s.ProbabilisticSampling != nil && s.ProbabilisticSampling.SamplingRate == 0.9 { + break + } + time.Sleep(1 * time.Millisecond) + } + assert.EqualValues(t, makeResponse(api_v2.SamplingStrategyType_PROBABILISTIC, 0.9), *s) +} + +func TestAutoUpdateStrategyErrors(t *testing.T) { + tempFile, _ := os.CreateTemp("", "for_go_test_*.json") + require.NoError(t, tempFile.Close()) + defer func() { + _ = os.Remove(tempFile.Name()) + }() + + zapCore, logs := observer.New(zap.InfoLevel) + logger := zap.New(zapCore) + + s, err := NewProvider(Options{ + StrategiesFile: "fixtures/strategies.json", + ReloadInterval: time.Hour, + }, logger) + require.NoError(t, err) + provider := s.(*samplingProvider) + defer provider.Close() + + // check invalid file path or read failure + assert.Equal(t, "blah", provider.reloadSamplingStrategy(provider.samplingStrategyLoader(tempFile.Name()+"bad-path"), "blah")) + assert.Len(t, logs.FilterMessage("failed to re-load sampling strategies").All(), 1) + + // check bad file content + require.NoError(t, os.WriteFile(tempFile.Name(), []byte("bad value"), 0o644)) + assert.Equal(t, "blah", provider.reloadSamplingStrategy(provider.samplingStrategyLoader(tempFile.Name()), "blah")) + assert.Len(t, logs.FilterMessage("failed to update sampling strategies").All(), 1) + + // check invalid url + assert.Equal(t, "duh", provider.reloadSamplingStrategy(provider.samplingStrategyLoader("bad-url"), "duh")) + assert.Len(t, logs.FilterMessage("failed to re-load sampling strategies").All(), 2) + + // check status code other than 200 + mockServer, _ := mockStrategyServer(t) + assert.Equal(t, "duh", provider.reloadSamplingStrategy(provider.samplingStrategyLoader(mockServer.URL+"/bad-status"), "duh")) + assert.Len(t, logs.FilterMessage("failed to re-load sampling strategies").All(), 3) + + // check bad content from url + assert.Equal(t, "duh", provider.reloadSamplingStrategy(provider.samplingStrategyLoader(mockServer.URL+"/bad-content"), "duh")) + assert.Len(t, logs.FilterMessage("failed to update sampling strategies").All(), 2) +} + +func TestServiceNoPerOperationStrategies(t *testing.T) { + // given setup of strategy provider with no specific per operation sampling strategies + // and option "sampling.strategies.bugfix-5270=true" + provider, err := NewProvider(Options{ + StrategiesFile: "fixtures/service_no_per_operation.json", + IncludeDefaultOpStrategies: true, + }, zap.NewNop()) + require.NoError(t, err) + + for _, service := range []string{"ServiceA", "ServiceB"} { + t.Run(service, func(t *testing.T) { + strategy, err := provider.GetSamplingStrategy(context.Background(), service) + require.NoError(t, err) + strategyJson, err := json.MarshalIndent(strategy, "", " ") + require.NoError(t, err) + + testName := strings.ReplaceAll(t.Name(), "/", "_") + snapshotFile := filepath.Join(snapshotLocation, testName+".json") + expectedServiceResponse, err := os.ReadFile(snapshotFile) + require.NoError(t, err) + + assert.JSONEq(t, string(expectedServiceResponse), string(strategyJson), + "comparing against stored snapshot. Use REGENERATE_SNAPSHOTS=true to rebuild snapshots.") + + if regenerateSnapshots { + os.WriteFile(snapshotFile, strategyJson, 0o644) + } + }) + } +} + +func TestServiceNoPerOperationStrategiesDeprecatedBehavior(t *testing.T) { + // test case to be removed along with removal of strategy_store.parseStrategies_deprecated, + // see https://github.com/jaegertracing/jaeger/issues/5270 for more details + + // given setup of strategy provider with no specific per operation sampling strategies + provider, err := NewProvider(Options{ + StrategiesFile: "fixtures/service_no_per_operation.json", + }, zap.NewNop()) + require.NoError(t, err) + + for _, service := range []string{"ServiceA", "ServiceB"} { + t.Run(service, func(t *testing.T) { + strategy, err := provider.GetSamplingStrategy(context.Background(), service) + require.NoError(t, err) + strategyJson, err := json.MarshalIndent(strategy, "", " ") + require.NoError(t, err) + + testName := strings.ReplaceAll(t.Name(), "/", "_") + snapshotFile := filepath.Join(snapshotLocation, testName+".json") + expectedServiceResponse, err := os.ReadFile(snapshotFile) + require.NoError(t, err) + + assert.JSONEq(t, string(expectedServiceResponse), string(strategyJson), + "comparing against stored snapshot. Use REGENERATE_SNAPSHOTS=true to rebuild snapshots.") + + if regenerateSnapshots { + os.WriteFile(snapshotFile, strategyJson, 0o644) + } + }) + } +} + +func TestSamplingStrategyLoader(t *testing.T) { + provider := &samplingProvider{logger: zap.NewNop()} + // invalid file path + loader := provider.samplingStrategyLoader("not-exists") + _, err := loader() + require.ErrorContains(t, err, "failed to read strategies file not-exists") + + // status code other than 200 + mockServer, _ := mockStrategyServer(t) + loader = provider.samplingStrategyLoader(mockServer.URL + "/bad-status") + _, err = loader() + require.ErrorContains(t, err, "receiving 404 Not Found while downloading strategies file") + + // should download content from URL + loader = provider.samplingStrategyLoader(mockServer.URL + "/bad-content") + content, err := loader() + require.NoError(t, err) + assert.Equal(t, "bad-content", string(content)) +} diff --git a/extension/jaegerremotesampling/strategy.go b/extension/jaegerremotesampling/strategy.go new file mode 100644 index 000000000000..55cc11cbe50d --- /dev/null +++ b/extension/jaegerremotesampling/strategy.go @@ -0,0 +1,30 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package jaegerremotesampling + +// strategy defines a sampling strategy. Type can be "probabilistic" or "ratelimiting" +// and Param will represent "sampling probability" and "max traces per second" respectively. +type strategy struct { + Type string `json:"type"` + Param float64 `json:"param"` +} + +// operationStrategy defines an operation specific sampling strategy. +type operationStrategy struct { + Operation string `json:"operation"` + strategy +} + +// serviceStrategy defines a service specific sampling strategy. +type serviceStrategy struct { + Service string `json:"service"` + OperationStrategies []*operationStrategy `json:"operation_strategies"` + strategy +} + +// strategies holds a default sampling strategy and service specific sampling strategies. +type strategies struct { + DefaultStrategy *serviceStrategy `json:"default_strategy"` + ServiceStrategies []*serviceStrategy `json:"service_strategies"` +}