@@ -64,7 +64,7 @@ type controllerManager struct {
64
64
// (and EventHandlers, Sources and Predicates).
65
65
recorderProvider recorder.Provider
66
66
67
- // resourceLock
67
+ // resourceLock forms the basis for leader election
68
68
resourceLock resourcelock.Interface
69
69
70
70
// mapper is used to map resources to kind, and map kind and version.
@@ -73,10 +73,16 @@ type controllerManager struct {
73
73
mu sync.Mutex
74
74
started bool
75
75
errChan chan error
76
- stop <- chan struct {}
77
76
78
- // stopper is the write side of the stop channel. They should have the same value.
79
- stopper chan <- struct {}
77
+ // internalStop is the stop channel *actually* used by everything involved
78
+ // with the manager as a stop channel, so that we can pass a stop channel
79
+ // to things that need it off the bat (like the Channel source). It can
80
+ // be closed via `internalStopper` (by being the same underlying channel).
81
+ internalStop <- chan struct {}
82
+
83
+ // internalStopper is the write side of the internal stop channel, allowing us to close it.
84
+ // It and `internalStop` should point to the same channel.
85
+ internalStopper chan <- struct {}
80
86
81
87
startCache func (stop <- chan struct {}) error
82
88
}
@@ -96,7 +102,7 @@ func (cm *controllerManager) Add(r Runnable) error {
96
102
if cm .started {
97
103
// If already started, start the controller
98
104
go func () {
99
- cm .errChan <- r .Start (cm .stop )
105
+ cm .errChan <- r .Start (cm .internalStop )
100
106
}()
101
107
}
102
108
@@ -119,7 +125,7 @@ func (cm *controllerManager) SetFields(i interface{}) error {
119
125
if _ , err := inject .InjectorInto (cm .SetFields , i ); err != nil {
120
126
return err
121
127
}
122
- if _ , err := inject .StopChannelInto (cm .stop , i ); err != nil {
128
+ if _ , err := inject .StopChannelInto (cm .internalStop , i ); err != nil {
123
129
return err
124
130
}
125
131
if _ , err := inject .DecoderInto (cm .admissionDecoder , i ); err != nil {
@@ -161,13 +167,15 @@ func (cm *controllerManager) GetRESTMapper() meta.RESTMapper {
161
167
}
162
168
163
169
func (cm * controllerManager ) Start (stop <- chan struct {}) error {
164
- defer close (cm .stopper )
170
+ // stop everything we start when we exit this method for any reason
171
+ defer close (cm .internalStopper )
165
172
173
+ // if we're not using resource election, start directly
166
174
if cm .resourceLock == nil {
167
- go cm .start ()
175
+ cm .start ()
168
176
select {
169
177
// Only this function should receive from stop, and everything else
170
- // should receive from cm.stop .
178
+ // should receive from cm.internalStop .
171
179
case <- stop :
172
180
// we are done
173
181
return nil
@@ -177,7 +185,8 @@ func (cm *controllerManager) Start(stop <-chan struct{}) error {
177
185
}
178
186
}
179
187
180
- l , err := leaderelection .NewLeaderElector (leaderelection.LeaderElectionConfig {
188
+ // otherwise, start when we acquire a resource election lock
189
+ leaderElector , err := leaderelection .NewLeaderElector (leaderelection.LeaderElectionConfig {
181
190
Lock : cm .resourceLock ,
182
191
// Values taken from: https://github.com/kubernetes/apiserver/blob/master/pkg/apis/config/v1alpha1/defaults.go
183
192
// TODO(joelspeed): These timings should be configurable
@@ -204,7 +213,7 @@ func (cm *controllerManager) Start(stop <-chan struct{}) error {
204
213
return err
205
214
}
206
215
207
- go l .Run ()
216
+ go leaderElector .Run ()
208
217
209
218
select {
210
219
case <- stop :
@@ -225,22 +234,22 @@ func (cm *controllerManager) start() {
225
234
cm .startCache = cm .cache .Start
226
235
}
227
236
go func () {
228
- if err := cm .startCache (cm .stop ); err != nil {
237
+ if err := cm .startCache (cm .internalStop ); err != nil {
229
238
cm .errChan <- err
230
239
}
231
240
}()
232
241
233
242
// Wait for the caches to sync.
234
243
// TODO(community): Check the return value and write a test
235
- cm .cache .WaitForCacheSync (cm .stop )
244
+ cm .cache .WaitForCacheSync (cm .internalStop )
236
245
237
246
// Start the runnables after the cache has synced
238
247
for _ , c := range cm .runnables {
239
248
// Controllers block, but we want to return an error if any have an error starting.
240
249
// Write any Start errors to a channel so we can return them
241
250
ctrl := c
242
251
go func () {
243
- cm .errChan <- ctrl .Start (cm .stop )
252
+ cm .errChan <- ctrl .Start (cm .internalStop )
244
253
}()
245
254
}
246
255
0 commit comments