Skip to content

Commit cbdb6ee

Browse files
authored
Use k8s events instead of status messages (open-telemetry#707)
* Storing upgrade status on events Signed-off-by: Yuri Sa <yurimsa@gmail.com> * Fixing Lint nits Signed-off-by: Yuri Sa <yurimsa@gmail.com> * Fixing test checks and converting messages to events Signed-off-by: Yuri Sa <yurimsa@gmail.com> * Fixing upgrade test Signed-off-by: Yuri Sa <yurimsa@gmail.com> * Fixing var value of expected version Signed-off-by: Yuri Sa <yurimsa@gmail.com> * Fixing var value of expected version Signed-off-by: Yuri Sa <yurimsa@gmail.com> * Change upgrade script fo v0.19.0 Signed-off-by: Yuri Sa <yurimsa@gmail.com> * Created const for RecordBufferSize and Removed commented lines Signed-off-by: Yuri Sa <yurimsa@gmail.com> * Changed function signature and type name Signed-off-by: Yuri Sa <yurimsa@gmail.com> * Changed variable exposure Signed-off-by: Yuri Sa <yurimsa@gmail.com>
1 parent e6cad70 commit cbdb6ee

25 files changed

+232
-116
lines changed

main.go

+8-2
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import (
2828
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
2929
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
3030
_ "k8s.io/client-go/plugin/pkg/client/auth/gcp"
31+
"k8s.io/client-go/tools/record"
3132
ctrl "sigs.k8s.io/controller-runtime"
3233
"sigs.k8s.io/controller-runtime/pkg/cache"
3334
"sigs.k8s.io/controller-runtime/pkg/healthz"
@@ -225,10 +226,15 @@ func addDependencies(_ context.Context, mgr ctrl.Manager, cfg config.Config, v v
225226
if err != nil {
226227
return fmt.Errorf("failed to start the auto-detect mechanism: %w", err)
227228
}
228-
229229
// adds the upgrade mechanism to be executed once the manager is ready
230230
err = mgr.Add(manager.RunnableFunc(func(c context.Context) error {
231-
return collectorupgrade.ManagedInstances(c, ctrl.Log.WithName("collector-upgrade"), v, mgr.GetClient())
231+
up := &collectorupgrade.VersionUpgrade{
232+
Log: ctrl.Log.WithName("collector-upgrade"),
233+
Version: v,
234+
Client: mgr.GetClient(),
235+
Recorder: record.NewFakeRecorder(collectorupgrade.RecordBufferSize),
236+
}
237+
return up.ManagedInstances(c)
232238
}))
233239
if err != nil {
234240
return fmt.Errorf("failed to upgrade OpenTelemetryCollector instances: %w", err)

pkg/collector/upgrade/upgrade.go

+26-16
Original file line numberDiff line numberDiff line change
@@ -22,34 +22,44 @@ import (
2222

2323
semver "github.com/Masterminds/semver/v3"
2424
"github.com/go-logr/logr"
25+
"k8s.io/client-go/tools/record"
2526
"sigs.k8s.io/controller-runtime/pkg/client"
2627

2728
"github.com/open-telemetry/opentelemetry-operator/apis/v1alpha1"
2829
"github.com/open-telemetry/opentelemetry-operator/internal/version"
2930
)
3031

32+
type VersionUpgrade struct {
33+
Log logr.Logger
34+
Version version.Version
35+
Client client.Client
36+
Recorder record.EventRecorder
37+
}
38+
39+
const RecordBufferSize int = 10
40+
3141
// ManagedInstances finds all the otelcol instances for the current operator and upgrades them, if necessary.
32-
func ManagedInstances(ctx context.Context, logger logr.Logger, ver version.Version, cl client.Client) error {
33-
logger.Info("looking for managed instances to upgrade")
42+
func (u VersionUpgrade) ManagedInstances(ctx context.Context) error {
43+
u.Log.Info("looking for managed instances to upgrade")
3444

3545
opts := []client.ListOption{
3646
client.MatchingLabels(map[string]string{
3747
"app.kubernetes.io/managed-by": "opentelemetry-operator",
3848
}),
3949
}
4050
list := &v1alpha1.OpenTelemetryCollectorList{}
41-
if err := cl.List(ctx, list, opts...); err != nil {
51+
if err := u.Client.List(ctx, list, opts...); err != nil {
4252
return fmt.Errorf("failed to list: %w", err)
4353
}
4454

4555
for i := range list.Items {
4656
original := list.Items[i]
47-
itemLogger := logger.WithValues("name", original.Name, "namespace", original.Namespace)
57+
itemLogger := u.Log.WithValues("name", original.Name, "namespace", original.Namespace)
4858
if original.Spec.UpgradeStrategy == v1alpha1.UpgradeStrategyNone {
4959
itemLogger.Info("skipping instance upgrade due to UpgradeStrategy")
5060
continue
5161
}
52-
upgraded, err := ManagedInstance(ctx, logger, ver, cl, original)
62+
upgraded, err := u.ManagedInstance(ctx, original)
5363
if err != nil {
5464
// nothing to do at this level, just go to the next instance
5565
continue
@@ -59,14 +69,14 @@ func ManagedInstances(ctx context.Context, logger logr.Logger, ver version.Versi
5969
// the resource update overrides the status, so, keep it so that we can reset it later
6070
st := upgraded.Status
6171
patch := client.MergeFrom(&original)
62-
if err := cl.Patch(ctx, &upgraded, patch); err != nil {
72+
if err := u.Client.Patch(ctx, &upgraded, patch); err != nil {
6373
itemLogger.Error(err, "failed to apply changes to instance")
6474
continue
6575
}
6676

6777
// the status object requires its own update
6878
upgraded.Status = st
69-
if err := cl.Status().Patch(ctx, &upgraded, patch); err != nil {
79+
if err := u.Client.Status().Patch(ctx, &upgraded, patch); err != nil {
7080
itemLogger.Error(err, "failed to apply changes to instance's status object")
7181
continue
7282
}
@@ -76,48 +86,48 @@ func ManagedInstances(ctx context.Context, logger logr.Logger, ver version.Versi
7686
}
7787

7888
if len(list.Items) == 0 {
79-
logger.Info("no instances to upgrade")
89+
u.Log.Info("no instances to upgrade")
8090
}
8191

8292
return nil
8393
}
8494

8595
// ManagedInstance performs the necessary changes to bring the given otelcol instance to the current version.
86-
func ManagedInstance(ctx context.Context, logger logr.Logger, currentV version.Version, cl client.Client, otelcol v1alpha1.OpenTelemetryCollector) (v1alpha1.OpenTelemetryCollector, error) {
96+
func (u VersionUpgrade) ManagedInstance(ctx context.Context, otelcol v1alpha1.OpenTelemetryCollector) (v1alpha1.OpenTelemetryCollector, error) {
8797
// this is likely a new instance, assume it's already up to date
8898
if otelcol.Status.Version == "" {
8999
return otelcol, nil
90100
}
91101

92102
instanceV, err := semver.NewVersion(otelcol.Status.Version)
93103
if err != nil {
94-
logger.Error(err, "failed to parse version for OpenTelemetry Collector instance", "name", otelcol.Name, "namespace", otelcol.Namespace, "version", otelcol.Status.Version)
104+
u.Log.Error(err, "failed to parse version for OpenTelemetry Collector instance", "name", otelcol.Name, "namespace", otelcol.Namespace, "version", otelcol.Status.Version)
95105
return otelcol, err
96106
}
97107

98108
if instanceV.GreaterThan(&Latest.Version) {
99-
logger.Info("skipping upgrade for OpenTelemetry Collector instance, as it's newer than our latest version", "name", otelcol.Name, "namespace", otelcol.Namespace, "version", otelcol.Status.Version, "latest", Latest.Version.String())
109+
u.Log.Info("skipping upgrade for OpenTelemetry Collector instance, as it's newer than our latest version", "name", otelcol.Name, "namespace", otelcol.Namespace, "version", otelcol.Status.Version, "latest", Latest.Version.String())
100110
return otelcol, nil
101111
}
102112

103113
for _, available := range versions {
104114
if available.GreaterThan(instanceV) {
105-
upgraded, err := available.upgrade(cl, &otelcol)
115+
upgraded, err := available.upgrade(u, &otelcol) //available.upgrade(params., &otelcol)
106116

107117
if err != nil {
108-
logger.Error(err, "failed to upgrade managed otelcol instances", "name", otelcol.Name, "namespace", otelcol.Namespace)
118+
u.Log.Error(err, "failed to upgrade managed otelcol instances", "name", otelcol.Name, "namespace", otelcol.Namespace)
109119
return otelcol, err
110120
}
111121

112-
logger.V(1).Info("step upgrade", "name", otelcol.Name, "namespace", otelcol.Namespace, "version", available.String())
122+
u.Log.V(1).Info("step upgrade", "name", otelcol.Name, "namespace", otelcol.Namespace, "version", available.String())
113123
upgraded.Status.Version = available.String()
114124
otelcol = *upgraded
115125
}
116126
}
117127

118128
// at the end of the process, we are up to date with the latest known version, which is what we have from versions.txt
119-
otelcol.Status.Version = currentV.OpenTelemetryCollector
129+
otelcol.Status.Version = u.Version.OpenTelemetryCollector
120130

121-
logger.V(1).Info("final version", "name", otelcol.Name, "namespace", otelcol.Namespace, "version", otelcol.Status.Version)
131+
u.Log.V(1).Info("final version", "name", otelcol.Name, "namespace", otelcol.Namespace, "version", otelcol.Status.Version)
122132
return otelcol, nil
123133
}

pkg/collector/upgrade/upgrade_test.go

+23-4
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"github.com/stretchr/testify/require"
2323
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2424
"k8s.io/apimachinery/pkg/types"
25+
"k8s.io/client-go/tools/record"
2526
logf "sigs.k8s.io/controller-runtime/pkg/log"
2627

2728
"github.com/open-telemetry/opentelemetry-operator/apis/v1alpha1"
@@ -60,9 +61,15 @@ func TestShouldUpgradeAllToLatestBasedOnUpgradeStrategy(t *testing.T) {
6061
err = k8sClient.Get(context.Background(), nsn, persisted)
6162
require.NoError(t, err)
6263
require.Equal(t, beginV, persisted.Status.Version)
64+
up := &upgrade.VersionUpgrade{
65+
Log: logger,
66+
Version: currentV,
67+
Client: k8sClient,
68+
Recorder: record.NewFakeRecorder(upgrade.RecordBufferSize),
69+
}
6370

6471
// test
65-
err = upgrade.ManagedInstances(context.Background(), logger, currentV, k8sClient)
72+
err = up.ManagedInstances(context.Background())
6673
assert.NoError(t, err)
6774

6875
// verify
@@ -84,9 +91,14 @@ func TestUpgradeUpToLatestKnownVersion(t *testing.T) {
8491

8592
currentV := version.Get()
8693
currentV.OpenTelemetryCollector = "0.10.0" // we don't have a 0.10.0 upgrade, but we have a 0.9.0
87-
94+
up := &upgrade.VersionUpgrade{
95+
Log: logger,
96+
Version: currentV,
97+
Client: k8sClient,
98+
Recorder: record.NewFakeRecorder(upgrade.RecordBufferSize),
99+
}
88100
// test
89-
res, err := upgrade.ManagedInstance(context.Background(), logger, currentV, k8sClient, existing)
101+
res, err := up.ManagedInstance(context.Background(), existing)
90102

91103
// verify
92104
assert.NoError(t, err)
@@ -113,8 +125,15 @@ func TestVersionsShouldNotBeChanged(t *testing.T) {
113125
currentV := version.Get()
114126
currentV.OpenTelemetryCollector = upgrade.Latest.String()
115127

128+
up := &upgrade.VersionUpgrade{
129+
Log: logger,
130+
Version: currentV,
131+
Client: k8sClient,
132+
Recorder: record.NewFakeRecorder(upgrade.RecordBufferSize),
133+
}
134+
116135
// test
117-
res, err := upgrade.ManagedInstance(context.Background(), logger, currentV, k8sClient, existing)
136+
res, err := up.ManagedInstance(context.Background(), existing)
118137
if tt.failureExpected {
119138
assert.Error(t, err)
120139
} else {

pkg/collector/upgrade/v0_15_0.go

+7-3
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,17 @@
1515
package upgrade
1616

1717
import (
18-
"sigs.k8s.io/controller-runtime/pkg/client"
19-
2018
"github.com/open-telemetry/opentelemetry-operator/apis/v1alpha1"
19+
20+
corev1 "k8s.io/api/core/v1"
2121
)
2222

23-
func upgrade0_15_0(cl client.Client, otelcol *v1alpha1.OpenTelemetryCollector) (*v1alpha1.OpenTelemetryCollector, error) {
23+
func upgrade0_15_0(u VersionUpgrade, otelcol *v1alpha1.OpenTelemetryCollector) (*v1alpha1.OpenTelemetryCollector, error) {
2424
delete(otelcol.Spec.Args, "--new-metrics")
2525
delete(otelcol.Spec.Args, "--legacy-metrics")
26+
existing := &corev1.ConfigMap{}
27+
updated := existing.DeepCopy()
28+
u.Recorder.Event(updated, "Normal", "Upgrade", "upgrade to v0.15.0 dropped the deprecated metrics arguments")
29+
2630
return otelcol, nil
2731
}

pkg/collector/upgrade/v0_15_0_test.go

+8-1
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"github.com/stretchr/testify/require"
2323
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2424
"k8s.io/apimachinery/pkg/types"
25+
"k8s.io/client-go/tools/record"
2526

2627
"github.com/open-telemetry/opentelemetry-operator/apis/v1alpha1"
2728
"github.com/open-telemetry/opentelemetry-operator/internal/version"
@@ -54,7 +55,13 @@ func TestRemoveMetricsTypeFlags(t *testing.T) {
5455
require.Contains(t, existing.Spec.Args, "--legacy-metrics")
5556

5657
// test
57-
res, err := upgrade.ManagedInstance(context.Background(), logger, version.Get(), nil, existing)
58+
up := &upgrade.VersionUpgrade{
59+
Log: logger,
60+
Version: version.Get(),
61+
Client: nil,
62+
Recorder: record.NewFakeRecorder(upgrade.RecordBufferSize),
63+
}
64+
res, err := up.ManagedInstance(context.Background(), existing)
5865
assert.NoError(t, err)
5966

6067
// verify

pkg/collector/upgrade/v0_19_0.go

+12-5
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,14 @@ import (
1919
"strings"
2020

2121
"gopkg.in/yaml.v2"
22-
"sigs.k8s.io/controller-runtime/pkg/client"
2322

2423
"github.com/open-telemetry/opentelemetry-operator/apis/v1alpha1"
2524
"github.com/open-telemetry/opentelemetry-operator/pkg/collector/adapters"
25+
26+
corev1 "k8s.io/api/core/v1"
2627
)
2728

28-
func upgrade0_19_0(cl client.Client, otelcol *v1alpha1.OpenTelemetryCollector) (*v1alpha1.OpenTelemetryCollector, error) {
29+
func upgrade0_19_0(u VersionUpgrade, otelcol *v1alpha1.OpenTelemetryCollector) (*v1alpha1.OpenTelemetryCollector, error) {
2930
if len(otelcol.Spec.Config) == 0 {
3031
return otelcol, nil
3132
}
@@ -47,7 +48,9 @@ func upgrade0_19_0(cl client.Client, otelcol *v1alpha1.OpenTelemetryCollector) (
4748
// Remove deprecated queued_retry processor
4849
if strings.HasPrefix(k.(string), "queued_retry") {
4950
delete(processors, k)
50-
otelcol.Status.Messages = append(otelcol.Status.Messages, fmt.Sprintf("upgrade to v0.19.0 removed the processor %q", k))
51+
existing := &corev1.ConfigMap{}
52+
updated := existing.DeepCopy()
53+
u.Recorder.Event(updated, "Normal", "Upgrade", fmt.Sprintf("upgrade to v0.19.0 removed the processor %q", k))
5154
continue
5255
}
5356

@@ -71,7 +74,9 @@ func upgrade0_19_0(cl client.Client, otelcol *v1alpha1.OpenTelemetryCollector) (
7174

7275
processor["attributes"] = attributes
7376
delete(processor, "type")
74-
otelcol.Status.Messages = append(otelcol.Status.Messages, fmt.Sprintf("upgrade to v0.19.0 migrated the property 'type' for processor %q", k))
77+
existing := &corev1.ConfigMap{}
78+
updated := existing.DeepCopy()
79+
u.Recorder.Event(updated, "Normal", "Upgrade", fmt.Sprintf("upgrade to v0.19.0 migrated the property 'type' for processor %q", k))
7580
}
7681

7782
// handle labels
@@ -95,7 +100,9 @@ func upgrade0_19_0(cl client.Client, otelcol *v1alpha1.OpenTelemetryCollector) (
95100

96101
processor["attributes"] = attributes
97102
delete(processor, "labels")
98-
otelcol.Status.Messages = append(otelcol.Status.Messages, fmt.Sprintf("upgrade to v0.19.0 migrated the property 'labels' for processor %q", k))
103+
existing := &corev1.ConfigMap{}
104+
updated := existing.DeepCopy()
105+
u.Recorder.Event(updated, "Normal", "Upgrade", fmt.Sprintf("upgrade to v0.19.0 migrated the property 'labels' for processor %q", k))
99106
}
100107

101108
processors[k] = processor

pkg/collector/upgrade/v0_19_0_test.go

+22-6
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"github.com/stretchr/testify/require"
2323
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2424
"k8s.io/apimachinery/pkg/types"
25+
"k8s.io/client-go/tools/record"
2526

2627
"github.com/open-telemetry/opentelemetry-operator/apis/v1alpha1"
2728
"github.com/open-telemetry/opentelemetry-operator/internal/version"
@@ -58,15 +59,20 @@ func TestRemoveQueuedRetryProcessor(t *testing.T) {
5859
require.Contains(t, existing.Spec.Config, "num_workers: 123") // checking one property is sufficient
5960

6061
// test
61-
res, err := upgrade.ManagedInstance(context.Background(), logger, version.Get(), nil, existing)
62+
up := &upgrade.VersionUpgrade{
63+
Log: logger,
64+
Version: version.Get(),
65+
Client: nil,
66+
Recorder: record.NewFakeRecorder(upgrade.RecordBufferSize),
67+
}
68+
res, err := up.ManagedInstance(context.Background(), existing)
6269
assert.NoError(t, err)
6370

6471
// verify
6572
assert.NotContains(t, res.Spec.Config, "queued_retry:")
6673
assert.Contains(t, res.Spec.Config, "otherprocessor:")
6774
assert.NotContains(t, res.Spec.Config, "queued_retry/second:")
6875
assert.NotContains(t, res.Spec.Config, "num_workers: 123") // checking one property is sufficient
69-
assert.Contains(t, res.Status.Messages[0], "upgrade to v0.19.0 removed the processor")
7076
}
7177

7278
func TestMigrateResourceType(t *testing.T) {
@@ -90,7 +96,13 @@ func TestMigrateResourceType(t *testing.T) {
9096
existing.Status.Version = "0.18.0"
9197

9298
// test
93-
res, err := upgrade.ManagedInstance(context.Background(), logger, version.Get(), nil, existing)
99+
up := &upgrade.VersionUpgrade{
100+
Log: logger,
101+
Version: version.Get(),
102+
Client: nil,
103+
Recorder: record.NewFakeRecorder(upgrade.RecordBufferSize),
104+
}
105+
res, err := up.ManagedInstance(context.Background(), existing)
94106
assert.NoError(t, err)
95107

96108
// verify
@@ -101,7 +113,6 @@ func TestMigrateResourceType(t *testing.T) {
101113
key: opencensus.type
102114
value: some-type
103115
`, res.Spec.Config)
104-
assert.Contains(t, res.Status.Messages[0], "upgrade to v0.19.0 migrated the property 'type' for processor")
105116
}
106117

107118
func TestMigrateLabels(t *testing.T) {
@@ -127,7 +138,13 @@ func TestMigrateLabels(t *testing.T) {
127138
existing.Status.Version = "0.18.0"
128139

129140
// test
130-
res, err := upgrade.ManagedInstance(context.Background(), logger, version.Get(), nil, existing)
141+
up := &upgrade.VersionUpgrade{
142+
Log: logger,
143+
Version: version.Get(),
144+
Client: nil,
145+
Recorder: record.NewFakeRecorder(upgrade.RecordBufferSize),
146+
}
147+
res, err := up.ManagedInstance(context.Background(), existing)
131148
assert.NoError(t, err)
132149

133150
actual, err := adapters.ConfigFromString(res.Spec.Config)
@@ -139,5 +156,4 @@ func TestMigrateLabels(t *testing.T) {
139156
// verify
140157
assert.Len(t, actualAttrs, 2)
141158
assert.Nil(t, actualProcessor["labels"])
142-
assert.Contains(t, res.Status.Messages[0], "upgrade to v0.19.0 migrated the property 'labels' for processor")
143159
}

0 commit comments

Comments
 (0)