Skip to content

Commit

Permalink
[extension/k8s leader elector] Extension leader election implementati…
Browse files Browse the repository at this point in the history
…on (#38015)

<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue.
Ex. Adding a feature - Explain what this achieves.-->
#### Description
Added the actual implementation for leader election

Initial structure was added here: [basic
structure](#37266)

<!-- Issue number (e.g. #1234) or full URL to issue, if applicable. -->
#### Link to tracking issue
Fixes

#34460
<!--Describe what testing was performed and which tests were added.-->
#### Testing
unit tests added
<!--Describe the documentation added.-->
#### Documentation
documentation is provided
<!--Please delete paragraphs that you did not use before submitting.-->
  • Loading branch information
rakesh-garimella authored Feb 27, 2025
1 parent 20a49b7 commit f036dd9
Show file tree
Hide file tree
Showing 7 changed files with 228 additions and 9 deletions.
24 changes: 23 additions & 1 deletion extension/k8sleaderelector/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,26 @@ service:
| **lease_namespace** | The namespace of the lease object. | none (required) |
| **lease_duration** | The duration of the lease. | 15s |
| **renew_deadline** | The deadline for renewing the lease. It must be less than the lease duration. | 10s |
| **retry_period** | The period for retrying the leader election. | 2s |
| **retry_period** | The period for retrying the leader election. | 2s |
### Suggested RBAC
```yaml
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
name: my-lease
namespace: default
rules:
- apiGroups:
- coordination.k8s.io
resources:
- leases
verbs:
- get
- list
- watch
- create
- update
- patch
- delete
```
48 changes: 48 additions & 0 deletions extension/k8sleaderelector/extension.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package k8sleaderelector // import "github.com/open-telemetry/opentelemetry-coll

import (
"context"
"sync"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/extension"
Expand Down Expand Up @@ -36,17 +37,64 @@ type leaderElectionExtension struct {
client kubernetes.Interface
logger *zap.Logger
leaseHolderID string
cancel context.CancelFunc
waitGroup sync.WaitGroup

onStartedLeading []StartCallback
onStoppedLeading []StopCallback
}

// If the receiver sets a callback function then it would be invoked when the leader wins the election
func (lee *leaderElectionExtension) startedLeading(ctx context.Context) {
for _, callback := range lee.onStartedLeading {
callback(ctx)
}
}

// If the receiver sets a callback function then it would be invoked when the leader loss the election
func (lee *leaderElectionExtension) stoppedLeading() {
for _, callback := range lee.onStoppedLeading {
callback()
}
}

// Start begins the extension's processing.
func (lee *leaderElectionExtension) Start(_ context.Context, _ component.Host) error {
lee.logger.Info("Starting k8s leader elector with UUID", zap.String("UUID", lee.leaseHolderID))

ctx := context.Background()
ctx, lee.cancel = context.WithCancel(ctx)
// Create the K8s leader elector
leaderElector, err := newK8sLeaderElector(lee.config, lee.client, lee.startedLeading, lee.stoppedLeading, lee.leaseHolderID)
if err != nil {
lee.logger.Error("Failed to create k8s leader elector", zap.Error(err))
return err
}
lee.waitGroup.Add(1)
go func() {
// Leader election loop stops if context is canceled or the leader elector loses the lease.
// The loop allows continued participation in leader election, even if the lease is lost.
defer lee.waitGroup.Done()
for {
leaderElector.Run(ctx)

if ctx.Err() != nil {
break
}

lee.logger.Info("Leader lease lost. Returning to standby mode...")
}
}()

return nil
}

// Shutdown ends the extension's processing.
func (lee *leaderElectionExtension) Shutdown(context.Context) error {
lee.logger.Info("Stopping k8s leader elector with UUID", zap.String("UUID", lee.leaseHolderID))
if lee.cancel != nil {
lee.cancel()
}
lee.waitGroup.Wait()
return nil
}
74 changes: 74 additions & 0 deletions extension/k8sleaderelector/extension_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package k8sleaderelector

import (
"context"
"fmt"
"testing"
"time"

"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component/componenttest"
"go.uber.org/zap"
"go.uber.org/zap/zaptest/observer"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/utils/ptr"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8sconfig"
)

func TestExtension(t *testing.T) {
config := &Config{
LeaseName: "foo",
LeaseNamespace: "default",
LeaseDuration: 15 * time.Second,
RenewDuration: 10 * time.Second,
RetryPeriod: 2 * time.Second,
}

iamInvokedOnLeading := false

ctx := context.TODO()
fakeClient := fake.NewClientset()
config.makeClient = func(_ k8sconfig.APIConfig) (kubernetes.Interface, error) {
return fakeClient, nil
}

observedZapCore, _ := observer.New(zap.WarnLevel)

leaderElection := leaderElectionExtension{
config: config,
client: fakeClient,
logger: zap.New(observedZapCore),
leaseHolderID: "foo",
}

leaderElection.SetCallBackFuncs(
func(_ context.Context) {
iamInvokedOnLeading = true
fmt.Printf("LeaderElection started leading")
},
func() {
fmt.Printf("LeaderElection stopped leading")
},
)

require.NoError(t, leaderElection.Start(ctx, componenttest.NewNopHost()))

expectedLeaseDurationSeconds := ptr.To(int32(15))

require.Eventually(t, func() bool {
lease, err := fakeClient.CoordinationV1().Leases("default").Get(ctx, "foo", metav1.GetOptions{})
require.NoError(t, err)
require.NotNil(t, lease)
require.Equal(t, expectedLeaseDurationSeconds, lease.Spec.LeaseDurationSeconds)
return true
}, 10*time.Second, 100*time.Millisecond)

require.True(t, iamInvokedOnLeading)
require.NoError(t, leaderElection.Shutdown(ctx))
}
9 changes: 4 additions & 5 deletions extension/k8sleaderelector/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@ package k8sleaderelector // import "github.com/open-telemetry/opentelemetry-coll
import (
"context"
"errors"
"os"
"sync"
"time"

"github.com/google/uuid"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/extension"

Expand Down Expand Up @@ -52,16 +53,14 @@ func createExtension(
return nil, errors.New("failed to create k8s client")
}

leaseHolderID, err := os.Hostname()
if err != nil {
return nil, err
}
leaseHolderID := uuid.New().String()

return &leaderElectionExtension{
config: baseCfg,
logger: set.Logger,
client: client,
leaseHolderID: leaseHolderID,
waitGroup: sync.WaitGroup{},
}, nil
}

Expand Down
6 changes: 3 additions & 3 deletions extension/k8sleaderelector/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module github.com/open-telemetry/opentelemetry-collector-contrib/extension/k8sle
go 1.23.0

require (
github.com/google/uuid v1.6.0
github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8sconfig v0.120.1
github.com/stretchr/testify v1.10.0
go.opentelemetry.io/collector/component v0.120.1-0.20250226024140-8099e51f9a77
Expand All @@ -12,7 +13,9 @@ require (
go.opentelemetry.io/collector/extension/extensiontest v0.120.1-0.20250226024140-8099e51f9a77
go.uber.org/goleak v1.3.0
go.uber.org/zap v1.27.0
k8s.io/apimachinery v0.32.2
k8s.io/client-go v0.32.2
k8s.io/utils v0.0.0-20241104100929-3ea5e8cea738
)

require (
Expand All @@ -30,7 +33,6 @@ require (
github.com/google/gnostic-models v0.6.8 // indirect
github.com/google/go-cmp v0.6.0 // indirect
github.com/google/gofuzz v1.2.0 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/josharian/intern v1.0.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/knadh/koanf/maps v0.1.1 // indirect
Expand Down Expand Up @@ -69,10 +71,8 @@ require (
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
k8s.io/api v0.32.2 // indirect
k8s.io/apimachinery v0.32.2 // indirect
k8s.io/klog/v2 v2.130.1 // indirect
k8s.io/kube-openapi v0.0.0-20241105132330-32ad38e42d3f // indirect
k8s.io/utils v0.0.0-20241104100929-3ea5e8cea738 // indirect
sigs.k8s.io/json v0.0.0-20241010143419-9aa6b5e7a4b3 // indirect
sigs.k8s.io/structured-merge-diff/v4 v4.4.3 // indirect
sigs.k8s.io/yaml v1.4.0 // indirect
Expand Down
46 changes: 46 additions & 0 deletions extension/k8sleaderelector/leader_elector.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package k8sleaderelector // import "github.com/open-telemetry/opentelemetry-collector-contrib/extension/k8sleaderelector"

import (
"context"

"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/leaderelection"
"k8s.io/client-go/tools/leaderelection/resourcelock"
)

func newK8sLeaderElector(
cfg *Config,
client kubernetes.Interface,
onStartedLeading func(context.Context),
onStoppedLeading func(),
identity string,
) (*leaderelection.LeaderElector, error) {
resourceLock, err := resourcelock.New(
resourcelock.LeasesResourceLock,
cfg.LeaseNamespace,
cfg.LeaseName,
client.CoreV1(),
client.CoordinationV1(),
resourcelock.ResourceLockConfig{
Identity: identity,
})
if err != nil {
return nil, err
}

leConfig := leaderelection.LeaderElectionConfig{
Lock: resourceLock,
LeaseDuration: cfg.LeaseDuration,
RenewDeadline: cfg.RenewDuration,
RetryPeriod: cfg.RetryPeriod,
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: onStartedLeading,
OnStoppedLeading: onStoppedLeading,
},
}

return leaderelection.NewLeaderElector(leConfig)
}
30 changes: 30 additions & 0 deletions extension/k8sleaderelector/leader_elector_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package k8sleaderelector

// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

import (
"context"
"testing"
"time"

"github.com/stretchr/testify/require"
"k8s.io/client-go/kubernetes/fake"
)

func TestLeaderElector(t *testing.T) {
fakeClient := fake.NewClientset()
onStartedLeading := func(_ context.Context) {}
onStoppedLeading := func() {}
leConfig := Config{
LeaseName: "foo",
LeaseNamespace: "bar",
LeaseDuration: 20 * time.Second,
RenewDuration: 10 * time.Second,
RetryPeriod: 2 * time.Second,
}

leaderElector, err := newK8sLeaderElector(&leConfig, fakeClient, onStartedLeading, onStoppedLeading, "host1")
require.NoError(t, err)
require.NotNil(t, leaderElector)
}

0 comments on commit f036dd9

Please sign in to comment.