Skip to content

Commit f5833f3

Browse files
vinceprik8s-infra-cherrypick-robot
authored and
k8s-infra-cherrypick-robot
committedApr 5, 2024··
bug: Runnable group should check if stopped before enqueueing
Signed-off-by: Vince Prignano <vincepri@redhat.com>
1 parent 854a6b1 commit f5833f3

File tree

2 files changed

+50
-1
lines changed

2 files changed

+50
-1
lines changed
 

‎pkg/manager/runnable_group.go

+14-1
Original file line numberDiff line numberDiff line change
@@ -263,6 +263,15 @@ func (r *runnableGroup) Add(rn Runnable, ready runnableCheck) error {
263263
r.start.Unlock()
264264
}
265265

266+
// Recheck if we're stopped and hold the readlock, given that the stop and start can be called
267+
// at the same time, we can end up in a situation where the runnable is added
268+
// after the group is stopped and the channel is closed.
269+
r.stop.RLock()
270+
defer r.stop.RUnlock()
271+
if r.stopped {
272+
return errRunnableGroupStopped
273+
}
274+
266275
// Enqueue the runnable.
267276
r.ch <- readyRunnable
268277
return nil
@@ -272,7 +281,11 @@ func (r *runnableGroup) Add(rn Runnable, ready runnableCheck) error {
272281
func (r *runnableGroup) StopAndWait(ctx context.Context) {
273282
r.stopOnce.Do(func() {
274283
// Close the reconciler channel once we're done.
275-
defer close(r.ch)
284+
defer func() {
285+
r.stop.Lock()
286+
close(r.ch)
287+
r.stop.Unlock()
288+
}()
276289

277290
_ = r.Start(ctx)
278291
r.stop.Lock()

‎pkg/manager/runnable_group_test.go

+36
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,42 @@ var _ = Describe("runnableGroup", func() {
161161
}
162162
})
163163

164+
It("should be able to handle adding runnables while stopping", func() {
165+
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
166+
defer cancel()
167+
rg := newRunnableGroup(defaultBaseContext, errCh)
168+
169+
go func() {
170+
defer GinkgoRecover()
171+
<-time.After(1 * time.Millisecond)
172+
Expect(rg.Start(ctx)).To(Succeed())
173+
}()
174+
go func() {
175+
defer GinkgoRecover()
176+
<-time.After(1 * time.Millisecond)
177+
ctx, cancel := context.WithCancel(context.Background())
178+
cancel()
179+
rg.StopAndWait(ctx)
180+
}()
181+
182+
for i := 0; i < 200; i++ {
183+
go func(i int) {
184+
defer GinkgoRecover()
185+
186+
<-time.After(time.Duration(i) * time.Microsecond)
187+
Expect(rg.Add(RunnableFunc(func(c context.Context) error {
188+
<-ctx.Done()
189+
return nil
190+
}), func(_ context.Context) bool {
191+
return true
192+
})).To(SatisfyAny(
193+
Succeed(),
194+
Equal(errRunnableGroupStopped),
195+
))
196+
}(i)
197+
}
198+
})
199+
164200
It("should not turn ready if some readiness check fail", func() {
165201
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
166202
defer cancel()

0 commit comments

Comments
 (0)
Please sign in to comment.