Skip to content

Commit 010d65c

Browse files
authored
Merge pull request #204 from DirectXMan12/bug/stop-channel-borked
fix nil stop value for source.Channel
2 parents 902ff11 + 071c3a2 commit 010d65c

File tree

7 files changed

+43
-30
lines changed

7 files changed

+43
-30
lines changed

.travis.yml

+1-1
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ os:
55
- osx
66

77
go:
8-
- "1.10"
8+
- "1.11"
99

1010
git:
1111
depth: 3

pkg/cache/internal/cache_reader.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ func (c *CacheReader) Get(_ context.Context, key client.ObjectKey, out runtime.O
8686
}
8787

8888
// List lists items out of the indexer and writes them to out
89-
func (c *CacheReader) List(ctx context.Context, opts *client.ListOptions, out runtime.Object) error {
89+
func (c *CacheReader) List(_ context.Context, opts *client.ListOptions, out runtime.Object) error {
9090
var objs []interface{}
9191
var err error
9292

pkg/controller/controller.go

+8-8
Original file line numberDiff line numberDiff line change
@@ -78,15 +78,15 @@ func New(name string, mgr manager.Manager, options Options) (Controller, error)
7878

7979
// Create controller with dependencies set
8080
c := &controller.Controller{
81-
Do: options.Reconciler,
82-
Cache: mgr.GetCache(),
83-
Config: mgr.GetConfig(),
84-
Scheme: mgr.GetScheme(),
85-
Client: mgr.GetClient(),
86-
Recorder: mgr.GetRecorder(name),
87-
Queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), name),
81+
Do: options.Reconciler,
82+
Cache: mgr.GetCache(),
83+
Config: mgr.GetConfig(),
84+
Scheme: mgr.GetScheme(),
85+
Client: mgr.GetClient(),
86+
Recorder: mgr.GetRecorder(name),
87+
Queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), name),
8888
MaxConcurrentReconciles: options.MaxConcurrentReconciles,
89-
Name: name,
89+
Name: name,
9090
}
9191

9292
// Add the controller as a Manager components

pkg/internal/controller/controller_test.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -63,9 +63,9 @@ var _ = Describe("controller", func() {
6363
informers = &informertest.FakeInformers{}
6464
ctrl = &Controller{
6565
MaxConcurrentReconciles: 1,
66-
Do: fakeReconcile,
67-
Queue: queue,
68-
Cache: informers,
66+
Do: fakeReconcile,
67+
Queue: queue,
68+
Cache: informers,
6969
}
7070
ctrl.InjectFunc(func(interface{}) error { return nil })
7171
})

pkg/manager/internal.go

+25-15
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ type controllerManager struct {
6969
// (and EventHandlers, Sources and Predicates).
7070
recorderProvider recorder.Provider
7171

72-
// resourceLock
72+
// resourceLock forms the basis for leader election
7373
resourceLock resourcelock.Interface
7474

7575
// mapper is used to map resources to kind, and map kind and version.
@@ -81,7 +81,16 @@ type controllerManager struct {
8181
mu sync.Mutex
8282
started bool
8383
errChan chan error
84-
stop <-chan struct{}
84+
85+
// internalStop is the stop channel *actually* used by everything involved
86+
// with the manager as a stop channel, so that we can pass a stop channel
87+
// to things that need it off the bat (like the Channel source). It can
88+
// be closed via `internalStopper` (by being the same underlying channel).
89+
internalStop <-chan struct{}
90+
91+
// internalStopper is the write side of the internal stop channel, allowing us to close it.
92+
// It and `internalStop` should point to the same channel.
93+
internalStopper chan<- struct{}
8594

8695
startCache func(stop <-chan struct{}) error
8796
}
@@ -101,7 +110,7 @@ func (cm *controllerManager) Add(r Runnable) error {
101110
if cm.started {
102111
// If already started, start the controller
103112
go func() {
104-
cm.errChan <- r.Start(cm.stop)
113+
cm.errChan <- r.Start(cm.internalStop)
105114
}()
106115
}
107116

@@ -124,7 +133,7 @@ func (cm *controllerManager) SetFields(i interface{}) error {
124133
if _, err := inject.InjectorInto(cm.SetFields, i); err != nil {
125134
return err
126135
}
127-
if _, err := inject.StopChannelInto(cm.stop, i); err != nil {
136+
if _, err := inject.StopChannelInto(cm.internalStop, i); err != nil {
128137
return err
129138
}
130139
if _, err := inject.DecoderInto(cm.admissionDecoder, i); err != nil {
@@ -192,13 +201,16 @@ func (cm *controllerManager) serveMetrics(stop <-chan struct{}) {
192201
}
193202

194203
func (cm *controllerManager) Start(stop <-chan struct{}) error {
204+
// join the passed-in stop channel as an upstream feeding into cm.internalStopper
205+
defer close(cm.internalStopper)
206+
195207
if cm.resourceLock != nil {
196-
err := cm.startLeaderElection(stop)
208+
err := cm.startLeaderElection()
197209
if err != nil {
198210
return err
199211
}
200212
} else {
201-
go cm.start(stop)
213+
go cm.start()
202214
}
203215

204216
select {
@@ -211,45 +223,43 @@ func (cm *controllerManager) Start(stop <-chan struct{}) error {
211223
}
212224
}
213225

214-
func (cm *controllerManager) start(stop <-chan struct{}) {
226+
func (cm *controllerManager) start() {
215227
cm.mu.Lock()
216228
defer cm.mu.Unlock()
217229

218-
cm.stop = stop
219-
220230
// Start the Cache. Allow the function to start the cache to be mocked out for testing
221231
if cm.startCache == nil {
222232
cm.startCache = cm.cache.Start
223233
}
224234
go func() {
225-
if err := cm.startCache(stop); err != nil {
235+
if err := cm.startCache(cm.internalStop); err != nil {
226236
cm.errChan <- err
227237
}
228238
}()
229239

230240
// Start the metrics server
231241
if cm.metricsListener != nil {
232-
go cm.serveMetrics(stop)
242+
go cm.serveMetrics(cm.internalStop)
233243
}
234244

235245
// Wait for the caches to sync.
236246
// TODO(community): Check the return value and write a test
237-
cm.cache.WaitForCacheSync(stop)
247+
cm.cache.WaitForCacheSync(cm.internalStop)
238248

239249
// Start the runnables after the cache has synced
240250
for _, c := range cm.runnables {
241251
// Controllers block, but we want to return an error if any have an error starting.
242252
// Write any Start errors to a channel so we can return them
243253
ctrl := c
244254
go func() {
245-
cm.errChan <- ctrl.Start(stop)
255+
cm.errChan <- ctrl.Start(cm.internalStop)
246256
}()
247257
}
248258

249259
cm.started = true
250260
}
251261

252-
func (cm *controllerManager) startLeaderElection(stop <-chan struct{}) (err error) {
262+
func (cm *controllerManager) startLeaderElection() (err error) {
253263
l, err := leaderelection.NewLeaderElector(leaderelection.LeaderElectionConfig{
254264
Lock: cm.resourceLock,
255265
// Values taken from: https://github.com/kubernetes/apiserver/blob/master/pkg/apis/config/v1alpha1/defaults.go
@@ -259,7 +269,7 @@ func (cm *controllerManager) startLeaderElection(stop <-chan struct{}) (err erro
259269
RetryPeriod: 2 * time.Second,
260270
Callbacks: leaderelection.LeaderCallbacks{
261271
OnStartedLeading: func(_ <-chan struct{}) {
262-
cm.start(stop)
272+
cm.start()
263273
},
264274
OnStoppedLeading: func() {
265275
// Most implementations of leader election log.Fatal() here.

pkg/manager/manager.go

+4
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,8 @@ func New(config *rest.Config, options Options) (Manager, error) {
200200
return nil, err
201201
}
202202

203+
stop := make(chan struct{})
204+
203205
return &controllerManager{
204206
config: config,
205207
scheme: options.Scheme,
@@ -219,6 +221,8 @@ func New(config *rest.Config, options Options) (Manager, error) {
219221
resourceLock: resourceLock,
220222
mapper: mapper,
221223
metricsListener: metricsListener,
224+
internalStop: stop,
225+
internalStopper: stop,
222226
}, nil
223227
}
224228

pkg/manager/manager_test.go

+1-2
Original file line numberDiff line numberDiff line change
@@ -497,8 +497,7 @@ var _ = Describe("manger.Manager", func() {
497497
},
498498
stop: func(stop <-chan struct{}) error {
499499
defer GinkgoRecover()
500-
// Manager stop chan has not been initialized.
501-
Expect(stop).To(BeNil())
500+
Expect(stop).NotTo(BeNil())
502501
return nil
503502
},
504503
f: func(f inject.Func) error {

0 commit comments

Comments
 (0)