From 5da74ca5a916d7e3ba5a1ddbfd774cb18580d447 Mon Sep 17 00:00:00 2001 From: Sunny Date: Fri, 18 Mar 2022 20:26:01 +0530 Subject: [PATCH] Add notify() in all the reconcilers notify() is used to emit events for new artifact and failure recovery scenarios. It's implemented in all the reconcilers. Previously, when there used to be a failure due to any reason, on a subsequent successful reconciliation, no notification was sent to indicate that the failure has been resolved. With notify(), the old version of the object is compared with the new version of the object to determine if all, if any, of the failures have been resolved and a notification is sent. The notification message is the same that's sent in usual successful source reconciliation message about stored artifact. Signed-off-by: Sunny --- controllers/bucket_controller.go | 48 +++++++- controllers/bucket_controller_test.go | 115 ++++++++++++++++++ controllers/gitrepository_controller.go | 46 ++++++- controllers/gitrepository_controller_test.go | 107 ++++++++++++++++ controllers/helmchart_controller.go | 49 ++++++-- controllers/helmchart_controller_test.go | 109 +++++++++++++++++ controllers/helmrepository_controller.go | 65 +++++++--- controllers/helmrepository_controller_test.go | 108 ++++++++++++++++ internal/reconcile/reconcile.go | 16 +++ internal/reconcile/reconcile_test.go | 96 +++++++++++++++ 10 files changed, 724 insertions(+), 35 deletions(-) diff --git a/controllers/bucket_controller.go b/controllers/bucket_controller.go index d1a0124a7..ced48ee31 100644 --- a/controllers/bucket_controller.go +++ b/controllers/bucket_controller.go @@ -99,6 +99,12 @@ var bucketReadyCondition = summarize.Conditions{ }, } +// bucketFailConditions contains the conditions that represent a failure. +var bucketFailConditions = []string{ + sourcev1.FetchFailedCondition, + sourcev1.StorageOperationFailedCondition, +} + // +kubebuilder:rbac:groups=source.toolkit.fluxcd.io,resources=buckets,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups=source.toolkit.fluxcd.io,resources=buckets/status,verbs=get;update;patch // +kubebuilder:rbac:groups=source.toolkit.fluxcd.io,resources=buckets/finalizers,verbs=get;create;update;patch;delete @@ -307,10 +313,13 @@ func (r *BucketReconciler) Reconcile(ctx context.Context, req ctrl.Request) (res return } -// reconcile iterates through the gitRepositoryReconcileFunc tasks for the +// reconcile iterates through the bucketReconcileFunc tasks for the // object. It returns early on the first call that returns // reconcile.ResultRequeue, or produces an error. func (r *BucketReconciler) reconcile(ctx context.Context, obj *sourcev1.Bucket, reconcilers []bucketReconcileFunc) (sreconcile.Result, error) { + oldObj := obj.DeepCopy() + + // Mark as reconciling if generation differs. if obj.Generation != obj.Status.ObservedGeneration { conditions.MarkReconciling(obj, "NewGeneration", "reconciling new object generation (%d)", obj.Generation) } @@ -355,9 +364,42 @@ func (r *BucketReconciler) reconcile(ctx context.Context, obj *sourcev1.Bucket, // Prioritize requeue request in the result. res = sreconcile.LowestRequeuingResult(res, recResult) } + + r.notify(oldObj, obj, index, res, resErr) + return res, resErr } +// notify emits notification related to the reconciliation. +func (r *BucketReconciler) notify(oldObj, newObj *sourcev1.Bucket, index *etagIndex, res sreconcile.Result, resErr error) { + // Notify successful reconciliation for new artifact and recovery from any + // failure. + if resErr == nil && res == sreconcile.ResultSuccess && newObj.Status.Artifact != nil { + annotations := map[string]string{ + sourcev1.GroupVersion.Group + "/revision": newObj.Status.Artifact.Revision, + sourcev1.GroupVersion.Group + "/checksum": newObj.Status.Artifact.Checksum, + } + + var oldChecksum string + if oldObj.GetArtifact() != nil { + oldChecksum = oldObj.GetArtifact().Checksum + } + + message := fmt.Sprintf("stored artifact with %d fetched files from '%s' bucket", index.Len(), newObj.Spec.BucketName) + + // Notify on new artifact and failure recovery. + if oldChecksum != newObj.GetArtifact().Checksum { + r.AnnotatedEventf(newObj, annotations, corev1.EventTypeNormal, + "NewArtifact", message) + } else { + if sreconcile.FailureRecovery(oldObj, newObj, bucketFailConditions) { + r.AnnotatedEventf(newObj, annotations, corev1.EventTypeNormal, + meta.SucceededReason, message) + } + } + } +} + // reconcileStorage ensures the current state of the storage matches the // desired and previously observed state. // @@ -574,10 +616,6 @@ func (r *BucketReconciler) reconcileArtifact(ctx context.Context, obj *sourcev1. conditions.MarkTrue(obj, sourcev1.StorageOperationFailedCondition, e.Reason, e.Err.Error()) return sreconcile.ResultEmpty, e } - r.annotatedEventLogf(ctx, obj, map[string]string{ - sourcev1.GroupVersion.Group + "/revision": artifact.Revision, - sourcev1.GroupVersion.Group + "/checksum": artifact.Checksum, - }, corev1.EventTypeNormal, "NewArtifact", "fetched %d files from '%s'", index.Len(), obj.Spec.BucketName) // Record it on the object obj.Status.Artifact = artifact.DeepCopy() diff --git a/controllers/bucket_controller_test.go b/controllers/bucket_controller_test.go index c0b12bf90..70983231f 100644 --- a/controllers/bucket_controller_test.go +++ b/controllers/bucket_controller_test.go @@ -18,6 +18,7 @@ package controllers import ( "context" + "errors" "fmt" "net/http" "net/url" @@ -1171,3 +1172,117 @@ func TestBucketReconciler_statusConditions(t *testing.T) { }) } } + +func TestBucketReconciler_notify(t *testing.T) { + tests := []struct { + name string + res sreconcile.Result + resErr error + oldObjBeforeFunc func(obj *sourcev1.Bucket) + newObjBeforeFunc func(obj *sourcev1.Bucket) + wantEvent string + }{ + { + name: "error - no event", + res: sreconcile.ResultEmpty, + resErr: errors.New("some error"), + }, + { + name: "new artifact", + res: sreconcile.ResultSuccess, + resErr: nil, + newObjBeforeFunc: func(obj *sourcev1.Bucket) { + obj.Status.Artifact = &sourcev1.Artifact{Revision: "xxx", Checksum: "yyy"} + }, + wantEvent: "Normal NewArtifact stored artifact with 2 fetched files from", + }, + { + name: "recovery from failure", + res: sreconcile.ResultSuccess, + resErr: nil, + oldObjBeforeFunc: func(obj *sourcev1.Bucket) { + obj.Status.Artifact = &sourcev1.Artifact{Revision: "xxx", Checksum: "yyy"} + conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, sourcev1.GitOperationFailedReason, "fail") + conditions.MarkFalse(obj, meta.ReadyCondition, meta.FailedReason, "foo") + }, + newObjBeforeFunc: func(obj *sourcev1.Bucket) { + obj.Status.Artifact = &sourcev1.Artifact{Revision: "xxx", Checksum: "yyy"} + conditions.MarkTrue(obj, meta.ReadyCondition, meta.SucceededReason, "ready") + }, + wantEvent: "Normal Succeeded stored artifact with 2 fetched files from", + }, + { + name: "recovery and new artifact", + res: sreconcile.ResultSuccess, + resErr: nil, + oldObjBeforeFunc: func(obj *sourcev1.Bucket) { + obj.Status.Artifact = &sourcev1.Artifact{Revision: "xxx", Checksum: "yyy"} + conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, sourcev1.GitOperationFailedReason, "fail") + conditions.MarkFalse(obj, meta.ReadyCondition, meta.FailedReason, "foo") + }, + newObjBeforeFunc: func(obj *sourcev1.Bucket) { + obj.Status.Artifact = &sourcev1.Artifact{Revision: "aaa", Checksum: "bbb"} + conditions.MarkTrue(obj, meta.ReadyCondition, meta.SucceededReason, "ready") + }, + wantEvent: "Normal NewArtifact stored artifact with 2 fetched files from", + }, + { + name: "no updates", + res: sreconcile.ResultSuccess, + resErr: nil, + oldObjBeforeFunc: func(obj *sourcev1.Bucket) { + obj.Status.Artifact = &sourcev1.Artifact{Revision: "xxx", Checksum: "yyy"} + conditions.MarkTrue(obj, meta.ReadyCondition, meta.SucceededReason, "ready") + }, + newObjBeforeFunc: func(obj *sourcev1.Bucket) { + obj.Status.Artifact = &sourcev1.Artifact{Revision: "xxx", Checksum: "yyy"} + conditions.MarkTrue(obj, meta.ReadyCondition, meta.SucceededReason, "ready") + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + g := NewWithT(t) + + recorder := record.NewFakeRecorder(32) + + oldObj := &sourcev1.Bucket{ + Spec: sourcev1.BucketSpec{ + BucketName: "test-bucket", + }, + } + newObj := oldObj.DeepCopy() + + if tt.oldObjBeforeFunc != nil { + tt.oldObjBeforeFunc(oldObj) + } + if tt.newObjBeforeFunc != nil { + tt.newObjBeforeFunc(newObj) + } + + reconciler := &BucketReconciler{ + EventRecorder: recorder, + } + index := &etagIndex{ + index: map[string]string{ + "zzz": "qqq", + "bbb": "ddd", + }, + } + reconciler.notify(oldObj, newObj, index, tt.res, tt.resErr) + + select { + case x, ok := <-recorder.Events: + g.Expect(ok).To(Equal(tt.wantEvent != ""), "unexpected event received") + if tt.wantEvent != "" { + g.Expect(x).To(ContainSubstring(tt.wantEvent)) + } + default: + if tt.wantEvent != "" { + t.Errorf("expected some event to be emitted") + } + } + }) + } +} diff --git a/controllers/gitrepository_controller.go b/controllers/gitrepository_controller.go index dd7ff44a7..37f6c42ad 100644 --- a/controllers/gitrepository_controller.go +++ b/controllers/gitrepository_controller.go @@ -91,6 +91,13 @@ var gitRepositoryReadyCondition = summarize.Conditions{ }, } +// gitRepositoryFailConditions contains the conditions that represent a failure. +var gitRepositoryFailConditions = []string{ + sourcev1.FetchFailedCondition, + sourcev1.IncludeUnavailableCondition, + sourcev1.StorageOperationFailedCondition, +} + // +kubebuilder:rbac:groups=source.toolkit.fluxcd.io,resources=gitrepositories,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups=source.toolkit.fluxcd.io,resources=gitrepositories/status,verbs=get;update;patch // +kubebuilder:rbac:groups=source.toolkit.fluxcd.io,resources=gitrepositories/finalizers,verbs=get;create;update;patch;delete @@ -212,6 +219,8 @@ func (r *GitRepositoryReconciler) Reconcile(ctx context.Context, req ctrl.Reques // object. It returns early on the first call that returns // reconcile.ResultRequeue, or produces an error. func (r *GitRepositoryReconciler) reconcile(ctx context.Context, obj *sourcev1.GitRepository, reconcilers []gitRepositoryReconcileFunc) (sreconcile.Result, error) { + oldObj := obj.DeepCopy() + // Mark as reconciling if generation differs if obj.Generation != obj.Status.ObservedGeneration { conditions.MarkReconciling(obj, "NewGeneration", "reconciling new object generation (%d)", obj.Generation) @@ -258,9 +267,42 @@ func (r *GitRepositoryReconciler) reconcile(ctx context.Context, obj *sourcev1.G // Prioritize requeue request in the result. res = sreconcile.LowestRequeuingResult(res, recResult) } + + r.notify(oldObj, obj, commit, res, resErr) + return res, resErr } +// notify emits notification related to the reconciliation. +func (r *GitRepositoryReconciler) notify(oldObj, newObj *sourcev1.GitRepository, commit git.Commit, res sreconcile.Result, resErr error) { + // Notify successful reconciliation for new artifact and recovery from any + // failure. + if resErr == nil && res == sreconcile.ResultSuccess && newObj.Status.Artifact != nil { + annotations := map[string]string{ + sourcev1.GroupVersion.Group + "/revision": newObj.Status.Artifact.Revision, + sourcev1.GroupVersion.Group + "/checksum": newObj.Status.Artifact.Checksum, + } + + var oldChecksum string + if oldObj.GetArtifact() != nil { + oldChecksum = oldObj.GetArtifact().Checksum + } + + message := fmt.Sprintf("stored artifact for commit '%s'", commit.ShortMessage()) + + // Notify on new artifact and failure recovery. + if oldChecksum != newObj.GetArtifact().Checksum { + r.AnnotatedEventf(newObj, annotations, corev1.EventTypeNormal, + "NewArtifact", message) + } else { + if sreconcile.FailureRecovery(oldObj, newObj, gitRepositoryFailConditions) { + r.AnnotatedEventf(newObj, annotations, corev1.EventTypeNormal, + meta.SucceededReason, message) + } + } + } +} + // reconcileStorage ensures the current state of the storage matches the // desired and previously observed state. // @@ -523,10 +565,6 @@ func (r *GitRepositoryReconciler) reconcileArtifact(ctx context.Context, conditions.MarkTrue(obj, sourcev1.StorageOperationFailedCondition, e.Reason, e.Err.Error()) return sreconcile.ResultEmpty, e } - r.AnnotatedEventf(obj, map[string]string{ - sourcev1.GroupVersion.Group + "/revision": artifact.Revision, - sourcev1.GroupVersion.Group + "/checksum": artifact.Checksum, - }, corev1.EventTypeNormal, "NewArtifact", "stored artifact for commit '%s'", commit.ShortMessage()) // Record it on the object obj.Status.Artifact = artifact.DeepCopy() diff --git a/controllers/gitrepository_controller_test.go b/controllers/gitrepository_controller_test.go index 0ae071272..7d000b85d 100644 --- a/controllers/gitrepository_controller_test.go +++ b/controllers/gitrepository_controller_test.go @@ -18,6 +18,7 @@ package controllers import ( "context" + "errors" "fmt" "net/http" "net/url" @@ -1776,3 +1777,109 @@ func TestGitRepositoryReconciler_statusConditions(t *testing.T) { }) } } + +func TestGitRepositoryReconciler_notify(t *testing.T) { + tests := []struct { + name string + res sreconcile.Result + resErr error + oldObjBeforeFunc func(obj *sourcev1.GitRepository) + newObjBeforeFunc func(obj *sourcev1.GitRepository) + wantEvent string + }{ + { + name: "error - no event", + res: sreconcile.ResultEmpty, + resErr: errors.New("some error"), + }, + { + name: "new artifact", + res: sreconcile.ResultSuccess, + resErr: nil, + newObjBeforeFunc: func(obj *sourcev1.GitRepository) { + obj.Status.Artifact = &sourcev1.Artifact{Revision: "xxx", Checksum: "yyy"} + }, + wantEvent: "Normal NewArtifact stored artifact for commit", + }, + { + name: "recovery from failure", + res: sreconcile.ResultSuccess, + resErr: nil, + oldObjBeforeFunc: func(obj *sourcev1.GitRepository) { + obj.Status.Artifact = &sourcev1.Artifact{Revision: "xxx", Checksum: "yyy"} + conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, sourcev1.GitOperationFailedReason, "fail") + conditions.MarkFalse(obj, meta.ReadyCondition, meta.FailedReason, "foo") + }, + newObjBeforeFunc: func(obj *sourcev1.GitRepository) { + obj.Status.Artifact = &sourcev1.Artifact{Revision: "xxx", Checksum: "yyy"} + conditions.MarkTrue(obj, meta.ReadyCondition, meta.SucceededReason, "ready") + }, + wantEvent: "Normal Succeeded stored artifact for commit", + }, + { + name: "recovery and new artifact", + res: sreconcile.ResultSuccess, + resErr: nil, + oldObjBeforeFunc: func(obj *sourcev1.GitRepository) { + obj.Status.Artifact = &sourcev1.Artifact{Revision: "xxx", Checksum: "yyy"} + conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, sourcev1.GitOperationFailedReason, "fail") + conditions.MarkFalse(obj, meta.ReadyCondition, meta.FailedReason, "foo") + }, + newObjBeforeFunc: func(obj *sourcev1.GitRepository) { + obj.Status.Artifact = &sourcev1.Artifact{Revision: "aaa", Checksum: "bbb"} + conditions.MarkTrue(obj, meta.ReadyCondition, meta.SucceededReason, "ready") + }, + wantEvent: "Normal NewArtifact stored artifact for commit", + }, + { + name: "no updates", + res: sreconcile.ResultSuccess, + resErr: nil, + oldObjBeforeFunc: func(obj *sourcev1.GitRepository) { + obj.Status.Artifact = &sourcev1.Artifact{Revision: "xxx", Checksum: "yyy"} + conditions.MarkTrue(obj, meta.ReadyCondition, meta.SucceededReason, "ready") + }, + newObjBeforeFunc: func(obj *sourcev1.GitRepository) { + obj.Status.Artifact = &sourcev1.Artifact{Revision: "xxx", Checksum: "yyy"} + conditions.MarkTrue(obj, meta.ReadyCondition, meta.SucceededReason, "ready") + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + g := NewWithT(t) + recorder := record.NewFakeRecorder(32) + + oldObj := &sourcev1.GitRepository{} + newObj := oldObj.DeepCopy() + + if tt.oldObjBeforeFunc != nil { + tt.oldObjBeforeFunc(oldObj) + } + if tt.newObjBeforeFunc != nil { + tt.newObjBeforeFunc(newObj) + } + + reconciler := &GitRepositoryReconciler{ + EventRecorder: recorder, + } + commit := &git.Commit{ + Message: "test commit", + } + reconciler.notify(oldObj, newObj, *commit, tt.res, tt.resErr) + + select { + case x, ok := <-recorder.Events: + g.Expect(ok).To(Equal(tt.wantEvent != ""), "unexpected event received") + if tt.wantEvent != "" { + g.Expect(x).To(ContainSubstring(tt.wantEvent)) + } + default: + if tt.wantEvent != "" { + t.Errorf("expected some event to be emitted") + } + } + }) + } +} diff --git a/controllers/helmchart_controller.go b/controllers/helmchart_controller.go index b970c2923..1f701d615 100644 --- a/controllers/helmchart_controller.go +++ b/controllers/helmchart_controller.go @@ -99,6 +99,13 @@ var helmChartReadyCondition = summarize.Conditions{ }, } +// helmChartFailConditions contains the conditions that represent a failure. +var helmChartFailConditions = []string{ + sourcev1.BuildFailedCondition, + sourcev1.FetchFailedCondition, + sourcev1.StorageOperationFailedCondition, +} + // +kubebuilder:rbac:groups=source.toolkit.fluxcd.io,resources=helmcharts,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups=source.toolkit.fluxcd.io,resources=helmcharts/status,verbs=get;update;patch // +kubebuilder:rbac:groups=source.toolkit.fluxcd.io,resources=helmcharts/finalizers,verbs=get;create;update;patch;delete @@ -239,10 +246,13 @@ func (r *HelmChartReconciler) Reconcile(ctx context.Context, req ctrl.Request) ( return } -// reconcile iterates through the gitRepositoryReconcileFunc tasks for the +// reconcile iterates through the helmChartReconcileFunc tasks for the // object. It returns early on the first call that returns // reconcile.ResultRequeue, or produces an error. func (r *HelmChartReconciler) reconcile(ctx context.Context, obj *sourcev1.HelmChart, reconcilers []helmChartReconcileFunc) (sreconcile.Result, error) { + oldObj := obj.DeepCopy() + + // Mark as reconciling if generation differs. if obj.Generation != obj.Status.ObservedGeneration { conditions.MarkReconciling(obj, "NewGeneration", "reconciling new object generation (%d)", obj.Generation) } @@ -269,9 +279,40 @@ func (r *HelmChartReconciler) reconcile(ctx context.Context, obj *sourcev1.HelmC // Prioritize requeue request in the result. res = sreconcile.LowestRequeuingResult(res, recResult) } + + r.notify(oldObj, obj, &build, res, resErr) + return res, resErr } +// notify emits notification related to the reconciliation. +func (r *HelmChartReconciler) notify(oldObj, newObj *sourcev1.HelmChart, build *chart.Build, res sreconcile.Result, resErr error) { + // Notify successful reconciliation for new artifact and recovery from any + // failure. + if resErr == nil && res == sreconcile.ResultSuccess && newObj.Status.Artifact != nil { + annotations := map[string]string{ + sourcev1.GroupVersion.Group + "/revision": newObj.Status.Artifact.Revision, + sourcev1.GroupVersion.Group + "/checksum": newObj.Status.Artifact.Checksum, + } + + var oldChecksum string + if oldObj.GetArtifact() != nil { + oldChecksum = oldObj.GetArtifact().Checksum + } + + // Notify on new artifact and failure recovery. + if oldChecksum != newObj.GetArtifact().Checksum { + r.AnnotatedEventf(newObj, annotations, corev1.EventTypeNormal, + reasonForBuild(build), build.Summary()) + } else { + if sreconcile.FailureRecovery(oldObj, newObj, helmChartFailConditions) { + r.AnnotatedEventf(newObj, annotations, corev1.EventTypeNormal, + reasonForBuild(build), build.Summary()) + } + } + } +} + // reconcileStorage ensures the current state of the storage matches the // desired and previously observed state. // @@ -714,12 +755,6 @@ func (r *HelmChartReconciler) reconcileArtifact(ctx context.Context, obj *source obj.Status.Artifact = artifact.DeepCopy() obj.Status.ObservedChartName = b.Name - // Publish an event - r.AnnotatedEventf(obj, map[string]string{ - sourcev1.GroupVersion.Group + "/revision": artifact.Revision, - sourcev1.GroupVersion.Group + "/checksum": artifact.Checksum, - }, corev1.EventTypeNormal, reasonForBuild(b), b.Summary()) - // Update symlink on a "best effort" basis symURL, err := r.Storage.Symlink(artifact, "latest.tar.gz") if err != nil { diff --git a/controllers/helmchart_controller_test.go b/controllers/helmchart_controller_test.go index 1ecddcd8a..8938f493f 100644 --- a/controllers/helmchart_controller_test.go +++ b/controllers/helmchart_controller_test.go @@ -1585,3 +1585,112 @@ func TestHelmChartReconciler_statusConditions(t *testing.T) { }) } } + +func TestHelmChartReconciler_notify(t *testing.T) { + tests := []struct { + name string + res sreconcile.Result + resErr error + oldObjBeforeFunc func(obj *sourcev1.HelmChart) + newObjBeforeFunc func(obj *sourcev1.HelmChart) + wantEvent string + }{ + { + name: "error - no event", + res: sreconcile.ResultEmpty, + resErr: errors.New("some error"), + }, + { + name: "new artifact", + res: sreconcile.ResultSuccess, + resErr: nil, + newObjBeforeFunc: func(obj *sourcev1.HelmChart) { + obj.Status.Artifact = &sourcev1.Artifact{Revision: "xxx", Checksum: "yyy"} + }, + wantEvent: "Normal ChartPackageSucceeded packaged", + }, + { + name: "recovery from failure", + res: sreconcile.ResultSuccess, + resErr: nil, + oldObjBeforeFunc: func(obj *sourcev1.HelmChart) { + obj.Status.Artifact = &sourcev1.Artifact{Revision: "xxx", Checksum: "yyy"} + conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, sourcev1.GitOperationFailedReason, "fail") + conditions.MarkFalse(obj, meta.ReadyCondition, meta.FailedReason, "foo") + }, + newObjBeforeFunc: func(obj *sourcev1.HelmChart) { + obj.Status.Artifact = &sourcev1.Artifact{Revision: "xxx", Checksum: "yyy"} + conditions.MarkTrue(obj, meta.ReadyCondition, meta.SucceededReason, "ready") + }, + wantEvent: "Normal ChartPackageSucceeded packaged", + }, + { + name: "recovery and new artifact", + res: sreconcile.ResultSuccess, + resErr: nil, + oldObjBeforeFunc: func(obj *sourcev1.HelmChart) { + obj.Status.Artifact = &sourcev1.Artifact{Revision: "xxx", Checksum: "yyy"} + conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, sourcev1.GitOperationFailedReason, "fail") + conditions.MarkFalse(obj, meta.ReadyCondition, meta.FailedReason, "foo") + }, + newObjBeforeFunc: func(obj *sourcev1.HelmChart) { + obj.Status.Artifact = &sourcev1.Artifact{Revision: "aaa", Checksum: "bbb"} + conditions.MarkTrue(obj, meta.ReadyCondition, meta.SucceededReason, "ready") + }, + wantEvent: "Normal ChartPackageSucceeded packaged", + }, + { + name: "no updates", + res: sreconcile.ResultSuccess, + resErr: nil, + oldObjBeforeFunc: func(obj *sourcev1.HelmChart) { + obj.Status.Artifact = &sourcev1.Artifact{Revision: "xxx", Checksum: "yyy"} + conditions.MarkTrue(obj, meta.ReadyCondition, meta.SucceededReason, "ready") + }, + newObjBeforeFunc: func(obj *sourcev1.HelmChart) { + obj.Status.Artifact = &sourcev1.Artifact{Revision: "xxx", Checksum: "yyy"} + conditions.MarkTrue(obj, meta.ReadyCondition, meta.SucceededReason, "ready") + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + g := NewWithT(t) + recorder := record.NewFakeRecorder(32) + + oldObj := &sourcev1.HelmChart{} + newObj := oldObj.DeepCopy() + + if tt.oldObjBeforeFunc != nil { + tt.oldObjBeforeFunc(oldObj) + } + if tt.newObjBeforeFunc != nil { + tt.newObjBeforeFunc(newObj) + } + + reconciler := &HelmChartReconciler{ + EventRecorder: recorder, + } + build := &chart.Build{ + Name: "foo", + Version: "1.0.0", + Path: "some/path", + Packaged: true, + } + reconciler.notify(oldObj, newObj, build, tt.res, tt.resErr) + + select { + case x, ok := <-recorder.Events: + g.Expect(ok).To(Equal(tt.wantEvent != ""), "unexpected event received") + if tt.wantEvent != "" { + g.Expect(x).To(ContainSubstring(tt.wantEvent)) + } + default: + if tt.wantEvent != "" { + t.Errorf("expected some event to be emitted") + } + } + }) + } +} diff --git a/controllers/helmrepository_controller.go b/controllers/helmrepository_controller.go index 17e11b6c0..63efd9d0c 100644 --- a/controllers/helmrepository_controller.go +++ b/controllers/helmrepository_controller.go @@ -22,7 +22,6 @@ import ( "errors" "fmt" "net/url" - "os" "time" "github.com/docker/go-units" @@ -82,6 +81,13 @@ var helmRepositoryReadyCondition = summarize.Conditions{ }, } +// helmRepositoryFailConditions contains the conditions that represent a +// failure. +var helmRepositoryFailConditions = []string{ + sourcev1.FetchFailedCondition, + sourcev1.StorageOperationFailedCondition, +} + // +kubebuilder:rbac:groups=source.toolkit.fluxcd.io,resources=helmrepositories,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups=source.toolkit.fluxcd.io,resources=helmrepositories/status,verbs=get;update;patch // +kubebuilder:rbac:groups=source.toolkit.fluxcd.io,resources=helmrepositories/finalizers,verbs=get;create;update;patch;delete @@ -195,10 +201,13 @@ func (r *HelmRepositoryReconciler) Reconcile(ctx context.Context, req ctrl.Reque return } -// reconcile iterates through the gitRepositoryReconcileFunc tasks for the +// reconcile iterates through the helmRepositoryReconcileFunc tasks for the // object. It returns early on the first call that returns // reconcile.ResultRequeue, or produces an error. func (r *HelmRepositoryReconciler) reconcile(ctx context.Context, obj *sourcev1.HelmRepository, reconcilers []helmRepositoryReconcileFunc) (sreconcile.Result, error) { + oldObj := obj.DeepCopy() + + // Mark as reconciling if generation differs. if obj.Generation != obj.Status.ObservedGeneration { conditions.MarkReconciling(obj, "NewGeneration", "reconciling new object generation (%d)", obj.Generation) } @@ -225,9 +234,44 @@ func (r *HelmRepositoryReconciler) reconcile(ctx context.Context, obj *sourcev1. // Prioritize requeue request in the result for successful results. res = sreconcile.LowestRequeuingResult(res, recResult) } + + r.notify(oldObj, obj, chartRepo, res, resErr) + return res, resErr } +// notify emits notification related to the reconciliation. +func (r *HelmRepositoryReconciler) notify(oldObj, newObj *sourcev1.HelmRepository, chartRepo repository.ChartRepository, res sreconcile.Result, resErr error) { + // Notify successful reconciliation for new artifact and recovery from any + // failure. + if resErr == nil && res == sreconcile.ResultSuccess && newObj.Status.Artifact != nil { + annotations := map[string]string{ + sourcev1.GroupVersion.Group + "/revision": newObj.Status.Artifact.Revision, + sourcev1.GroupVersion.Group + "/checksum": newObj.Status.Artifact.Checksum, + } + + size := units.HumanSize(float64(*newObj.Status.Artifact.Size)) + + var oldChecksum string + if oldObj.GetArtifact() != nil { + oldChecksum = oldObj.GetArtifact().Checksum + } + + message := fmt.Sprintf("stored fetched index of size %s from '%s'", size, chartRepo.URL) + + // Notify on new artifact and failure recovery. + if oldChecksum != newObj.GetArtifact().Checksum { + r.AnnotatedEventf(newObj, annotations, corev1.EventTypeNormal, + "NewArtifact", message) + } else { + if sreconcile.FailureRecovery(oldObj, newObj, helmRepositoryFailConditions) { + r.AnnotatedEventf(newObj, annotations, corev1.EventTypeNormal, + meta.SucceededReason, message) + } + } + } +} + // reconcileStorage ensures the current state of the storage matches the // desired and previously observed state. // @@ -448,23 +492,6 @@ func (r *HelmRepositoryReconciler) reconcileArtifact(ctx context.Context, obj *s return sreconcile.ResultEmpty, e } - // Calculate the artifact size to be included in the NewArtifact event. - fi, err := os.Stat(chartRepo.CachePath) - if err != nil { - e := &serror.Event{ - Err: fmt.Errorf("unable to read the artifact: %w", err), - Reason: sourcev1.ReadOperationFailedReason, - } - conditions.MarkTrue(obj, sourcev1.StorageOperationFailedCondition, e.Reason, e.Err.Error()) - return sreconcile.ResultEmpty, e - } - size := units.HumanSize(float64(fi.Size())) - - r.AnnotatedEventf(obj, map[string]string{ - sourcev1.GroupVersion.Group + "/revision": artifact.Revision, - sourcev1.GroupVersion.Group + "/checksum": artifact.Checksum, - }, corev1.EventTypeNormal, "NewArtifact", "fetched index of size %s from '%s'", size, chartRepo.URL) - // Record it on the object. obj.Status.Artifact = artifact.DeepCopy() diff --git a/controllers/helmrepository_controller_test.go b/controllers/helmrepository_controller_test.go index a4508d2f0..3f48dc2e2 100644 --- a/controllers/helmrepository_controller_test.go +++ b/controllers/helmrepository_controller_test.go @@ -18,6 +18,7 @@ package controllers import ( "context" + "errors" "fmt" "net/http" "os" @@ -842,3 +843,110 @@ func TestHelmRepositoryReconciler_statusConditions(t *testing.T) { }) } } + +func TestHelmRepositoryReconciler_notify(t *testing.T) { + var aSize int64 = 30000 + tests := []struct { + name string + res sreconcile.Result + resErr error + oldObjBeforeFunc func(obj *sourcev1.HelmRepository) + newObjBeforeFunc func(obj *sourcev1.HelmRepository) + wantEvent string + }{ + { + name: "error - no event", + res: sreconcile.ResultEmpty, + resErr: errors.New("some error"), + }, + { + name: "new artifact", + res: sreconcile.ResultSuccess, + resErr: nil, + newObjBeforeFunc: func(obj *sourcev1.HelmRepository) { + obj.Status.Artifact = &sourcev1.Artifact{Revision: "xxx", Checksum: "yyy", Size: &aSize} + }, + wantEvent: "Normal NewArtifact stored fetched index of size", + }, + { + name: "recovery from failure", + res: sreconcile.ResultSuccess, + resErr: nil, + oldObjBeforeFunc: func(obj *sourcev1.HelmRepository) { + obj.Status.Artifact = &sourcev1.Artifact{Revision: "xxx", Checksum: "yyy", Size: &aSize} + conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, sourcev1.GitOperationFailedReason, "fail") + conditions.MarkFalse(obj, meta.ReadyCondition, meta.FailedReason, "foo") + }, + newObjBeforeFunc: func(obj *sourcev1.HelmRepository) { + obj.Status.Artifact = &sourcev1.Artifact{Revision: "xxx", Checksum: "yyy", Size: &aSize} + conditions.MarkTrue(obj, meta.ReadyCondition, meta.SucceededReason, "ready") + }, + wantEvent: "Normal Succeeded stored fetched index of size", + }, + { + name: "recovery and new artifact", + res: sreconcile.ResultSuccess, + resErr: nil, + oldObjBeforeFunc: func(obj *sourcev1.HelmRepository) { + obj.Status.Artifact = &sourcev1.Artifact{Revision: "xxx", Checksum: "yyy", Size: &aSize} + conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, sourcev1.GitOperationFailedReason, "fail") + conditions.MarkFalse(obj, meta.ReadyCondition, meta.FailedReason, "foo") + }, + newObjBeforeFunc: func(obj *sourcev1.HelmRepository) { + obj.Status.Artifact = &sourcev1.Artifact{Revision: "aaa", Checksum: "bbb", Size: &aSize} + conditions.MarkTrue(obj, meta.ReadyCondition, meta.SucceededReason, "ready") + }, + wantEvent: "Normal NewArtifact stored fetched index of size", + }, + { + name: "no updates", + res: sreconcile.ResultSuccess, + resErr: nil, + oldObjBeforeFunc: func(obj *sourcev1.HelmRepository) { + obj.Status.Artifact = &sourcev1.Artifact{Revision: "xxx", Checksum: "yyy", Size: &aSize} + conditions.MarkTrue(obj, meta.ReadyCondition, meta.SucceededReason, "ready") + }, + newObjBeforeFunc: func(obj *sourcev1.HelmRepository) { + obj.Status.Artifact = &sourcev1.Artifact{Revision: "xxx", Checksum: "yyy", Size: &aSize} + conditions.MarkTrue(obj, meta.ReadyCondition, meta.SucceededReason, "ready") + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + g := NewWithT(t) + recorder := record.NewFakeRecorder(32) + + oldObj := &sourcev1.HelmRepository{} + newObj := oldObj.DeepCopy() + + if tt.oldObjBeforeFunc != nil { + tt.oldObjBeforeFunc(oldObj) + } + if tt.newObjBeforeFunc != nil { + tt.newObjBeforeFunc(newObj) + } + + reconciler := &HelmRepositoryReconciler{ + EventRecorder: recorder, + } + chartRepo := repository.ChartRepository{ + URL: "some-address", + } + reconciler.notify(oldObj, newObj, chartRepo, tt.res, tt.resErr) + + select { + case x, ok := <-recorder.Events: + g.Expect(ok).To(Equal(tt.wantEvent != ""), "unexpected event received") + if tt.wantEvent != "" { + g.Expect(x).To(ContainSubstring(tt.wantEvent)) + } + default: + if tt.wantEvent != "" { + t.Errorf("expected some event to be emitted") + } + } + }) + } +} diff --git a/internal/reconcile/reconcile.go b/internal/reconcile/reconcile.go index af0c71b97..a3de4da95 100644 --- a/internal/reconcile/reconcile.go +++ b/internal/reconcile/reconcile.go @@ -158,3 +158,19 @@ func LowestRequeuingResult(i, j Result) Result { return j } } + +// FailureRecovery finds out if a failure recovery occurred by checking the fail +// conditions in the old object and the new object. +func FailureRecovery(oldObj, newObj conditions.Getter, failConditions []string) bool { + failuresBefore := 0 + for _, failCondition := range failConditions { + if conditions.Get(oldObj, failCondition) != nil { + failuresBefore++ + } + if conditions.Get(newObj, failCondition) != nil { + // Short-circuit, there is failure now, can't be a recovery. + return false + } + } + return failuresBefore > 0 +} diff --git a/internal/reconcile/reconcile_test.go b/internal/reconcile/reconcile_test.go index 127e3c186..26922f26d 100644 --- a/internal/reconcile/reconcile_test.go +++ b/internal/reconcile/reconcile_test.go @@ -202,3 +202,99 @@ func TestComputeReconcileResult(t *testing.T) { }) } } + +func TestFailureRecovery(t *testing.T) { + failCondns := []string{ + "FooFailed", + "BarFailed", + "BazFailed", + } + tests := []struct { + name string + oldObjFunc func(obj conditions.Setter) + newObjFunc func(obj conditions.Setter) + failConditions []string + result bool + }{ + { + name: "no failures", + oldObjFunc: func(obj conditions.Setter) { + conditions.MarkTrue(obj, meta.ReadyCondition, meta.SucceededReason, "ready") + }, + newObjFunc: func(obj conditions.Setter) { + conditions.MarkTrue(obj, meta.ReadyCondition, meta.SucceededReason, "ready") + }, + failConditions: failCondns, + result: false, + }, + { + name: "no recovery", + oldObjFunc: func(obj conditions.Setter) { + conditions.MarkTrue(obj, "FooFailed", "some-reason", "message") + conditions.MarkFalse(obj, meta.ReadyCondition, meta.SucceededReason, "ready") + }, + newObjFunc: func(obj conditions.Setter) { + conditions.MarkTrue(obj, "FooFailed", "some-reason", "message") + conditions.MarkFalse(obj, meta.ReadyCondition, meta.SucceededReason, "ready") + }, + failConditions: failCondns, + result: false, + }, + { + name: "different failure", + oldObjFunc: func(obj conditions.Setter) { + conditions.MarkTrue(obj, "FooFailed", "some-reason", "message") + conditions.MarkFalse(obj, meta.ReadyCondition, meta.SucceededReason, "ready") + }, + newObjFunc: func(obj conditions.Setter) { + conditions.MarkTrue(obj, "BarFailed", "some-reason", "message") + conditions.MarkFalse(obj, meta.ReadyCondition, meta.SucceededReason, "ready") + }, + failConditions: failCondns, + result: false, + }, + { + name: "failure recovery", + oldObjFunc: func(obj conditions.Setter) { + conditions.MarkTrue(obj, "FooFailed", "some-reason", "message") + conditions.MarkFalse(obj, meta.ReadyCondition, meta.SucceededReason, "ready") + }, + newObjFunc: func(obj conditions.Setter) { + conditions.MarkTrue(obj, meta.ReadyCondition, meta.SucceededReason, "ready") + }, + failConditions: failCondns, + result: true, + }, + { + name: "ready to fail", + oldObjFunc: func(obj conditions.Setter) { + conditions.MarkTrue(obj, meta.ReadyCondition, meta.SucceededReason, "ready") + }, + newObjFunc: func(obj conditions.Setter) { + conditions.MarkTrue(obj, "BazFailed", "some-reason", "message") + conditions.MarkFalse(obj, meta.ReadyCondition, meta.SucceededReason, "ready") + }, + failConditions: failCondns, + result: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + g := NewWithT(t) + + oldObj := &sourcev1.GitRepository{} + newObj := oldObj.DeepCopy() + + if tt.oldObjFunc != nil { + tt.oldObjFunc(oldObj) + } + + if tt.newObjFunc != nil { + tt.newObjFunc(newObj) + } + + g.Expect(FailureRecovery(oldObj, newObj, tt.failConditions)).To(Equal(tt.result)) + }) + } +}