Skip to content

Commit 2a910cb

Browse files
committed
Add notify() in all the reconcilers
notify() is used to emit events for new artifact and failure recovery scenarios. It's implemented in all the reconcilers. Previously, when there used to be a failure due to any reason, on a subsequent successful reconciliation, no notification was sent to indicate that the failure has been resolved. With notify(), the old version of the object is compared with the new version of the object to determine if all, if any, of the failures have been resolved and a notification is sent. The notification message is the same that's sent in usual successful source reconciliation message about stored artifact. Signed-off-by: Sunny <darkowlzz@protonmail.com>
1 parent 362bc56 commit 2a910cb

10 files changed

+721
-35
lines changed

controllers/bucket_controller.go

+42-5
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,12 @@ var bucketReadyCondition = summarize.Conditions{
9999
},
100100
}
101101

102+
// bucketFailConditions contains the conditions that represent a failure.
103+
var bucketFailConditions = []string{
104+
sourcev1.FetchFailedCondition,
105+
sourcev1.StorageOperationFailedCondition,
106+
}
107+
102108
// +kubebuilder:rbac:groups=source.toolkit.fluxcd.io,resources=buckets,verbs=get;list;watch;create;update;patch;delete
103109
// +kubebuilder:rbac:groups=source.toolkit.fluxcd.io,resources=buckets/status,verbs=get;update;patch
104110
// +kubebuilder:rbac:groups=source.toolkit.fluxcd.io,resources=buckets/finalizers,verbs=get;create;update;patch;delete
@@ -307,10 +313,12 @@ func (r *BucketReconciler) Reconcile(ctx context.Context, req ctrl.Request) (res
307313
return
308314
}
309315

310-
// reconcile iterates through the gitRepositoryReconcileFunc tasks for the
316+
// reconcile iterates through the bucketReconcileFunc tasks for the
311317
// object. It returns early on the first call that returns
312318
// reconcile.ResultRequeue, or produces an error.
313319
func (r *BucketReconciler) reconcile(ctx context.Context, obj *sourcev1.Bucket, reconcilers []bucketReconcileFunc) (sreconcile.Result, error) {
320+
oldObj := obj.DeepCopy()
321+
314322
if obj.Generation != obj.Status.ObservedGeneration {
315323
conditions.MarkReconciling(obj, "NewGeneration", "reconciling new object generation (%d)", obj.Generation)
316324
}
@@ -355,9 +363,42 @@ func (r *BucketReconciler) reconcile(ctx context.Context, obj *sourcev1.Bucket,
355363
// Prioritize requeue request in the result.
356364
res = sreconcile.LowestRequeuingResult(res, recResult)
357365
}
366+
367+
r.notify(oldObj, obj, index, res, resErr)
368+
358369
return res, resErr
359370
}
360371

372+
// notify emits notification related to the reconciliation.
373+
func (r *BucketReconciler) notify(oldObj, newObj *sourcev1.Bucket, index *etagIndex, res sreconcile.Result, resErr error) {
374+
// Notify successful reconciliation for new artifact and recovery from any
375+
// failure.
376+
if resErr == nil && res == sreconcile.ResultSuccess && newObj.Status.Artifact != nil {
377+
annotations := map[string]string{
378+
"revision": newObj.Status.Artifact.Revision,
379+
"checksum": newObj.Status.Artifact.Checksum,
380+
}
381+
382+
var oldChecksum string
383+
if oldObj.GetArtifact() != nil {
384+
oldChecksum = oldObj.GetArtifact().Checksum
385+
}
386+
387+
message := fmt.Sprintf("stored artifact with %d fetched files from '%s' bucket", index.Len(), newObj.Spec.BucketName)
388+
389+
// Notify on new artifact and failure recovery.
390+
if oldChecksum != newObj.GetArtifact().Checksum {
391+
r.AnnotatedEventf(newObj, annotations, corev1.EventTypeNormal,
392+
"NewArtifact", message)
393+
} else {
394+
if sreconcile.FailureRecovery(oldObj, newObj, bucketFailConditions) {
395+
r.AnnotatedEventf(newObj, annotations, corev1.EventTypeNormal,
396+
meta.SucceededReason, message)
397+
}
398+
}
399+
}
400+
}
401+
361402
// reconcileStorage ensures the current state of the storage matches the
362403
// desired and previously observed state.
363404
//
@@ -574,10 +615,6 @@ func (r *BucketReconciler) reconcileArtifact(ctx context.Context, obj *sourcev1.
574615
conditions.MarkTrue(obj, sourcev1.StorageOperationFailedCondition, e.Reason, e.Err.Error())
575616
return sreconcile.ResultEmpty, e
576617
}
577-
r.annotatedEventLogf(ctx, obj, map[string]string{
578-
sourcev1.GroupVersion.Group + "/revision": artifact.Revision,
579-
sourcev1.GroupVersion.Group + "/checksum": artifact.Checksum,
580-
}, corev1.EventTypeNormal, "NewArtifact", "fetched %d files from '%s'", index.Len(), obj.Spec.BucketName)
581618

582619
// Record it on the object
583620
obj.Status.Artifact = artifact.DeepCopy()

controllers/bucket_controller_test.go

+115
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package controllers
1818

1919
import (
2020
"context"
21+
"errors"
2122
"fmt"
2223
"net/http"
2324
"net/url"
@@ -1163,3 +1164,117 @@ func TestBucketReconciler_statusConditions(t *testing.T) {
11631164
})
11641165
}
11651166
}
1167+
1168+
func TestBucketReconciler_notify(t *testing.T) {
1169+
tests := []struct {
1170+
name string
1171+
res sreconcile.Result
1172+
resErr error
1173+
oldObjBeforeFunc func(obj *sourcev1.Bucket)
1174+
newObjBeforeFunc func(obj *sourcev1.Bucket)
1175+
wantEvent string
1176+
}{
1177+
{
1178+
name: "error - no event",
1179+
res: sreconcile.ResultEmpty,
1180+
resErr: errors.New("some error"),
1181+
},
1182+
{
1183+
name: "new artifact",
1184+
res: sreconcile.ResultSuccess,
1185+
resErr: nil,
1186+
newObjBeforeFunc: func(obj *sourcev1.Bucket) {
1187+
obj.Status.Artifact = &sourcev1.Artifact{Revision: "xxx", Checksum: "yyy"}
1188+
},
1189+
wantEvent: "Normal NewArtifact stored artifact with 2 fetched files from",
1190+
},
1191+
{
1192+
name: "recovery from failure",
1193+
res: sreconcile.ResultSuccess,
1194+
resErr: nil,
1195+
oldObjBeforeFunc: func(obj *sourcev1.Bucket) {
1196+
obj.Status.Artifact = &sourcev1.Artifact{Revision: "xxx", Checksum: "yyy"}
1197+
conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, sourcev1.GitOperationFailedReason, "fail")
1198+
conditions.MarkFalse(obj, meta.ReadyCondition, meta.FailedReason, "foo")
1199+
},
1200+
newObjBeforeFunc: func(obj *sourcev1.Bucket) {
1201+
obj.Status.Artifact = &sourcev1.Artifact{Revision: "xxx", Checksum: "yyy"}
1202+
conditions.MarkTrue(obj, meta.ReadyCondition, meta.SucceededReason, "ready")
1203+
},
1204+
wantEvent: "Normal Succeeded stored artifact with 2 fetched files from",
1205+
},
1206+
{
1207+
name: "recovery and new artifact",
1208+
res: sreconcile.ResultSuccess,
1209+
resErr: nil,
1210+
oldObjBeforeFunc: func(obj *sourcev1.Bucket) {
1211+
obj.Status.Artifact = &sourcev1.Artifact{Revision: "xxx", Checksum: "yyy"}
1212+
conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, sourcev1.GitOperationFailedReason, "fail")
1213+
conditions.MarkFalse(obj, meta.ReadyCondition, meta.FailedReason, "foo")
1214+
},
1215+
newObjBeforeFunc: func(obj *sourcev1.Bucket) {
1216+
obj.Status.Artifact = &sourcev1.Artifact{Revision: "aaa", Checksum: "bbb"}
1217+
conditions.MarkTrue(obj, meta.ReadyCondition, meta.SucceededReason, "ready")
1218+
},
1219+
wantEvent: "Normal NewArtifact stored artifact with 2 fetched files from",
1220+
},
1221+
{
1222+
name: "no updates",
1223+
res: sreconcile.ResultSuccess,
1224+
resErr: nil,
1225+
oldObjBeforeFunc: func(obj *sourcev1.Bucket) {
1226+
obj.Status.Artifact = &sourcev1.Artifact{Revision: "xxx", Checksum: "yyy"}
1227+
conditions.MarkTrue(obj, meta.ReadyCondition, meta.SucceededReason, "ready")
1228+
},
1229+
newObjBeforeFunc: func(obj *sourcev1.Bucket) {
1230+
obj.Status.Artifact = &sourcev1.Artifact{Revision: "xxx", Checksum: "yyy"}
1231+
conditions.MarkTrue(obj, meta.ReadyCondition, meta.SucceededReason, "ready")
1232+
},
1233+
},
1234+
}
1235+
1236+
for _, tt := range tests {
1237+
t.Run(tt.name, func(t *testing.T) {
1238+
g := NewWithT(t)
1239+
1240+
recorder := record.NewFakeRecorder(32)
1241+
1242+
oldObj := &sourcev1.Bucket{
1243+
Spec: sourcev1.BucketSpec{
1244+
BucketName: "test-bucket",
1245+
},
1246+
}
1247+
newObj := oldObj.DeepCopy()
1248+
1249+
if tt.oldObjBeforeFunc != nil {
1250+
tt.oldObjBeforeFunc(oldObj)
1251+
}
1252+
if tt.newObjBeforeFunc != nil {
1253+
tt.newObjBeforeFunc(newObj)
1254+
}
1255+
1256+
reconciler := &BucketReconciler{
1257+
EventRecorder: recorder,
1258+
}
1259+
index := &etagIndex{
1260+
index: map[string]string{
1261+
"zzz": "qqq",
1262+
"bbb": "ddd",
1263+
},
1264+
}
1265+
reconciler.notify(oldObj, newObj, index, tt.res, tt.resErr)
1266+
1267+
select {
1268+
case x, ok := <-recorder.Events:
1269+
g.Expect(ok).To(Equal(tt.wantEvent != ""), "unexpected event received")
1270+
if tt.wantEvent != "" {
1271+
g.Expect(x).To(ContainSubstring(tt.wantEvent))
1272+
}
1273+
default:
1274+
if tt.wantEvent != "" {
1275+
t.Errorf("expected some event to be emitted")
1276+
}
1277+
}
1278+
})
1279+
}
1280+
}

controllers/gitrepository_controller.go

+42-4
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,13 @@ var gitRepositoryReadyCondition = summarize.Conditions{
9191
},
9292
}
9393

94+
// gitRepositoryFailConditions contains the conditions that represent a failure.
95+
var gitRepositoryFailConditions = []string{
96+
sourcev1.FetchFailedCondition,
97+
sourcev1.IncludeUnavailableCondition,
98+
sourcev1.StorageOperationFailedCondition,
99+
}
100+
94101
// +kubebuilder:rbac:groups=source.toolkit.fluxcd.io,resources=gitrepositories,verbs=get;list;watch;create;update;patch;delete
95102
// +kubebuilder:rbac:groups=source.toolkit.fluxcd.io,resources=gitrepositories/status,verbs=get;update;patch
96103
// +kubebuilder:rbac:groups=source.toolkit.fluxcd.io,resources=gitrepositories/finalizers,verbs=get;create;update;patch;delete
@@ -212,6 +219,8 @@ func (r *GitRepositoryReconciler) Reconcile(ctx context.Context, req ctrl.Reques
212219
// object. It returns early on the first call that returns
213220
// reconcile.ResultRequeue, or produces an error.
214221
func (r *GitRepositoryReconciler) reconcile(ctx context.Context, obj *sourcev1.GitRepository, reconcilers []gitRepositoryReconcileFunc) (sreconcile.Result, error) {
222+
oldObj := obj.DeepCopy()
223+
215224
// Mark as reconciling if generation differs
216225
if obj.Generation != obj.Status.ObservedGeneration {
217226
conditions.MarkReconciling(obj, "NewGeneration", "reconciling new object generation (%d)", obj.Generation)
@@ -258,9 +267,42 @@ func (r *GitRepositoryReconciler) reconcile(ctx context.Context, obj *sourcev1.G
258267
// Prioritize requeue request in the result.
259268
res = sreconcile.LowestRequeuingResult(res, recResult)
260269
}
270+
271+
r.notify(oldObj, obj, commit, res, resErr)
272+
261273
return res, resErr
262274
}
263275

276+
// notify emits notification related to the reconciliation.
277+
func (r *GitRepositoryReconciler) notify(oldObj, newObj *sourcev1.GitRepository, commit git.Commit, res sreconcile.Result, resErr error) {
278+
// Notify successful reconciliation for new artifact and recovery from any
279+
// failure.
280+
if resErr == nil && res == sreconcile.ResultSuccess && newObj.Status.Artifact != nil {
281+
annotations := map[string]string{
282+
"revision": newObj.Status.Artifact.Revision,
283+
"checksum": newObj.Status.Artifact.Checksum,
284+
}
285+
286+
var oldChecksum string
287+
if oldObj.GetArtifact() != nil {
288+
oldChecksum = oldObj.GetArtifact().Checksum
289+
}
290+
291+
message := fmt.Sprintf("stored artifact for commit '%s'", commit.ShortMessage())
292+
293+
// Notify on new artifact and failure recovery.
294+
if oldChecksum != newObj.GetArtifact().Checksum {
295+
r.AnnotatedEventf(newObj, annotations, corev1.EventTypeNormal,
296+
"NewArtifact", message)
297+
} else {
298+
if sreconcile.FailureRecovery(oldObj, newObj, gitRepositoryFailConditions) {
299+
r.AnnotatedEventf(newObj, annotations, corev1.EventTypeNormal,
300+
meta.SucceededReason, message)
301+
}
302+
}
303+
}
304+
}
305+
264306
// reconcileStorage ensures the current state of the storage matches the
265307
// desired and previously observed state.
266308
//
@@ -523,10 +565,6 @@ func (r *GitRepositoryReconciler) reconcileArtifact(ctx context.Context,
523565
conditions.MarkTrue(obj, sourcev1.StorageOperationFailedCondition, e.Reason, e.Err.Error())
524566
return sreconcile.ResultEmpty, e
525567
}
526-
r.AnnotatedEventf(obj, map[string]string{
527-
sourcev1.GroupVersion.Group + "/revision": artifact.Revision,
528-
sourcev1.GroupVersion.Group + "/checksum": artifact.Checksum,
529-
}, corev1.EventTypeNormal, "NewArtifact", "stored artifact for commit '%s'", commit.ShortMessage())
530568

531569
// Record it on the object
532570
obj.Status.Artifact = artifact.DeepCopy()

0 commit comments

Comments
 (0)