-
Notifications
You must be signed in to change notification settings - Fork 237
/
Copy pathstate.go
280 lines (243 loc) · 7.6 KB
/
state.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
package resource
import (
"context"
"log"
"time"
)
var refreshGracePeriod = 30 * time.Second
// StateRefreshFunc is a function type used for StateChangeConf that is
// responsible for refreshing the item being watched for a state change.
//
// It returns three results. `result` is any object that will be returned
// as the final object after waiting for state change. This allows you to
// return the final updated object, for example an EC2 instance after refreshing
// it. A nil result represents not found.
//
// `state` is the latest state of that object. And `err` is any error that
// may have happened while refreshing the state.
type StateRefreshFunc func() (result interface{}, state string, err error)
// StateChangeConf is the configuration struct used for `WaitForState`.
type StateChangeConf struct {
Delay time.Duration // Wait this time before starting checks
Pending []string // States that are "allowed" and will continue trying
Refresh StateRefreshFunc // Refreshes the current state
Target []string // Target state
Timeout time.Duration // The amount of time to wait before timeout
MinTimeout time.Duration // Smallest time to wait before refreshes
PollInterval time.Duration // Override MinTimeout/backoff and only poll this often
NotFoundChecks int // Number of times to allow not found (nil result from Refresh)
// This is to work around inconsistent APIs
ContinuousTargetOccurence int // Number of times the Target state has to occur continuously
}
// WaitForStateContext watches an object and waits for it to achieve the state
// specified in the configuration using the specified Refresh() func,
// waiting the number of seconds specified in the timeout configuration.
//
// If the Refresh function returns an error, exit immediately with that error.
//
// If the Refresh function returns a state other than the Target state or one
// listed in Pending, return immediately with an error.
//
// If the Timeout is exceeded before reaching the Target state, return an
// error.
//
// Otherwise, the result is the result of the first call to the Refresh function to
// reach the target state.
//
// Cancellation from the passed in context will cancel the refresh loop
func (conf *StateChangeConf) WaitForStateContext(ctx context.Context) (interface{}, error) {
log.Printf("[DEBUG] Waiting for state to become: %s", conf.Target)
notfoundTick := 0
targetOccurence := 0
// Set a default for times to check for not found
if conf.NotFoundChecks == 0 {
conf.NotFoundChecks = 20
}
if conf.ContinuousTargetOccurence == 0 {
conf.ContinuousTargetOccurence = 1
}
type Result struct {
Result interface{}
State string
Error error
Done bool
}
// Read every result from the refresh loop, waiting for a positive result.Done.
resCh := make(chan Result, 1)
// cancellation channel for the refresh loop
cancelCh := make(chan struct{})
result := Result{}
go func() {
defer close(resCh)
select {
case <-time.After(conf.Delay):
case <-cancelCh:
return
}
// start with 0 delay for the first loop
var wait time.Duration
for {
// store the last result
resCh <- result
// wait and watch for cancellation
select {
case <-cancelCh:
return
case <-time.After(wait):
// first round had no wait
if wait == 0 {
wait = 100 * time.Millisecond
}
}
res, currentState, err := conf.Refresh()
result = Result{
Result: res,
State: currentState,
Error: err,
}
if err != nil {
resCh <- result
return
}
// If we're waiting for the absence of a thing, then return
if res == nil && len(conf.Target) == 0 {
targetOccurence++
if conf.ContinuousTargetOccurence == targetOccurence {
result.Done = true
resCh <- result
return
}
continue
}
if res == nil {
// If we didn't find the resource, check if we have been
// not finding it for awhile, and if so, report an error.
notfoundTick++
if notfoundTick > conf.NotFoundChecks {
result.Error = &NotFoundError{
LastError: err,
Retries: notfoundTick,
}
resCh <- result
return
}
} else {
// Reset the counter for when a resource isn't found
notfoundTick = 0
found := false
for _, allowed := range conf.Target {
if currentState == allowed {
found = true
targetOccurence++
if conf.ContinuousTargetOccurence == targetOccurence {
result.Done = true
resCh <- result
return
}
continue
}
}
for _, allowed := range conf.Pending {
if currentState == allowed {
found = true
targetOccurence = 0
break
}
}
if !found && len(conf.Pending) > 0 {
result.Error = &UnexpectedStateError{
LastError: err,
State: result.State,
ExpectedState: conf.Target,
}
resCh <- result
return
}
}
// Wait between refreshes using exponential backoff, except when
// waiting for the target state to reoccur.
if targetOccurence == 0 {
wait *= 2
}
// If a poll interval has been specified, choose that interval.
// Otherwise bound the default value.
if conf.PollInterval > 0 && conf.PollInterval < 180*time.Second {
wait = conf.PollInterval
} else {
if wait < conf.MinTimeout {
wait = conf.MinTimeout
} else if wait > 10*time.Second {
wait = 10 * time.Second
}
}
log.Printf("[TRACE] Waiting %s before next try", wait)
}
}()
// store the last value result from the refresh loop
lastResult := Result{}
timeout := time.After(conf.Timeout)
for {
select {
case r, ok := <-resCh:
// channel closed, so return the last result
if !ok {
return lastResult.Result, lastResult.Error
}
// we reached the intended state
if r.Done {
return r.Result, r.Error
}
// still waiting, store the last result
lastResult = r
case <-ctx.Done():
close(cancelCh)
return nil, ctx.Err()
case <-timeout:
log.Printf("[WARN] WaitForState timeout after %s", conf.Timeout)
log.Printf("[WARN] WaitForState starting %s refresh grace period", refreshGracePeriod)
// cancel the goroutine and start our grace period timer
close(cancelCh)
timeout := time.After(refreshGracePeriod)
// we need a for loop and a label to break on, because we may have
// an extra response value to read, but still want to wait for the
// channel to close.
forSelect:
for {
select {
case r, ok := <-resCh:
if r.Done {
// the last refresh loop reached the desired state
return r.Result, r.Error
}
if !ok {
// the goroutine returned
break forSelect
}
// target state not reached, save the result for the
// TimeoutError and wait for the channel to close
lastResult = r
case <-ctx.Done():
log.Println("[ERROR] Context cancelation detected, abandoning grace period")
break forSelect
case <-timeout:
log.Println("[ERROR] WaitForState exceeded refresh grace period")
break forSelect
}
}
return nil, &TimeoutError{
LastError: lastResult.Error,
LastState: lastResult.State,
Timeout: conf.Timeout,
ExpectedState: conf.Target,
}
}
}
}
// WaitForState watches an object and waits for it to achieve the state
// specified in the configuration using the specified Refresh() func,
// waiting the number of seconds specified in the timeout configuration.
//
// Deprecated: Please use WaitForStateContext to ensure proper plugin shutdown
func (conf *StateChangeConf) WaitForState() (interface{}, error) {
return conf.WaitForStateContext(context.Background())
}