@@ -69,7 +69,7 @@ type controllerManager struct {
69
69
// (and EventHandlers, Sources and Predicates).
70
70
recorderProvider recorder.Provider
71
71
72
- // resourceLock
72
+ // resourceLock forms the basis for leader election
73
73
resourceLock resourcelock.Interface
74
74
75
75
// mapper is used to map resources to kind, and map kind and version.
@@ -81,10 +81,16 @@ type controllerManager struct {
81
81
mu sync.Mutex
82
82
started bool
83
83
errChan chan error
84
- stop <- chan struct {}
85
84
86
- // stopper is the write side of the stop channel. They should have the same value.
87
- stopper chan <- struct {}
85
+ // internalStop is the stop channel *actually* used by everything involved
86
+ // with the manager as a stop channel, so that we can pass a stop channel
87
+ // to things that need it off the bat (like the Channel source). It can
88
+ // be closed via `internalStopper` (by being the same underlying channel).
89
+ internalStop <- chan struct {}
90
+
91
+ // internalStopper is the write side of the internal stop channel, allowing us to close it.
92
+ // It and `internalStop` should point to the same channel.
93
+ internalStopper chan <- struct {}
88
94
89
95
startCache func (stop <- chan struct {}) error
90
96
}
@@ -104,7 +110,7 @@ func (cm *controllerManager) Add(r Runnable) error {
104
110
if cm .started {
105
111
// If already started, start the controller
106
112
go func () {
107
- cm .errChan <- r .Start (cm .stop )
113
+ cm .errChan <- r .Start (cm .internalStop )
108
114
}()
109
115
}
110
116
@@ -127,7 +133,7 @@ func (cm *controllerManager) SetFields(i interface{}) error {
127
133
if _ , err := inject .InjectorInto (cm .SetFields , i ); err != nil {
128
134
return err
129
135
}
130
- if _ , err := inject .StopChannelInto (cm .stop , i ); err != nil {
136
+ if _ , err := inject .StopChannelInto (cm .internalStop , i ); err != nil {
131
137
return err
132
138
}
133
139
if _ , err := inject .DecoderInto (cm .admissionDecoder , i ); err != nil {
@@ -195,8 +201,8 @@ func (cm *controllerManager) serveMetrics(stop <-chan struct{}) {
195
201
}
196
202
197
203
func (cm * controllerManager ) Start (stop <- chan struct {}) error {
198
- // join the passed-in stop channel as an upstream feeding into cm.stopper
199
- defer close (cm .stopper )
204
+ // join the passed-in stop channel as an upstream feeding into cm.internalStopper
205
+ defer close (cm .internalStopper )
200
206
201
207
if cm .resourceLock != nil {
202
208
err := cm .startLeaderElection ()
@@ -226,27 +232,27 @@ func (cm *controllerManager) start() {
226
232
cm .startCache = cm .cache .Start
227
233
}
228
234
go func () {
229
- if err := cm .startCache (cm .stop ); err != nil {
235
+ if err := cm .startCache (cm .internalStop ); err != nil {
230
236
cm .errChan <- err
231
237
}
232
238
}()
233
239
234
240
// Start the metrics server
235
241
if cm .metricsListener != nil {
236
- go cm .serveMetrics (cm .stop )
242
+ go cm .serveMetrics (cm .internalStop )
237
243
}
238
244
239
245
// Wait for the caches to sync.
240
246
// TODO(community): Check the return value and write a test
241
- cm .cache .WaitForCacheSync (cm .stop )
247
+ cm .cache .WaitForCacheSync (cm .internalStop )
242
248
243
249
// Start the runnables after the cache has synced
244
250
for _ , c := range cm .runnables {
245
251
// Controllers block, but we want to return an error if any have an error starting.
246
252
// Write any Start errors to a channel so we can return them
247
253
ctrl := c
248
254
go func () {
249
- cm .errChan <- ctrl .Start (cm .stop )
255
+ cm .errChan <- ctrl .Start (cm .internalStop )
250
256
}()
251
257
}
252
258
0 commit comments