Skip to content

Commit ef6d576

Browse files
committed
Add notify() in all the reconcilers
notify() is used to emit events for new artifact and failure recovery scenarios. It's implemented in all the reconcilers. Previously, when there used to be a failure due to any reason, on a subsequent successful reconciliation, no notification was sent to indicate that the failure has been resolved. With notify(), the old version of the object is compared with the new version of the object to determine if all, if any, of the failures have been resolved and a notification is sent. The notification message is the same that's sent in usual successful source reconciliation message about stored artifact. Signed-off-by: Sunny <darkowlzz@protonmail.com>
1 parent 362bc56 commit ef6d576

10 files changed

+723
-35
lines changed

controllers/bucket_controller.go

+42-5
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,12 @@ var bucketReadyCondition = summarize.Conditions{
9999
},
100100
}
101101

102+
// bucketFailConditions contains the conditions that represent failure.
103+
var bucketFailConditions = []string{
104+
sourcev1.FetchFailedCondition,
105+
sourcev1.StorageOperationFailedCondition,
106+
}
107+
102108
// +kubebuilder:rbac:groups=source.toolkit.fluxcd.io,resources=buckets,verbs=get;list;watch;create;update;patch;delete
103109
// +kubebuilder:rbac:groups=source.toolkit.fluxcd.io,resources=buckets/status,verbs=get;update;patch
104110
// +kubebuilder:rbac:groups=source.toolkit.fluxcd.io,resources=buckets/finalizers,verbs=get;create;update;patch;delete
@@ -307,14 +313,16 @@ func (r *BucketReconciler) Reconcile(ctx context.Context, req ctrl.Request) (res
307313
return
308314
}
309315

310-
// reconcile iterates through the gitRepositoryReconcileFunc tasks for the
316+
// reconcile iterates through the bucketReconcileFunc tasks for the
311317
// object. It returns early on the first call that returns
312318
// reconcile.ResultRequeue, or produces an error.
313319
func (r *BucketReconciler) reconcile(ctx context.Context, obj *sourcev1.Bucket, reconcilers []bucketReconcileFunc) (sreconcile.Result, error) {
314320
if obj.Generation != obj.Status.ObservedGeneration {
315321
conditions.MarkReconciling(obj, "NewGeneration", "reconciling new object generation (%d)", obj.Generation)
316322
}
317323

324+
oldObj := obj.DeepCopy()
325+
318326
// Create temp working dir
319327
tmpDir, err := os.MkdirTemp("", fmt.Sprintf("%s-%s-%s-", obj.Kind, obj.Namespace, obj.Name))
320328
if err != nil {
@@ -355,9 +363,42 @@ func (r *BucketReconciler) reconcile(ctx context.Context, obj *sourcev1.Bucket,
355363
// Prioritize requeue request in the result.
356364
res = sreconcile.LowestRequeuingResult(res, recResult)
357365
}
366+
367+
r.notify(oldObj, obj, index, res, resErr)
368+
358369
return res, resErr
359370
}
360371

372+
// notify emits notification related to the reconciliation.
373+
func (r *BucketReconciler) notify(oldObj, newObj *sourcev1.Bucket, index *etagIndex, res sreconcile.Result, resErr error) {
374+
// Notify successful reconciliation for new artifact and recovery from any
375+
// failure.
376+
if resErr == nil && res == sreconcile.ResultSuccess && newObj.Status.Artifact != nil {
377+
annotations := map[string]string{
378+
"revision": newObj.Status.Artifact.Revision,
379+
"checksum": newObj.Status.Artifact.Checksum,
380+
}
381+
382+
var oldChecksum string
383+
if oldObj.GetArtifact() != nil {
384+
oldChecksum = oldObj.GetArtifact().Checksum
385+
}
386+
387+
message := fmt.Sprintf("stored %d files from '%s'", index.Len(), newObj.Spec.BucketName)
388+
389+
// Notify on new artifact and failure recovery.
390+
if oldChecksum != newObj.GetArtifact().Checksum {
391+
r.AnnotatedEventf(newObj, annotations, corev1.EventTypeNormal,
392+
"NewArtifact", message)
393+
} else {
394+
if sreconcile.FailureRecovery(oldObj, newObj, bucketFailConditions) {
395+
r.AnnotatedEventf(newObj, annotations, corev1.EventTypeNormal,
396+
meta.SucceededReason, message)
397+
}
398+
}
399+
}
400+
}
401+
361402
// reconcileStorage ensures the current state of the storage matches the
362403
// desired and previously observed state.
363404
//
@@ -574,10 +615,6 @@ func (r *BucketReconciler) reconcileArtifact(ctx context.Context, obj *sourcev1.
574615
conditions.MarkTrue(obj, sourcev1.StorageOperationFailedCondition, e.Reason, e.Err.Error())
575616
return sreconcile.ResultEmpty, e
576617
}
577-
r.annotatedEventLogf(ctx, obj, map[string]string{
578-
sourcev1.GroupVersion.Group + "/revision": artifact.Revision,
579-
sourcev1.GroupVersion.Group + "/checksum": artifact.Checksum,
580-
}, corev1.EventTypeNormal, "NewArtifact", "fetched %d files from '%s'", index.Len(), obj.Spec.BucketName)
581618

582619
// Record it on the object
583620
obj.Status.Artifact = artifact.DeepCopy()

controllers/bucket_controller_test.go

+115
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package controllers
1818

1919
import (
2020
"context"
21+
"errors"
2122
"fmt"
2223
"net/http"
2324
"net/url"
@@ -1163,3 +1164,117 @@ func TestBucketReconciler_statusConditions(t *testing.T) {
11631164
})
11641165
}
11651166
}
1167+
1168+
func TestBucketReconciler_notify(t *testing.T) {
1169+
tests := []struct {
1170+
name string
1171+
res sreconcile.Result
1172+
resErr error
1173+
oldObjBeforeFunc func(obj *sourcev1.Bucket)
1174+
newObjBeforeFunc func(obj *sourcev1.Bucket)
1175+
wantEvent string
1176+
}{
1177+
{
1178+
name: "error - no event",
1179+
res: sreconcile.ResultEmpty,
1180+
resErr: errors.New("some error"),
1181+
},
1182+
{
1183+
name: "new artifact",
1184+
res: sreconcile.ResultSuccess,
1185+
resErr: nil,
1186+
newObjBeforeFunc: func(obj *sourcev1.Bucket) {
1187+
obj.Status.Artifact = &sourcev1.Artifact{Revision: "xxx", Checksum: "yyy"}
1188+
},
1189+
wantEvent: "Normal NewArtifact stored 2 files from",
1190+
},
1191+
{
1192+
name: "recovery from failure",
1193+
res: sreconcile.ResultSuccess,
1194+
resErr: nil,
1195+
oldObjBeforeFunc: func(obj *sourcev1.Bucket) {
1196+
obj.Status.Artifact = &sourcev1.Artifact{Revision: "xxx", Checksum: "yyy"}
1197+
conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, sourcev1.GitOperationFailedReason, "fail")
1198+
conditions.MarkFalse(obj, meta.ReadyCondition, meta.FailedReason, "foo")
1199+
},
1200+
newObjBeforeFunc: func(obj *sourcev1.Bucket) {
1201+
obj.Status.Artifact = &sourcev1.Artifact{Revision: "xxx", Checksum: "yyy"}
1202+
conditions.MarkTrue(obj, meta.ReadyCondition, meta.SucceededReason, "ready")
1203+
},
1204+
wantEvent: "Normal Succeeded stored 2 files from",
1205+
},
1206+
{
1207+
name: "recovery and new artifact",
1208+
res: sreconcile.ResultSuccess,
1209+
resErr: nil,
1210+
oldObjBeforeFunc: func(obj *sourcev1.Bucket) {
1211+
obj.Status.Artifact = &sourcev1.Artifact{Revision: "xxx", Checksum: "yyy"}
1212+
conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, sourcev1.GitOperationFailedReason, "fail")
1213+
conditions.MarkFalse(obj, meta.ReadyCondition, meta.FailedReason, "foo")
1214+
},
1215+
newObjBeforeFunc: func(obj *sourcev1.Bucket) {
1216+
obj.Status.Artifact = &sourcev1.Artifact{Revision: "aaa", Checksum: "bbb"}
1217+
conditions.MarkTrue(obj, meta.ReadyCondition, meta.SucceededReason, "ready")
1218+
},
1219+
wantEvent: "Normal NewArtifact stored 2 files from",
1220+
},
1221+
{
1222+
name: "no updates",
1223+
res: sreconcile.ResultSuccess,
1224+
resErr: nil,
1225+
oldObjBeforeFunc: func(obj *sourcev1.Bucket) {
1226+
obj.Status.Artifact = &sourcev1.Artifact{Revision: "xxx", Checksum: "yyy"}
1227+
conditions.MarkTrue(obj, meta.ReadyCondition, meta.SucceededReason, "ready")
1228+
},
1229+
newObjBeforeFunc: func(obj *sourcev1.Bucket) {
1230+
obj.Status.Artifact = &sourcev1.Artifact{Revision: "xxx", Checksum: "yyy"}
1231+
conditions.MarkTrue(obj, meta.ReadyCondition, meta.SucceededReason, "ready")
1232+
},
1233+
},
1234+
}
1235+
1236+
for _, tt := range tests {
1237+
t.Run(tt.name, func(t *testing.T) {
1238+
g := NewWithT(t)
1239+
1240+
recorder := record.NewFakeRecorder(32)
1241+
1242+
oldObj := &sourcev1.Bucket{
1243+
Spec: sourcev1.BucketSpec{
1244+
BucketName: "test-bucket",
1245+
},
1246+
}
1247+
newObj := oldObj.DeepCopy()
1248+
1249+
if tt.oldObjBeforeFunc != nil {
1250+
tt.oldObjBeforeFunc(oldObj)
1251+
}
1252+
if tt.newObjBeforeFunc != nil {
1253+
tt.newObjBeforeFunc(newObj)
1254+
}
1255+
1256+
reconciler := &BucketReconciler{
1257+
EventRecorder: recorder,
1258+
}
1259+
index := &etagIndex{
1260+
index: map[string]string{
1261+
"zzz": "qqq",
1262+
"bbb": "ddd",
1263+
},
1264+
}
1265+
reconciler.notify(oldObj, newObj, index, tt.res, tt.resErr)
1266+
1267+
select {
1268+
case x, ok := <-recorder.Events:
1269+
g.Expect(ok).To(Equal(tt.wantEvent != ""), "unexpected event received")
1270+
if tt.wantEvent != "" {
1271+
g.Expect(x).To(ContainSubstring(tt.wantEvent))
1272+
}
1273+
default:
1274+
if tt.wantEvent != "" {
1275+
t.Errorf("expected some event to be emitted")
1276+
}
1277+
}
1278+
})
1279+
}
1280+
}

controllers/gitrepository_controller.go

+42-4
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,13 @@ var gitRepositoryReadyCondition = summarize.Conditions{
9191
},
9292
}
9393

94+
// gitRepositoryFailConditions contains the conditions that represent failure.
95+
var gitRepositoryFailConditions = []string{
96+
sourcev1.FetchFailedCondition,
97+
sourcev1.IncludeUnavailableCondition,
98+
sourcev1.StorageOperationFailedCondition,
99+
}
100+
94101
// +kubebuilder:rbac:groups=source.toolkit.fluxcd.io,resources=gitrepositories,verbs=get;list;watch;create;update;patch;delete
95102
// +kubebuilder:rbac:groups=source.toolkit.fluxcd.io,resources=gitrepositories/status,verbs=get;update;patch
96103
// +kubebuilder:rbac:groups=source.toolkit.fluxcd.io,resources=gitrepositories/finalizers,verbs=get;create;update;patch;delete
@@ -217,6 +224,8 @@ func (r *GitRepositoryReconciler) reconcile(ctx context.Context, obj *sourcev1.G
217224
conditions.MarkReconciling(obj, "NewGeneration", "reconciling new object generation (%d)", obj.Generation)
218225
}
219226

227+
oldObj := obj.DeepCopy()
228+
220229
// Create temp dir for Git clone
221230
tmpDir, err := util.TempDirForObj("", obj)
222231
if err != nil {
@@ -258,9 +267,42 @@ func (r *GitRepositoryReconciler) reconcile(ctx context.Context, obj *sourcev1.G
258267
// Prioritize requeue request in the result.
259268
res = sreconcile.LowestRequeuingResult(res, recResult)
260269
}
270+
271+
r.notify(oldObj, obj, commit, res, resErr)
272+
261273
return res, resErr
262274
}
263275

276+
// notify emits notification related to the reconciliation.
277+
func (r *GitRepositoryReconciler) notify(oldObj, newObj *sourcev1.GitRepository, commit git.Commit, res sreconcile.Result, resErr error) {
278+
// Notify successful reconciliation for new artifact and recovery from any
279+
// failure.
280+
if resErr == nil && res == sreconcile.ResultSuccess && newObj.Status.Artifact != nil {
281+
annotations := map[string]string{
282+
"revision": newObj.Status.Artifact.Revision,
283+
"checksum": newObj.Status.Artifact.Checksum,
284+
}
285+
286+
var oldChecksum string
287+
if oldObj.GetArtifact() != nil {
288+
oldChecksum = oldObj.GetArtifact().Checksum
289+
}
290+
291+
message := fmt.Sprintf("stored artifact for commit '%s'", commit.ShortMessage())
292+
293+
// Notify on new artifact and failure recovery.
294+
if oldChecksum != newObj.GetArtifact().Checksum {
295+
r.AnnotatedEventf(newObj, annotations, corev1.EventTypeNormal,
296+
"NewArtifact", message)
297+
} else {
298+
if sreconcile.FailureRecovery(oldObj, newObj, gitRepositoryFailConditions) {
299+
r.AnnotatedEventf(newObj, annotations, corev1.EventTypeNormal,
300+
meta.SucceededReason, message)
301+
}
302+
}
303+
}
304+
}
305+
264306
// reconcileStorage ensures the current state of the storage matches the
265307
// desired and previously observed state.
266308
//
@@ -523,10 +565,6 @@ func (r *GitRepositoryReconciler) reconcileArtifact(ctx context.Context,
523565
conditions.MarkTrue(obj, sourcev1.StorageOperationFailedCondition, e.Reason, e.Err.Error())
524566
return sreconcile.ResultEmpty, e
525567
}
526-
r.AnnotatedEventf(obj, map[string]string{
527-
sourcev1.GroupVersion.Group + "/revision": artifact.Revision,
528-
sourcev1.GroupVersion.Group + "/checksum": artifact.Checksum,
529-
}, corev1.EventTypeNormal, "NewArtifact", "stored artifact for commit '%s'", commit.ShortMessage())
530568

531569
// Record it on the object
532570
obj.Status.Artifact = artifact.DeepCopy()

0 commit comments

Comments
 (0)