diff --git a/extension/k8sleaderelector/README.md b/extension/k8sleaderelector/README.md index 72867d6a0887..fe66317397dd 100644 --- a/extension/k8sleaderelector/README.md +++ b/extension/k8sleaderelector/README.md @@ -42,4 +42,26 @@ service: | **lease_namespace** | The namespace of the lease object. | none (required) | | **lease_duration** | The duration of the lease. | 15s | | **renew_deadline** | The deadline for renewing the lease. It must be less than the lease duration. | 10s | -| **retry_period** | The period for retrying the leader election. | 2s | \ No newline at end of file +| **retry_period** | The period for retrying the leader election. | 2s | + +### Suggested RBAC +```yaml +apiVersion: rbac.authorization.k8s.io/v1 +kind: Role +metadata: + name: my-lease + namespace: default +rules: +- apiGroups: + - coordination.k8s.io + resources: + - leases + verbs: + - get + - list + - watch + - create + - update + - patch + - delete +``` \ No newline at end of file diff --git a/extension/k8sleaderelector/extension.go b/extension/k8sleaderelector/extension.go index 49fb4ce25f79..c643a3470822 100644 --- a/extension/k8sleaderelector/extension.go +++ b/extension/k8sleaderelector/extension.go @@ -5,6 +5,7 @@ package k8sleaderelector // import "github.com/open-telemetry/opentelemetry-coll import ( "context" + "sync" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/extension" @@ -36,17 +37,64 @@ type leaderElectionExtension struct { client kubernetes.Interface logger *zap.Logger leaseHolderID string + cancel context.CancelFunc + waitGroup sync.WaitGroup onStartedLeading []StartCallback onStoppedLeading []StopCallback } +// If the receiver sets a callback function then it would be invoked when the leader wins the election +func (lee *leaderElectionExtension) startedLeading(ctx context.Context) { + for _, callback := range lee.onStartedLeading { + callback(ctx) + } +} + +// If the receiver sets a callback function then it would be invoked when the leader loss the election +func (lee *leaderElectionExtension) stoppedLeading() { + for _, callback := range lee.onStoppedLeading { + callback() + } +} + // Start begins the extension's processing. func (lee *leaderElectionExtension) Start(_ context.Context, _ component.Host) error { + lee.logger.Info("Starting k8s leader elector with UUID", zap.String("UUID", lee.leaseHolderID)) + + ctx := context.Background() + ctx, lee.cancel = context.WithCancel(ctx) + // Create the K8s leader elector + leaderElector, err := newK8sLeaderElector(lee.config, lee.client, lee.startedLeading, lee.stoppedLeading, lee.leaseHolderID) + if err != nil { + lee.logger.Error("Failed to create k8s leader elector", zap.Error(err)) + return err + } + lee.waitGroup.Add(1) + go func() { + // Leader election loop stops if context is canceled or the leader elector loses the lease. + // The loop allows continued participation in leader election, even if the lease is lost. + defer lee.waitGroup.Done() + for { + leaderElector.Run(ctx) + + if ctx.Err() != nil { + break + } + + lee.logger.Info("Leader lease lost. Returning to standby mode...") + } + }() + return nil } // Shutdown ends the extension's processing. func (lee *leaderElectionExtension) Shutdown(context.Context) error { + lee.logger.Info("Stopping k8s leader elector with UUID", zap.String("UUID", lee.leaseHolderID)) + if lee.cancel != nil { + lee.cancel() + } + lee.waitGroup.Wait() return nil } diff --git a/extension/k8sleaderelector/extension_test.go b/extension/k8sleaderelector/extension_test.go new file mode 100644 index 000000000000..baa5ea5887bd --- /dev/null +++ b/extension/k8sleaderelector/extension_test.go @@ -0,0 +1,74 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package k8sleaderelector + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component/componenttest" + "go.uber.org/zap" + "go.uber.org/zap/zaptest/observer" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/kubernetes/fake" + "k8s.io/utils/ptr" + + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8sconfig" +) + +func TestExtension(t *testing.T) { + config := &Config{ + LeaseName: "foo", + LeaseNamespace: "default", + LeaseDuration: 15 * time.Second, + RenewDuration: 10 * time.Second, + RetryPeriod: 2 * time.Second, + } + + iamInvokedOnLeading := false + + ctx := context.TODO() + fakeClient := fake.NewClientset() + config.makeClient = func(_ k8sconfig.APIConfig) (kubernetes.Interface, error) { + return fakeClient, nil + } + + observedZapCore, _ := observer.New(zap.WarnLevel) + + leaderElection := leaderElectionExtension{ + config: config, + client: fakeClient, + logger: zap.New(observedZapCore), + leaseHolderID: "foo", + } + + leaderElection.SetCallBackFuncs( + func(_ context.Context) { + iamInvokedOnLeading = true + fmt.Printf("LeaderElection started leading") + }, + func() { + fmt.Printf("LeaderElection stopped leading") + }, + ) + + require.NoError(t, leaderElection.Start(ctx, componenttest.NewNopHost())) + + expectedLeaseDurationSeconds := ptr.To(int32(15)) + + require.Eventually(t, func() bool { + lease, err := fakeClient.CoordinationV1().Leases("default").Get(ctx, "foo", metav1.GetOptions{}) + require.NoError(t, err) + require.NotNil(t, lease) + require.Equal(t, expectedLeaseDurationSeconds, lease.Spec.LeaseDurationSeconds) + return true + }, 10*time.Second, 100*time.Millisecond) + + require.True(t, iamInvokedOnLeading) + require.NoError(t, leaderElection.Shutdown(ctx)) +} diff --git a/extension/k8sleaderelector/factory.go b/extension/k8sleaderelector/factory.go index 3aa1acbe4649..d26568ee3415 100644 --- a/extension/k8sleaderelector/factory.go +++ b/extension/k8sleaderelector/factory.go @@ -6,9 +6,10 @@ package k8sleaderelector // import "github.com/open-telemetry/opentelemetry-coll import ( "context" "errors" - "os" + "sync" "time" + "github.com/google/uuid" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/extension" @@ -52,16 +53,14 @@ func createExtension( return nil, errors.New("failed to create k8s client") } - leaseHolderID, err := os.Hostname() - if err != nil { - return nil, err - } + leaseHolderID := uuid.New().String() return &leaderElectionExtension{ config: baseCfg, logger: set.Logger, client: client, leaseHolderID: leaseHolderID, + waitGroup: sync.WaitGroup{}, }, nil } diff --git a/extension/k8sleaderelector/go.mod b/extension/k8sleaderelector/go.mod index a016289e4533..ef7b6e9eec7e 100644 --- a/extension/k8sleaderelector/go.mod +++ b/extension/k8sleaderelector/go.mod @@ -3,6 +3,7 @@ module github.com/open-telemetry/opentelemetry-collector-contrib/extension/k8sle go 1.23.0 require ( + github.com/google/uuid v1.6.0 github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8sconfig v0.120.1 github.com/stretchr/testify v1.10.0 go.opentelemetry.io/collector/component v0.120.1-0.20250226024140-8099e51f9a77 @@ -12,7 +13,9 @@ require ( go.opentelemetry.io/collector/extension/extensiontest v0.120.1-0.20250226024140-8099e51f9a77 go.uber.org/goleak v1.3.0 go.uber.org/zap v1.27.0 + k8s.io/apimachinery v0.32.2 k8s.io/client-go v0.32.2 + k8s.io/utils v0.0.0-20241104100929-3ea5e8cea738 ) require ( @@ -30,7 +33,6 @@ require ( github.com/google/gnostic-models v0.6.8 // indirect github.com/google/go-cmp v0.6.0 // indirect github.com/google/gofuzz v1.2.0 // indirect - github.com/google/uuid v1.6.0 // indirect github.com/josharian/intern v1.0.0 // indirect github.com/json-iterator/go v1.1.12 // indirect github.com/knadh/koanf/maps v0.1.1 // indirect @@ -69,10 +71,8 @@ require ( gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect k8s.io/api v0.32.2 // indirect - k8s.io/apimachinery v0.32.2 // indirect k8s.io/klog/v2 v2.130.1 // indirect k8s.io/kube-openapi v0.0.0-20241105132330-32ad38e42d3f // indirect - k8s.io/utils v0.0.0-20241104100929-3ea5e8cea738 // indirect sigs.k8s.io/json v0.0.0-20241010143419-9aa6b5e7a4b3 // indirect sigs.k8s.io/structured-merge-diff/v4 v4.4.3 // indirect sigs.k8s.io/yaml v1.4.0 // indirect diff --git a/extension/k8sleaderelector/leader_elector.go b/extension/k8sleaderelector/leader_elector.go new file mode 100644 index 000000000000..9a80f4ad3158 --- /dev/null +++ b/extension/k8sleaderelector/leader_elector.go @@ -0,0 +1,46 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package k8sleaderelector // import "github.com/open-telemetry/opentelemetry-collector-contrib/extension/k8sleaderelector" + +import ( + "context" + + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/leaderelection" + "k8s.io/client-go/tools/leaderelection/resourcelock" +) + +func newK8sLeaderElector( + cfg *Config, + client kubernetes.Interface, + onStartedLeading func(context.Context), + onStoppedLeading func(), + identity string, +) (*leaderelection.LeaderElector, error) { + resourceLock, err := resourcelock.New( + resourcelock.LeasesResourceLock, + cfg.LeaseNamespace, + cfg.LeaseName, + client.CoreV1(), + client.CoordinationV1(), + resourcelock.ResourceLockConfig{ + Identity: identity, + }) + if err != nil { + return nil, err + } + + leConfig := leaderelection.LeaderElectionConfig{ + Lock: resourceLock, + LeaseDuration: cfg.LeaseDuration, + RenewDeadline: cfg.RenewDuration, + RetryPeriod: cfg.RetryPeriod, + Callbacks: leaderelection.LeaderCallbacks{ + OnStartedLeading: onStartedLeading, + OnStoppedLeading: onStoppedLeading, + }, + } + + return leaderelection.NewLeaderElector(leConfig) +} diff --git a/extension/k8sleaderelector/leader_elector_test.go b/extension/k8sleaderelector/leader_elector_test.go new file mode 100644 index 000000000000..e0aaca7574b3 --- /dev/null +++ b/extension/k8sleaderelector/leader_elector_test.go @@ -0,0 +1,30 @@ +package k8sleaderelector + +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/require" + "k8s.io/client-go/kubernetes/fake" +) + +func TestLeaderElector(t *testing.T) { + fakeClient := fake.NewClientset() + onStartedLeading := func(_ context.Context) {} + onStoppedLeading := func() {} + leConfig := Config{ + LeaseName: "foo", + LeaseNamespace: "bar", + LeaseDuration: 20 * time.Second, + RenewDuration: 10 * time.Second, + RetryPeriod: 2 * time.Second, + } + + leaderElector, err := newK8sLeaderElector(&leConfig, fakeClient, onStartedLeading, onStoppedLeading, "host1") + require.NoError(t, err) + require.NotNil(t, leaderElector) +}