Skip to content

Commit 9eabe9f

Browse files
authored
Add customized probe num support for circuit breaker (#428)
1 parent a409ca5 commit 9eabe9f

File tree

3 files changed

+170
-10
lines changed

3 files changed

+170
-10
lines changed

core/circuitbreaker/circuit_breaker.go

+39-7
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,10 @@ type circuitBreakerBase struct {
136136
retryTimeoutMs uint32
137137
// nextRetryTimestampMs is the time circuit breaker could probe
138138
nextRetryTimestampMs uint64
139+
// probeNumber is the number of probe requests that are allowed to pass when the circuit breaker is half open.
140+
probeNumber uint64
141+
// curProbeNumber is the real-time probe number.
142+
curProbeNumber uint64
139143
// state is the state machine of circuit breaker
140144
state *State
141145
}
@@ -156,6 +160,14 @@ func (b *circuitBreakerBase) updateNextRetryTimestamp() {
156160
atomic.StoreUint64(&b.nextRetryTimestampMs, util.CurrentTimeMillis()+uint64(b.retryTimeoutMs))
157161
}
158162

163+
func (b *circuitBreakerBase) addCurProbeNum() {
164+
atomic.AddUint64(&b.curProbeNumber, 1)
165+
}
166+
167+
func (b *circuitBreakerBase) resetCurProbeNum() {
168+
atomic.StoreUint64(&b.curProbeNumber, 0)
169+
}
170+
159171
// fromClosedToOpen updates circuit breaker state machine from closed to open.
160172
// Return true only if current goroutine successfully accomplished the transformation.
161173
func (b *circuitBreakerBase) fromClosedToOpen(snapshot interface{}) bool {
@@ -206,6 +218,7 @@ func (b *circuitBreakerBase) fromOpenToHalfOpen(ctx *base.EntryContext) bool {
206218
// Return true only if current goroutine successfully accomplished the transformation.
207219
func (b *circuitBreakerBase) fromHalfOpenToOpen(snapshot interface{}) bool {
208220
if b.state.cas(HalfOpen, Open) {
221+
b.resetCurProbeNum()
209222
b.updateNextRetryTimestamp()
210223
for _, listener := range stateChangeListeners {
211224
listener.OnTransformToOpen(HalfOpen, *b.rule, snapshot)
@@ -221,6 +234,7 @@ func (b *circuitBreakerBase) fromHalfOpenToOpen(snapshot interface{}) bool {
221234
// Return true only if current goroutine successfully accomplished the transformation.
222235
func (b *circuitBreakerBase) fromHalfOpenToClosed() bool {
223236
if b.state.cas(HalfOpen, Closed) {
237+
b.resetCurProbeNum()
224238
for _, listener := range stateChangeListeners {
225239
listener.OnTransformToClosed(HalfOpen, *b.rule)
226240
}
@@ -247,6 +261,7 @@ func newSlowRtCircuitBreakerWithStat(r *Rule, stat *slowRequestLeapArray) *slowR
247261
retryTimeoutMs: r.RetryTimeoutMs,
248262
nextRetryTimestampMs: 0,
249263
state: newState(),
264+
probeNumber: r.ProbeNum,
250265
},
251266
stat: stat,
252267
maxAllowedRt: r.MaxAllowedRtMs,
@@ -282,6 +297,8 @@ func (b *slowRtCircuitBreaker) TryPass(ctx *base.EntryContext) bool {
282297
if b.retryTimeoutArrived() && b.fromOpenToHalfOpen(ctx) {
283298
return true
284299
}
300+
} else if curStatus == HalfOpen && b.probeNumber > 0 {
301+
return true
285302
}
286303
return false
287304
}
@@ -318,9 +335,12 @@ func (b *slowRtCircuitBreaker) OnRequestComplete(rt uint64, _ error) {
318335
// fail to probe
319336
b.fromHalfOpenToOpen(1.0)
320337
} else {
321-
// succeed to probe
322-
b.fromHalfOpenToClosed()
323-
b.resetMetric()
338+
b.addCurProbeNum()
339+
if b.probeNumber == 0 || atomic.LoadUint64(&b.curProbeNumber) >= b.probeNumber {
340+
// succeed to probe
341+
b.fromHalfOpenToClosed()
342+
b.resetMetric()
343+
}
324344
}
325345
return
326346
}
@@ -433,6 +453,7 @@ func newErrorRatioCircuitBreakerWithStat(r *Rule, stat *errorCounterLeapArray) *
433453
retryTimeoutMs: r.RetryTimeoutMs,
434454
nextRetryTimestampMs: 0,
435455
state: newState(),
456+
probeNumber: r.ProbeNum,
436457
},
437458
minRequestAmount: r.MinRequestAmount,
438459
errorRatioThreshold: r.Threshold,
@@ -465,6 +486,8 @@ func (b *errorRatioCircuitBreaker) TryPass(ctx *base.EntryContext) bool {
465486
if b.retryTimeoutArrived() && b.fromOpenToHalfOpen(ctx) {
466487
return true
467488
}
489+
} else if curStatus == HalfOpen && b.probeNumber > 0 {
490+
return true
468491
}
469492
return false
470493
}
@@ -498,8 +521,11 @@ func (b *errorRatioCircuitBreaker) OnRequestComplete(_ uint64, err error) {
498521
}
499522
if curStatus == HalfOpen {
500523
if err == nil {
501-
b.fromHalfOpenToClosed()
502-
b.resetMetric()
524+
b.addCurProbeNum()
525+
if b.probeNumber == 0 || atomic.LoadUint64(&b.curProbeNumber) >= b.probeNumber {
526+
b.fromHalfOpenToClosed()
527+
b.resetMetric()
528+
}
503529
} else {
504530
b.fromHalfOpenToOpen(1.0)
505531
}
@@ -612,6 +638,7 @@ func newErrorCountCircuitBreakerWithStat(r *Rule, stat *errorCounterLeapArray) *
612638
retryTimeoutMs: r.RetryTimeoutMs,
613639
nextRetryTimestampMs: 0,
614640
state: newState(),
641+
probeNumber: r.ProbeNum,
615642
},
616643
minRequestAmount: r.MinRequestAmount,
617644
errorCountThreshold: uint64(r.Threshold),
@@ -644,6 +671,8 @@ func (b *errorCountCircuitBreaker) TryPass(ctx *base.EntryContext) bool {
644671
if b.retryTimeoutArrived() && b.fromOpenToHalfOpen(ctx) {
645672
return true
646673
}
674+
} else if curStatus == HalfOpen && b.probeNumber > 0 {
675+
return true
647676
}
648677
return false
649678
}
@@ -675,8 +704,11 @@ func (b *errorCountCircuitBreaker) OnRequestComplete(_ uint64, err error) {
675704
}
676705
if curStatus == HalfOpen {
677706
if err == nil {
678-
b.fromHalfOpenToClosed()
679-
b.resetMetric()
707+
b.addCurProbeNum()
708+
if b.probeNumber == 0 || atomic.LoadUint64(&b.curProbeNumber) >= b.probeNumber {
709+
b.fromHalfOpenToClosed()
710+
b.resetMetric()
711+
}
680712
} else {
681713
b.fromHalfOpenToOpen(1)
682714
}

core/circuitbreaker/circuit_breaker_test.go

+126-3
Original file line numberDiff line numberDiff line change
@@ -61,18 +61,15 @@ type StateChangeListenerMock struct {
6161
}
6262

6363
func (s *StateChangeListenerMock) OnTransformToClosed(prev State, rule Rule) {
64-
_ = s.Called(prev, rule)
6564
logging.Debug("transform to closed", "strategy", rule.Strategy, "prevState", prev.String())
6665
return
6766
}
6867

6968
func (s *StateChangeListenerMock) OnTransformToOpen(prev State, rule Rule, snapshot interface{}) {
70-
_ = s.Called(prev, rule, snapshot)
7169
logging.Debug("transform to open", "strategy", rule.Strategy, "prevState", prev.String(), "snapshot", snapshot)
7270
}
7371

7472
func (s *StateChangeListenerMock) OnTransformToHalfOpen(prev State, rule Rule) {
75-
_ = s.Called(prev, rule)
7673
logging.Debug("transform to Half-Open", "strategy", rule.Strategy, "prevState", prev.String())
7774
}
7875

@@ -140,6 +137,35 @@ func TestSlowRtCircuitBreaker_TryPass(t *testing.T) {
140137
assert.True(t, pass)
141138
assert.True(t, b.state.get() == HalfOpen)
142139
})
140+
141+
t.Run("TryPass_ProbeNum", func(t *testing.T) {
142+
r := &Rule{
143+
Resource: "abc",
144+
Strategy: SlowRequestRatio,
145+
RetryTimeoutMs: 3000,
146+
MinRequestAmount: 10,
147+
StatIntervalMs: 10000,
148+
MaxAllowedRtMs: 50,
149+
Threshold: 0.5,
150+
ProbeNum: 10,
151+
}
152+
b, err := newSlowRtCircuitBreaker(r)
153+
assert.Nil(t, err)
154+
155+
b.state.set(Open)
156+
ctx := &base.EntryContext{
157+
Resource: base.NewResourceWrapper("abc", base.ResTypeCommon, base.Inbound),
158+
}
159+
e := base.NewSentinelEntry(ctx, base.NewResourceWrapper("abc", base.ResTypeCommon, base.Inbound), nil)
160+
ctx.SetEntry(e)
161+
for i := 0; i < 10; i++ {
162+
pass := b.TryPass(ctx)
163+
assert.True(t, pass)
164+
assert.True(t, b.state.get() == HalfOpen)
165+
b.OnRequestComplete(1, nil)
166+
}
167+
assert.True(t, b.state.get() == Closed)
168+
})
143169
}
144170

145171
func TestSlowRt_OnRequestComplete(t *testing.T) {
@@ -169,6 +195,20 @@ func TestSlowRt_OnRequestComplete(t *testing.T) {
169195
b.OnRequestComplete(10, nil)
170196
assert.True(t, b.CurrentState() == Closed)
171197
})
198+
t.Run("OnRequestComplete_ProbeNum_Success", func(t *testing.T) {
199+
b.probeNumber = 2
200+
b.state.set(HalfOpen)
201+
b.OnRequestComplete(10, nil)
202+
assert.True(t, b.CurrentState() == HalfOpen)
203+
assert.True(t, b.curProbeNumber == 1)
204+
})
205+
t.Run("OnRequestComplete_ProbeNum_Failed", func(t *testing.T) {
206+
b.probeNumber = 2
207+
b.state.set(HalfOpen)
208+
b.OnRequestComplete(base.NewEmptyEntryContext().Rt(), nil)
209+
assert.True(t, b.CurrentState() == Open)
210+
assert.True(t, b.curProbeNumber == 0)
211+
})
172212
}
173213

174214
func TestSlowRt_ResetBucketTo(t *testing.T) {
@@ -227,6 +267,33 @@ func TestErrorRatioCircuitBreaker_TryPass(t *testing.T) {
227267
assert.True(t, pass)
228268
assert.True(t, b.state.get() == HalfOpen)
229269
})
270+
t.Run("TryPass_ProbeNum", func(t *testing.T) {
271+
r := &Rule{
272+
Resource: "abc",
273+
Strategy: ErrorRatio,
274+
RetryTimeoutMs: 3000,
275+
MinRequestAmount: 10,
276+
StatIntervalMs: 10000,
277+
Threshold: 0.5,
278+
ProbeNum: 10,
279+
}
280+
b, err := newErrorRatioCircuitBreaker(r)
281+
assert.Nil(t, err)
282+
283+
b.state.set(Open)
284+
ctx := &base.EntryContext{
285+
Resource: base.NewResourceWrapper("abc", base.ResTypeCommon, base.Inbound),
286+
}
287+
e := base.NewSentinelEntry(ctx, base.NewResourceWrapper("abc", base.ResTypeCommon, base.Inbound), nil)
288+
ctx.SetEntry(e)
289+
for i := 0; i < 10; i++ {
290+
pass := b.TryPass(ctx)
291+
assert.True(t, pass)
292+
assert.True(t, b.state.get() == HalfOpen)
293+
b.OnRequestComplete(1, nil)
294+
}
295+
assert.True(t, b.state.get() == Closed)
296+
})
230297
}
231298

232299
func TestErrorRatio_OnRequestComplete(t *testing.T) {
@@ -254,6 +321,20 @@ func TestErrorRatio_OnRequestComplete(t *testing.T) {
254321
b.OnRequestComplete(0, errors.New("errorRatio"))
255322
assert.True(t, b.CurrentState() == Open)
256323
})
324+
t.Run("OnRequestComplete_ProbeNum_Success", func(t *testing.T) {
325+
b.probeNumber = 2
326+
b.state.set(HalfOpen)
327+
b.OnRequestComplete(base.NewEmptyEntryContext().Rt(), nil)
328+
assert.True(t, b.CurrentState() == HalfOpen)
329+
assert.True(t, b.curProbeNumber == 1)
330+
})
331+
t.Run("OnRequestComplete_ProbeNum_Failed", func(t *testing.T) {
332+
b.probeNumber = 2
333+
b.state.set(HalfOpen)
334+
b.OnRequestComplete(0, errors.New("errorRatio"))
335+
assert.True(t, b.CurrentState() == Open)
336+
assert.True(t, b.curProbeNumber == 0)
337+
})
257338
}
258339

259340
func TestErrorRatio_ResetBucketTo(t *testing.T) {
@@ -312,6 +393,34 @@ func TestErrorCountCircuitBreaker_TryPass(t *testing.T) {
312393
assert.True(t, pass)
313394
assert.True(t, b.state.get() == HalfOpen)
314395
})
396+
397+
t.Run("TryPass_ProbeNum", func(t *testing.T) {
398+
r := &Rule{
399+
Resource: "abc",
400+
Strategy: ErrorCount,
401+
RetryTimeoutMs: 3000,
402+
MinRequestAmount: 10,
403+
StatIntervalMs: 10000,
404+
Threshold: 1.0,
405+
ProbeNum: 10,
406+
}
407+
b, err := newErrorCountCircuitBreaker(r)
408+
assert.Nil(t, err)
409+
410+
b.state.set(Open)
411+
ctx := &base.EntryContext{
412+
Resource: base.NewResourceWrapper("abc", base.ResTypeCommon, base.Inbound),
413+
}
414+
e := base.NewSentinelEntry(ctx, base.NewResourceWrapper("abc", base.ResTypeCommon, base.Inbound), nil)
415+
ctx.SetEntry(e)
416+
for i := 0; i < 10; i++ {
417+
pass := b.TryPass(ctx)
418+
assert.True(t, pass)
419+
assert.True(t, b.state.get() == HalfOpen)
420+
b.OnRequestComplete(1, nil)
421+
}
422+
assert.True(t, b.state.get() == Closed)
423+
})
315424
}
316425

317426
func TestErrorCount_OnRequestComplete(t *testing.T) {
@@ -339,6 +448,20 @@ func TestErrorCount_OnRequestComplete(t *testing.T) {
339448
b.OnRequestComplete(0, errors.New("errorCount"))
340449
assert.True(t, b.CurrentState() == Open)
341450
})
451+
t.Run("OnRequestComplete_ProbeNum_Success", func(t *testing.T) {
452+
b.probeNumber = 2
453+
b.state.set(HalfOpen)
454+
b.OnRequestComplete(base.NewEmptyEntryContext().Rt(), nil)
455+
assert.True(t, b.CurrentState() == HalfOpen)
456+
assert.True(t, b.curProbeNumber == 1)
457+
})
458+
t.Run("OnRequestComplete_ProbeNum_Failed", func(t *testing.T) {
459+
b.probeNumber = 2
460+
b.state.set(HalfOpen)
461+
b.OnRequestComplete(0, errors.New("errorCount"))
462+
assert.True(t, b.CurrentState() == Open)
463+
assert.True(t, b.curProbeNumber == 0)
464+
})
342465
}
343466

344467
func TestFromClosedToOpen(t *testing.T) {

core/circuitbreaker/rule.go

+5
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,11 @@ type Rule struct {
7878
// for ErrorRatio, it represents the max error request ratio
7979
// for ErrorCount, it represents the max error request count
8080
Threshold float64 `json:"threshold"`
81+
//ProbeNum is number of probes required when the circuit breaker is half-open.
82+
//when the probe num are set and circuit breaker in the half-open state.
83+
//if err occurs during the probe, the circuit breaker is opened immediately.
84+
//otherwise,the circuit breaker is closed only after the number of probes is reached
85+
ProbeNum uint64 `json:"probeNum"`
8186
}
8287

8388
func (r *Rule) String() string {

0 commit comments

Comments
 (0)