Skip to content

Commit 732aa29

Browse files
mhrivnakDirectXMan12
authored andcommitted
fix nil stop value for source.Channel
fixes #103 Creates a stop channel for the manager in New(), which will get passed to any source.Channel instances that are added. When the manager's start method is called and a new stop channel is passed in, that channel will be joined in a goroutine with the manager's existing channel so that if the newer channel gets closed, so will the manager's.
1 parent 81f67a0 commit 732aa29

File tree

5 files changed

+31
-24
lines changed

5 files changed

+31
-24
lines changed

pkg/controller/controller.go

+8-8
Original file line numberDiff line numberDiff line change
@@ -78,15 +78,15 @@ func New(name string, mgr manager.Manager, options Options) (Controller, error)
7878

7979
// Create controller with dependencies set
8080
c := &controller.Controller{
81-
Do: options.Reconciler,
82-
Cache: mgr.GetCache(),
83-
Config: mgr.GetConfig(),
84-
Scheme: mgr.GetScheme(),
85-
Client: mgr.GetClient(),
86-
Recorder: mgr.GetRecorder(name),
87-
Queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), name),
81+
Do: options.Reconciler,
82+
Cache: mgr.GetCache(),
83+
Config: mgr.GetConfig(),
84+
Scheme: mgr.GetScheme(),
85+
Client: mgr.GetClient(),
86+
Recorder: mgr.GetRecorder(name),
87+
Queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), name),
8888
MaxConcurrentReconciles: options.MaxConcurrentReconciles,
89-
Name: name,
89+
Name: name,
9090
}
9191

9292
// Add the controller as a Manager components

pkg/internal/controller/controller_test.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -63,9 +63,9 @@ var _ = Describe("controller", func() {
6363
informers = &informertest.FakeInformers{}
6464
ctrl = &Controller{
6565
MaxConcurrentReconciles: 1,
66-
Do: fakeReconcile,
67-
Queue: queue,
68-
Cache: informers,
66+
Do: fakeReconcile,
67+
Queue: queue,
68+
Cache: informers,
6969
}
7070
ctrl.InjectFunc(func(interface{}) error { return nil })
7171
})

pkg/manager/internal.go

+15-11
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,9 @@ type controllerManager struct {
8383
errChan chan error
8484
stop <-chan struct{}
8585

86+
// stopper is the write side of the stop channel. They should have the same value.
87+
stopper chan<- struct{}
88+
8689
startCache func(stop <-chan struct{}) error
8790
}
8891

@@ -192,13 +195,16 @@ func (cm *controllerManager) serveMetrics(stop <-chan struct{}) {
192195
}
193196

194197
func (cm *controllerManager) Start(stop <-chan struct{}) error {
198+
// join the passed-in stop channel as an upstream feeding into cm.stopper
199+
defer close(cm.stopper)
200+
195201
if cm.resourceLock != nil {
196-
err := cm.startLeaderElection(stop)
202+
err := cm.startLeaderElection()
197203
if err != nil {
198204
return err
199205
}
200206
} else {
201-
go cm.start(stop)
207+
go cm.start()
202208
}
203209

204210
select {
@@ -211,45 +217,43 @@ func (cm *controllerManager) Start(stop <-chan struct{}) error {
211217
}
212218
}
213219

214-
func (cm *controllerManager) start(stop <-chan struct{}) {
220+
func (cm *controllerManager) start() {
215221
cm.mu.Lock()
216222
defer cm.mu.Unlock()
217223

218-
cm.stop = stop
219-
220224
// Start the Cache. Allow the function to start the cache to be mocked out for testing
221225
if cm.startCache == nil {
222226
cm.startCache = cm.cache.Start
223227
}
224228
go func() {
225-
if err := cm.startCache(stop); err != nil {
229+
if err := cm.startCache(cm.stop); err != nil {
226230
cm.errChan <- err
227231
}
228232
}()
229233

230234
// Start the metrics server
231235
if cm.metricsListener != nil {
232-
go cm.serveMetrics(stop)
236+
go cm.serveMetrics(cm.stop)
233237
}
234238

235239
// Wait for the caches to sync.
236240
// TODO(community): Check the return value and write a test
237-
cm.cache.WaitForCacheSync(stop)
241+
cm.cache.WaitForCacheSync(cm.stop)
238242

239243
// Start the runnables after the cache has synced
240244
for _, c := range cm.runnables {
241245
// Controllers block, but we want to return an error if any have an error starting.
242246
// Write any Start errors to a channel so we can return them
243247
ctrl := c
244248
go func() {
245-
cm.errChan <- ctrl.Start(stop)
249+
cm.errChan <- ctrl.Start(cm.stop)
246250
}()
247251
}
248252

249253
cm.started = true
250254
}
251255

252-
func (cm *controllerManager) startLeaderElection(stop <-chan struct{}) (err error) {
256+
func (cm *controllerManager) startLeaderElection() (err error) {
253257
l, err := leaderelection.NewLeaderElector(leaderelection.LeaderElectionConfig{
254258
Lock: cm.resourceLock,
255259
// Values taken from: https://github.com/kubernetes/apiserver/blob/master/pkg/apis/config/v1alpha1/defaults.go
@@ -259,7 +263,7 @@ func (cm *controllerManager) startLeaderElection(stop <-chan struct{}) (err erro
259263
RetryPeriod: 2 * time.Second,
260264
Callbacks: leaderelection.LeaderCallbacks{
261265
OnStartedLeading: func(_ <-chan struct{}) {
262-
cm.start(stop)
266+
cm.start()
263267
},
264268
OnStoppedLeading: func() {
265269
// Most implementations of leader election log.Fatal() here.

pkg/manager/manager.go

+4
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,8 @@ func New(config *rest.Config, options Options) (Manager, error) {
200200
return nil, err
201201
}
202202

203+
stop := make(chan struct{})
204+
203205
return &controllerManager{
204206
config: config,
205207
scheme: options.Scheme,
@@ -219,6 +221,8 @@ func New(config *rest.Config, options Options) (Manager, error) {
219221
resourceLock: resourceLock,
220222
mapper: mapper,
221223
metricsListener: metricsListener,
224+
stop: stop,
225+
stopper: stop,
222226
}, nil
223227
}
224228

pkg/manager/manager_test.go

+1-2
Original file line numberDiff line numberDiff line change
@@ -497,8 +497,7 @@ var _ = Describe("manger.Manager", func() {
497497
},
498498
stop: func(stop <-chan struct{}) error {
499499
defer GinkgoRecover()
500-
// Manager stop chan has not been initialized.
501-
Expect(stop).To(BeNil())
500+
Expect(stop).NotTo(BeNil())
502501
return nil
503502
},
504503
f: func(f inject.Func) error {

0 commit comments

Comments
 (0)